In [2]:
%%HTML
<link rel="stylesheet" type="text/css" href="//fonts.googleapis.com/css?family=Quicksand:300" />
<link rel="stylesheet" type="text/css" href="custom.css">

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import seaborn as sns

%matplotlib inline 

from pylab import rcParams
plt.style.use('ggplot')
rcParams['figure.figsize'] = (10, 6)

In [None]:
data_dir = '/data'
master = 'local[2]'

import pyspark
import pyspark.sql.functions as sf

spark = (
    pyspark.sql.SparkSession.builder
    .master(master) 
    .getOrCreate()
)

# DataFrames Windows

Analytical functions that work on parts of your DataFrame.

- Saves you from groupby-agg-join.

We'll try it with our heroes.

__Goal__: compute attack demeaned per role.

In [None]:
heroes_path = os.path.join(data_dir, 'heroes.csv')
heroes = spark.read.csv(heroes_path, header=True, nanValue='NA')

First compute the average attack per role:

In [None]:
attack_per_role = (
    heroes
    .groupBy('role')
    .agg(sf.mean('attack').alias('avg_attack'))
)
attack_per_role.toPandas()

Join it back on and substract:

In [None]:
groupby_heroes = (
    heroes
    .join(attack_per_role, on = ['role'])
    .withColumn('demeaned_attack', sf.col('attack') - sf.col('avg_attack'))
)
groupby_heroes.toPandas()

Doing it all at once with a window:

In [None]:
from pyspark.sql import Window

role_window = Window.partitionBy('role')
window_heroes = (
    heroes
    .withColumn('demeaned_attack', 
                sf.col('attack') - sf.mean('attack').over(role_window))
)
window_heroes.toPandas()

__Question__: Are the groupby-join and window approaches the same?

The results are the same, but the ordering of the columns and rows are different:

In [None]:
print('window columns:', window_heroes.columns)
print('groupby columns:', groupby_heroes.columns)
print('\nwindow first:\n', window_heroes.select('name').limit(2).toPandas())
print('\ngroupby first:\n', groupby_heroes.select('name').limit(2).toPandas())

### How did we do it?

1. Define a window
    - Partitions
    - Ordering within partitions
    - Window size going over ordered partitions
1. Execute with an analytical function.

In [None]:
sdf = spark.createDataFrame([['first', 1.5, 3], 
                             ['second', 0.5, 1]],
                            ['a', 'b', 'c'])
window = (
    Window
    .partitionBy('a')  # Frames.
    .orderBy('b')  # Ordering within frames.
)  # No window size specified.

(
    sdf
    .withColumn('d', sf.sum('c').over(window))  # Analytical function.
)

<img src="images/partitions.png" height="100%" align="left"/>

We'll illustrate windows on a new DataFrame:

In [None]:
temperatures = spark.createDataFrame([[1, 1.0, 3.0], [2, 1.0, 6.0], [3, 2.0, 4.0],
                                      [4, 3.0, 8.0], [5, 3.0, 9.0], [6, 3.0, 8.0],
                                      [7, 3.0, 12.0]], 
                                     schema=['mid', 'month', 'temperature'])
temperatures.toPandas()

### `Window.rowsBetween()`: Window size

Specify size of window in terms of rows before and after a row in ordering.

<img src="images/rolling_window.png" width="60%" align="left"/>

In [None]:
mid_window = (
    Window  # No partitions: full DataFrame
    .orderBy('mid')  # Ordered by mid
    .rowsBetween(-1, 0)  # Moving window of size 1
)
(
    temperatures
     .withColumn('window_start', sf.first('mid').over(mid_window))
     .withColumn('window_end', sf.last('mid').over(mid_window))
     .withColumn('mean_temp', sf.mean('temperature').over(mid_window))
     .sort('mid')
     .toPandas()
)

Or use an expanding window:

<img src="images/expanding_window.png" width="60%" align="left"/>

In [None]:
import sys
expanding_window = Window.orderBy('mid').rowsBetween(-sys.maxsize, 0)
(
    temperatures
     .withColumn('window_start', sf.first('mid').over(expanding_window))
     .withColumn('window_end', sf.last('mid').over(expanding_window))
     .withColumn('mean_temp', sf.mean('temperature').over(expanding_window))
     .sort('mid')
     .toPandas()
)

`.orderBy(col)` without row specification is implicitly `.orderBy(col).rowsBetween(-sys.maxsize, 0)`:

In [None]:
alternative_window = Window.orderBy('mid')
(
    temperatures
     .withColumn('window_start', sf.first('mid').over(alternative_window))
     .withColumn('window_end', sf.last('mid').over(alternative_window))
     .withColumn('mean_temp', sf.mean('temperature').over(alternative_window))
     .sort('mid')
     .toPandas()
)

Rolling mean accross months:

In [None]:
month_window = Window.orderBy('month')
(
    temperatures
    .withColumn('window_start', sf.first('mid').over(month_window))
    .withColumn('window_end', sf.last('mid').over(month_window))
    .withColumn('mean_temp', sf.mean('temperature').over(month_window))
    .sort('mid')
    .toPandas()
)

`.partitionBy` followed by `.orderBy`: rolling means within each month.

In [None]:
rolling_window = Window.partitionBy('month').orderBy('mid')
(
    temperatures
     .withColumn('window_start', sf.first('mid').over(rolling_window))
     .withColumn('window_end', sf.last('mid').over(rolling_window))
     .withColumn('mean_temp', sf.mean('temperature').over(rolling_window))
    .sort('mid')
     .show()
)

### `Window.rangeBetween()`: Window size on values

- `.rowsBetween()`: this row and the next row
- `.rangeBetween()`: this value and the next value (of the column specified in orderBy)

<img src="images/rangebetween.png" width="80%" align="left"/>

In [None]:
range_window = Window.orderBy('month').rangeBetween(0, 1)
(
    temperatures
    .withColumn('window_start_mid', sf.first('mid').over(range_window))
    .withColumn('window_end_mid', sf.last('mid').over(range_window))
    .withColumn('max_temp', sf.max('temperature').over(range_window))
    .toPandas()
)


## 6. Comments about the API

Spark is a powerful framework, but the PySpark API can be confusing.

- Multiple ways to do the same things:
    - `sdf.filter()` vs `sdf.where()`
    - `sdf.groupBy()` vs `sdf.groupby()`
    - `sf.isnull('name)` vs `sf.col('name').isNull()`
    - `sf.col('a')` vs `sdf.a`
- Functionality hidden in Spark functions.
- Inconsistencies:
    - Many functions accept a 'string' or `sf.col`, but some don't.
    - Many functions accept many arguments or a list wit arguments, but some don't.

You'll have to deal with it! ;-)

Try to be consistent in how you write your Spark code.

## Exercise: Windows

> #### `temperatures`
>
> 1. Add a column with the average temperature of the month.
1. Compute the temperature delta with the previous measurement (hint: look what `sf.lag()` and `sf.lead()` do).
1. Exclude rows of months with an average temperature below 5 degrees 

> #### `airlines`
>
> 1. Demean the flight delays in `arr_delay` partitioning by year (ignore missing and NaNs);
1. Demean the flight delays partitioning by year/carrier;
1. For each year, find the top 5 carriers with the most flights cancelled (hint: check if you should either use `sf.rank()` or `sf.rownumber()`;
1. Same as previous, but rank by cancelled divided by the total number of flights per carrier/year;
1. Per airline, find the airport with the most delays due to security reasons in a given year/month.

In [None]:
# %load ../answers/01_windows.py

## Exercise: Advanced windows

> 1. Create a new column for a numeric column of your choice with the column difference between with the previous month in the `'airport', 'carrier'` window. When there is nothing in the previous element, put a 0 (not mathematically correct, but still).
1. Remove all the groups of `'airport', 'carrier', 'year'` where more than 20% of flights is delayed by 2000 (make the parameters adjustable!)
1. Take a look at the NA patterns in the `airlines` DataFrame. It seems like low volume airports do not have flights every month! Let's do something barbaric then: fill all the NA columns with the minimum historically known (i.e. expanding window) with partitions of `'airport', 'carrier'` ordered by `'year', 'month'`.


In [None]:
# %load ../answers/01_windows_advanced.py