## PySpark Window Functions Examples
This notebook contains practical examples demonstrating key PySpark window function concepts, including window specification, common window functions, frame definitions, and advanced use cases.



### Basic Window Functions
- Define window partitioning and ordering
- Use row_number(), rank(), dense_rank() for row numbering and ranking
- Use lag() and lead() to access previous and next rows

### Window Frames and Aggregations
- Specify window frames with rowsBetween and rangeBetween
- Calculate rolling averages, cumulative sums, and rolling max/min
- Understand Window.unboundedPreceding for cumulative calculations

### Advanced Window Use Cases
- Weighted moving averages
- Ranking top N within rolling period
- Calculating differences vs rolling averages
- Complex frame management

### Notes
- Functions like lag and lead do not support custom frame boundaries
- Ranking functions require full partition frame (no custom frame)
- Choose frame boundaries based on use case (sliding windows, growing windows, etc.)

In [0]:
df=spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load('/Volumes/sandeshmsdatabricks/sourcefiles/sourcevolume/weather/Brazil_weather_data.csv')

window frame and rolling calculation concept

In [0]:

from pyspark.sql.window import *
from pyspark.sql.functions import *

windowspec=Window.partitionBy("Country").orderBy("Date").rowsBetween(-3,0)

df=df.withColumn("Rolling_Avg_Temp",avg("Temp_Max").over(windowspec))

df.show(10)

+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+
|Country|      Date|Temp_Max|Temp_Min|Temp_Mean|Precipitation_Sum|Windspeed_Max|Windgusts_Max|Sunshine_Duration|  Rolling_Avg_Temp|
+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+
| Brazil|2000-01-01|    26.5|    21.8|     23.7|             15.7|          8.8|         24.8|         10559.94|              26.5|
| Brazil|2000-01-02|    27.2|    21.8|     23.5|             11.7|         12.7|         32.8|         14654.83|             26.85|
| Brazil|2000-01-03|    26.6|    21.6|     23.3|             20.9|         15.0|         38.2|         16619.55| 26.76666666666667|
| Brazil|2000-01-04|    24.0|    21.3|     22.3|             12.0|         13.4|         29.2|          7564.03|26.075000000000003|
| Brazil|2000-01-05|    27.8|    21.1|     23.7|             16.6|         1

Using the rank() window function, create a new column Temp_Rank that ranks Temp_Max values within each Country in descending order (highest temperature ranked 1).

In [0]:
windowspec = Window.partitionBy("Country").orderBy(desc("Temp_Max"))

df = df.withColumn(
    "RankedDate",
    rank().over(windowspec)
).withColumn(
    "DenseRankedDate",
    dense_rank().over(windowspec)
)

df.show(10)

+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+
|Country|      Date|Temp_Max|Temp_Min|Temp_Mean|Precipitation_Sum|Windspeed_Max|Windgusts_Max|Sunshine_Duration|  Rolling_Avg_Temp|RankedDate|DenseRankedDate|
+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+
| Brazil|2021-09-21|    40.8|    21.0|     31.1|              0.0|         13.8|         36.4|         39774.77|39.349999999999994|         1|              1|
| Brazil|2019-09-14|    40.7|    21.5|     30.4|              0.0|          9.6|         25.6|          39600.0|39.900000000000006|         2|              2|
| Brazil|2019-09-21|    40.4|    25.7|     32.6|              0.0|         10.8|         32.4|         38496.34|            39.725|         3|              3|
| Brazil|2019-09-22|    40.2|    26.0|     32.

Question 1 (Easy):
Create a new column Row_Number that assigns a row number for each row within each Country, ordered by Date (earliest first).

Use the row_number() window function and appropriate window specification.

In [0]:
from pyspark.sql.window import *
windowspec=Window.partitionBy("Country").orderBy("Date")

df=df.withColumn("Rownum",row_number().over(windowspec))
df.show(10)

+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+
|Country|      Date|Temp_Max|Temp_Min|Temp_Mean|Precipitation_Sum|Windspeed_Max|Windgusts_Max|Sunshine_Duration|  Rolling_Avg_Temp|RankedDate|DenseRankedDate|Rownum|
+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+
| Brazil|2000-01-01|    26.5|    21.8|     23.7|             15.7|          8.8|         24.8|         10559.94|              26.5|      8591|            141|     1|
| Brazil|2000-01-02|    27.2|    21.8|     23.5|             11.7|         12.7|         32.8|         14654.83|             26.85|      8450|            134|     2|
| Brazil|2000-01-03|    26.6|    21.6|     23.3|             20.9|         15.0|         38.2|         16619.55| 26.76666666666667|      8572|            140|     3|
| Br

### LEAD and LAG Use Case

In [0]:
windowspec=Window.partitionBy("Country").orderBy("Date")

df=df.withColumn("Leaddata",lead("Temp_Max",1).over(windowspec))\
.withColumn("LagData",lag("Temp_Max",1).over(windowspec))
df.show(10)

+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+--------+-------+
|Country|      Date|Temp_Max|Temp_Min|Temp_Mean|Precipitation_Sum|Windspeed_Max|Windgusts_Max|Sunshine_Duration|  Rolling_Avg_Temp|RankedDate|DenseRankedDate|Rownum|Leaddata|LagData|
+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+--------+-------+
| Brazil|2000-01-01|    26.5|    21.8|     23.7|             15.7|          8.8|         24.8|         10559.94|              26.5|      8591|            141|     1|    27.2|   NULL|
| Brazil|2000-01-02|    27.2|    21.8|     23.5|             11.7|         12.7|         32.8|         14654.83|             26.85|      8450|            134|     2|    26.6|   26.5|
| Brazil|2000-01-03|    26.6|    21.6|     23.3|             20.9|         15.0|     

Write PySpark code to calculate a cumulative sum of the column Temp_Max within each Country, ordered by Date, and add it as a new column named SumOfTemp to the DataFrame.

Use window functions with an appropriate window frame to include all rows from the beginning of the partition up to the current row.

In [0]:
windowspec=Window.partitionBy("Country").orderBy("Date").rowsBetween(Window.unboundedPreceding,
    Window.currentRow)
df=df.withColumn("SumOfTemp",sum("Temp_Max").over(windowspec))

df.show(10)

+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+--------+-------+------------------+
|Country|      Date|Temp_Max|Temp_Min|Temp_Mean|Precipitation_Sum|Windspeed_Max|Windgusts_Max|Sunshine_Duration|  Rolling_Avg_Temp|RankedDate|DenseRankedDate|Rownum|Leaddata|LagData|         SumOfTemp|
+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+--------+-------+------------------+
| Brazil|2000-01-01|    26.5|    21.8|     23.7|             15.7|          8.8|         24.8|         10559.94|              26.5|      8591|            141|     1|    27.2|   NULL|              26.5|
| Brazil|2000-01-02|    27.2|    21.8|     23.5|             11.7|         12.7|         32.8|         14654.83|             26.85|      8450|            134|     2|    26.6|   26.5|          

Find the difference between the current day’s Windgusts_Max and the previous day’s Windgusts_Max for each Country in a new column Windgusts_Diff.

In [0]:
from pyspark.sql.functions import col, lag

windowspec = Window.partitionBy("Country").orderBy("Date")

df = df.withColumn(
    "diffWindgusts_Max",
    col("Windgusts_Max") - lag("Windgusts_Max", 1).over(windowspec)
)

df.show(10)

+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+--------+-------+------------------+------------------+
|Country|      Date|Temp_Max|Temp_Min|Temp_Mean|Precipitation_Sum|Windspeed_Max|Windgusts_Max|Sunshine_Duration|  Rolling_Avg_Temp|RankedDate|DenseRankedDate|Rownum|Leaddata|LagData|         SumOfTemp| diffWindgusts_Max|
+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+--------+-------+------------------+------------------+
| Brazil|2000-01-01|    26.5|    21.8|     23.7|             15.7|          8.8|         24.8|         10559.94|              26.5|      8591|            141|     1|    27.2|   NULL|              26.5|              NULL|
| Brazil|2000-01-02|    27.2|    21.8|     23.5|             11.7|         12.7|         32.8|         14654.83|    

For each Country, find the max Temp_Max value within the last 7 days including current day (rolling max).

In [0]:
windowspec = Window.partitionBy("Country").orderBy("Date").rowsBetween(-7,0)

from pyspark.sql.functions import col, max

df=df.withColumn("Max0fseven",max(col("temp_Max")).over(windowspec))

df.show(10)

+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+--------+-------+------------------+------------------+----------+
|Country|      Date|Temp_Max|Temp_Min|Temp_Mean|Precipitation_Sum|Windspeed_Max|Windgusts_Max|Sunshine_Duration|  Rolling_Avg_Temp|RankedDate|DenseRankedDate|Rownum|Leaddata|LagData|         SumOfTemp| diffWindgusts_Max|Max0fseven|
+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+--------+-------+------------------+------------------+----------+
| Brazil|2000-01-01|    26.5|    21.8|     23.7|             15.7|          8.8|         24.8|         10559.94|              26.5|      8591|            141|     1|    27.2|   NULL|              26.5|              NULL|      26.5|
| Brazil|2000-01-02|    27.2|    21.8|     23.5|             11.7|      

Previous 7-Day Average (Excluding Current Day)
Task: Create a column Prev7Day_Avg_Temp that calculates the average Temp_Max of the previous 7 days ONLY (excluding the current day) for each Country, ordered by Date.

Challenge: Window frame must exclude the current row but include exactly the 7 previous rows.

In [0]:
windowspec=Window.partitionBy("Country").orderBy("Date").rowsBetween(-7,-1)

df=df.withColumn("Previous_Day_Avg_Temp",avg(col("Temp_Max")).over(windowspec))
df.show(10)

+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+--------+-------+------------------+------------------+----------+---------------------+
|Country|      Date|Temp_Max|Temp_Min|Temp_Mean|Precipitation_Sum|Windspeed_Max|Windgusts_Max|Sunshine_Duration|  Rolling_Avg_Temp|RankedDate|DenseRankedDate|Rownum|Leaddata|LagData|         SumOfTemp| diffWindgusts_Max|Max0fseven|Previous_Day_Avg_Temp|
+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+--------+-------+------------------+------------------+----------+---------------------+
| Brazil|2000-01-01|    26.5|    21.8|     23.7|             15.7|          8.8|         24.8|         10559.94|              26.5|      8591|            141|     1|    27.2|   NULL|              26.5|              NULL|      26.5|       

Top 3 Precipitation Days in Last 30 Days
Create Top3_Precip_Rank column that ranks Precipitation_Sum within each Country over the last 30 days (including current), highest first. Rank 1-3 = top precipitation days.

Hint: rowsBetween(-29, 0) + rank() + desc("Precipitation_Sum")

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

windowspec = Window.partitionBy("Country").orderBy(desc("Precipitation_Sum"))
df = df.withColumn(
    "Top3_Precip_Rank",
    rank().over(windowspec)
)


display(df.filter(col("Top3_Precip_Rank")<= 3))

Country,Date,Temp_Max,Temp_Min,Temp_Mean,Precipitation_Sum,Windspeed_Max,Windgusts_Max,Sunshine_Duration,Rolling_Avg_Temp,RankedDate,DenseRankedDate,Rownum,Leaddata,LagData,SumOfTemp,diffWindgusts_Max,Max0fseven,Previous_Day_Avg_Temp,Top3_Precip_Rank
Brazil,2023-03-19,22.9,21.0,21.7,124.4,16.9,37.4,0.0,27.0,8760,171,8479,26.7,27.5,270590.80000000075,3.6000000000000014,32.7,30.085714285714285,1
Brazil,2013-01-25,23.1,22.3,22.8,101.3,15.1,32.8,0.0,26.725,8758,170,4774,25.2,27.8,151034.80000000037,5.399999999999999,31.2,28.585714285714285,2
Brazil,2008-02-03,22.8,21.9,22.4,94.0,20.3,39.6,488.38,25.525,8762,172,2956,25.6,25.2,93350.6,15.1,28.5,26.628571428571423,3


 Create Temp_Vs_7DayAvg showing difference between current Temp_Max and average of previous 7 days only (excluding current day).

In [0]:
windowspec=Window.partitionBy("Country").orderBy("Date").rowsBetween(-7,-1)


df=df.withColumn("Temp_Vs_7DayAvg",col("Temp_Max")-avg(col("Temp_Max")).over(windowspec))
df.show(10)

+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+--------+-------+------------------+------------------+----------+---------------------+----------------+-------------------+
|Country|      Date|Temp_Max|Temp_Min|Temp_Mean|Precipitation_Sum|Windspeed_Max|Windgusts_Max|Sunshine_Duration|  Rolling_Avg_Temp|RankedDate|DenseRankedDate|Rownum|Leaddata|LagData|         SumOfTemp| diffWindgusts_Max|Max0fseven|Previous_Day_Avg_Temp|Top3_Precip_Rank|    Temp_Vs_7DayAvg|
+-------+----------+--------+--------+---------+-----------------+-------------+-------------+-----------------+------------------+----------+---------------+------+--------+-------+------------------+------------------+----------+---------------------+----------------+-------------------+
| Brazil|2000-01-01|    26.5|    21.8|     23.7|             15.7|          8.8|         24.8|         10559.94|              2