### Setting up

In [None]:
dir_root = '/user/centos/'
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

from pyspark.sql import Window, Row
from pyspark.sql import functions as sf

## 4. Windows

In [None]:
from pyspark.sql import Window
from pyspark.sql import functions as sf

In [None]:
ddf_heroes = spark.read.csv(dir_root + 'data/heroes.csv', header = True)

In [None]:
ddf_heroes.show()

Compute the demeaned attack for each hero wrt other heroes in its role

In [None]:
ddf_heroes_agg = (ddf_heroes
                  .groupBy('role')
                  .agg(sf.mean('attack').alias('mean_attack')))

In [None]:
ddf_heroes_agg.show()

In [None]:
(ddf_heroes
 .join(ddf_heroes_agg, on = ['role'])
 .withColumn('demeaned_attack', sf.col('attack') - sf.col('mean_attack'))
 .show(5))

### 4.1 partitionBy()
- add aggregation as column (cleaner)
- partition into windows by variable

<img src="images/partitions.png" width="40%" align="left"/>

In [None]:

ddf_role = (ddf_heroes.withColumn('demeaned_attack', sf.col('attack') - sf.mean('attack').over(Window.partitionBy('role')))) 
ddf_role.show(10) 

Quick question: how is partitionBy different (in result) from the groupby-join method?

In [None]:
ddf_role.sort('_c0').show(5)

In [None]:
ddf_role.dtypes

In [None]:
ddf_role.sort(sf.col('_c0').cast('Int')).show(5)

### 4.2 orderBy()
- sliding window
- orderby -> sorting
- window functions: first, last etc.

In [None]:
ddf_temp = spark.createDataFrame([Row(mid=1, month=1.0, temperature=3.0),
 Row(mid=2, month=1.0, temperature=6.0),
 Row(mid=3, month=2.0, temperature=4.0),
 Row(mid=4, month=3.0, temperature=8.0),
 Row(mid=5, month=3.0, temperature=9.0),
 Row(mid=6, month=3.0, temperature=8.0),
 Row(mid=7, month=3.0, temperature=12.0)])

In [None]:
ddf_temp.show()

In [None]:
wspec = Window.orderBy('mid')
(ddf_temp
 .withColumn('mean_temp', sf.mean('temperature').over(wspec))
 .withColumn('window_start', sf.first('mid').over(wspec))
 .withColumn('window_end', sf.last('mid').over(wspec))
 .sort('mid')
 .show())

### 4.3 rowsBetween()
- specify size of window in terms of rows before and after a row in ordering

In [None]:
import sys

In [None]:
-sys.maxsize, sys.maxsize

In [None]:
wspec = Window.orderBy('mid').rowsBetween(-sys.maxsize, 0)
(ddf_temp
 .withColumn('mean_temp', sf.mean('temperature').over(wspec))
 .withColumn('window_start', sf.first('mid').over(wspec))
 .withColumn('window_end', sf.last('mid').over(wspec))
 .withColumn('temperature_minus_last', sf.col('temperature') - sf.last('temperature').over(wspec)) # why all zero?
 .sort('mid')
 .show())

### 4.4 rowsBetween() vs rangeBetween()

rowsBetween: this row and the next row

In [None]:
wspec = Window.orderBy('month').rowsBetween(0, 1)
(ddf_temp
 .withColumn('window_start_mid', sf.first('mid').over(wspec))
 .withColumn('window_end_mid', sf.last('mid').over(wspec))
 .withColumn('max_temp', sf.max('temperature').over(wspec))
 .show())

rangeBetween: this value and the next value (of the column specified in orderBy)

In [None]:
wspec = Window.orderBy('month').rangeBetween(0, 1)
(ddf_temp
 .withColumn('window_start_mid', sf.first('mid').over(wspec))
 .withColumn('window_end_mid', sf.last('mid').over(wspec))
 .withColumn('max_temp', sf.max('temperature').over(wspec))
 .show())

### 4.5 Expanding window

In [None]:
wspec = Window.orderBy('mid').rangeBetween(-sys.maxsize, 0)
(ddf_temp
 .withColumn('mean_temp', sf.mean('temperature').over(wspec))
 .withColumn('window_start', sf.first('mid').over(wspec))
 .withColumn('window_end', sf.last('mid').over(wspec))
 .sort('mid')
 .show())

In [None]:
wspec = Window.orderBy('mid')
(ddf_temp
 .withColumn('mean_temp', sf.mean('temperature').over(wspec))
 .withColumn('window_start', sf.first('mid').over(wspec))
 .withColumn('window_end', sf.last('mid').over(wspec))
 .sort('mid')
 .show())

`orderBy(col)` without row specification is implicitly `orderBy(col).rangeBetween(-sys.maxsize, 0)`

#### Rolling mean accross months

In [None]:
wspec = Window.orderBy('month')
(ddf_temp
 .withColumn('mean_temp', sf.mean('temperature').over(wspec))
 .withColumn('window_start', sf.first('mid').over(wspec))
 .withColumn('window_end', sf.last('mid').over(wspec))
 .sort('mid')
 .show())

#### partitionBy AND orderBy
Rolling means within each month

In [None]:
wspec = Window.partitionBy('month').orderBy('mid').rowsBetween(-sys.maxsize, 0) # order of orderBy and partitionBy clause doesn't matter
(ddf_temp
 .withColumn('mean_temp', sf.mean('temperature').over(wspec))
 .withColumn('window_start', sf.first('mid').over(wspec))
 .withColumn('window_end', sf.last('mid').over(wspec))
 .sort('mid')
 .show())

### 4.5 lead() and lag()
- access specific rows before or after a row
- can access data outside of window

In [None]:
wspec = Window.orderBy('mid')
(ddf_temp
 .withColumn('prev_temp', sf.lag('temperature').over(wspec))
 .withColumn('next_in_two_temp', sf.lead('temperature', count=2).over(wspec)) # why last 2 null?
 .show())

### *Exercise A*
2. Add a column with the average temperature of the month
3. Compute the temperature delta with the previous measurement
1. Exclude rows of months with an average temperature below 5 degrees 

In [None]:
from pyspark.sql import Row

In [None]:
ddf_temp = spark.createDataFrame([Row(mid=1, month=1.0, temperature=3.0),
 Row(mid=2, month=1.0, temperature=6.0),
 Row(mid=3, month=2.0, temperature=4.0),
 Row(mid=4, month=3.0, temperature=8.0),
 Row(mid=5, month=3.0, temperature=9.0),
 Row(mid=6, month=3.0, temperature=8.0),
 Row(mid=7, month=3.0, temperature=12.0)])

### *Exercise B*
1. Demean the flight delays partitioning by year;
2. Demean the flight delays partitioning by year/carrier;
3. For each year, find the carriers with the most flights cancelled;
4. Same as 3., but normalize by number of flights;
5. Per airline, find the airport with the most delays due to security reasons in a given year/month.

In [None]:
ddf_air = spark.read.parquet(dir_root + 'data/airlines.parquet')

## 5. Output: storing your results

### Hive table

In [None]:
ddf_temp.write.saveAsTable('table_name')

In [None]:
!sh hdfs dfs -ls /user/hive/warehouse

### Parquet

In [None]:
ddf_temp.write.save(path = 'ddf/temperature',
               format = 'parquet', #json 
               mode = 'overwrite')

### coalesce

In [None]:
!sh hdfs dfs -ls /user/centos/ddf/temperature

In [None]:
ddf_temp.coalesce(1).write.save(path = 'ddf/temperature_coalesce',
               format = 'parquet', #json 
               mode = 'overwrite')

In [None]:
!sh hdfs dfs -ls /user/centos/ddf/temperature_coalesce