# Chapter Your data under a different lens: Window functions
we will disucss
- Window functions and the kind of data transformation they enable
- Summarizing, ranking, and analyzing data using the different classes of window functions
- Building static, growing, and unbounded windows to your functions
- Apply UDF to windows as custom window functions

In [64]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
import pyspark.sql.functions as F
import pyspark.sql.types as T
import os
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# change the account name to your email account
account='sli'

# define a root path to access the data in the DataAnalysisWithPythonAndPySpark
root_path='/net/clusterhn/home/'+account+'/isa460/Data/'

# append path to helper_functions to system path
sys.path.append('/net/clusterhn/home/sli/isa460_sli')

import helper_functions as H

# check if the Spark session is active. If it is activate, close it

try:
    if spark:
        spark.stop()
except:
    pass    

spark = (SparkSession.builder.appName("Multidimensional Data Frame")
        .config("spark.port.maxRetries", "100")
        .config("spark.sql.mapKeyDedupPolicy", "LAST_WIN")  # This configuration allow the duplicate keys in the map data type.
       .config("spark.driver.memory", "16g")
        .getOrCreate())

# confiture the log level (defaulty is WARN)
spark.sparkContext.setLogLevel('ERROR')

# Weather data
For this exercise, we will use the [National Oceanic and Atmospheric Administration’s (NOAA) Global Surface Summary of the Day (GSOD) data set](https://www.ncei.noaa.gov/metadata/geoportal/rest/metadata/item/gov.noaa.ncdc:C00516/html#) from Google BigQuery. We will focus on a particular weather station (Boston Logon 725090) between 2017-2019. See [data definition](https://www.ncei.noaa.gov/data/global-summary-of-the-day/doc/readme.txt) for each field

In [25]:
# load data
gsod=spark.read.parquet(root_path+'window/gsod.parquet').filter(F.col('stn')=='725090')

gsod1=gsod.select('year', 'mo','da', 'temp')

## Identify the coldest day of each year

In [11]:
from pyspark.sql.window import Window

result=gsod1.withColumn('coldestDay', F.min('temp').over(Window.partitionBy('year')))
result.where('temp=coldestDay').drop('coldestDay').show()

+----+---+---+----+
|year| mo| da|temp|
+----+---+---+----+
|2017| 12| 29| 6.4|
|2018| 01| 07| 4.9|
|2019| 01| 21| 9.3|
+----+---+---+----+



In [16]:
# use Window function in select statement

gsod1.select('year', 'mo', 'da', 'temp', F.min('temp').over(Window.partitionBy('year'))
             .alias('coldestDay')).where('temp=coldestDay').drop('coldestDay').show()


+----+---+---+----+----------+
|year| mo| da|temp|coldestDay|
+----+---+---+----+----------+
|2017| 12| 29| 6.4|       6.4|
|2018| 01| 07| 4.9|       4.9|
|2019| 01| 21| 9.3|       9.3|
+----+---+---+----+----------+



## Identify the hottest day of each year

In [18]:
gsod1.select('year', 'mo', 'da', 'temp', F.max('temp').over(Window.partitionBy('year'))
             .alias('hottestDay')).where('temp=hottestDay').drop('hottestDay').show()


+----+---+---+----+
|year| mo| da|temp|
+----+---+---+----+
|2017| 06| 13|84.8|
|2018| 08| 29|89.3|
|2019| 07| 21|90.2|
+----+---+---+----+



## Ranking functions

This section covers ranking functions: nonconsecutive ranks with rank(), consecutive ranks with dense_rank(), percentile ranks with percent_rank(), tiles with ntile(), and finally a bare row number with row_number(). Ranking functions are used for getting the top (or bottom) record for each window partition, or, more generally, for getting an order according to some column’s value.

## Identify the top 3 hottest days per year

In [28]:
windowSpec=Window.partitionBy('year').orderBy(F.desc('temp'))

gsod1.withColumn('rank', F.rank().over(windowSpec)).where('rank<=3').show()

+----+---+---+----+----+
|year| mo| da|temp|rank|
+----+---+---+----+----+
|2017| 06| 13|84.8|   1|
|2017| 06| 12|84.0|   2|
|2017| 07| 20|83.7|   3|
|2017| 05| 18|83.7|   3|
|2018| 08| 29|89.3|   1|
|2018| 08| 07|86.4|   2|
|2018| 08| 28|85.1|   3|
|2019| 07| 21|90.2|   1|
|2019| 07| 20|88.5|   2|
|2019| 07| 30|84.6|   3|
+----+---+---+----+----+



## Identify the top 5% of the hottest day per year

In [32]:
windowSpec=Window.partitionBy('year').orderBy('temp')

gsod1.withColumn('percent_rank', F.percent_rank().over(windowSpec)).where('percent_rank>=0.95').show()

+----+---+---+----+------------------+
|year| mo| da|temp|      percent_rank|
+----+---+---+----+------------------+
|2017| 08| 13|76.3|0.9505494505494505|
|2017| 06| 24|76.3|0.9505494505494505|
|2017| 06| 23|76.5|0.9560439560439561|
|2017| 06| 25|76.7|0.9587912087912088|
|2017| 07| 10|76.8|0.9615384615384616|
|2017| 08| 21|77.2|0.9642857142857143|
|2017| 08| 23|77.7| 0.967032967032967|
|2017| 07| 22|78.4|0.9697802197802198|
|2017| 08| 22|78.6|0.9725274725274725|
|2017| 06| 11|78.6|0.9725274725274725|
|2017| 07| 02|78.9| 0.978021978021978|
|2017| 07| 19|79.3|0.9807692307692307|
|2017| 07| 03|80.1|0.9835164835164835|
|2017| 05| 19|80.2|0.9862637362637363|
|2017| 07| 21|81.2| 0.989010989010989|
|2017| 07| 20|83.7|0.9917582417582418|
|2017| 05| 18|83.7|0.9917582417582418|
|2017| 06| 12|84.0|0.9972527472527473|
|2017| 06| 13|84.8|               1.0|
|2018| 08| 27|80.3|0.9505494505494505|
+----+---+---+----+------------------+
only showing top 20 rows



## Split the temp per year into 10 equal buckets (decile)

In [38]:
windowSpec=Window.partitionBy('year').orderBy('temp')

gsod1.withColumn('decile', F.ntile(10).over(windowSpec)).groupBy('year', 'decile').count().orderBy('year', 'decile').show()

+----+------+-----+
|year|decile|count|
+----+------+-----+
|2017|     1|   37|
|2017|     2|   37|
|2017|     3|   37|
|2017|     4|   37|
|2017|     5|   37|
|2017|     6|   36|
|2017|     7|   36|
|2017|     8|   36|
|2017|     9|   36|
|2017|    10|   36|
|2018|     1|   37|
|2018|     2|   37|
|2018|     3|   37|
|2018|     4|   37|
|2018|     5|   37|
|2018|     6|   36|
|2018|     7|   36|
|2018|     8|   36|
|2018|     9|   36|
|2018|    10|   36|
+----+------+-----+
only showing top 20 rows



In [39]:
# check the temp in decile (10% of the coldest temperature in each year)
windowSpec=Window.partitionBy('year').orderBy('temp')
gsod1.withColumn('decile', F.ntile(10).over(windowSpec)).where('decile=1').show()

+----+---+---+----+------+
|year| mo| da|temp|decile|
+----+---+---+----+------+
|2017| 12| 29| 6.4|     1|
|2017| 12| 28|10.3|     1|
|2017| 12| 30|10.8|     1|
|2017| 12| 31|13.7|     1|
|2017| 01| 09|15.4|     1|
|2017| 02| 10|16.5|     1|
|2017| 03| 12|16.7|     1|
|2017| 12| 27|17.6|     1|
|2017| 01| 08|17.9|     1|
|2017| 03| 05|17.9|     1|
|2017| 03| 11|18.8|     1|
|2017| 03| 04|19.5|     1|
|2017| 02| 11|20.8|     1|
|2017| 12| 15|21.8|     1|
|2017| 03| 13|21.8|     1|
|2017| 01| 07|22.7|     1|
|2017| 01| 10|22.9|     1|
|2017| 12| 14|24.4|     1|
|2017| 01| 31|24.9|     1|
|2017| 03| 06|25.1|     1|
+----+---+---+----+------+
only showing top 20 rows



## Add a row number to your data frame, ignore tie

In [40]:
windowSpec=Window.partitionBy('year').orderBy('temp')
gsod1.withColumn('row_number', F.row_number().over(windowSpec)).show()

+----+---+---+----+----------+
|year| mo| da|temp|row_number|
+----+---+---+----+----------+
|2017| 12| 29| 6.4|         1|
|2017| 12| 28|10.3|         2|
|2017| 12| 30|10.8|         3|
|2017| 12| 31|13.7|         4|
|2017| 01| 09|15.4|         5|
|2017| 02| 10|16.5|         6|
|2017| 03| 12|16.7|         7|
|2017| 12| 27|17.6|         8|
|2017| 01| 08|17.9|         9|
|2017| 03| 05|17.9|        10|
|2017| 03| 11|18.8|        11|
|2017| 03| 04|19.5|        12|
|2017| 02| 11|20.8|        13|
|2017| 12| 15|21.8|        14|
|2017| 03| 13|21.8|        15|
|2017| 01| 07|22.7|        16|
|2017| 01| 10|22.9|        17|
|2017| 12| 14|24.4|        18|
|2017| 01| 31|24.9|        19|
|2017| 03| 06|25.1|        20|
+----+---+---+----+----------+
only showing top 20 rows



## Access the records before or after using lag() and lead()

## Display average daily temp change by month

In [53]:
windowSpec=Window.partitionBy('year', 'mo').orderBy('da')

gsod2=gsod1.select('year', 'mo', 'da', 'temp', F.lag('temp').over(windowSpec).alias('pre_temp'))

gsod3=gsod2.withColumn('temp_change', F.abs(F.col('temp')-F.col('pre_temp')))

gsod3.groupBy('year', 'mo').agg(F.avg('temp_change').alias('avgtempChange')).show()

+----+---+------------------+
|year| mo|     avgtempChange|
+----+---+------------------+
|2017| 01| 4.923333333333332|
|2017| 02| 5.444444444444444|
|2017| 03| 6.413333333333333|
|2017| 04| 5.989655172413792|
|2017| 05| 4.650000000000001|
|2017| 06| 3.886206896551725|
|2017| 07| 4.046666666666666|
|2017| 08|2.3933333333333335|
|2017| 09|3.5896551724137917|
|2017| 10| 4.536666666666665|
|2017| 11| 6.927586206896552|
|2017| 12| 4.906666666666667|
|2018| 02| 7.029629629629629|
|2018| 03|2.7933333333333326|
|2018| 04| 4.641379310344828|
|2018| 05| 6.713333333333335|
|2018| 06| 4.631034482758621|
|2018| 07| 3.256666666666668|
|2018| 08|3.9999999999999996|
|2018| 09| 4.296551724137932|
+----+---+------------------+
only showing top 20 rows



In [54]:
# display avg, max and min temp change by month

gsod3.groupBy('year', 'mo').agg(F.avg('temp_change').alias('avgTempChange')
                                , F.min('temp_change').alias('minTempChange')
                               , F.max('temp_change').alias('maxTempChange')
                               ).show()

+----+---+------------------+-------------------+------------------+
|year| mo|     avgTempChange|      minTempChange|     maxTempChange|
+----+---+------------------+-------------------+------------------+
|2017| 01| 4.923333333333332|0.09999999999999787|              22.8|
|2017| 02| 5.444444444444444|0.20000000000000284|15.899999999999999|
|2017| 03| 6.413333333333333|0.10000000000000142|              18.7|
|2017| 04| 5.989655172413792| 0.4000000000000057|20.200000000000003|
|2017| 05| 4.650000000000001|                0.5|              22.0|
|2017| 06| 3.886206896551725|0.20000000000000284|13.700000000000003|
|2017| 07| 4.046666666666666|0.09999999999999432|10.400000000000006|
|2017| 08|2.3933333333333335|0.09999999999999432|               8.0|
|2017| 09|3.5896551724137917|0.09999999999999432|15.399999999999999|
|2017| 10| 4.536666666666665|                0.0|14.600000000000001|
|2017| 11| 6.927586206896552|0.10000000000000142|18.800000000000004|
|2017| 12| 4.906666666666667|0.199

##Spark also provides the rowsBetween() and rangeBetween() methods to create window frame boundaries.

## Display three days moving average temp for each month

In [63]:
windowSpec=Window.partitionBy('year', 'mo').orderBy('year', 'mo', 'da').rowsBetween(-2,0)

gsod1.withColumn('3_day_moving_avg', F.avg('temp').over(windowSpec)).show(20)

+----+---+---+----+------------------+
|year| mo| da|temp|  3_day_moving_avg|
+----+---+---+----+------------------+
|2017| 01| 01|40.0|              40.0|
|2017| 01| 02|35.1|             37.55|
|2017| 01| 03|41.3|              38.8|
|2017| 01| 04|42.6|39.666666666666664|
|2017| 01| 05|31.9|              38.6|
|2017| 01| 06|28.7|              34.4|
|2017| 01| 07|22.7|27.766666666666666|
|2017| 01| 08|17.9|23.099999999999998|
|2017| 01| 09|15.4|18.666666666666664|
|2017| 01| 10|22.9| 18.73333333333333|
|2017| 01| 11|45.4|27.899999999999995|
|2017| 01| 12|50.1| 39.46666666666667|
|2017| 01| 13|48.6| 48.03333333333333|
|2017| 01| 14|25.8|              41.5|
|2017| 01| 15|31.4| 35.26666666666667|
|2017| 01| 16|31.3|              29.5|
|2017| 01| 17|37.2|33.300000000000004|
|2017| 01| 18|37.8| 35.43333333333333|
|2017| 01| 19|36.6|37.199999999999996|
|2017| 01| 20|38.2| 37.53333333333334|
+----+---+---+----+------------------+
only showing top 20 rows



# Summary

- Window functions are functions that are applied over a portion of a data frame called a window frame. They can perform aggregation, ranking, or analytical operations. A window function will return the data frame with the same number of records, unlike its siblings the groupby-aggregate operation and the group map UDF.
- A window frame is defined through a window spec. A window spec mandates how the data frame is split (partitionBy()), how it’s ordered (orderBy()), and how it’s portioned (rowsBetween()/rangeBetween()).
- By default, an unordered window frame will be unbounded, meaning that the window frame will be equal to the window partition for every record. An ordered window frame will grow to the left, meaning that each record will have a window frame ranging from the first record in the window partition to the current record.
- A window can be bounded by row, meaning that the records included in the window frame are tied to the row boundaries passed as parameters (with the range boundaries added to the row number of the current row), or by range, meaning that the records included in the window frame depend on the value of the current row (with the range boundaries added to the value).