<img style="float: right" src="images/surfsara.png">
<br/>
<hr style="clear: both" />

# Spark Structured API: DataFrames and SQL
In this notebook we will look at Spark's Structured API. We will see how you can use DataFrames and SQL to perform common data processing operations.

First, we will need to create a `SparkSession` object:

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .getOrCreate()

This `SparkSession` object is the entry point for our Spark cluster. We can use a `SparkSession` to create DataFrames, as we will soon see.

## DataFrames from Python collections
We can create a DataFrame from an existing Python collection, such as a list or a dictionary. In addition to the collection itself we will also describe (part of) the structure of the data by naming the columns. Additionally, we can specify the data types of the columns. However, in this case we will let Spark infer this automatically.

First, a list of tuples in Python is created, called `phone_stock`. Next, we create a list called `columns` that contain the name of all columns of the DataFrame. Then we use these two lists as input for [`createDataFrame`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession.createDataFrame). The result is the DataFrame `phone_df`. Next we print the type of both `phone_stock` and `phone_df`:

In [3]:
phone_stock = [
    ('iPhone 6', 'Apple', 6, 549.00),
    ('iPhone 6s', 'Apple', 5, 585.00),
    ('iPhone 7', 'Apple', 11, 739.00),
    ('Pixel', 'Google', 8, 859.00),
    ('Pixel XL', 'Google', 2, 959.00),
    ('Galaxy S7', 'Samsung', 10, 539.00),
    ('Galaxy S6', 'Samsung', 5, 414.00),
    ('Galaxy A5', 'Samsung', 7, 297.00),
    ('Galaxy Note 7', 'Samsung', 0, 841.00)
]

columns = ['model', 'brand', 'stock', 'unit_price']

phone_df = spark.createDataFrame(phone_stock, columns)

print('the type of phone_stock: ' + str(type(phone_stock)))
print('the type of phone_df: ' + str(type(phone_df)))

the type of phone_stock: <class 'list'>
the type of phone_df: <class 'pyspark.sql.dataframe.DataFrame'>


We can see that the `phone_stock` variable is a Python `list`, kept in memory. Using `createDataFrame`, it is converted into a DataFrame, which is now located on our Spark cluster. It is important to distinguish between data available locally (driver-side), such as `phone_stock`, and data available on the cluster (cluster-side), such as `phone_df`.<br>
Although data can be downloaded from the cluster to the driver, and uploaded from the driver to the cluster, this is often a costly operation. In order to reduce computation time, it is important to know where our data 'lives', and how Spark operation affect the location of our data. We will come back to this issue later in the notebook.

In order to see a few rows of a DataFrame we use [`show()`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show). By default it shows 20 rows, but you can give the desired number of rows that you want to see as an argument.

In [6]:
phone_df.show(5)

+---------+------+-----+----------+
|    model| brand|stock|unit_price|
+---------+------+-----+----------+
| iPhone 6| Apple|    6|     549.0|
|iPhone 6s| Apple|    5|     585.0|
| iPhone 7| Apple|   11|     739.0|
|    Pixel|Google|    8|     859.0|
| Pixel XL|Google|    2|     959.0|
+---------+------+-----+----------+
only showing top 5 rows



The [`collect()`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.collect) _action_ returns all data from a DataFrame to the driver. Notice that the result is a Python `list` containing `Row` objects. In turn, each `Row` object consists of pairs of column names and attribute values.

**Note**: `collect` is a costly operation. Each executor in the cluster will return its part of the DataFrame to the driver in its entirety. This is fine for small DataFrames, such as on the one we are using, but can cause serious problems on larger DataFrames.

In [7]:
all_phones = phone_df.collect()
all_phones

[Row(model='iPhone 6', brand='Apple', stock=6, unit_price=549.0),
 Row(model='iPhone 6s', brand='Apple', stock=5, unit_price=585.0),
 Row(model='iPhone 7', brand='Apple', stock=11, unit_price=739.0),
 Row(model='Pixel', brand='Google', stock=8, unit_price=859.0),
 Row(model='Pixel XL', brand='Google', stock=2, unit_price=959.0),
 Row(model='Galaxy S7', brand='Samsung', stock=10, unit_price=539.0),
 Row(model='Galaxy S6', brand='Samsung', stock=5, unit_price=414.0),
 Row(model='Galaxy A5', brand='Samsung', stock=7, unit_price=297.0),
 Row(model='Galaxy Note 7', brand='Samsung', stock=0, unit_price=841.0)]

Working directly with a list of row objects is cumbersome. To work directly with data on the driver's side, we usually convert the Spark DataFrame to a `pandas` DataFrame. [`pandas`](https://pandas.pydata.org/) is a data processing library that allows us to manipulate tabular table. It is suitable for processing that isn't too intensive and data that isn't too large to fit into local memory (otherwise, why would we want to use Spark?).

Spark DataFrames have a [`toPandas()`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toPandas) action defined on them, that will pull all data to the driver and convert it to a `pandas` DataFrame:

In [8]:
phone_df.toPandas()

Unnamed: 0,model,brand,stock,unit_price
0,iPhone 6,Apple,6,549.0
1,iPhone 6s,Apple,5,585.0
2,iPhone 7,Apple,11,739.0
3,Pixel,Google,8,859.0
4,Pixel XL,Google,2,959.0
5,Galaxy S7,Samsung,10,539.0
6,Galaxy S6,Samsung,5,414.0
7,Galaxy A5,Samsung,7,297.0
8,Galaxy Note 7,Samsung,0,841.0


## Schemas

There are several methods to inspect the structure of a DataFrame: `printSchema`, `schema` and `describe`. [`printSchema`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.printSchema) is especially useful with complicated nested structures, because it provides a human-readable form:

In [9]:
phone_df.printSchema()

root
 |-- model: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- stock: long (nullable = true)
 |-- unit_price: double (nullable = true)



Note that all columns are listed, together with their type and a boolean value that indicates whether the value for that column can be NULL. Although this DataFrame does not contain nested structures, we will see a more involved example later in the notebook.

Schema's can also be listed programmatically. By calling [`schema`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.schema) we get to see the structure of the DataFrame in Spark's internal types. It is possible to define a schema in code by making use of these types, although we won't do this here.

In [10]:
phone_df.schema

StructType(List(StructField(model,StringType,true),StructField(brand,StringType,true),StructField(stock,LongType,true),StructField(unit_price,DoubleType,true)))

The `schema`'s `fields` attribute will provide a more readable way to inspect the DataFrame's structure:

In [11]:
phone_df.schema.fields

[StructField(model,StringType,true),
 StructField(brand,StringType,true),
 StructField(stock,LongType,true),
 StructField(unit_price,DoubleType,true)]

[`describe`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.describe) will compute summary statistics for numeric and string columns:

In [14]:
phone_df.describe().toPandas()

Unnamed: 0,summary,model,brand,stock,unit_price
0,count,9,9,9.0,9.0
1,mean,,,6.0,642.4444444444445
2,stddev,,,3.5355339059327378,220.82295573100583
3,min,Galaxy A5,Apple,0.0,297.0
4,max,iPhone 7,Samsung,11.0,959.0


## Data extraction

Now that we have our data in a DataFrame, we want to use it to manipulate the data. Let's start by selecting subsets of the data: specific columns and/or rows.

### Selecting columns

Often we are not interested in all the columns of our data. DataFrames make it very easy to select a subset of the data by using the [`select`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.describe) method. Realise that we are not modifying the original DataFrame, but creating a new one:

In [15]:
# Select only the model column
model_df = phone_df.select('model')
model_df.show()

+-------------+
|        model|
+-------------+
|     iPhone 6|
|    iPhone 6s|
|     iPhone 7|
|        Pixel|
|     Pixel XL|
|    Galaxy S7|
|    Galaxy S6|
|    Galaxy A5|
|Galaxy Note 7|
+-------------+



We can also rename a column by using [`expr`](https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.functions.expr).

In [16]:
from pyspark.sql.functions import expr
mymodel_df = phone_df.select('brand', expr('model as mymodel'))
mymodel_df.show()

+-------+-------------+
|  brand|      mymodel|
+-------+-------------+
|  Apple|     iPhone 6|
|  Apple|    iPhone 6s|
|  Apple|     iPhone 7|
| Google|        Pixel|
| Google|     Pixel XL|
|Samsung|    Galaxy S7|
|Samsung|    Galaxy S6|
|Samsung|    Galaxy A5|
|Samsung|Galaxy Note 7|
+-------+-------------+



In [17]:
# Select both the brand and model columns
bm_df = phone_df.select(['brand', 'model'])
bm_df.show()

+-------+-------------+
|  brand|        model|
+-------+-------------+
|  Apple|     iPhone 6|
|  Apple|    iPhone 6s|
|  Apple|     iPhone 7|
| Google|        Pixel|
| Google|     Pixel XL|
|Samsung|    Galaxy S7|
|Samsung|    Galaxy S6|
|Samsung|    Galaxy A5|
|Samsung|Galaxy Note 7|
+-------+-------------+



#### Note: Columns specifications
In the previous examples we have used various _column specifications_ for selecting data. For example, the following column specifications for selecting the `brand` and `model` columns are all equivalent:
```
bm_df = phone_df.select('brand', 'model')
bm_df = phone_df.select(['brand', 'model'])
bm_df = phone_df.select([phone_df.brand, phone_df.model])
bm_df = phone_df.select(phone_df['brand'], phone_df['model'])
```
Although the first two are shorter, we will sometimes need to use the last two column specifications for more complex queries, and to resolve any ambiguities for Spark's parser. In fact, the Spark documentation states that the last column specification is preferred, although you are free to use the specification you see fit.

## Assignment 1
Select the `model` and `stock` columns from `phone_df`, with whatever column specification you prefer:

In [18]:
# TODO: Replace <FILL IN> with appropriate code
# Select the model and stock columns
ms_df = phone_df.select('model', 'stock')
ms_df.show()

+-------------+-----+
|        model|stock|
+-------------+-----+
|     iPhone 6|    6|
|    iPhone 6s|    5|
|     iPhone 7|   11|
|        Pixel|    8|
|     Pixel XL|    2|
|    Galaxy S7|   10|
|    Galaxy S6|    5|
|    Galaxy A5|    7|
|Galaxy Note 7|    0|
+-------------+-----+



### Filtering rows

We can filter specific rows by using the DataFrame [`filter`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.filter) method. Please note that the [`where`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where) method is an alias for `filter`. As with the column specifications for the `select` method, there are several ways of filtering with the same query:

In [19]:
# Select rows with phones from Google
google_df = phone_df.filter(phone_df['brand'] == 'Google')
google_df.show()

+--------+------+-----+----------+
|   model| brand|stock|unit_price|
+--------+------+-----+----------+
|   Pixel|Google|    8|     859.0|
|Pixel XL|Google|    2|     959.0|
+--------+------+-----+----------+



In [20]:
# Select rows with phones from Google, this time with a query string. Note the double quotes to specify a string
google_df = phone_df.filter('brand = "Google"')
google_df.show()

+--------+------+-----+----------+
|   model| brand|stock|unit_price|
+--------+------+-----+----------+
|   Pixel|Google|    8|     859.0|
|Pixel XL|Google|    2|     959.0|
+--------+------+-----+----------+



## Assignment 2
Select the rows with `unit_price` less than 550.00

In [24]:
# TODO: Replace <FILL IN> with appropriate code

cheap_df = phone_df.filter('unit_price < 550.00')
cheap_df.show()

+---------+-------+-----+----------+
|    model|  brand|stock|unit_price|
+---------+-------+-----+----------+
| iPhone 6|  Apple|    6|     549.0|
|Galaxy S7|Samsung|   10|     539.0|
|Galaxy S6|Samsung|    5|     414.0|
|Galaxy A5|Samsung|    7|     297.0|
+---------+-------+-----+----------+



Multiple filter conditions can be specified using Python's [bitwise operators](https://docs.python.org/3/library/stdtypes.html#bitwise-operations-on-integer-types), such as `|` (or) and `&` (and). Of course, we are not manipulating bits. Instead, think of the bitwise operators as applying a boolean operation on each pair of elements between two columns.

Also, please be aware that due to Python's operator precedence rules, individual expressions must be wrapped in enclosing brackets:

In [25]:
phone_df.filter((phone_df.brand == 'Apple') | (phone_df.brand == 'Google')).show()

+---------+------+-----+----------+
|    model| brand|stock|unit_price|
+---------+------+-----+----------+
| iPhone 6| Apple|    6|     549.0|
|iPhone 6s| Apple|    5|     585.0|
| iPhone 7| Apple|   11|     739.0|
|    Pixel|Google|    8|     859.0|
| Pixel XL|Google|    2|     959.0|
+---------+------+-----+----------+



### Ordering rows

We can use the [`orderBy`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.orderBy) method to sort data:

In [26]:
phone_df.orderBy('unit_price').show()

+-------------+-------+-----+----------+
|        model|  brand|stock|unit_price|
+-------------+-------+-----+----------+
|    Galaxy A5|Samsung|    7|     297.0|
|    Galaxy S6|Samsung|    5|     414.0|
|    Galaxy S7|Samsung|   10|     539.0|
|     iPhone 6|  Apple|    6|     549.0|
|    iPhone 6s|  Apple|    5|     585.0|
|     iPhone 7|  Apple|   11|     739.0|
|Galaxy Note 7|Samsung|    0|     841.0|
|        Pixel| Google|    8|     859.0|
|     Pixel XL| Google|    2|     959.0|
+-------------+-------+-----+----------+



In the next cell we use a chain of DataFrame methods that are very similar to the SQL query language used for certain databases.

In [27]:
phone_df.select("model", "unit_price").where("brand='Apple'").orderBy('stock', ascending=False).show()

+---------+----------+
|    model|unit_price|
+---------+----------+
| iPhone 7|     739.0|
| iPhone 6|     549.0|
|iPhone 6s|     585.0|
+---------+----------+



An alternative way of doing the same as the cell above is using `phone_df["brand"]` in the where clause. This is longer to type but intuitively more clear and easier to read. There is no ambiguity for the Spark parser with this notation.

In [28]:
phone_df.select("model", "unit_price").where(phone_df["brand"]=="Apple").orderBy('stock', ascending=False).show()

+---------+----------+
|    model|unit_price|
+---------+----------+
| iPhone 7|     739.0|
| iPhone 6|     549.0|
|iPhone 6s|     585.0|
+---------+----------+



## Assignment 3
Select all phones with a unit price larger than 300 and of which there are more than two in stock. Display the remaining phones, ordered by brand, followed by stock. Use whatever column specification syntax you prefer.

In [33]:
phone_df.select('brand', 'model', 'unit_price', 'stock').filter((phone_df.unit_price > 300) & (phone_df.stock > 2)).orderBy('brand').orderBy('stock').show()

+-------+---------+----------+-----+
|  brand|    model|unit_price|stock|
+-------+---------+----------+-----+
|  Apple|iPhone 6s|     585.0|    5|
|Samsung|Galaxy S6|     414.0|    5|
|  Apple| iPhone 6|     549.0|    6|
| Google|    Pixel|     859.0|    8|
|Samsung|Galaxy S7|     539.0|   10|
|  Apple| iPhone 7|     739.0|   11|
+-------+---------+----------+-----+



## Aggregating data
An important part of data processing is the ability to combine multiple records, like we did with `reduceByKey`. In the DataFrame API this is a two-step process: grouping data, and then applying a function to the data.

First we group the data using the [`groupBy`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy) method. `groupBy` can operate on one or multiple columns. It will not actually perform the grouping but create a reference to a [`GroupedData`](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.GroupedData) object:

In [34]:
grouped_df = phone_df.groupBy('brand')
print(type(grouped_df))

<class 'pyspark.sql.group.GroupedData'>


After the data is grouped we can apply one of the standard aggregation functions on it. They are listed at the [`GroupedData`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData) API documentation. These are: `min`, `max`, `mean`, `sum` and `count`. We can apply an aggregation to all columns or to a subset of the columns.

In [35]:
# Minimum for all columns
min_df = grouped_df.min('unit_price')

min_df.toPandas()

Unnamed: 0,brand,min(unit_price)
0,Samsung,297.0
1,Google,859.0
2,Apple,549.0


Notice that the `min(unit_price)` is the name of the new column. We can rename this column with the [`withColumnRenamed`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumnRenamed) method:

In [36]:
min_df.withColumnRenamed('min(unit_price)', 'min_unit_price').show()

+-------+--------------+
|  brand|min_unit_price|
+-------+--------------+
|Samsung|         297.0|
| Google|         859.0|
|  Apple|         549.0|
+-------+--------------+



## Assignment 4

Compute the maximum  of the unit_price per brand and rename the resulting column to `max`.
(We assume you can do this in one line. Feel free to adapt the cell and use more lines if you want.)

In [38]:
# TODO: Replace <FILL IN> with appropriate code
max_df = phone_df.groupBy('brand').max('unit_price')
max_df.toPandas()

Unnamed: 0,brand,max(unit_price)
0,Samsung,841.0
1,Google,959.0
2,Apple,739.0


To calculate multiple aggregations over our data, we use the [`agg`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.agg) method:

In [39]:
# Take the sum of the stock column, and calculate the mean of the unit_price column, in one go
sum_df = grouped_df.agg({'stock': 'sum', 'unit_price': 'mean'})

sum_df.show()

+-------+----------+-----------------+
|  brand|sum(stock)|  avg(unit_price)|
+-------+----------+-----------------+
|Samsung|        22|           522.75|
| Google|        10|            909.0|
|  Apple|        22|624.3333333333334|
+-------+----------+-----------------+



## SQL
The SQL API aims to be ANSI-SQL SQL2003 and Hive-SQL compatible. The expressiveness is very similar to the DataFrame methods we just discussed. You can access the SQL API from the SparkSession by using `spark.sql`. Below is a query performed using Spark's DataFrame API:

In [40]:
# DataFrame version
res_df = phone_df.filter(phone_df['stock'] > 7).select('model')
res_df.show()

+---------+
|    model|
+---------+
| iPhone 7|
|    Pixel|
|Galaxy S7|
+---------+



The SQL version of the query requires us to 'register' the DataFrame as an SQL table first: 

In [41]:
# SQL version

# Register the phone_df DataFrame within SQL as a table with name 'phones'
phone_df.createOrReplaceTempView('phones')

# Perform the SQL query on the 'phones' table
res_df = spark.sql('SELECT model FROM phones WHERE stock > 7')
res_df.show()

+---------+
|    model|
+---------+
| iPhone 7|
|    Pixel|
|Galaxy S7|
+---------+



## Assignment 5
Transform the following query into SQL syntax, and run it using `spark.sql`. Show the resulting data frame.
```
phone_df.select("model", "unit_price").where(phone_df["brand"]=="Apple").orderBy('stock', ascending=False)
```
**Hint**: use the `phones` table in your SQL query, similar to the cell above.
<br>
**Hint**: for an SQL syntax cheat sheet, see this [pdf](http://files.zeroturnaround.com/pdf/zt_sql_cheat_sheet.pdf).

In [43]:
sql_df = spark.sql('SELECT model, unit_price FROM phones WHERE brand = "Apple" ORDER BY stock')
sql_df.show()

+---------+----------+
|    model|unit_price|
+---------+----------+
|iPhone 6s|     585.0|
| iPhone 6|     549.0|
| iPhone 7|     739.0|
+---------+----------+



## Joining with other data sets
Often you want to combine multiple datasets on a shared column. In this example we create an extra table with information about phone manufacturers:

In [44]:
companies = [
    ('Google', 'USA', 1998, 'Sundar Pichai'),
    ('Samsung', 'South Korea', 1938 ,'Oh-Hyun Kwon' ),
    ('Apple', 'USA', 1976 ,'Tim Cook')
]

columns = ['company_name', 'hq_country', 'founding_year', 'ceo']

company_df = spark.createDataFrame(companies, columns)
company_df.show()

+------------+-----------+-------------+-------------+
|company_name| hq_country|founding_year|          ceo|
+------------+-----------+-------------+-------------+
|      Google|        USA|         1998|Sundar Pichai|
|     Samsung|South Korea|         1938| Oh-Hyun Kwon|
|       Apple|        USA|         1976|     Tim Cook|
+------------+-----------+-------------+-------------+



To join two DataFrames, we use the [`join`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join) method on one of the DataFrames. This method takes two arguments: (1) the other DataFrame and (2) a join relation, providing an expression on what columns to join on from both DataFrames. Here we join the two DataFrames on the brand/company_name columns:

In [45]:
joined_df = phone_df.join(company_df, phone_df['brand'] == company_df['company_name'])
joined_df.show()

+-------------+-------+-----+----------+------------+-----------+-------------+-------------+
|        model|  brand|stock|unit_price|company_name| hq_country|founding_year|          ceo|
+-------------+-------+-----+----------+------------+-----------+-------------+-------------+
|    Galaxy S7|Samsung|   10|     539.0|     Samsung|South Korea|         1938| Oh-Hyun Kwon|
|    Galaxy S6|Samsung|    5|     414.0|     Samsung|South Korea|         1938| Oh-Hyun Kwon|
|    Galaxy A5|Samsung|    7|     297.0|     Samsung|South Korea|         1938| Oh-Hyun Kwon|
|Galaxy Note 7|Samsung|    0|     841.0|     Samsung|South Korea|         1938| Oh-Hyun Kwon|
|        Pixel| Google|    8|     859.0|      Google|        USA|         1998|Sundar Pichai|
|     Pixel XL| Google|    2|     959.0|      Google|        USA|         1998|Sundar Pichai|
|     iPhone 6|  Apple|    6|     549.0|       Apple|        USA|         1976|     Tim Cook|
|    iPhone 6s|  Apple|    5|     585.0|       Apple|       

## Assignment 6
Join the phone data frame on the company data frame. Select only the models for which the stock is greater than 7, and of which the company HQ is located in the USA.

In [49]:
result = phone_df.join(company_df, phone_df['brand'] == company_df['company_name']).filter((phone_df.stock > 7) & (company_df.hq_country == 'USA'))

result.show()

+--------+------+-----+----------+------------+----------+-------------+-------------+
|   model| brand|stock|unit_price|company_name|hq_country|founding_year|          ceo|
+--------+------+-----+----------+------------+----------+-------------+-------------+
|   Pixel|Google|    8|     859.0|      Google|       USA|         1998|Sundar Pichai|
|iPhone 7| Apple|   11|     739.0|       Apple|       USA|         1976|     Tim Cook|
+--------+------+-----+----------+------------+----------+-------------+-------------+



### Spark's logical and physical plans
Under the hood, Spark performs query optimization using its Catalyst optimizer. Catalyst will spot opportunities for simplification and speed-up, and generate an optimized _physical plan_ that is then executed by Spark.

One of the optimizations Catalyst performs is _predicate pushdown_. It is easiest to show this mechanism by means of the example above. We can have Spark _explain_ to us what a logical and physical plan look like for a given query, using the [`explain`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.explain) method:

In [50]:
phone_df \
    .join(company_df, phone_df['brand'] == company_df['company_name']) \
    .filter(company_df['hq_country'] == 'USA') \
    .filter(phone_df['stock'] > 7) \
    .select('model') \
    .explain(extended=True)

== Parsed Logical Plan ==
'Project [unresolvedalias('model, None)]
+- Filter (stock#11L > cast(7 as bigint))
   +- Filter (hq_country#937 = USA)
      +- Join Inner, (brand#10 = company_name#936)
         :- LogicalRDD [model#9, brand#10, stock#11L, unit_price#12]
         +- LogicalRDD [company_name#936, hq_country#937, founding_year#938L, ceo#939]

== Analyzed Logical Plan ==
model: string
Project [model#9]
+- Filter (stock#11L > cast(7 as bigint))
   +- Filter (hq_country#937 = USA)
      +- Join Inner, (brand#10 = company_name#936)
         :- LogicalRDD [model#9, brand#10, stock#11L, unit_price#12]
         +- LogicalRDD [company_name#936, hq_country#937, founding_year#938L, ceo#939]

== Optimized Logical Plan ==
Project [model#9]
+- Join Inner, (brand#10 = company_name#936)
   :- Project [model#9, brand#10]
   :  +- Filter ((isnotnull(stock#11L) && (stock#11L > 7)) && isnotnull(brand#10))
   :     +- LogicalRDD [model#9, brand#10, stock#11L, unit_price#12]
   +- Project [company_

The first block of text is on **the parsed logical plan**. Each plan is a tree of operations, where operations in the leafs of the tree (the right-most entries) are executed first. As you can see, the parsed logical plan will involve a join on two `LogicalRDD` objects (the company and phone data frames), after which there are two filter operations. In sequence, they filter the `hq_country` column, followed by the `stock` column.

After resolving references for the query strings, we end up with the **analyzed logical plan**. Please note that the `unresolvedalias('model` in the parsed logical plan is now resolved to its appropriate type (`string`). Also note that the tree of operations is not changed going from parsed to analyzed logical plan.

Catalyst will now optimize the logical plan. In this case, Catalyst will notice that the two filter operations (`hq_country` on `company_df`) and (`stock` on `phone_df`) are performed _after joining_. However, since both operations only use columns from their own data frame, these filter operations can be performed **before** joining. In this way, the result of the filter operations are much smaller tables, and the resulting join will be much quicker to do as well. <br>
Catalyst will move these filter operations down into the operations tree, such that we will filter first, and join later. This is known as _predicate pushdown_, and its effect can be seen in the **optimized logical plan**. The `Join Inner` on the second line of the plan will now contain two subtrees, one filtering the `company_df` data frame, the other filtering the `phone_df` data frame.

Predicate pushdown is just one of the optimizations Catalyst can perform. Also, if the execution of a query takes an inordinate amount of time, it may help to use the [`explain`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.explain) method to see its logical and physical plans.

## Assignment 7

The problem below was taken from Coursera's MOOC [Big Data Analysis with Scala and Spark](https://www.coursera.org/learn/scala-spark-big-data) by the École Polytechnique Fédérale de Lausanne. We adapted the problem for PySpark, and extended it.

Let's assume we have a dataset with posts from a discussion forum. The entries of the dataset consist of an `author_id`, a `subforum_id`, the number of likes and a date. The data frame is constructed in the following cell.

**We would like to know how many likes each author posted on each subforum. The table should show per `subforum_id` how many likes each author has, the highest number of likes first.**

In [51]:
posts_df = spark.createDataFrame(
    [
        (4, 1, 5, 'sept 5'),
        (1, 2, 3, 'sept 4'),
        (2, 2, 35, 'sept 3'),
        (3, 1, 1, 'sept 5'),
        (4, 1, 14, 'sept 5'),
        (3, 2, 12, 'sept 3'),
        (3, 1, 14, 'sept 5'),
        (3, 1, 10, 'sept 5'),
        (2, 2, 21, 'sept 5')
    ],
    ['author_id', 'subforum_id', 'likes', 'date']
)
posts_df.show()

+---------+-----------+-----+------+
|author_id|subforum_id|likes|  date|
+---------+-----------+-----+------+
|        4|          1|    5|sept 5|
|        1|          2|    3|sept 4|
|        2|          2|   35|sept 3|
|        3|          1|    1|sept 5|
|        4|          1|   14|sept 5|
|        3|          2|   12|sept 3|
|        3|          1|   14|sept 5|
|        3|          1|   10|sept 5|
|        2|          2|   21|sept 5|
+---------+-----------+-----+------+



Please use a [`groupBy`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy), the [`sum`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.sum) aggregation function and an [`orderBy`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.orderBy) to come up with the desired dataFrame. Note that you want to order the results descendingly.
Also note that you can use `groupBy` and `orderBy` on more than one column.

If you get confused, break the problem into steps.

In [69]:
posts_df.groupBy('author_id', 'subforum_id').agg({'likes': 'sum'}).orderBy('sum(likes)', ascending=False).show()


+---------+-----------+----------+
|author_id|subforum_id|sum(likes)|
+---------+-----------+----------+
|        2|          2|        56|
|        3|          1|        25|
|        4|          1|        19|
|        3|          2|        12|
|        1|          2|         3|
+---------+-----------+----------+



## Assignment 8
We also have two data frames containing author names and subforum names. Join these two tables on the DataFrame and repeat the query, now showing the author's name and forum's name instead of their IDs.

In [78]:
authors_df = spark.createDataFrame(
    [
        (1, 'Brian Watt'),
        (2, 'Esmeralda Andrews'),
        (3, 'Mabel Frank'),
        (4, 'Jan Lyndon'),
        (5, 'Dione Thorpe')
    ],
    ['author_id', 'author_name']
)

subforums_df = spark.createDataFrame(
    [
        (1, 'java'),
        (2, 'python')
    ],
    ['subforum_id', 'subforum_name']
)

In [91]:
posts_df.join(subforums_df, 'subforum_id').join(authors_df, 'author_id').groupBy('subforum_name', 'author_name').sum('likes').orderBy('sum(likes)').show()

+-------------+-----------------+----------+
|subforum_name|      author_name|sum(likes)|
+-------------+-----------------+----------+
|       python|       Brian Watt|         3|
|       python|      Mabel Frank|        12|
|         java|       Jan Lyndon|        19|
|         java|      Mabel Frank|        25|
|       python|Esmeralda Andrews|        56|
+-------------+-----------------+----------+



## Multi-dimensional aggregations

When working with multi-dimensional data sets, we often may be interested in aggregrating across multiple dimensions of our data. One of the simplest operations to do so is [`pivot`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.pivot). Continuing with our forum posts data sets, we can _pivot_ a `GroupedData` object on another column. Please have look at the following example:



In [92]:
# Join all relevant data frames

posts_joined_df = posts_df.join(subforums_df, 'subforum_id').join(authors_df, 'author_id')

# Group by forum name, and pivot on the date, then sum the number of likes

posts_joined_df \
    .groupBy('subforum_name') \
    .pivot('date') \
    .sum('likes') \
    .show()

+-------------+------+------+------+
|subforum_name|sept 3|sept 4|sept 5|
+-------------+------+------+------+
|         java|  null|  null|    44|
|       python|    47|     3|    21|
+-------------+------+------+------+



If we want to look at our data across more than two dimensions, we will need to look at a number of different methods. The following cell calculates the subtotals and grand totals of the number of likes for each subforum and author using the [`rollup`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.rollup) method:

In [93]:
# Roll up over subforum name and author name

posts_joined_df \
    .rollup('subforum_name', 'author_name') \
    .sum('likes') \
    .orderBy('subforum_name', 'author_name') \
    .show()

+-------------+-----------------+----------+
|subforum_name|      author_name|sum(likes)|
+-------------+-----------------+----------+
|         null|             null|       115|
|         java|             null|        44|
|         java|       Jan Lyndon|        19|
|         java|      Mabel Frank|        25|
|       python|             null|        71|
|       python|       Brian Watt|         3|
|       python|Esmeralda Andrews|        56|
|       python|      Mabel Frank|        12|
+-------------+-----------------+----------+



We can see that the grand total number of likes across all forums is 115, where the Java subforum has 44 and the Python subforum has 71. Per subforum we also have a subtotal of the number of likes per person.

As you can see, `rollup` takes into account an **hierarchy**. It will output:

* a grand total (`null`, `null`);
* a total for each forum (`<subforum_name>`, `<null>`);
* a subtotal for each author and subforum (`<subforum_name>`, `<author_name>`)

However, because of the same hierarchy it will not output a total for each author across all forums.

In contrast, the [cube](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.cube) method will aggregate over _all possible combinations_ of dimensions:

In [94]:
posts_joined_df \
    .cube('subforum_name', 'author_name') \
    .sum('likes') \
    .orderBy('subforum_name', 'author_name') \
    .show()

+-------------+-----------------+----------+
|subforum_name|      author_name|sum(likes)|
+-------------+-----------------+----------+
|         null|             null|       115|
|         null|       Brian Watt|         3|
|         null|Esmeralda Andrews|        56|
|         null|       Jan Lyndon|        19|
|         null|      Mabel Frank|        37|
|         java|             null|        44|
|         java|       Jan Lyndon|        19|
|         java|      Mabel Frank|        25|
|       python|             null|        71|
|       python|       Brian Watt|         3|
|       python|Esmeralda Andrews|        56|
|       python|      Mabel Frank|        12|
+-------------+-----------------+----------+



As we can see, `cube` calculates the aggregate for each possible combination of subforum name and author name. As with `rollup`, we still have our grand total and totals per subforum. In addition, though, we also have a subtotal for each author across all forums (`null`, `<author_name>`).

## Reading structured files/sources
One of the advantages of using `DataFrames` is the ability to read already structured data and automatically import the structure in Spark. Spark contains readers for a number of formats such as csv, json, parquet, orc, text and jdbc. There are also third-party readers/connectors for databases such as MongoDB and Cassandra.

Here we read some json-formatted tweets. The nested json structure is inferred, as you can see:

In [95]:
tweet_df = spark.read.format("json").load('../data/tweets.json')
tweet_df.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- media_url: string (nullable = true)
 |    |    |    |-- media_url_https: string (nullable = true)
 |    |    |    |-- s

This structure is squeezed into a table. When we convert to Pandas we can see what the first tweet looks like in a DataFrame.

In [96]:
tweet_df.toPandas().head(1)

Unnamed: 0,contributors,coordinates,created_at,entities,extended_entities,favorite_count,favorited,filter_level,geo,id,...,place,possibly_sensitive,retweet_count,retweeted,retweeted_status,source,text,timestamp_ms,truncated,user
0,,,Wed Apr 29 13:26:48 +0000 2015,"([], None, [], [], [], [(48305190, 48305190, [...",,0,False,low,,593406077439516672,...,,False,0,False,,"<a href=""http://twitter.com"" rel=""nofollow"">Tw...",@OdekedeJong Omdat ik het zelf ook ervaar en m...,1430314008470,False,"(False, Thu Mar 04 11:16:36 +0000 2010, False,..."


As you can see, the elements of a tweet are flattened to fit into the row of a table. We can also convert a tweet to a Python dictionary to see its nested structure:

In [166]:
tweet_df.head(198)[0].asDict(recursive=True)

{'contributors': None,
 'coordinates': None,
 'created_at': 'Wed Apr 29 13:26:48 +0000 2015',
 'entities': {'hashtags': [],
  'media': None,
  'symbols': [],
  'trends': [],
  'urls': [],
  'user_mentions': [{'id': 48305190,
    'id_str': '48305190',
    'indices': [0, 12],
    'name': 'Odeke de Jong',
    'screen_name': 'OdekedeJong'}]},
 'extended_entities': None,
 'favorite_count': 0,
 'favorited': False,
 'filter_level': 'low',
 'geo': None,
 'id': 593406077439516672,
 'id_str': '593406077439516672',
 'in_reply_to_screen_name': 'OdekedeJong',
 'in_reply_to_status_id': 593403994481020928,
 'in_reply_to_status_id_str': '593403994481020928',
 'in_reply_to_user_id': 48305190,
 'in_reply_to_user_id_str': '48305190',
 'lang': 'nl',
 'place': None,
 'possibly_sensitive': False,
 'retweet_count': 0,
 'retweeted': False,
 'retweeted_status': None,
 'source': '<a href="http://twitter.com" rel="nofollow">Twitter Web Client</a>',
 'text': '@OdekedeJong Omdat ik het zelf ook ervaar en mijn omge

## Assignment 9
Select the screen name and language of the user, as well as the text field.

**Hint**: nested fields can be selected using the dot notation, i.e. `df.select('<parent>.<child>')`.

In [102]:
name_df = tweet_df.select('user.screen_name', 'user.lang', 'text')
name_df.toPandas().head(15)

Unnamed: 0,screen_name,lang,text
0,Claudia_NL,en,@OdekedeJong Omdat ik het zelf ook ervaar en m...
1,MiesjeB,nl,"RT @RHoogland: The game is on, vrienden. Vrijd..."
2,martine_vandijk,nl,@deBeschaving Snap ik! Ben nou eenmaal wat ong...
3,Tessaaatje,en,Jeminee ik word nu pas wakker wat is dit. Zo l...
4,juradoscrime,nl,@mayravdzwaag hij was helemaal niet leuk en ik...
5,dewestkrant,nl,"Het lijken wel kogelgaten, maar volgens de pol..."
6,PronkRijpstra,nl,@bvpuntcom ja hoor! Deze is van net ❤️ http://...
7,PostNL,nl,"@mamarije30 Dat ga ik even voor je kijken, heb..."
8,hildafeenstra,nl,"Na ruim 1,5 jaar vandaag mijn laatste werkdag ..."
9,GewoonBasten,nl,@RowfeyVFX @Jerry_Kuijper heb ik ook gezegt! i...


## Assignment 10
Count the number of tweets per user, and display the top 10 most-tweeting users.

In [109]:
status_df = tweet_df.select(expr('user.screen_name as name'), expr('user.statuses_count as num_tweets')).groupBy('name').sum('num_tweets').orderBy('sum(num_tweets)', ascending=False).show(10)

+---------------+---------------+
|           name|sum(num_tweets)|
+---------------+---------------+
|     news24hnld|        6541613|
|            KLM|        2235614|
|      NS_online|        1438124|
|   loveinground|         813134|
|tmobile_webcare|         499221|
|    atomsoffice|         417921|
|      NLvandaag|         395668|
|   nieuws_media|         312414|
|    mannenNETbe|         300335|
|     KPNwebcare|         265483|
+---------------+---------------+
only showing top 10 rows



## Word count in DataFrames

It is also possible to use DataFrames for less-structured data such as text. Here we show how you could do a word count with DataFrames.

The following chained query contains a number of methods you haven't seen before, and we'll go through it line by line in the subsequent cells:

In [110]:
from pyspark.sql.functions import explode, split

spark \
    .read.text('../data/shakespeare.txt') \
    .select(explode(split("value", "\W+")).alias("word")) \
    .groupBy("word") \
    .count() \
    .orderBy("count", ascending=0).show()

+----+------+
|word| count|
+----+------+
|    |198753|
| the| 23288|
|   I| 22225|
| and| 18653|
|  to| 16373|
|  of| 15725|
|   a| 12796|
| you| 12186|
|  my| 10839|
|  in| 10016|
|   d|  8954|
|  is|  8414|
|that|  8343|
| not|  8038|
|  me|  7752|
|   s|  7487|
| And|  7457|
|with|  6802|
|  it|  6760|
|  be|  6412|
+----+------+
only showing top 20 rows



To see what happens here, we break it down into steps. First we read in the data file and inspect the DataFrame. It contains one column, called `value` by default:

In [111]:
swan_df = spark.read.text('../data/shakespeare.txt')
swan_df.show()

+--------------------+
|               value|
+--------------------+
|This is the 100th...|
|is presented in c...|
|Library of the Fu...|
|often releases Et...|
|                    |
|         Shakespeare|
|                    |
|*This Etext has c...|
|                    |
|<<THIS ELECTRONIC...|
|SHAKESPEARE IS CO...|
|PROVIDED BY PROJE...|
|WITH PERMISSION. ...|
|DISTRIBUTED SO LO...|
|PERSONAL USE ONLY...|
|COMMERCIALLY.  PR...|
|SERVICE THAT CHAR...|
|                    |
|*Project Gutenber...|
|in the presentati...|
+--------------------+
only showing top 20 rows



The column name `value` explains why it is mentioned inside the [`split`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.split) function. Let's call the `select` method but omit `explode` and see what happens. Please note that with [`alias`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.alias) we rename the column.

In [112]:
split_df = swan_df.select(split("value", "\W+").alias("word"))
split_df.show()

+--------------------+
|                word|
+--------------------+
|[This, is, the, 1...|
|[is, presented, i...|
|[Library, of, the...|
|[often, releases,...|
|                  []|
|       [Shakespeare]|
|                  []|
|[, This, Etext, h...|
|                  []|
|[, THIS, ELECTRON...|
|[SHAKESPEARE, IS,...|
|[PROVIDED, BY, PR...|
|[WITH, PERMISSION...|
|[DISTRIBUTED, SO,...|
|[PERSONAL, USE, O...|
|[COMMERCIALLY, PR...|
|[SERVICE, THAT, C...|
|                  []|
|[, Project, Guten...|
|[in, the, present...|
+--------------------+
only showing top 20 rows



Looking at the schema, we can see that `word` is actually an array of strings:

In [113]:
split_df.printSchema()

root
 |-- word: array (nullable = true)
 |    |-- element: string (containsNull = true)



Instead, we would like to have a row for each word, which is where [`explode`](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.explode) comes in. `explode` will return a new row for each element in the `value` list, containing all the words.

In [115]:
swan_df.select(explode(split("value", "\W+")).alias("word")).show()

+-----------+
|       word|
+-----------+
|       This|
|         is|
|        the|
|      100th|
|      Etext|
|       file|
|  presented|
|         by|
|    Project|
|  Gutenberg|
|        and|
|         is|
|  presented|
|         in|
|cooperation|
|       with|
|      World|
|    Library|
|        Inc|
|       from|
+-----------+
only showing top 20 rows



### User-defined functions

In the previous example we used the built-in `split` function. It is also possible to define and use a custom user-defined function, or UDF. We'll show an example for the phone stock DataFrame first:

In [116]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

exp_udf = udf(lambda price: "Expensive" if price >= 500 else "Inexpensive", StringType())

phone_df.withColumn("cost", exp_udf(phone_df['unit_price'])).show()

+-------------+-------+-----+----------+-----------+
|        model|  brand|stock|unit_price|       cost|
+-------------+-------+-----+----------+-----------+
|     iPhone 6|  Apple|    6|     549.0|  Expensive|
|    iPhone 6s|  Apple|    5|     585.0|  Expensive|
|     iPhone 7|  Apple|   11|     739.0|  Expensive|
|        Pixel| Google|    8|     859.0|  Expensive|
|     Pixel XL| Google|    2|     959.0|  Expensive|
|    Galaxy S7|Samsung|   10|     539.0|  Expensive|
|    Galaxy S6|Samsung|    5|     414.0|Inexpensive|
|    Galaxy A5|Samsung|    7|     297.0|Inexpensive|
|Galaxy Note 7|Samsung|    0|     841.0|  Expensive|
+-------------+-------+-----+----------+-----------+



In this manner, we can apply specialized function, like tokenizers, on DataFrames. However, we first must register them as UDFs and cannot simply pass them as (lambda) functions.

Below we define a very simple tokenizer, just as an example. It uses Python's string `split`, and also lowers the case of the text.

In [122]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

def my_tokenize(s):
    s = s.lower()
    words = s.split()
    return words

returnType = ArrayType(StringType())

tokenize_udf = udf(my_tokenize, returnType)

+--------------------+
|               value|
+--------------------+
|This is the 100th...|
|is presented in c...|
|Library of the Fu...|
|often releases Et...|
|                    |
|         Shakespeare|
|                    |
|*This Etext has c...|
|                    |
|<<THIS ELECTRONIC...|
|SHAKESPEARE IS CO...|
|PROVIDED BY PROJE...|
|WITH PERMISSION. ...|
|DISTRIBUTED SO LO...|
|PERSONAL USE ONLY...|
|COMMERCIALLY.  PR...|
|SERVICE THAT CHAR...|
|                    |
|*Project Gutenber...|
|in the presentati...|
+--------------------+
only showing top 20 rows



## Assignment 11
Use the `tokenize_udf` function from the last cell to count words on the Shakespeare DataFrame `swan_df` instead of usng the `split` function. Display the top 10 most occurring words.

In [147]:
word_df = swan_df.select(explode(tokenize_udf('value')).alias('word'))
word_df.groupBy('word').count().orderBy('count', ascending=False).show(10)

+----+-----+
|word|count|
+----+-----+
| the|27549|
| and|26037|
|   i|19540|
|  to|18700|
|  of|18010|
|   a|14383|
|  my|12455|
|  in|10671|
| you|10630|
|that|10487|
+----+-----+
only showing top 10 rows



## Bonus exercise
This exercise is optional. Perform a word count on the tweets' hashtags and show the top 10 most-used hashtags.

In [174]:
#tweet_df.head(1)[0].asDict(recursive=True)
#hashtags_df = tweet_df.select('entities.hashtags.text')
hashtags_df = tweet_df.select(explode('entities.hashtags.text').alias('hashtag'))
hashtags_df.groupBy('hashtag').count().orderBy('count', ascending=False).show(10)

+-----------+-----+
|    hashtag|count|
+-----------+-----+
|   pgbdebat|   10|
|   pgbalarm|   10|
|partnerruil|    4|
|  freelance|    4|
|        zzp|    4|
|   vacature|    3|
|maagdenhuis|    3|
|        ZZP|    3|
|        pgb|    2|
|         NS|    2|
+-----------+-----+
only showing top 10 rows

