# Window Functions

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
file = "dbfs:/databricks-datasets/flights/"
# reads the csv file and creates a Spark DataFrame
df = spark \
    .read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file)

df = df.withColumn("year", lit(2024))

df = df.withColumn(
    "FlightTimestamp",
    expr("try_to_timestamp(concat('2024', date), 'yyyyMMddHHmm')") # invalid values are set to NULL
)

display(df.limit(10))

date,delay,distance,origin,destination,year,FlightTimestamp
1011245,6,602,ABE,ATL,2024,2024-01-01T12:45:00.000Z
1020600,-8,369,ABE,DTW,2024,2024-01-02T06:00:00.000Z
1021245,-2,602,ABE,ATL,2024,2024-01-02T12:45:00.000Z
1020605,-4,602,ABE,ATL,2024,2024-01-02T06:05:00.000Z
1031245,-4,602,ABE,ATL,2024,2024-01-03T12:45:00.000Z
1030605,0,602,ABE,ATL,2024,2024-01-03T06:05:00.000Z
1041243,10,602,ABE,ATL,2024,2024-01-04T12:43:00.000Z
1040605,28,602,ABE,ATL,2024,2024-01-04T06:05:00.000Z
1051245,88,602,ABE,ATL,2024,2024-01-05T12:45:00.000Z
1050605,9,602,ABE,ATL,2024,2024-01-05T06:05:00.000Z


In [0]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- delay: string (nullable = true)
 |-- distance: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- year: integer (nullable = false)
 |-- FlightTimestamp: timestamp (nullable = true)



In [0]:
# clean data. remove null values for FlightTimestamp as this ones did not started yet.
clean_df = df.filter(df.FlightTimestamp.isNotNull())

## Window function with row_number()

In [0]:
row_number_window = Window.partitionBy("origin").orderBy(desc("FlightTimestamp"))
row_number_df = clean_df.withColumn("row_number", row_number().over(row_number_window))
display(row_number_df.limit(30))

date,delay,distance,origin,destination,year,FlightTimestamp,row_number
3311910,-16,165,AEX,IAH,2024,2024-03-31T19:10:00.000Z,1
3311820,-9,247,AEX,DFW,2024,2024-03-31T18:20:00.000Z,2
3311720,-10,435,AEX,ATL,2024,2024-03-31T17:20:00.000Z,3
3311409,77,435,AEX,ATL,2024,2024-03-31T14:09:00.000Z,4
3311201,-7,165,AEX,IAH,2024,2024-03-31T12:01:00.000Z,5
3311135,-7,435,AEX,ATL,2024,2024-03-31T11:35:00.000Z,6
3311045,-10,247,AEX,DFW,2024,2024-03-31T10:45:00.000Z,7
3310600,-11,435,AEX,ATL,2024,2024-03-31T06:00:00.000Z,8
3310600,-12,247,AEX,DFW,2024,2024-03-31T06:00:00.000Z,9
3310530,-10,165,AEX,IAH,2024,2024-03-31T05:30:00.000Z,10


## Window Function with rank()

In [0]:
ranking_window = Window.partitionBy("origin").orderBy(desc("delay"))
ranking_df = clean_df.withColumn("rank", rank().over(ranking_window))
display(ranking_df.limit(30))

date,delay,distance,origin,destination,year,FlightTimestamp,rank
3161910,98,165,AEX,IAH,2024,2024-03-16T19:10:00.000Z,1
2061030,90,247,AEX,DFW,2024,2024-02-06T10:30:00.000Z,2
1021116,9,435,AEX,ATL,2024,2024-01-02T11:16:00.000Z,3
1221415,9,435,AEX,ATL,2024,2024-01-22T14:15:00.000Z,3
2231115,9,435,AEX,ATL,2024,2024-02-23T11:15:00.000Z,3
2090615,9,247,AEX,DFW,2024,2024-02-09T06:15:00.000Z,3
3031030,9,247,AEX,DFW,2024,2024-03-03T10:30:00.000Z,3
3041720,9,435,AEX,ATL,2024,2024-03-04T17:20:00.000Z,3
3121045,9,247,AEX,DFW,2024,2024-03-12T10:45:00.000Z,3
3281409,9,435,AEX,ATL,2024,2024-03-28T14:09:00.000Z,3


## Window Function with dense_rank()

In [0]:
dense_ranking = Window.partitionBy("origin").orderBy(desc("delay"))
dense_ranking_df = clean_df.withColumn("dense_rank", dense_rank().over(dense_ranking))
display(dense_ranking_df.limit(30))

date,delay,distance,origin,destination,year,FlightTimestamp,dense_rank
3161910,98,165,AEX,IAH,2024,2024-03-16T19:10:00.000Z,1
2061030,90,247,AEX,DFW,2024,2024-02-06T10:30:00.000Z,2
1021116,9,435,AEX,ATL,2024,2024-01-02T11:16:00.000Z,3
1221415,9,435,AEX,ATL,2024,2024-01-22T14:15:00.000Z,3
2231115,9,435,AEX,ATL,2024,2024-02-23T11:15:00.000Z,3
2090615,9,247,AEX,DFW,2024,2024-02-09T06:15:00.000Z,3
3031030,9,247,AEX,DFW,2024,2024-03-03T10:30:00.000Z,3
3041720,9,435,AEX,ATL,2024,2024-03-04T17:20:00.000Z,3
3121045,9,247,AEX,DFW,2024,2024-03-12T10:45:00.000Z,3
3281409,9,435,AEX,ATL,2024,2024-03-28T14:09:00.000Z,3


## Window Function with percent_rank()

In [0]:
percent_ranking = Window.partitionBy("origin").orderBy(desc("delay"))
percent_ranking_df = clean_df.withColumn("percent_rank", percent_rank().over(percent_ranking))
display(percent_ranking_df.limit(30))

date,delay,distance,origin,destination,year,FlightTimestamp,percent_rank
3161910,98,165,AEX,IAH,2024,2024-03-16T19:10:00.000Z,0.0
2061030,90,247,AEX,DFW,2024,2024-02-06T10:30:00.000Z,0.0011402508551881
1021116,9,435,AEX,ATL,2024,2024-01-02T11:16:00.000Z,0.0022805017103762
1221415,9,435,AEX,ATL,2024,2024-01-22T14:15:00.000Z,0.0022805017103762
2231115,9,435,AEX,ATL,2024,2024-02-23T11:15:00.000Z,0.0022805017103762
2090615,9,247,AEX,DFW,2024,2024-02-09T06:15:00.000Z,0.0022805017103762
3031030,9,247,AEX,DFW,2024,2024-03-03T10:30:00.000Z,0.0022805017103762
3041720,9,435,AEX,ATL,2024,2024-03-04T17:20:00.000Z,0.0022805017103762
3121045,9,247,AEX,DFW,2024,2024-03-12T10:45:00.000Z,0.0022805017103762
3281409,9,435,AEX,ATL,2024,2024-03-28T14:09:00.000Z,0.0022805017103762


## Window Function with ntile()

In [0]:
ntile_window = Window.partitionBy("origin", "destination").orderBy(desc("delay"))
ntile_df = clean_df.withColumn("ntile", ntile(300).over(ntile_window))
display(ntile_df.limit(30))

date,delay,distance,origin,destination,year,FlightTimestamp,ntile
3211135,99,494,ABQ,DFW,2024,2024-03-21T11:35:00.000Z,1
1240730,97,494,ABQ,DFW,2024,2024-01-24T07:30:00.000Z,1
2061135,96,494,ABQ,DFW,2024,2024-02-06T11:35:00.000Z,1
2201135,90,494,ABQ,DFW,2024,2024-02-20T11:35:00.000Z,2
1051135,9,494,ABQ,DFW,2024,2024-01-05T11:35:00.000Z,2
1051655,9,494,ABQ,DFW,2024,2024-01-05T16:55:00.000Z,2
1091030,9,494,ABQ,DFW,2024,2024-01-09T10:30:00.000Z,3
3041405,9,494,ABQ,DFW,2024,2024-03-04T14:05:00.000Z,3
1020730,87,494,ABQ,DFW,2024,2024-01-02T07:30:00.000Z,3
1021655,87,494,ABQ,DFW,2024,2024-01-02T16:55:00.000Z,4


# Window Analytic Funtions
## lag() and lead()

In [0]:
# lag will return null if there is no previous value to lag
# lead will return null if there is no next value to lead
lag_function = Window.partitionBy("origin").orderBy(("FlightTimestamp"))
lead_function = Window.partitionBy("origin").orderBy(("FlightTimestamp"))
lead_lag_df = clean_df.withColumn("lead", lead("delay", 1).over(lead_function)).withColumn("lag", lag("delay", 1).over(lag_function))
display(lead_lag_df.limit(30))

date,delay,distance,origin,destination,year,FlightTimestamp,lead,lag
1010615,-3,247,AEX,DFW,2024,2024-01-01T06:15:00.000Z,63,
1011030,63,247,AEX,DFW,2024,2024-01-01T10:30:00.000Z,-3,-3.0
1011115,-3,435,AEX,ATL,2024,2024-01-01T11:15:00.000Z,-8,63.0
1011204,-8,165,AEX,IAH,2024,2024-01-01T12:04:00.000Z,18,-3.0
1011710,18,435,AEX,ATL,2024,2024-01-01T17:10:00.000Z,25,-8.0
1011830,25,247,AEX,DFW,2024,2024-01-01T18:30:00.000Z,-14,18.0
1011838,-14,165,AEX,IAH,2024,2024-01-01T18:38:00.000Z,-7,25.0
1020537,-7,165,AEX,IAH,2024,2024-01-02T05:37:00.000Z,-6,-14.0
1020600,-6,435,AEX,ATL,2024,2024-01-02T06:00:00.000Z,-8,-7.0
1020615,-8,247,AEX,DFW,2024,2024-01-02T06:15:00.000Z,-5,-6.0


## GroupBy()

In [0]:
display(clean_df.groupBy("origin")
    .agg(avg("delay").alias("avg_delay"), count("*").alias("flight_count"),
    min("delay").alias("min_delay"),
    max("delay").alias("max_delay"))
    .orderBy(desc("flight_count")))

origin,avg_delay,flight_count,min_delay,max_delay
ATL,12.58238599099296,91484,-1,99
DFW,9.918650156245436,68482,-1,99
ORD,18.588917606028524,64228,-1,99
LAX,10.45538586695263,54086,-1,99
DEN,16.922668773989614,53148,-1,99
IAH,13.56006549664445,43361,-1,99
PHX,9.067936745112688,40155,-1,99
SFO,12.705974723298636,39483,-1,99
LAS,13.34451928595161,33107,-1,99
CLT,8.681008379691571,28402,-1,99


## where()

In [0]:
# using where to filter data
display(clean_df.where('origin = "MDW"').where('delay < 0').orderBy(desc('delay')).limit(30))

date,delay,distance,origin,destination,year,FlightTimestamp
2251200,-9,198,MDW,DTW,2024,2024-02-25T12:00:00.000Z
3071135,-9,455,MDW,ROC,2024,2024-03-07T11:35:00.000Z
2110625,-9,630,MDW,LGA,2024,2024-02-11T06:25:00.000Z
2101749,-9,860,MDW,MCO,2024,2024-02-10T17:49:00.000Z
3230730,-9,501,MDW,IAD,2024,2024-03-23T07:30:00.000Z
3040620,-9,1014,MDW,FLL,2024,2024-03-04T06:20:00.000Z
1111335,-9,623,MDW,ALB,2024,2024-01-11T13:35:00.000Z
2190705,-9,349,MDW,PIT,2024,2024-02-19T07:05:00.000Z
2260645,-9,266,MDW,CLE,2024,2024-02-26T06:45:00.000Z
2190710,-9,218,MDW,STL,2024,2024-02-19T07:10:00.000Z


In [0]:
# for each airport show the row with the biggest delay
max_delay_window = Window.partitionBy("origin").orderBy(desc("delay"))
max_delay_row_df = clean_df.withColumn("rank", rank().over(max_delay_window)).filter(col("rank") == 1)
display(max_delay_row_df.orderBy(desc("delay")).limit(30))

date,delay,distance,origin,destination,year,FlightTimestamp,rank
1090600,995,462,SMF,SLC,2024,2024-01-09T06:00:00.000Z,1
3191420,994,1590,SJC,ORD,2024,2024-03-19T14:20:00.000Z,1
1200645,993,525,MOT,DEN,2024,2024-01-20T06:45:00.000Z,1
2211840,99,714,ATL,OMA,2024,2024-02-21T18:40:00.000Z,1
1302144,99,390,ATL,BTR,2024,2024-01-30T21:44:00.000Z,1
2101449,99,92,ATL,CHA,2024,2024-02-10T14:49:00.000Z,1
2150929,99,516,ATL,DTW,2024,2024-02-15T09:29:00.000Z,1
1021010,99,1133,ATL,ASE,2024,2024-01-02T10:10:00.000Z,1
1061635,99,516,ATL,DTW,2024,2024-01-06T16:35:00.000Z,1
1291500,99,369,ATL,MSY,2024,2024-01-29T15:00:00.000Z,1


## describe()

In [0]:
clean_df.describe().show()

+-------+-----------------+------------------+-----------------+-------+-----------+--------------------+
|summary|             date|             delay|         distance| origin|destination|                year|
+-------+-----------------+------------------+-----------------+-------+-----------+--------------------+
|  count|          1391578|           1391578|          1391578|1391578|    1391578|             1391578|
|   mean|2180446.584000322|12.079802928761449|690.5508264718184|   NULL|       NULL|              2024.0|
| stddev|838031.1536740946| 38.80773374985644|513.6628153663218|   NULL|       NULL|1.823773136043662...|
|    min|         01010005|                -1|              100|    ABE|        ABE|                2024|
|    max|         03312359|               995|              999|    YUM|        YUM|                2024|
+-------+-----------------+------------------+-----------------+-------+-----------+--------------------+



In [0]:
clean_df.filter(col("origin") == "ATL").describe().show()

+-------+-----------------+------------------+-----------------+------+-----------+------+
|summary|             date|             delay|         distance|origin|destination|  year|
+-------+-----------------+------------------+-----------------+------+-----------+------+
|  count|            91484|             91484|            91484| 91484|      91484| 91484|
|   mean| 2179146.73735298|12.582385990992961|550.9688032880066|  NULL|       NULL|2024.0|
| stddev|837398.5766726806|35.304948727664325|387.6414892003325|  NULL|       NULL|   0.0|
|    min|         01010600|                -1|             1029|   ATL|        ABE|  2024|
|    max|         03312312|                99|              976|   ATL|        XNA|  2024|
+-------+-----------------+------------------+-----------------+------+-----------+------+

