# Pyspark Window Functions
Pyspark window functions are useful when you want to examine relationships within groups of data rather than between groups of data (as for groupBy)

To use them you start by defining a window function then select a separate function or set of functions to operate within that window

NB- this workbook is designed to work on Databricks Community Edition

In [52]:
import sys
import pandas as pd
import pyspark.sql.functions as fn
from pyspark.sql import SparkSession
from pyspark.sql import Window

#### Create spark session

In [2]:
spark = SparkSession.builder.appName("window").getOrCreate()

#### Sample data

In [3]:
df_data = {'partition': ['a','a', 'a', 'a', 'b', 'b', 'b', 'c', 'c',],
           'col_1': [1,1,1,1,2,2,2,3,3,], 
           'aggregation': [1,2,3,4,5,6,7,8,9,],
           'ranking': [4,3,2,1,1,1,3,1,5,],
           'lagging': [9,8,7,6,5,4,3,2,1,],
           'cumulative': [1,2,4,6,1,1,1,20,30,],
          }

#### create pandas df

In [7]:
df_pandas = pd.DataFrame(df_data)
# create spark dataframe
df = spark.createDataFrame(df_pandas)
df.show()

+---------+-----+-----------+-------+-------+----------+
|partition|col_1|aggregation|ranking|lagging|cumulative|
+---------+-----+-----------+-------+-------+----------+
|        a|    1|          1|      4|      9|         1|
|        a|    1|          2|      3|      8|         2|
|        a|    1|          3|      2|      7|         4|
|        a|    1|          4|      1|      6|         6|
|        b|    2|          5|      1|      5|         1|
|        b|    2|          6|      1|      4|         1|
|        b|    2|          7|      3|      3|         1|
|        c|    3|          8|      1|      2|        20|
|        c|    3|          9|      5|      1|        30|
+---------+-----+-----------+-------+-------+----------+



## Simple aggregation functions
we can use the standard group by aggregations with window functions. These functions use the simplest form of window which just defines grouping.
Aggregation functions use the simplest form of window which just defines grouping

In [17]:
windowSpec = Window.partitionBy('partition')
df_aggregations = df.select(
    'partition','aggregation'
).withColumn('aggregate_sum', fn.sum('aggregation').over(windowSpec)
).withColumn('aggregate_avg', fn.avg('aggregation').over(windowSpec)
).withColumn('aggregate_min', fn.min('aggregation').over(windowSpec)
).withColumn('aggregate_max', fn.max('aggregation').over(windowSpec)
)

In [18]:
df_aggregations.orderBy('partition').show()

+---------+-----------+-------------+-------------+-------------+-------------+
|partition|aggregation|aggregate_sum|aggregate_avg|aggregate_min|aggregate_max|
+---------+-----------+-------------+-------------+-------------+-------------+
|        a|          3|           10|          2.5|            1|            4|
|        a|          4|           10|          2.5|            1|            4|
|        a|          1|           10|          2.5|            1|            4|
|        a|          2|           10|          2.5|            1|            4|
|        b|          5|           18|          6.0|            5|            7|
|        b|          6|           18|          6.0|            5|            7|
|        b|          7|           18|          6.0|            5|            7|
|        c|          8|           17|          8.5|            8|            9|
|        c|          9|           17|          8.5|            8|            9|
+---------+-----------+-------------+---

## Row wise ordering and ranking functions
We can also use window funtions to order and rank data. These functions add an element to the definition of the window which defines both grouping AND ordering

In [21]:
rank_window = Window.partitionBy('partition').orderBy('ranking')

df_rank = df.select('partition', 'aggregation', 'ranking'
).withColumn('ranking_row_num', fn.row_number().over(rank_window)
).withColumn('ranking_rank', fn.rank().over(rank_window)
).withColumn('ranking_dense_rank', fn.dense_rank().over(rank_window)
).withColumn('ranking_per_rank', fn.percent_rank().over(rank_window)
).withColumn('ranking_ntile_rank', fn.ntile(2).over(rank_window)
)

In [28]:
df_rank.show()

+---------+-----------+-------+---------------+------------+------------------+------------------+------------------+
|partition|aggregation|ranking|ranking_row_num|ranking_rank|ranking_dense_rank|  ranking_per_rank|ranking_ntile_rank|
+---------+-----------+-------+---------------+------------+------------------+------------------+------------------+
|        c|          8|      1|              1|           1|                 1|               0.0|                 1|
|        c|          9|      5|              2|           2|                 2|               1.0|                 2|
|        b|          5|      1|              1|           1|                 1|               0.0|                 1|
|        b|          6|      1|              2|           1|                 1|               0.0|                 1|
|        b|          7|      3|              3|           3|                 2|               1.0|                 2|
|        a|          4|      1|              1|         

## Creating lagged columns
If we want to conduct operations like calculating the difference between subsequent operations in a group, we can use window functions to create the lagged values we require to perform the calculation. Where there is no preceding lag value, a null entry will be inserted not a zero.

The inverse of lag is lead. Effectively fn.lag(n) == fn.lead(-n)

In [43]:
lag_window = Window.partitionBy('partition').orderBy('lagging')

df_lag = df.select('partition', 'aggregation', 'lagging'
).withColumn('lagging_lag_1', fn.lag('lagging', 1).over(lag_window)
).withColumn('lagging_lag_m1', fn.lag('lagging', -1).over(lag_window)
).withColumn('lagging_lead_1', fn.lead('lagging', 1).over(lag_window)
).withColumn('difference_between', fn.col('lagging') - fn.lag('lagging', 1).over(lag_window)
)

df_lag.show()

+---------+-----------+-------+-------------+--------------+--------------+------------------+
|partition|aggregation|lagging|lagging_lag_1|lagging_lag_m1|lagging_lead_1|difference_between|
+---------+-----------+-------+-------------+--------------+--------------+------------------+
|        c|          9|      1|         null|             2|             2|              null|
|        c|          8|      2|            1|          null|          null|                 1|
|        b|          7|      3|         null|             4|             4|              null|
|        b|          6|      4|            3|             5|             5|                 1|
|        b|          5|      5|            4|          null|          null|                 1|
|        a|          4|      6|         null|             7|             7|              null|
|        a|          3|      7|            6|             8|             8|                 1|
|        a|          2|      8|            7|     

## Cumulative Calculations (Running totals and averages)
There are often good reasons to want to create a running total or running average column. In some cases we might want running totals for subsets of data. Window functions can be useful for that sort of thing.

In order to calculate such things we need to add yet another element to the window. Now we account for partition, order and which rows should be covered by the function. This can be done in two ways we can use rangeBetween to define how similar values in the window must be to be considered, or we can use rowsBetween to define how many rows should be considered. The current row is considered row zero, the following rows are numbered positively and the preceding rows negatively. For cumulative calculations you can define "all previous rows" with Window.unboundedPreceding and "all following rows" with Window.unboundedFolowing

Note that the window may vary in size as it progresses over the rows since at the start and end part of the window may "extend past" the existing rows

In [54]:
cumulative_window = Window.partitionBy('partition').rowsBetween(-1,1)

df_cumulative_avg = df.select('partition', 'cumulative'
).withColumn('cumulative_avg', fn.avg('cumulative').over(cumulative_window)).show()

+---------+----------+------------------+
|partition|cumulative|    cumulative_avg|
+---------+----------+------------------+
|        c|        20|              25.0|
|        c|        30|              25.0|
|        b|         1|               1.0|
|        b|         1|               1.0|
|        b|         1|               1.0|
|        a|         1|               1.5|
|        a|         2|2.3333333333333335|
|        a|         4|               4.0|
|        a|         6|               5.0|
+---------+----------+------------------+



### Using Window.unboundedPreceding 

In [55]:
cumulative_window_2 = Window.partitionBy('partition').orderBy('cumulative').rowsBetween(Window.unboundedPreceding, 0)

df_cumulative_sum = df.select('partition', 'cumulative'
).withColumn('cumulative_sum', fn.sum('cumulative').over(cumulative_window_2)).show()

+---------+----------+--------------+
|partition|cumulative|cumulative_sum|
+---------+----------+--------------+
|        c|        20|            20|
|        c|        30|            50|
|        b|         1|             1|
|        b|         1|             2|
|        b|         1|             3|
|        a|         1|             1|
|        a|         2|             3|
|        a|         4|             7|
|        a|         6|            13|
+---------+----------+--------------+



## Combining window and different functions

In [56]:
aggregation_window = Window.partitionBy('partition')
grouping_window = Window.partitionBy('partition').orderBy('aggregation')

# then we can use this window function for our aggregations
df_aggregations = df.select(
  'partition', 'aggregation'
).withColumn(
  # note that we calculate row number over the grouping_window
  'group_rank', fn.row_number().over(grouping_window) 
).withColumn(
  # but we calculate other columns over the aggregation_window
  'aggregation_sum', fn.sum('aggregation').over(aggregation_window),
).withColumn(
  'aggregation_avg', fn.avg('aggregation').over(aggregation_window),
).withColumn(
  'aggregation_min', fn.min('aggregation').over(aggregation_window),
).withColumn(
  'aggregation_max', fn.max('aggregation').over(aggregation_window),
).where(
  fn.col('group_rank') == 1
).select(
  'partition', 
  'aggregation_sum', 
  'aggregation_avg', 
  'aggregation_min', 
  'aggregation_max'
)

df_aggregations.show()

# this is equivalent to the rather simpler expression below
df_groupby = df.select(
  'partition', 'aggregation'
).groupBy(
  'partition'
).agg(
  fn.sum('aggregation').alias('aggregation_sum'),
  fn.avg('aggregation').alias('aggregation_avg'),
  fn.min('aggregation').alias('aggregation_min'),
  fn.max('aggregation').alias('aggregation_max'),
)

df_groupby.show()

+---------+---------------+---------------+---------------+---------------+
|partition|aggregation_sum|aggregation_avg|aggregation_min|aggregation_max|
+---------+---------------+---------------+---------------+---------------+
|        c|             17|            8.5|              8|              9|
|        b|             18|            6.0|              5|              7|
|        a|             10|            2.5|              1|              4|
+---------+---------------+---------------+---------------+---------------+

+---------+---------------+---------------+---------------+---------------+
|partition|aggregation_sum|aggregation_avg|aggregation_min|aggregation_max|
+---------+---------------+---------------+---------------+---------------+
|        c|             17|            8.5|              8|              9|
|        b|             18|            6.0|              5|              7|
|        a|             10|            2.5|              1|              4|
+---------+