# Spark Deeper Concepts
© Explore Data Science Academy

<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://github.com/Explore-AI/Pictures/blob/f7a433fa65521cca0016a42da6cffbebc6e65c40/data_engineering/transform/spark_logo.png?raw=True"
     alt="Spark Diagram"
     style="padding-bottom=0.5em"
     width=600px/>
</div>

## Learning objectives

In this train, you will learn how to:

- understand and use SparkSQL to manipulate datasets;
- understand and use unions, joins, windowing and other modifications in Spark;
- manipulate complex data types using built-in Spark functions; 
- apply UDF and Pandas UDFs to datasets; and
- have knowledge of JDBC and SQL Databases with Spark.



## Introduction

By now, you should be quite familiar with Spark, the benefits of using in-memory processing, and be able to do quite a few transformations using the PySpark implementation of Apache Spark. 

In this train, we go deeper. We move past only using the PySpark interface, and go one level of abstraction lower – the Spark SQL API – to directly interface with Spark. Using Spark SQL, it is possible to perform transformations commonly available in SQL, like joins, unions, and working with complex data types. These transformations are very powerful, and because they're being implemented in Spark SQL, they are very fast and computationally efficient. 

Using Spark SQL is sometimes not sufficient, and we want to perform more complex or custom transformations on our data. In this case, Spark provides us with user-defined functions, which are functions defined programmatically in Python (or another Spark-compatible language). These functions can combine columns in a unique way, perform complex mathematical operations, or use logic to manipulate the dataset. 

Finally, we look at how we can use Spark to output data to databases. 

## Load dependencies

To start, let's set up Spark and read in the Bitcoin dataset.

The dataset can be accessed by downloading the accompanying zip folder. 
The file to access is: `spark_deeper_concepts_transformed_data.snappy.parquet`

In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, FloatType

In [2]:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

For the below cell, make sure that you download and unzip the required file and place it in your current working directory.

In [3]:
bitcoin_data = spark.read.parquet('./spark_deeper_concepts_transformed_data.snappy.parquet')

In [4]:
bitcoin_data.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- open: float (nullable = true)
 |-- high: float (nullable = true)
 |-- low: float (nullable = true)
 |-- close: float (nullable = true)
 |-- volume_btc: float (nullable = true)
 |-- volume_currency: float (nullable = true)
 |-- weighted_price: float (nullable = true)



In [5]:
bitcoin_data.show(10, False)

+-------------------+----+----+----+-----+----------+---------------+--------------+
|timestamp          |open|high|low |close|volume_btc|volume_currency|weighted_price|
+-------------------+----+----+----+-----+----------+---------------+--------------+
|2011-12-31 09:52:00|4.39|4.39|4.39|4.39 |0.45558086|2.0            |4.39          |
|2011-12-31 09:53:00|NaN |NaN |NaN |NaN  |NaN       |NaN            |NaN           |
|2011-12-31 09:54:00|NaN |NaN |NaN |NaN  |NaN       |NaN            |NaN           |
|2011-12-31 09:55:00|NaN |NaN |NaN |NaN  |NaN       |NaN            |NaN           |
|2011-12-31 09:56:00|NaN |NaN |NaN |NaN  |NaN       |NaN            |NaN           |
|2011-12-31 09:57:00|NaN |NaN |NaN |NaN  |NaN       |NaN            |NaN           |
|2011-12-31 09:58:00|NaN |NaN |NaN |NaN  |NaN       |NaN            |NaN           |
|2011-12-31 09:59:00|NaN |NaN |NaN |NaN  |NaN       |NaN            |NaN           |
|2011-12-31 10:00:00|NaN |NaN |NaN |NaN  |NaN       |NaN         

## Using SparkSQL
*Spark SQL is a base layer in Spark which we can use to interface with data. It provides a similar interface and level of abstraction as the Python API, but in ANSI-compliant SQL.*

Spark SQL has various advantages:

1. It provides an engine on which higher-level APIs are built.
2. It can read and write to many structured formats (for example, JSON, Hive tables, Parquet, Avro, ORC, and CSV).
3. It can connect to databases using ODBC/JDBC connectors or other systems such as PowerBI, Tableau, Talend, or from RDBMSs such as MySQL or PostgreSQL.
4. It offers an interactive shell to issue SQL queries on your structured data.
5. It supports ANSI SQL:2003-compliant commands and HiveQL.

As mentioned already, the `SparkSession` is the main entry point into programming Spark with the Structured APIs. To run a SQL query, use the `sql()` method on the `SparkSession` instance, `spark`. For example, `spark.sql("SELECT * FROM table")`.

### Starting out – basic queries

In this train, we'll be using the Bitcoin dataset which we've worked with previously. Here we aim to do some basic exploration and manipulation of the data in Spark SQL.

Before we can do this, we need to create a temporary table. Temporary tables and views can be created using the `createOrReplaceTempView()` method on the DataFrame, taking one argument – the name of the temporary view.

In [6]:
bitcoin_data.createOrReplaceTempView('bitcoin_tbl')

Now we can execute SQL queries on the table. 

As a demonstration for this functionality, let's write a brief query that returns the `timestamp`, `high`, and `low` fields for entries that have a `high` value larger than 5000:

In [7]:
spark.sql("""SELECT timestamp, high, low
          FROM bitcoin_tbl 
          WHERE high > 5000
          ORDER BY timestamp DESC""").show(10)

+-------------------+--------+--------+
|          timestamp|    high|     low|
+-------------------+--------+--------+
|2020-12-31 02:00:00|28928.49|28893.21|
|2020-12-31 01:59:00|28911.52| 28867.6|
|2020-12-31 01:58:00|28900.52|28850.49|
|2020-12-31 01:57:00| 28863.9|28829.42|
|2020-12-31 01:56:00|28829.42|28785.64|
|2020-12-31 01:55:00| 28825.5|28800.01|
|2020-12-31 01:54:00|28832.79| 28800.0|
|2020-12-31 01:53:00|28822.71| 28800.0|
|2020-12-31 01:52:00|28844.25|28816.09|
|2020-12-31 01:51:00|28849.67|28807.78|
+-------------------+--------+--------+
only showing top 10 rows



This has a very familiar SQL-like interface, and queries are simple and easy to write if you are well versed in SQL.
Here we:
 - used the `SELECT` command to define which fields to select; 
 - used the `FROM` command to define which table to select from; 
 - used the `WHERE` command to define a condition to select; and 
 - used the `ORDER BY` command to define which field to order on and the `DESC` command to define that order should be descending.

Note that we call the `.show()` method to limit the number of printed results in our notebook to 10.

To try another query, let's look at the timestamps (minute resolution) for days in which there was no Bitcoin trading:

In [8]:
# Let's select all days where there was no trading.

spark.sql("""SELECT timestamp, high, low, volume_currency, close
          FROM bitcoin_tbl WHERE volume_currency <= 0
          ORDER BY timestamp DESC""").show(10)

+-------------------+-------+-------+---------------+-------+
|          timestamp|   high|    low|volume_currency|  close|
+-------------------+-------+-------+---------------+-------+
|2017-08-14 03:08:00|4068.49|4068.49|            0.0|4068.49|
|2014-03-25 11:50:00| 573.91| 573.91|            0.0| 573.91|
|2014-01-09 08:37:00| 834.89| 834.89|            0.0| 834.89|
|2013-10-03 12:02:00| 110.52| 110.52|            0.0| 110.52|
|2013-07-04 08:58:00|  74.42|  74.42|            0.0|  74.42|
|2013-06-30 16:40:00|   89.2|   89.2|            0.0|   89.2|
|2013-06-27 11:13:00|   98.4|   98.4|            0.0|   98.4|
|2013-06-26 22:50:00|  99.17|  99.17|            0.0|  99.17|
|2013-06-24 22:36:00|  98.88|  98.88|            0.0|  98.88|
|2013-06-22 21:39:00| 101.16| 101.16|            0.0| 101.16|
+-------------------+-------+-------+---------------+-------+
only showing top 10 rows



The last time no Bitcoin traded was back in 2017, and only for one minute. It seems to have been more common before 2017. However, let's look at the total number of times (minutes) on which there was no trading using the `COUNT` command.


In [9]:
# Let's use a count of all the days where there was no trading.

spark.sql("""SELECT COUNT(*) as Occurrences
          FROM bitcoin_tbl WHERE volume_currency <= 0
          """).show(10)

+-----------+
|Occurrences|
+-----------+
|         16|
+-----------+



#### \[Exercise 1\]

As an exercise, using Spark SQL, select all entries from the bitcoin_tbl that have a close value larger than 10 000 USD.

> **Note** 💡
 >
 > Spark has a special way of handling **NaNs** while performing computations. Have a look at the [NaN semantics](https://spark.apache.org/docs/3.0.0-preview/sql-ref-nan-semantics.html) provided by Spark.
 

### Increasing complexity – using additional SQL functionality

Next, we create more complex SQL statements, using `CASE`.

CASE in SQL defines a list of `WHEN` statements to go through and executes the true condition.
This is very similar to an if-then-else conditional found in most programming languages. 

Here we want to define (somewhat arbitrary) categories for when to either sell or acquire Bitcoin based on the open value for Bitcoin on a specific day. 


In [10]:
# We use CASE with multiple WHEN statements to define some categories.

bitcoin_advice = spark.sql("""SELECT timestamp, high, low, open,
            CASE
            WHEN open > 19000 THEN 'Sell now'
            WHEN open > 17000 THEN 'You should really sell'
            WHEN open > 15000 THEN 'Should have already sold'
            WHEN open > 0 THEN 'Buy NOW!'
            WHEN open = 0 THEN 'Lost it all!'
            ELSE 'Something is wrong here...'
            END AS sell_guidance
            FROM bitcoin_tbl
            ORDER BY timestamp DESC""")

Note that the spark.sql() method returns a spark DataFrame, which has methods available like `show()` or other operators, for instance, creating new views. 

Before we can run more Spark SQL commands on the DataFrame we just created above, we have to create a view from the DataFrame:

In [11]:
bitcoin_advice.createOrReplaceTempView('bitcoin_advice')

In [12]:
spark.sql("""SELECT *
          FROM bitcoin_advice""").show(10, False)

+-------------------+--------+--------+--------+-------------+
|timestamp          |high    |low     |open    |sell_guidance|
+-------------------+--------+--------+--------+-------------+
|2020-12-31 02:00:00|28928.49|28893.21|28893.21|Sell now     |
|2020-12-31 01:59:00|28911.52|28867.6 |28910.54|Sell now     |
|2020-12-31 01:58:00|28900.52|28850.49|28850.49|Sell now     |
|2020-12-31 01:57:00|28863.9 |28829.42|28829.42|Sell now     |
|2020-12-31 01:56:00|28829.42|28785.64|28801.47|Sell now     |
|2020-12-31 01:55:00|28825.5 |28800.01|28809.07|Sell now     |
|2020-12-31 01:54:00|28832.79|28800.0 |28800.0 |Sell now     |
|2020-12-31 01:53:00|28822.71|28800.0 |28814.36|Sell now     |
|2020-12-31 01:52:00|28844.25|28816.09|28826.49|Sell now     |
|2020-12-31 01:51:00|28849.67|28807.78|28836.97|Sell now     |
+-------------------+--------+--------+--------+-------------+
only showing top 10 rows



Similar to the Python API, we can aggregate in Spark SQL using the `GROUP BY` command. 

In [13]:
# Grouping by the categories created above, and aggregating by count.

spark.sql("""SELECT sell_guidance, COUNT(high)
          FROM bitcoin_advice
          WHERE timestamp > '2020-01-02'
          GROUP BY sell_guidance""").show(10, False)

+------------------------+-----------+
|sell_guidance           |count(high)|
+------------------------+-----------+
|Sell now                |43895      |
|Should have already sold|17811      |
|Buy NOW!                |437267     |
|You should really sell  |25307      |
+------------------------+-----------+



It looks like there were numerous occasions when you should have bought in 2020, with the value being below the threshold, but also ample times when you could have made a buck by selling. At least it looks like the price never dipped below 0.


The Spark SQL interface undergoes the same optimisations as the Python, Scala, and other APIs, and you can effectively perform all of the transformations and operations defined in the previous chapter. You can also perform all operations that you normally write in SQL. 

#### \[Exercise 2\]

As an exercise, using the Spark SQL filter for dates, which month had the most occurrences when it was ideal to buy Bitcoin in 2018, according to the above criteria?

### Tables and views

Tables hold data, but each table also has associated metadata, which includes information on the table schema, description, table name, database name, column names, partitions, physical location, and more. This is by default stored in a Hive metastore, located at `/user/hive/warehouse` within the host file system. You can change the location of this metastore to an external metastore to leverage more advanced features of Spark and Hive. Generally, such an external metastore will be hosted centrally, where multiple teams can use it.  Many examples exist to implement both on [Microsoft Azure](https://docs.microsoft.com/en-us/azure/databricks/data/metastores/external-hive-metastore) and [AWS](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-metastore-external-hive.html).

It is also important to know that Spark has two types of tables: **managed** and **unmanaged**. 

*Managed* tables have their metadata managed by Spark, as well as the data in the file store. This could be locally in HDFS or in an object store such as Azure Storage or Amazon S3. In *unmanaged*, Spark only manages the metadata, while you are responsible for managing the data storage and location yourself.

The command `DROP TABLE tbl` will drop the table data and metadata with a managed table, whereas the same command will only delete the metadata with an unmanaged table.

Managed tables are created by using the `saveAsTable()` method, whereas unmanaged tables include the `path` option.

As you have seen, you can create a view using the `createOrReplaceTempView()` method. This creates a view that will exist as long as the SparkSession is active.

In [14]:
bitcoin_advice.createOrReplaceTempView('bitcoin_advice')

### Databases

All tables have to reside within a database. By default, Spark creates tables under a `default` schema, but databases can also be manually created and specified. 

This will mean that most novice users of Spark would not be aware of databases, and may be ignorant of the concept.

Users of Databricks may already be very familiar with this concept:

<div align="center" style="width: 600px; font-size: 80%; text-align: center; margin: 0 auto">
<img src="https://github.com/Explore-AI/Pictures/blob/65a436ec1393464b511f3fff4e274f035eb503a1/data_engineering/transform/spark_deeper_concepts/default_database.png?raw=True"
     alt="Default Database"
     style="padding-bottom=1em"
     width=400px
     />
     <p><em>Databases and tables Microsoft documentation <a href="https://docs.microsoft.com/en-us/azure/databricks/data/tables">here</a>.</em></p>
</div>
<br/><br/>
The following commands create and set which schema to use:

In [15]:
# In Spark, it's called a DATABASE, but this is equivalent to schemas in relational databases.

spark.sql("CREATE DATABASE IF NOT EXISTS bitcoin")
spark.sql("USE bitcoin")

DataFrame[]

Create a managed table:

In [16]:
bitcoin_advice.write.saveAsTable('bitcoin_advice')

This will create the table `bitcoin_advice` in the database `bitcoin`. 

You can find this table in:

`$WORKING_DIR/spark-warehouse/bitcoin.db/bitcoin_advice/`

The table will be stored as a collection of Parquet files in this directory. Spark will save the table as a collection of Parquet files when working with managed tables, irrespective of the format from which the data were read.

### Viewing metadata

As mentioned, Spark stores lots of information on the databases and tables that are saved. Let's have a look at the databases, tables, and other metadata that are stored in Spark.

First, we use the `listDatabases()` method to list the databases created within our Spark environment. Leading on from above, we expect only the `default` and `bitcoin` databases.

Uncomment `spark.catalog.listDatabases()` in the cell below to run to run it

In [19]:
# Databases:
#spark.catalog.listDatabases()

Next, we list all the tables that we have created within our environment. If we do not pass an argument to the method, it will list all the tables within the environment.  Alternatively, you can pass a database name as an argument, in which case only the tables for that database will be listed.

In [18]:
# Tables:
spark.catalog.listTables()

[Table(name='bitcoin_advice', database='bitcoin', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='bitcoin_advice', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='bitcoin_tbl', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

We can dive deeper into specific tables as well by looking into the columns of a specific table, using the `listColumns()` method, with the single argument being the name of the table. 

In [20]:
# Having a look at the column metadata stored for a specific table:
spark.catalog.listColumns("bitcoin_advice")

[Column(name='timestamp', description=None, dataType='timestamp', nullable=True, isPartition=False, isBucket=False),
 Column(name='high', description=None, dataType='float', nullable=True, isPartition=False, isBucket=False),
 Column(name='low', description=None, dataType='float', nullable=True, isPartition=False, isBucket=False),
 Column(name='open', description=None, dataType='float', nullable=True, isPartition=False, isBucket=False),
 Column(name='sell_guidance', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]

We can now have a look at all the properties of the created table. We do this by using the `DESCRIBE` command, adding the `FORMATTED` command to return the formatted detail on the table:

In [21]:
spark.sql('DESCRIBE FORMATTED bitcoin.bitcoin_advice').show(25, True)

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|           timestamp|           timestamp|   null|
|                high|               float|   null|
|                 low|               float|   null|
|                open|               float|   null|
|       sell_guidance|              string|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|             bitcoin|       |
|               Table|      bitcoin_advice|       |
|        Created Time|Fri Oct 08 13:28:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.1.2|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|            Location|file:/Users/maddy...|       |
+--------------------+--------------------+-------+



This is great! 


Information returned on the various fields in the table include:
- which database it belongs to;
- when it was created; 
- who created it (in this case Spark); 
- its format (Provider);  and 
- the location where it is stored.


Let's have a look to see if we have any additional metadata on the table.

Additional metadata for tables are stored in the table's `TBLPROPERTIES` (referred to in the above table as `Table Properties`).

In [22]:
spark.sql('SHOW TBLPROPERTIES bitcoin.bitcoin_advice').show(10, False)

+---+-----+
|key|value|
+---+-----+
+---+-----+



It does not look like it.  
So, let’s add some:

In [23]:
properties = {'description': 'This is a table that is used to determine whether you should buy Bitcoin or not', 
              'created_by': 'John Smith'}
spark.sql(f'ALTER TABLE bitcoin.bitcoin_advice SET TBLPROPERTIES (PROPERTIES = "{properties}")')

DataFrame[]

In [24]:
props = spark.sql('SHOW TBLPROPERTIES bitcoin.bitcoin_advice (PROPERTIES)').collect()[0]['value']
eval(props)

{'description': 'This is a table that is used to determine whether you should buy Bitcoin or not',
 'created_by': 'John Smith'}

Let’s consider everything that happened here.

We first retrieve the table properties again using the `TBLPROPERTIES` attribute of the table – this time retrieving `PROPERTIES`. We then `collect` the results from the returned DataFrame and retrieve the first value `[0]` (we know there will only be one entry since the keys stored in `TBLPROPERTIES` are unique), finally retrieving the `value` and evaluating for proper formatting.


## SQL-like operations
*Understand and use unions, joins, windowing, and other modifications in Spark.*

Spark also allows SQL-like operations. 

These include:
- unions (sometimes called concatenation); 
- joins (you should know these quite well); 
- windowing (a cool new method which is between aggregation and single-row operations); and
- other operations that you would normally perform in SQL.


In [25]:
# Let's create some mock datasets to play with. We are going to use this to practise joining, 
# create window functions, and concatenate tables.
# We create a city info table that describes some static features of cities in South Africa.

city_info = spark.createDataFrame([
    (0, 'Johannesburg', '2001', '011', 1886, False),
    (1, 'Pretoria', '0001', '012', 1855, False),
    (2, 'Durban', '4001', '031', 1880, True),
    (3, 'Cape Town', '8000', '021', 1652, True),
    (4, 'Port Elizabeth', '6001', '041', 1820, True),
    (5, 'Bloemfontein', '9300', '051', 1846, False),
    (6, 'Kimberley', '8301', '053', 1873, False),
    (7, 'East London', '5200', '043', 1847, True),
], ['index', 'city', 'postal_code', 'area_code', 'established', 'coastal'])

# Let's create a city data table which contains data form cities from a recent census
city_data = spark.createDataFrame([
    (0, 'Johannesburg', 10500000, 76000000, 1644.98),
    (1, 'Pretoria', 2921612, 75600000, 687.54),
    (2, 'Durban', 3442361, 83900000, 225.91),
    (3, 'Cape Town', 3740026, 78700000, 2461.0),
    (4, 'Port Elizabeth', 1152915, 45600000, 251.03),
    (5, 'Bloemfontein', 747431, 15600000, 236.17),
    (6, 'Kimberley', 225160, 460000, 164.3),
    (7, 'East London', 755200, 23400000, 168.86),
], ['index', 'city', 'population', 'gdp', 'area'])

city_info.createOrReplaceTempView('city_info')
city_data.createOrReplaceTempView('city_data')

In [26]:
# Selecting data from city info.
spark.sql("SELECT * FROM city_info").show()

+-----+--------------+-----------+---------+-----------+-------+
|index|          city|postal_code|area_code|established|coastal|
+-----+--------------+-----------+---------+-----------+-------+
|    0|  Johannesburg|       2001|      011|       1886|  false|
|    1|      Pretoria|       0001|      012|       1855|  false|
|    2|        Durban|       4001|      031|       1880|   true|
|    3|     Cape Town|       8000|      021|       1652|   true|
|    4|Port Elizabeth|       6001|      041|       1820|   true|
|    5|  Bloemfontein|       9300|      051|       1846|  false|
|    6|     Kimberley|       8301|      053|       1873|  false|
|    7|   East London|       5200|      043|       1847|   true|
+-----+--------------+-----------+---------+-----------+-------+



In [27]:
# Selecting data from city data.
spark.sql("SELECT * FROM city_data").show()

+-----+--------------+----------+--------+-------+
|index|          city|population|     gdp|   area|
+-----+--------------+----------+--------+-------+
|    0|  Johannesburg|  10500000|76000000|1644.98|
|    1|      Pretoria|   2921612|75600000| 687.54|
|    2|        Durban|   3442361|83900000| 225.91|
|    3|     Cape Town|   3740026|78700000| 2461.0|
|    4|Port Elizabeth|   1152915|45600000| 251.03|
|    5|  Bloemfontein|    747431|15600000| 236.17|
|    6|     Kimberley|    225160|  460000|  164.3|
|    7|   East London|    755200|23400000| 168.86|
+-----+--------------+----------+--------+-------+



### Union

Union, as you are aware, is the joining of two tables with the same schema. 

Let's duplicate and identify the `city_info` table:


In [28]:
# We are adding a column to the city_info table here, assigning it a value of 1 or 2.

city_info_1 = city_info.withColumn('origin', F.lit(1))
city_info_2 = city_info.withColumn('origin', F.lit(2))

# Remember that Spark queries always return a Spark DataFrame.

In [29]:
city_info_1.createOrReplaceTempView('city_info_1')
city_info_2.createOrReplaceTempView('city_info_2')

In [30]:
spark.sql("SELECT * FROM city_info_1 UNION SELECT * FROM city_info_2").show(18, False)

+-----+--------------+-----------+---------+-----------+-------+------+
|index|city          |postal_code|area_code|established|coastal|origin|
+-----+--------------+-----------+---------+-----------+-------+------+
|0    |Johannesburg  |2001       |011      |1886       |false  |1     |
|0    |Johannesburg  |2001       |011      |1886       |false  |2     |
|4    |Port Elizabeth|6001       |041      |1820       |true   |1     |
|2    |Durban        |4001       |031      |1880       |true   |1     |
|7    |East London   |5200       |043      |1847       |true   |2     |
|3    |Cape Town     |8000       |021      |1652       |true   |1     |
|4    |Port Elizabeth|6001       |041      |1820       |true   |2     |
|1    |Pretoria      |0001       |012      |1855       |false  |2     |
|7    |East London   |5200       |043      |1847       |true   |1     |
|2    |Durban        |4001       |031      |1880       |true   |2     |
|5    |Bloemfontein  |9300       |051      |1846       |false  |

We can do the same using the PySpark API:

In [31]:
# Using DataFrames just created, we can perform a union.

city_info_1.union(city_info_2).show(18, False)

+-----+--------------+-----------+---------+-----------+-------+------+
|index|city          |postal_code|area_code|established|coastal|origin|
+-----+--------------+-----------+---------+-----------+-------+------+
|0    |Johannesburg  |2001       |011      |1886       |false  |1     |
|1    |Pretoria      |0001       |012      |1855       |false  |1     |
|2    |Durban        |4001       |031      |1880       |true   |1     |
|3    |Cape Town     |8000       |021      |1652       |true   |1     |
|4    |Port Elizabeth|6001       |041      |1820       |true   |1     |
|5    |Bloemfontein  |9300       |051      |1846       |false  |1     |
|6    |Kimberley     |8301       |053      |1873       |false  |1     |
|7    |East London   |5200       |043      |1847       |true   |1     |
|0    |Johannesburg  |2001       |011      |1886       |false  |2     |
|1    |Pretoria      |0001       |012      |1855       |false  |2     |
|2    |Durban        |4001       |031      |1880       |true   |

The above syntax is straightforward and resembles the syntax used in Pandas.

It is also clear that we joined tables that are from separate origins.

### Joins

It is a common operation to join two tables together. By default, Spark performs an inner join, with other options, including `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`, `right`, `right_outer`, `left_semi`, and `left_anti`.

Let's join the city info and data tables:

In [32]:
spark.sql("SELECT * FROM city_info ci JOIN city_data cd ON ci.city = cd.city ").show(5, False)

+-----+------------+-----------+---------+-----------+-------+-----+------------+----------+--------+-------+
|index|city        |postal_code|area_code|established|coastal|index|city        |population|gdp     |area   |
+-----+------------+-----------+---------+-----------+-------+-----+------------+----------+--------+-------+
|1    |Pretoria    |0001       |012      |1855       |false  |1    |Pretoria    |2921612   |75600000|687.54 |
|2    |Durban      |4001       |031      |1880       |true   |2    |Durban      |3442361   |83900000|225.91 |
|3    |Cape Town   |8000       |021      |1652       |true   |3    |Cape Town   |3740026   |78700000|2461.0 |
|0    |Johannesburg|2001       |011      |1886       |false  |0    |Johannesburg|10500000  |76000000|1644.98|
|6    |Kimberley   |8301       |053      |1873       |false  |6    |Kimberley   |225160    |460000  |164.3  |
+-----+------------+-----------+---------+-----------+-------+-----+------------+----------+--------+-------+
only showi

Again, we can do the same in PySpark:

In [33]:
joined_cities = city_info.join(city_data, on='city')
joined_cities.show(5, False)

+------------+-----+-----------+---------+-----------+-------+-----+----------+--------+-------+
|city        |index|postal_code|area_code|established|coastal|index|population|gdp     |area   |
+------------+-----+-----------+---------+-----------+-------+-----+----------+--------+-------+
|Pretoria    |1    |0001       |012      |1855       |false  |1    |2921612   |75600000|687.54 |
|Durban      |2    |4001       |031      |1880       |true   |2    |3442361   |83900000|225.91 |
|Cape Town   |3    |8000       |021      |1652       |true   |3    |3740026   |78700000|2461.0 |
|Johannesburg|0    |2001       |011      |1886       |false  |0    |10500000  |76000000|1644.98|
|Kimberley   |6    |8301       |053      |1873       |false  |6    |225160    |460000  |164.3  |
+------------+-----+-----------+---------+-----------+-------+-----+----------+--------+-------+
only showing top 5 rows



It's just as easy as joining data in Pandas.  Also, note that the column you specify to join on will only be represented once.

### Windowing

A windowing function takes values from rows in a window (which is a limited number of rows) and then returns values, typically as a row. Note that, while taking in a set of rows, it is still possible to return a single value. 


Windowing may require that we explain it in a bit more detail. Initially, Spark supported two types of transformations: 
- Functions that take a single row as input and then generate a single value as an output. 
- Functions that perform aggregations, take in a group of rows, and then calculate a single return value for each of the groups.


This left us with a group of operations not yet possible in Spark, operating on a group of rows but then returning a different value for each group of rows. Possible examples include moving averages, ranking groups, cumulative sums, and creating lags in time series.


The window function calculates a value for each row in a `Frame` (the group defined for the window).

We gain access to the `Window` function through the Spark SQL interface:



In [34]:
from pyspark.sql.window import Window

Let's illustrate its functionality with a couple of examples:

#### Ranking

First, let’s use the rank function. Here we use `dense_rank()` to rank the cities by population, windowed by if they are coastal or not.  

Read more about the dense_rank vs rank [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=window#pyspark.sql.functions.dense_rank).

Let's first implement it in Spark SQL:

In [35]:
joined_cities.createOrReplaceTempView("joined_cities")

In [36]:
spark.sql("""SELECT
    *,
    dense_rank() OVER (PARTITION BY coastal ORDER BY population DESC) as pop_rank_area
  FROM joined_cities
""").show()

+--------------+-----+-----------+---------+-----------+-------+-----+----------+--------+-------+-------------+
|          city|index|postal_code|area_code|established|coastal|index|population|     gdp|   area|pop_rank_area|
+--------------+-----+-----------+---------+-----------+-------+-----+----------+--------+-------+-------------+
|     Cape Town|    3|       8000|      021|       1652|   true|    3|   3740026|78700000| 2461.0|            1|
|        Durban|    2|       4001|      031|       1880|   true|    2|   3442361|83900000| 225.91|            2|
|Port Elizabeth|    4|       6001|      041|       1820|   true|    4|   1152915|45600000| 251.03|            3|
|   East London|    7|       5200|      043|       1847|   true|    7|    755200|23400000| 168.86|            4|
|  Johannesburg|    0|       2001|      011|       1886|  false|    0|  10500000|76000000|1644.98|            1|
|      Pretoria|    1|       0001|      012|       1855|  false|    1|   2921612|75600000| 687.5

Spark SQL makes it quite intuitive. We are selecting everything using `SELECT *`, then we call the `dense_rank()` function, which requires that we specify over what we `PARTITION` the data and what we `ORDER` the data by. 

Now let's do the same in PySpark:

In [37]:
# To create a `Frame` to partition by, we need to instantiate the `Window` class, 
# and use the partitionby() and orderby() methods to define the `Frame`.

coastal_population_window = Window.partitionBy(joined_cities['coastal']) \
                                  .orderBy(joined_cities['population'].desc()) 

# We can then use the instantiated Window class to aggregate over.

joined_cities = joined_cities.withColumn('pop_rank_area', F.dense_rank().over(coastal_population_window))

A lot is happening here. Let’s look at it part by part.

**First**, we call the `partitionBy()` method, which defined the field to partition by (here we select the `coastal` field).

*Remember that you can also use F.col('coastal') to reference the fields.*

**Next**, we define the field by which we want to order the rows within each `Frame`.

**Finally**, we bring it all together by creating a new column using the `withColumn()` method. 

Here we call the `dense_rank()` function, and it has the `over()` method that takes the window object as an argument. This means that we will apply a dense rank over the specified window frame, effectively aggregating within the window frame.

Overall, the result is quite the same between SparkSQL and PySpark, the interface is slightly different, and you can decide which works best for you. Going forward in this section, we are going to use PySpark implementations, which is more in line with the Python environment we have been using until now.


In [38]:
joined_cities.show()

+--------------+-----+-----------+---------+-----------+-------+-----+----------+--------+-------+-------------+
|          city|index|postal_code|area_code|established|coastal|index|population|     gdp|   area|pop_rank_area|
+--------------+-----+-----------+---------+-----------+-------+-----+----------+--------+-------+-------------+
|     Cape Town|    3|       8000|      021|       1652|   true|    3|   3740026|78700000| 2461.0|            1|
|        Durban|    2|       4001|      031|       1880|   true|    2|   3442361|83900000| 225.91|            2|
|Port Elizabeth|    4|       6001|      041|       1820|   true|    4|   1152915|45600000| 251.03|            3|
|   East London|    7|       5200|      043|       1847|   true|    7|    755200|23400000| 168.86|            4|
|  Johannesburg|    0|       2001|      011|       1886|  false|    0|  10500000|76000000|1644.98|            1|
|      Pretoria|    1|       0001|      012|       1855|  false|    1|   2921612|75600000| 687.5

What happened here?

We calculated the pop_rank_area, which is a rank based on if a town is coastal or not. You will see that coastal cities are ranked 1 to 4, and so also inland cities. This is all because we partitioned by 'coastal' and ordered by 'population'. So you will see that the rank is based on population.

Next, let's not just rank, but rather get into some analytics. 

Let’s calculate the area difference between each city and the largest city for inland and coastal cities.


In [39]:
# Again using the Window class, partitioning by coastal and ordering by area. 
coastal_area_window = Window.partitionBy(F.col('coastal')) \
  .orderBy(F.col('area').desc()) 

# This time, we are using arithmetic to calculate how much smaller the area is 
# for the city in question than the largest one in the coastal or inland group.
joined_cities = \
  joined_cities.withColumn('coastal_rank_area', F.max(F.col('area')).over(coastal_area_window) - F.col('area'))

In [40]:
joined_cities.show()

+--------------+-----+-----------+---------+-----------+-------+-----+----------+--------+-------+-------------+-----------------+
|          city|index|postal_code|area_code|established|coastal|index|population|     gdp|   area|pop_rank_area|coastal_rank_area|
+--------------+-----+-----------+---------+-----------+-------+-----+----------+--------+-------+-------------+-----------------+
|     Cape Town|    3|       8000|      021|       1652|   true|    3|   3740026|78700000| 2461.0|            1|              0.0|
|Port Elizabeth|    4|       6001|      041|       1820|   true|    4|   1152915|45600000| 251.03|            3|          2209.97|
|        Durban|    2|       4001|      031|       1880|   true|    2|   3442361|83900000| 225.91|            2|          2235.09|
|   East London|    7|       5200|      043|       1847|   true|    7|    755200|23400000| 168.86|            4|          2292.14|
|  Johannesburg|    0|       2001|      011|       1886|  false|    0|  10500000|76

It is quite clear that Cape Town and Johannesburg are massive! Maybe coastal vs. inland is not the correct group comparison here. 

#### \[Exercise 3\]

As an exercise, create another grouping: high vs. low GDP (you can create an arbitrary threshold). And perform similar comparisons as above to see if the differences are smaller or more comparable.

## Getting into complex data types
*Spark has an amazing functionality of storing complex data within DataFrames.*

Complex data are made up of simpler data types, and as such, there are intuitively two approaches for processing complex data:

1. Exploding the data, performing a transformation and recreating the structure.
2. Using user-defined functions to process the data.

However, both of these methods are computationally expensive, as the first requires you to collect all the items in the transformation and structure recreation, and the second, in turn, requires you to define a UDF, which Spark has limited capacity to optimise.


> 💡 &nbsp; **Introducing UDFs**
>
> UDFs or user-defined functions are functions defined by the user that perform specific transformations. More specifically, UDFs are functions that operate on columns to extend the vocabulary of the Spark DSL for data transformation.


Fortunately, Spark has built-in functions for dealing with complex data types. 


Let's first create a DataFrame with complex data:

In [41]:
# Create a DataFrame containing complex data types.

df = spark.createDataFrame([
    (0, 'ford', {'length': 2.4, 'height': 1.3, 'color': 'blue'}, [1, 4, 2, 5, 1], [1, 2, 4, 5, 1], [11, 22, 43, 54, 15]),
    (1, 'isuzu', {'length': 2.9, 'height': 1.6, 'color': 'white'}, [15, 2, 55, 12, 2], [55, 15, 16, 171, 2], [55, 15, 16, 171, 22]),
    (2, 'toyota', {'length': 2.4, 'height': 1.3, 'color': 'white'}, [1, 2, 1, 1, 1], [1, 2, 2, 2, 1], [11, 22, 23, 24, 15]),
    (3, 'mini', {'length': 1.2, 'height': 1.3, 'color': 'black'}, [1, 4, 4, 1, 4], [9, 8, 7, 6, 5], [91, 82, 73, 64, 55]),
    (4, 'ford', {'length': 2.4, 'height': 1.3, 'color': 'blue'}, [1, 2, 3, 4, 5], [0, 1, 2, 3, 4], [10, 21, 32, 43, 54]),
], ['index', 'make', 'description', 'count', 'anti_count', 'index_count'])

In [42]:
df.show(5, False)

+-----+------+---------------------------------------------+------------------+--------------------+---------------------+
|index|make  |description                                  |count             |anti_count          |index_count          |
+-----+------+---------------------------------------------+------------------+--------------------+---------------------+
|0    |ford  |{length -> 2.4, color -> null, height -> 1.3}|[1, 4, 2, 5, 1]   |[1, 2, 4, 5, 1]     |[11, 22, 43, 54, 15] |
|1    |isuzu |{length -> 2.9, color -> null, height -> 1.6}|[15, 2, 55, 12, 2]|[55, 15, 16, 171, 2]|[55, 15, 16, 171, 22]|
|2    |toyota|{length -> 2.4, color -> null, height -> 1.3}|[1, 2, 1, 1, 1]   |[1, 2, 2, 2, 1]     |[11, 22, 23, 24, 15] |
|3    |mini  |{length -> 1.2, color -> null, height -> 1.3}|[1, 4, 4, 1, 4]   |[9, 8, 7, 6, 5]     |[91, 82, 73, 64, 55] |
|4    |ford  |{length -> 2.4, color -> null, height -> 1.3}|[1, 2, 3, 4, 5]   |[0, 1, 2, 3, 4]     |[10, 21, 32, 43, 54] |
+-----+------+--

Let's consider the code used above.

First, we are using the `createDataFrame()` method on the existing `SparkSession`. It requires a list of tuples that will make up the data in the DataFrame, and a list of columns names. 

In this instance, we are providing it with six columns:
- an index (which is an integer);
- a car make column (string type);
- some attributes of the car (map type – which are dictionaries in Python); and
- three miscellaneous columns (containing array types – lists in Python). 

We can have a look at how Spark inferred the schema:

In [43]:
df.printSchema()

root
 |-- index: long (nullable = true)
 |-- make: string (nullable = true)
 |-- description: map (nullable = true)
 |    |-- key: string
 |    |-- value: double (valueContainsNull = true)
 |-- count: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- anti_count: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- index_count: array (nullable = true)
 |    |-- element: long (containsNull = true)



For us to use Spark SQL to manipulate this DataFrame, let's register this as a temp view:

In [44]:
df.createOrReplaceTempView('_temp_df')

Now for the fun part. We are going to do a range of manipulations:

1. Select the distinct elements from each array in the DataFrame in the count field.

In [45]:
# array_distinct is a SparkSQL function that operates on complex data 
# within DataFrames on type array, getting distinct elements of each array.

spark.sql("SELECT array_distinct(count) from _temp_df").show()

+---------------------+
|array_distinct(count)|
+---------------------+
|         [1, 4, 2, 5]|
|      [15, 2, 55, 12]|
|               [1, 2]|
|               [1, 4]|
|      [1, 2, 3, 4, 5]|
+---------------------+



2. Select the maximum for each array in the count field:

In [46]:
# array_max is a SparkSQL function that operates on complex data 
# within DataFrames on type array, getting the numerical maximum for each array.

spark.sql("SELECT array_max(count) from _temp_df").show()

+----------------+
|array_max(count)|
+----------------+
|               5|
|              55|
|               2|
|               4|
|               5|
+----------------+



3. Sort the arrays in the anti_count field:

In [47]:
# array_sort is a SparkSQL function that operates on complex data 
# within DataFrames on type array, sorting the elements in the array.

spark.sql("SELECT array_sort(anti_count) from _temp_df").show()

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|array_sort(anti_count, lambdafunction((IF(((namedlambdavariable() IS NULL) AND (namedlambdavariable() IS NULL)), 0, (IF((namedlambdavariable() IS NULL), 1, (IF((namedlambdavariable() IS NULL), -1, (IF((namedlambdavariable() < namedlambdavariable()), -1, (IF((namedlambdavariable() > namedlambdavariable()), 1, 0)))))))))), namedlambdavariable(), namedlambdavariable()))|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

4. Get the cardinality of the array fields:

In [48]:
# cardinality is a SparkSQL function that operates on complex data 
# within DataFrames on type array, calculating the cardinality of each array.

spark.sql("SELECT cardinality(count), cardinality(anti_count), cardinality(index_count) from _temp_df").show()

+------------------+-----------------------+------------------------+
|cardinality(count)|cardinality(anti_count)|cardinality(index_count)|
+------------------+-----------------------+------------------------+
|                 5|                      5|                       5|
|                 5|                      5|                       5|
|                 5|                      5|                       5|
|                 5|                      5|                       5|
|                 5|                      5|                       5|
+------------------+-----------------------+------------------------+



> 💡 &nbsp; **Remember**
>
> Cardinality is a measure of the number of elements in a set, array or list.
>
> For example, this list [41, 22, 67, 5] will have a cardinality of 4.

5. Find the intersection between count and anti-count, as well as values in count array and not in anti_count:

In [49]:
# array_intersect is a SparkSQL function that operates on complex data 
# within DataFrames on type array, finding the intersect between two arrays.

# array_except is the opposite, selecting values contained in the first 
# array and not the second.

spark.sql("SELECT array_intersect(count, anti_count), array_except(count, anti_count) from _temp_df").show()

+----------------------------------+-------------------------------+
|array_intersect(count, anti_count)|array_except(count, anti_count)|
+----------------------------------+-------------------------------+
|                      [1, 4, 2, 5]|                             []|
|                       [15, 2, 55]|                           [12]|
|                            [1, 2]|                             []|
|                                []|                         [1, 4]|
|                      [1, 2, 3, 4]|                            [5]|
+----------------------------------+-------------------------------+



6. Create map types from the index_count and anti_count fields:

In [50]:
# map_from_arrays create a one-to-one mapping between elements in the two
# columns.

spark.sql("SELECT map_from_arrays(index_count, anti_count) from _temp_df").show(5, False)

+---------------------------------------------------+
|map_from_arrays(index_count, anti_count)           |
+---------------------------------------------------+
|{11 -> 1, 22 -> 2, 43 -> 4, 54 -> 5, 15 -> 1}      |
|{55 -> 55, 15 -> 15, 16 -> 16, 171 -> 171, 22 -> 2}|
|{11 -> 1, 22 -> 2, 23 -> 2, 24 -> 2, 15 -> 1}      |
|{91 -> 9, 82 -> 8, 73 -> 7, 64 -> 6, 55 -> 5}      |
|{10 -> 0, 21 -> 1, 32 -> 2, 43 -> 3, 54 -> 4}      |
+---------------------------------------------------+



7. Have a look at the cardinality of the description map field:

In [51]:
spark.sql("SELECT cardinality(description) from _temp_df").show()

+------------------------+
|cardinality(description)|
+------------------------+
|                       3|
|                       3|
|                       3|
|                       3|
|                       3|
+------------------------+



8. Retrieve the length from each of the map types in the description field:

In [52]:
# element_at can decompose map types within a field.

spark.sql('SELECT element_at(description, "length") from _temp_df').show()

+-------------------------------+
|element_at(description, length)|
+-------------------------------+
|                            2.4|
|                            2.9|
|                            2.4|
|                            1.2|
|                            2.4|
+-------------------------------+



It may not be blatantly obvious where to use this yet, but accessing dictionaries and lists from within fields in Spark can significantly improve your performance and the range of operations you can perform with Spark.

### Higher-order functions

The above functions for transforming complex data types are quite useful, but you may require some level of customisation. This is where higher-order functions come in, where you can pass anonymous user-defined or lambda functions as arguments. 

Let's start with `transform()`, which allows us to apply a lambda function to each element in an array within a Spark field. This is similar to the `map()` Python function. 

In [53]:
# Here, we divide each element in the array within the count field by 100.

spark.sql("SELECT make, transform(count, t -> t / 100) AS dim_count FROM _temp_df").show(5, False)

+------+------------------------------+
|make  |dim_count                     |
+------+------------------------------+
|ford  |[0.01, 0.04, 0.02, 0.05, 0.01]|
|isuzu |[0.15, 0.02, 0.55, 0.12, 0.02]|
|toyota|[0.01, 0.02, 0.01, 0.01, 0.01]|
|mini  |[0.01, 0.04, 0.04, 0.01, 0.04]|
|ford  |[0.01, 0.02, 0.03, 0.04, 0.05]|
+------+------------------------------+



We can also filter each value in the array by a specific lambda expression:

In [54]:
# Here, we filter arrays in the count field which are larger than 10.

spark.sql("SELECT make, filter(count, t -> t > 10) AS large_count FROM _temp_df").show(5, False)

+------+------------+
|make  |large_count |
+------+------------+
|ford  |[]          |
|isuzu |[15, 55, 12]|
|toyota|[]          |
|mini  |[]          |
|ford  |[]          |
+------+------------+



We can also check boolean conditions:

In [55]:
# Here, we check if an array within the count field contains the 
# value 12 and return a boolean value based on that.

spark.sql("SELECT make, count, exists(count, t -> t = 12) AS check_count FROM _temp_df").show(5, False)

+------+------------------+-----------+
|make  |count             |check_count|
+------+------------------+-----------+
|ford  |[1, 4, 2, 5, 1]   |false      |
|isuzu |[15, 2, 55, 12, 2]|true       |
|toyota|[1, 2, 1, 1, 1]   |false      |
|mini  |[1, 4, 4, 1, 4]   |false      |
|ford  |[1, 2, 3, 4, 5]   |false      |
+------+------------------+-----------+



## User-defined functions – when Spark can't help you
*Sometimes Spark just does not have the functionality that you require.  This is where UDFs and Pandas UDFs come in.*


User-defined functions (UDFs) are processes that you programme yourself in Python and want to perform on your data in a distributed nature. Similarly, you can create Pandas UDFs, which have been greatly enhanced in Spark 3, to allow Spark to peek into these functions and optimise their execution at the DAG level. 


The benefit of creating UDFs is that you will also be able to use them in Spark SQL. As a data scientist or engineer, you can write functions that other team members later reuse without the need for them to understand the exact logic encapsulated in the function.


It is important to note that the UDF does not persist longer than the SparkSession remains active. 

Let us apply the same logic as we did in the Bitcoin section, but this time using a UDF instead of a SQL CASE statement.

We will do this in several steps:


1. Define a number of strings that will be used in the lambda function, which we then create to use as a UDF.
2. Define a lambda function as a sequence of IF statements.  This can also be represented as a collection of IF statements in a normal function.
3. Create a UDF object which can be used in the Python API.
4. Register the UDF with the SparkSession to be used in Spark SQL.
5. Apply the lambda function to the DataFrame.


In [56]:
from pyspark.sql.types import StringType

# Define the Bitcoin advice strings.
mesg1 = 'Sell now!'
mesg2 = 'You should really  sell'
mesg3 = 'Should have already sold'
mesg4 = 'Buy NOW!'
mesg5 = 'Lost it all!'

# Define a lambda function that translates the Bitcoin price into the advice strings.
func = lambda x : mesg1 if x > 19000 else (mesg2 if x > 17000 else (mesg3 if x > 15000 else (mesg4 if x > 0 else mesg5)))

# Instantiate the UDF. 
udf_mesg = F.udf(func)

# Register the UDF to allow reuse in Spark SQL.
spark.udf.register('udf_mesg', func, StringType())

bitcoin_data = bitcoin_data.withColumn('sell_guidance', udf_mesg('open'))

In [57]:
bitcoin_data.select('timestamp', 'open', 'sell_guidance').show(10)

+-------------------+----+-------------+
|          timestamp|open|sell_guidance|
+-------------------+----+-------------+
|2011-12-31 09:52:00|4.39|     Buy NOW!|
|2011-12-31 09:53:00| NaN| Lost it all!|
|2011-12-31 09:54:00| NaN| Lost it all!|
|2011-12-31 09:55:00| NaN| Lost it all!|
|2011-12-31 09:56:00| NaN| Lost it all!|
|2011-12-31 09:57:00| NaN| Lost it all!|
|2011-12-31 09:58:00| NaN| Lost it all!|
|2011-12-31 09:59:00| NaN| Lost it all!|
|2011-12-31 10:00:00| NaN| Lost it all!|
|2011-12-31 10:01:00| NaN| Lost it all!|
+-------------------+----+-------------+
only showing top 10 rows



We can do a similar thing using the temporary view, `bitcoin_tbl`,  that we created earlier. Here we apply the UDF directly to the temporary view using Spark SQL.

In [58]:
spark.sql("SELECT timestamp, open, udf_mesg(open) AS sell_guidance FROM bitcoin_tbl").show(10)

+-------------------+----+-------------+
|          timestamp|open|sell_guidance|
+-------------------+----+-------------+
|2011-12-31 09:52:00|4.39|     Buy NOW!|
|2011-12-31 09:53:00| NaN| Lost it all!|
|2011-12-31 09:54:00| NaN| Lost it all!|
|2011-12-31 09:55:00| NaN| Lost it all!|
|2011-12-31 09:56:00| NaN| Lost it all!|
|2011-12-31 09:57:00| NaN| Lost it all!|
|2011-12-31 09:58:00| NaN| Lost it all!|
|2011-12-31 09:59:00| NaN| Lost it all!|
|2011-12-31 10:00:00| NaN| Lost it all!|
|2011-12-31 10:01:00| NaN| Lost it all!|
+-------------------+----+-------------+
only showing top 10 rows



Now to perform some additional magic:

First, we add a `year` column to the DataFrame, then group the newly created categorical field, pivot the table on the year column, and aggregate by retrieving the mean of the open column.  Quite a bit to do, but let’s add one more operation. Instead of only getting the mean,  also round the mean.


*This last bit was just to give you something so you can make sense of what the data will look like and draw some inferences from it.*

In [59]:
# Add a year column, filled by the year values from the timestamp column.
bitcoin_data = bitcoin_data.withColumn('year', F.year('timestamp'))

# Group by the sell_guidance field created above and pivot on the year field.
# Once pivoted, Spark required an aggregation function. Here we get the mean
# and round to the nearest two digits.

bitcoin_data.dropna().groupBy('sell_guidance').pivot('year').agg(F.round(F.mean('open'), 2)).show()

+--------------------+----+-----+------+------+------+------+--------+--------+-------+--------+
|       sell_guidance|2011| 2012|  2013|  2014|  2015|  2016|    2017|    2018|   2019|    2020|
+--------------------+----+-----+------+------+------+------+--------+--------+-------+--------+
|You should really...|null| null|  null|  null|  null|  null|18006.24|17077.87|   null|18252.32|
|Should have alrea...|null| null|  null|  null|  null|  null|16030.03|15957.52|   null| 15881.4|
|            Buy NOW!|4.46|10.09|254.14|527.23|274.35|558.79| 3565.59| 7468.63|7426.42| 9525.09|
|           Sell now!|null| null|  null|  null|  null|  null|19230.81|    null|   null|22261.03|
+--------------------+----+-----+------+------+------+------+--------+--------+-------+--------+



One of the issues specific to PySpark is that UDFs tend to be significantly slower than their Scala counterparts. That is due to the data movement between the Java Virtual Machine (JVM) and Python. This was resolved when Pandas UDFs were introduced, which uses Apache Arrow to transfer data. You can define the function using the `pandas_df` decorator, which will transform the data into the Apache Arrow format, which is consumable by Python. Instead of input rows, you can also operate on a Pandas Series or DataFrame, speeding up transfer and processing.


Spark 3 also introduced a further distinction – Pandas UDFs and Pandas Function APIs. 

With Pandas UDFs, Spark infers the data types from Python type hints, such as `pandas.DataFrame`. Currently, you can then do conversions using Pandas UDFs from Series to Series, Iterator of Series to Iterator of Series, Iterator of Multiple Series to Iterator of Series, and Series to Scalar.

For this to work, make sure you have pyarrow >= 1.5.1 installed. [Here are the instructions to follow](https://arrow.apache.org/docs/python/install.html).

In [60]:
import pandas as pd
from pyspark.sql.types import LongType

# Declare the difference function. Both inputs are Pandas series objects, 
# similarly the output. This will map to fields in Spark.
def difference(a: pd.Series, b: pd.Series) -> pd.Series:
    return a - b

# Create the Pandas UDF for the difference function.
cubed_udf = F.pandas_udf(difference, returnType=FloatType())

In [61]:
# Apply the difference UDF function to `close` and `open` fields.

bitcoin_data = bitcoin_data.withColumn('change_in_price', cubed_udf('close', 'open'))

In [62]:
bitcoin_data.where(F.col('change_in_price') != 0).show()

+-------------------+----+----+----+-----+----------+---------------+--------------+-------------+----+---------------+
|          timestamp|open|high| low|close|volume_btc|volume_currency|weighted_price|sell_guidance|year|change_in_price|
+-------------------+----+----+----+-----+----------+---------------+--------------+-------------+----+---------------+
|2011-12-31 18:59:00| 4.5|4.57| 4.5| 4.57| 37.862297|      171.38034|     4.5264115|     Buy NOW!|2011|     0.07000017|
|2012-01-04 18:00:00|5.36|5.37|5.36| 5.37| 13.629423|          73.06|     5.3604617|     Buy NOW!|2012|    0.009999752|
|2012-01-04 19:51:00|5.37|5.57|5.37| 5.57| 43.312195|      235.74707|      5.442972|     Buy NOW!|2012|     0.20000029|
|2012-01-05 09:19:00|5.75|5.79|5.75| 5.79|      14.8|           85.5|      5.777027|     Buy NOW!|2012|     0.03999996|
|2012-01-05 12:10:00|6.19|6.23|6.19| 6.23|      16.0|       99.28572|     6.2053576|     Buy NOW!|2012|     0.03999996|
|2012-01-05 12:48:00|6.23|6.25|6.23| 6.2

Pandas Function APIs allow you to apply a local Python function to a PySpark DataFrame, where both the input and output are Pandas instances. 

## Connecting to relational databases

The following information and tables on the connection properties are referenced from the *Learning Spark Textbook* that can be found [here](https://github.com/Explore-AI/Pictures/blob/a700f77ae9331147029cbad145f4293650ac8eab/data_engineering/transform/spark_deeper_concepts/LearningSpark2.0.pdf?raw=true).

*While the Hive metastore is a great interface into the data that are underlying Spark, sometimes you want to connect to the data from the outside.*

To connect to Spark from the outside world, you have to connect to the Thrift JDBC/ODBC server, also called the Spark Thrift Server (STS). This allows JDBC/ODBC clients to execute SQL queries over JDBC and OBDC protocols on Spark. 
This is the toolkit you will be using if you want to connect BI tools to Spark, for example, PowerBI or Tableau. 


In addition to using the Thrift server to connect external tools to Spark, Spark SQL includes a data source API that allows you to read and write data from external databases using JDBC. When running queries, the connector will return a DataFrame object, along with all performance optimisations and the ability to join to datasets already in Spark as DataFrames.

To instantiate this, you first have to specify the JDBC driver for your JDBC data source. 
For example: 

`cd $SPARK_HOME`

`./bin/spark-shell --driver-class-path $database.jar --jars $database.jar`

JDBC connection properties will then be specified in the data source options. Options include:

<div align="center" style="width: 800px; font-size: 100%; text-align: center">
<img src="https://github.com/Explore-AI/Pictures/blob/4a0a9f0f76ab12bb3dc2684a41c901adaec63164/data_engineering/transform/spark_deeper_concepts/common_connection_properties.png?raw=True"
     alt="Common Connection Properties"
     style="padding-bottom=1em"
     width=750px/>
     <em>Learning Spark <a href="https://github.com/Explore-AI/Pictures/blob/a700f77ae9331147029cbad145f4293650ac8eab/data_engineering/transform/spark_deeper_concepts/LearningSpark2.0.pdf?raw=true">here</a>.</em>
</div>

**Notes on partitioning** 

Partitioning is especially important when transferring or writing large amounts of data between Spark and an external data source (not just SQL). When transferring data through the JDBC connector, all the data go through a single driver node, which can easily be saturated and slow down the operation. It is advised that you include some of the following properties to speed up the operation. 


<div align="left" style="width: 800px; font-size: 100%; text-align: left">
<img src="https://github.com/Explore-AI/Pictures/blob/4a0a9f0f76ab12bb3dc2684a41c901adaec63164/data_engineering/transform/spark_deeper_concepts/partitioning_connection_properties.png?raw=True"
     alt="Common Connection Properties"
     style="padding-bottom=1em"
     width=750px/>
     <em>Learning Spark <a href="https://github.com/Explore-AI/Pictures/blob/a700f77ae9331147029cbad145f4293650ac8eab/data_engineering/transform/spark_deeper_concepts/LearningSpark2.0.pdf?raw=true">here</a>.</em>
</div>
<br/><br/>

Partitioning will happen by partitioning on the `partitionColumn`. The `numPartitions` will determine the maximum number of concurrent JDBC connections to execute, and `lowerBound` and `upperBound` will determine the maximum and minimum stride for the partitions.


For example, if you define the following configuration:
- `numPartitions: 10`
- `lowerBound: 2011`
- `upperBound: 2020`
- `partitionColumn: year`

The following set of queries will be executed:
- `SELECT * FROM table WHERE year is 2011`
- `SELECT * FROM table WHERE year is 2012`
- ...
- `SELECT * FROM table WHERE year is 2020`

A total of 10 `SELECT` statements.

Some guiding principles:
- Start with the same number of partitions as the number of workers you have in your cluster, or a multiple thereof. But keep in mind the concurrency potential of your source/sink system. 
- Create the `lowerBound` and `upperBound` values based on actual values in the `partitionColumn`. If these values are not accurate, you will exclude some of your data or have empty partitions. 
- Select a `partitionColumn` that can be partitioned roughly equally (for example, in the above, example year is a great choice since we expect the number of entries to be identical).  Times and dates make excellent partitions in a system where data has been ingested consistently. Alternatively, you can create a hash from your primary key or a set of columns.

We include a code example to connect to a database (Microsoft SQL Server). However, the code below is not meant to be run in this notebook unless you have a database connection set upon which you can test this.

Jars can be downloaded from MAVEN:
`bin/spark-shell --jars mssql-jdbc-7.2.2.jre8.jar`

```python
databaseUrl = <URL-TO-TABLE>
tbl = some_table
uname = joe_doe
pwd = some_intricate_password
# Loading data from a JDBC source.
readDF = (spark.read\
.format("jdbc")
.option("url", databaseUrl)
.option("dbtable", "tbl")
.option("user", "uname")
.option("password", "pwd")
.load())
```

```python
# Saving data to a JDBC source.
writeDF
.write
.format("jdbc")
.option("url", databaseUrl)
.option("dbtable", "tbl")
.option("user", "uname")
.option("password", "pwd")
.save()
```

And that's all, folks!

## Conclusion

Thanks for joining us for lots of Spark SQL.

In this train, we dived into the details of what is possible in Spark SQL, how it is possible to replicate the capabilities of the Python API in Spark SQL, and how some things are simpler with Spark SQL than it is in the other APIs.
We also looked at how to perform custom functions when you can't find a function you need in Spark, and finally, interacting with external databases through Spark connectors.

## Appendix

Exercise solutions

#### \[Exercise 1\]

For this exercise, we have to select all records with a close value larger than 10 000. 
An additional catch here is that Spark interprets NaN values as very large floats, and they will be included. We thus have to filter them out. 

In [63]:
spark.sql("""SELECT *
          FROM bitcoin_tbl WHERE close > 10000
          AND isNaN(close) = false 
          """).show(10)

+-------------------+--------+--------+--------+--------+----------+---------------+--------------+
|          timestamp|    open|    high|     low|   close|volume_btc|volume_currency|weighted_price|
+-------------------+--------+--------+--------+--------+----------+---------------+--------------+
|2017-11-29 04:39:00|  9999.0| 10010.0|  9992.0| 10010.0| 165.63174|      1656279.1|      9999.769|
|2017-11-29 04:40:00| 10010.0|10035.39| 10010.0|10035.39|  76.33833|      765076.94|     10022.187|
|2017-11-29 04:41:00|10035.39| 10059.0|10035.39| 10058.0|  37.38189|      375654.78|     10049.111|
|2017-11-29 04:42:00|10058.99|10069.12|10042.87| 10069.0| 12.911021|      129891.89|     10060.544|
|2017-11-29 04:43:00| 10068.0|10069.12| 10059.0| 10069.0| 26.123903|       263039.8|     10068.933|
|2017-11-29 04:44:00|10069.12|10069.12|10050.75|10054.46| 12.534665|     126085.984|     10058.983|
|2017-11-29 04:45:00|10054.47|10054.98| 10050.0|10054.42| 3.3103845|       33275.46|     10051.842|


#### \[Exercise 2\]

Here we had to find the best month to buy Bitcoin in 2018, in other words, the month with the most occurrences of `Buy NOW!`.

First, we use the `BETWEEN` method to filter for records in 2018 only. We also filter for the `Buy NOW!` category in the sell guidance field. 
Finally, we group by the month using the `month()` function and order according to the same field.

In [64]:
spark.sql("""SELECT month(timestamp), COUNT(high)
          FROM bitcoin_advice
          WHERE timestamp BETWEEN '2018-01-01' and '2019-01-01' AND sell_guidance = 'Buy NOW!'
          GROUP BY month(timestamp)
          ORDER BY month(timestamp)""").show(12, False)

+------------------------------+-----------+
|month(CAST(timestamp AS DATE))|count(high)|
+------------------------------+-----------+
|1                             |37982      |
|2                             |40242      |
|3                             |44352      |
|4                             |43121      |
|5                             |44530      |
|6                             |42853      |
|7                             |44179      |
|8                             |43044      |
|9                             |39392      |
|10                            |37962      |
|11                            |39171      |
|12                            |42407      |
+------------------------------+-----------+



#### \[Exercise 3\]

Here we have to window over a new column, which divides the GDP figures into two or more groups. 

First, we create those groups based on an arbitrary number that splits the data into two.

Next, we window by that new field and order by GDP within that window. 

Finally, we calculate the difference between the GDP of the group’s largest member and the member in question. 

In [65]:
joined_cities = joined_cities.withColumn('gdp_group', F.when(F.col('gdp') > 75000000, 'high').otherwise('low'))

gdp_area_window = Window.partitionBy(F.col('gdp_group')) \
  .orderBy(F.col('gdp').desc()) 

joined_cities = \
  joined_cities.withColumn('gdp_rank_area', F.max(F.col('gdp')).over(gdp_area_window) - F.col('gdp'))

joined_cities.show()

+--------------+-----+-----------+---------+-----------+-------+-----+----------+--------+-------+-------------+-----------------+---------+-------------+
|          city|index|postal_code|area_code|established|coastal|index|population|     gdp|   area|pop_rank_area|coastal_rank_area|gdp_group|gdp_rank_area|
+--------------+-----+-----------+---------+-----------+-------+-----+----------+--------+-------+-------------+-----------------+---------+-------------+
|Port Elizabeth|    4|       6001|      041|       1820|   true|    4|   1152915|45600000| 251.03|            3|          2209.97|      low|            0|
|   East London|    7|       5200|      043|       1847|   true|    7|    755200|23400000| 168.86|            4|          2292.14|      low|     22200000|
|  Bloemfontein|    5|       9300|      051|       1846|  false|    5|    747431|15600000| 236.17|            3|          1408.81|      low|     30000000|
|     Kimberley|    6|       8301|      053|       1873|  false|    6|