In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, lead, lag, count, avg

spark = (SparkSession.builder
         .appName("apply-window-functions")
         .master("spark://spark-master:7077")
         .config("spark.executor.memory", "512m")
         .getOrCreate())

spark.sparkContext.setLogLevel("ERROR")

In [20]:
df = (spark.read
      .format("csv")
      .option("header", "true")
      .option("nullValue", "null")
      .option("dateFormat", "LLLL d, y")
      .load("../data/netflix_titles.csv"))

                                                                                

In [21]:
df = df.filter(col('country').isNotNull() & col('date_added').isNotNull())

In [22]:
from pyspark.sql.window import Window
window_spec = Window.partitionBy("country").orderBy("date_added")

In [23]:
# Assign row numbers within each partition
result = df.withColumn("row_number", row_number().over(window_spec))
result.select("title","country","date_added","row_number").show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+----------+
|               title|             country|          date_added|row_number|
+--------------------+--------------------+--------------------+----------+
| Beasts of No Nation|     Ama K. Abebrese|  Kobina Amissah Sam|         1|
|Get Him to the Greek|         Aziz Ansari|         Carla Gallo|         1|
|      Rhyme & Reason|            Chuck D.|     Desiree Densiti|         1|
|            Backfire|       Dominic Costa|        Nick Ferraro|         1|
|Hurricane Bianca:...|          Doug Plaut|    Cheyenne Jackson|         1|
|Offering to the S...|     Francesc Orella|        Imanol Arias|         1|
|        An Easy Girl|  Henri-Noël Tabary"|              France|         1|
| An Imperfect Murder|       James Toback"|       United States|         1|
|    Hurricane Bianca| Justin ""Alyssa ...|         Molly Ryman|         1|
|             Dayveon|  Lachion Buckingham|       Chasity Moore|         1|
|The Legacy 

                                                                                

In [24]:
# Add lead column
df = df.withColumn("lead_date_added", lead("date_added").over(window_spec))
# Add lag column
df = df.withColumn("lag_date_added", lag("date_added").over(window_spec))

df.select("title","country","date_added","lead_date_added","lag_date_added").show(3)

+--------------------+----------------+-------------------+---------------+--------------+
|               title|         country|         date_added|lead_date_added|lag_date_added|
+--------------------+----------------+-------------------+---------------+--------------+
| Beasts of No Nation| Ama K. Abebrese| Kobina Amissah Sam|           null|          null|
|Get Him to the Greek|     Aziz Ansari|        Carla Gallo|           null|          null|
|      Rhyme & Reason|        Chuck D.|    Desiree Densiti|           null|          null|
+--------------------+----------------+-------------------+---------------+--------------+
only showing top 3 rows



### Nested Window Functions

In [25]:
from pyspark.sql.functions import sum, lead
from pyspark.sql.window import Window

window_spec = Window.partitionBy("country").orderBy("release_year")
df = df.withColumn("running_total", count("show_id").over(window_spec))
df = df.withColumn("next_running_total", lead("running_total").over(window_spec))
df = df.withColumn("diff", df["next_running_total"] - df["running_total"])

### Window Frames

In [26]:
data = [(1, 10), (2, 15), (3, 20), (4, 25), (5, 30)]
df = spark.createDataFrame(data, ["id", "value"])

windowSpec = Window.orderBy("id").rowsBetween(-2, 0)
df = df.withColumn("rolling_avg", avg(df["value"]).over(windowSpec))

df.show()

[Stage 9:>                                                          (0 + 1) / 1]

+---+-----+-----------+
| id|value|rolling_avg|
+---+-----+-----------+
|  1|   10|       10.0|
|  2|   15|       12.5|
|  3|   20|       15.0|
|  4|   25|       20.0|
|  5|   30|       25.0|
+---+-----+-----------+



                                                                                

In [27]:
spark.stop()