Window Functions

In [1]:
import pyspark
from pyspark.sql import  SparkSession

In [2]:
spark = SparkSession.builder \
    .appName("Window Functions exercise 4") \
    .master("local[*]") \
    .getOrCreate()


In [22]:
# Create a schema
from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([
    StructField("country", StringType(), True),
    StructField("weeknum", IntegerType(), True),
    StructField("numinvoices", IntegerType(), True),
    StructField("totalquantity", IntegerType(), True),
    StructField("invoicevalue", FloatType(), True)
])

In [24]:
# Create a dataset sample for exercise
rdd = spark.sparkContext.parallelize([
    ("Spain", 49, 1, 67, 174.72),
    ("Germany", 48, 11, 1795, 1600.0),
    ("Germany", 48, 15, 1985, 2100.0),
    ("Lithuania", 48, 10, 6000, 1254.2),
    ("Germany", 49, 12, 1852, 1800.0),
    ("Germany", 49, 15, 2114, 2000.0),
    ("France", 50, 8, 1450, 1200.5),
    ("Italy", 48, 6, 980, 850.0),
    ("Spain", 47, 7, 1250, 1350.8),
    ("Finland", 50, 4, 560, 700.3),
    ("Finland", 50, 4, 700, 850.1),
    ("Lithuania", 49, 5, 780, 650.9),
    ("Germany", 47, 15, 2200, 2050.0),
    ("Spain", 50, 3, 430, 560.2),
    ("France", 49, 9, 1600, 1350.0),
    ("France", 49, 11, 1800, 1500.0),
    ("Italy", 47, 10, 1420, 1325.5),
    ("Sweden", 48, 5, 600, 755.6),
    ("Norway", 49, 6, 720, 850.2),
    ("Denmark", 50, 4, 540, 670.1),
    ("Denmark", 50, 6, 700, 820.6),
    ("Finland", 47, 7, 1130, 1230.8),
    ("Poland", 48, 8, 1245, 1360.5),
    ("Czechia", 49, 3, 400, 500.0),
    ("Slovakia", 50, 2, 250, 320.0),
    ("Hungary", 47, 5, 600, 740.2),
    ("Austria", 48, 6, 780, 870.9),
    ("Belgium", 49, 7, 900, 950.0),
    ("Netherlands", 50, 4, 560, 670.8),
    ("Portugal", 47, 3, 420, 510.2),
    ("Switzerland", 48, 8, 1240, 1355.0),
    ("Ireland", 49, 6, 720, 810.3),
    ("Spain", 46, 4, 520, 620.1),
    ("France", 45, 9, 1500, 1400.5),
    ("Germany", 46, 12, 1900, 1750.0),
    ("Italy", 45, 10, 1400, 1290.5),
    ("Sweden", 46, 5, 620, 780.6),
    ("Norway", 47, 6, 750, 890.2),
    ("Denmark", 48, 4, 510, 645.0),
    ("Finland", 49, 7, 1120, 1220.8),
    ("Poland", 50, 8, 1275, 1380.5),
    ("Czechia", 47, 3, 410, 515.0),
    ("Slovakia", 46, 2, 260, 325.0),
    ("Hungary", 45, 5, 620, 750.2),
    ("Austria", 44, 6, 790, 880.9),
    ("Belgium", 43, 7, 920, 970.0),
    ("Netherlands", 42, 4, 570, 680.8),
    ("Portugal", 41, 3, 430, 520.2),
    ("Switzerland", 40, 8, 1260, 1375.0)
])

In [26]:
# Convert RDD to DataFrame
df = spark.createDataFrame(rdd, schema)

In [27]:
df.printSchema()

root
 |-- country: string (nullable = true)
 |-- weeknum: integer (nullable = true)
 |-- numinvoices: integer (nullable = true)
 |-- totalquantity: integer (nullable = true)
 |-- invoicevalue: float (nullable = true)



In [28]:
df.show(5)

+---------+-------+-----------+-------------+------------+
|  country|weeknum|numinvoices|totalquantity|invoicevalue|
+---------+-------+-----------+-------------+------------+
|    Spain|     49|          1|           67|      174.72|
|  Germany|     48|         11|         1795|      1600.0|
|  Germany|     48|         15|         1985|      2100.0|
|Lithuania|     48|         10|         6000|      1254.2|
|  Germany|     49|         12|         1852|      1800.0|
+---------+-------+-----------+-------------+------------+
only showing top 5 rows



In [29]:
df1 = df.groupBy("country").count()
df1.show(5)

+---------+-----+
|  country|count|
+---------+-----+
|  Germany|    6|
|Lithuania|    2|
|    Spain|    4|
|   France|    4|
|  Finland|    4|
+---------+-----+
only showing top 5 rows



In [30]:
df1.orderBy("count",ascending=False).show()

+-----------+-----+
|    country|count|
+-----------+-----+
|    Germany|    6|
|      Spain|    4|
|     France|    4|
|    Finland|    4|
|      Italy|    3|
|    Denmark|    3|
|  Lithuania|    2|
|     Sweden|    2|
|     Norway|    2|
|    Czechia|    2|
|     Poland|    2|
|   Slovakia|    2|
|    Belgium|    2|
|   Portugal|    2|
|    Austria|    2|
|    Hungary|    2|
|Netherlands|    2|
|Switzerland|    2|
|    Ireland|    1|
+-----------+-----+



In [31]:
# import Window
from pyspark.sql.window import Window
window_spec = Window.partitionBy("country").orderBy("weeknum")

In [35]:
# Ranking functions
df.select(
    col("country"),
    col("weeknum"),
    col("invoicevalue"),
    row_number().over(window_spec).alias("row_number"),
    rank().over(window_spec).alias("rank"),
    dense_rank().over(window_spec).alias("dense_rank")) \
    .show(5)

+-------+-------+------------+----------+----+----------+
|country|weeknum|invoicevalue|row_number|rank|dense_rank|
+-------+-------+------------+----------+----+----------+
|Austria|     44|       880.9|         1|   1|         1|
|Austria|     48|       870.9|         2|   2|         2|
|Belgium|     43|       970.0|         1|   1|         1|
|Belgium|     49|       950.0|         2|   2|         2|
|Czechia|     47|       515.0|         1|   1|         1|
+-------+-------+------------+----------+----+----------+
only showing top 5 rows



In [None]:
# Create filter "country" = "Germany"
df.where("country == 'Germany'").select(
    col("country"),
    col("weeknum"),
    col("invoicevalue"),
    row_number().over(window_spec).alias("row_number"),
    rank().over(window_spec).alias("rank"),
    dense_rank().over(window_spec).alias("dense_rank")) \
    .show(5)

+-------+-------+------------+----------+----+----------+
|country|weeknum|invoicevalue|row_number|rank|dense_rank|
+-------+-------+------------+----------+----+----------+
|Germany|     46|      1750.0|         1|   1|         1|
|Germany|     47|      2050.0|         2|   2|         2|
|Germany|     48|      1600.0|         3|   3|         3|
|Germany|     48|      2100.0|         4|   3|         3|
|Germany|     49|      1800.0|         5|   5|         4|
+-------+-------+------------+----------+----+----------+
only showing top 5 rows



In [38]:
# Creating filter: second way
df.filter(df['country'] == 'France').select(
    col("country"),
    col("weeknum"),
    col("invoicevalue"),
    rank().over(window_spec).alias("rank"),
    dense_rank().over(window_spec).alias("dense_rank")
).show()

+-------+-------+------------+----+----------+
|country|weeknum|invoicevalue|rank|dense_rank|
+-------+-------+------------+----+----------+
| France|     45|      1400.5|   1|         1|
| France|     49|      1350.0|   2|         2|
| France|     49|      1500.0|   2|         2|
| France|     50|      1200.5|   4|         3|
+-------+-------+------------+----+----------+



Window: orderby(invoicevalue)

In [39]:
my_window = Window.partitionBy("country").orderBy("invoicevalue")

In [40]:
df.withColumn("rank", rank().over(my_window)).show()

+-------+-------+-----------+-------------+------------+----+
|country|weeknum|numinvoices|totalquantity|invoicevalue|rank|
+-------+-------+-----------+-------------+------------+----+
|Austria|     48|          6|          780|       870.9|   1|
|Austria|     44|          6|          790|       880.9|   2|
|Belgium|     49|          7|          900|       950.0|   1|
|Belgium|     43|          7|          920|       970.0|   2|
|Czechia|     49|          3|          400|       500.0|   1|
|Czechia|     47|          3|          410|       515.0|   2|
|Denmark|     48|          4|          510|       645.0|   1|
|Denmark|     50|          4|          540|       670.1|   2|
|Denmark|     50|          6|          700|       820.6|   3|
|Finland|     50|          4|          560|       700.3|   1|
|Finland|     50|          4|          700|       850.1|   2|
|Finland|     49|          7|         1120|      1220.8|   3|
|Finland|     47|          7|         1130|      1230.8|   4|
| France

In [None]:
# Lowest Sales of each country
df_r = df.withColumn("row_number", row_number().over(my_window))
df_r.where("row_number == 1").show()

+-----------+-------+-----------+-------------+------------+----------+
|    country|weeknum|numinvoices|totalquantity|invoicevalue|row_number|
+-----------+-------+-----------+-------------+------------+----------+
|    Austria|     48|          6|          780|       870.9|         1|
|    Belgium|     49|          7|          900|       950.0|         1|
|    Czechia|     49|          3|          400|       500.0|         1|
|    Denmark|     48|          4|          510|       645.0|         1|
|    Finland|     50|          4|          560|       700.3|         1|
|     France|     50|          8|         1450|      1200.5|         1|
|    Germany|     48|         11|         1795|      1600.0|         1|
|    Hungary|     47|          5|          600|       740.2|         1|
|    Ireland|     49|          6|          720|       810.3|         1|
|      Italy|     48|          6|          980|       850.0|         1|
|  Lithuania|     49|          5|          780|       650.9|    

In [43]:
my_window1 = Window.partitionBy("country").orderBy(desc("invoicevalue"))

In [None]:
# Highest Sales of each country
df_r = df.withColumn("row_number", row_number().over(my_window1)).where("row_number == 1").show()

+-----------+-------+-----------+-------------+------------+----------+
|    country|weeknum|numinvoices|totalquantity|invoicevalue|row_number|
+-----------+-------+-----------+-------------+------------+----------+
|    Austria|     44|          6|          790|       880.9|         1|
|    Belgium|     43|          7|          920|       970.0|         1|
|    Czechia|     47|          3|          410|       515.0|         1|
|    Denmark|     50|          6|          700|       820.6|         1|
|    Finland|     47|          7|         1130|      1230.8|         1|
|     France|     49|         11|         1800|      1500.0|         1|
|    Germany|     48|         15|         1985|      2100.0|         1|
|    Hungary|     45|          5|          620|       750.2|         1|
|    Ireland|     49|          6|          720|       810.3|         1|
|      Italy|     47|         10|         1420|      1325.5|         1|
|  Lithuania|     48|         10|         6000|      1254.2|    

In [46]:
# Highest Sales of each country (delete row_number column which helped us to arrange data from highest to lowest)
df_r  = df.withColumn("row_number", row_number().over(my_window1)).where("row_number == 1").drop("row_number").show()

+-----------+-------+-----------+-------------+------------+
|    country|weeknum|numinvoices|totalquantity|invoicevalue|
+-----------+-------+-----------+-------------+------------+
|    Austria|     44|          6|          790|       880.9|
|    Belgium|     43|          7|          920|       970.0|
|    Czechia|     47|          3|          410|       515.0|
|    Denmark|     50|          6|          700|       820.6|
|    Finland|     47|          7|         1130|      1230.8|
|     France|     49|         11|         1800|      1500.0|
|    Germany|     48|         15|         1985|      2100.0|
|    Hungary|     45|          5|          620|       750.2|
|    Ireland|     49|          6|          720|       810.3|
|      Italy|     47|         10|         1420|      1325.5|
|  Lithuania|     48|         10|         6000|      1254.2|
|Netherlands|     42|          4|          570|       680.8|
|     Norway|     47|          6|          750|       890.2|
|     Poland|     50|   

Lead() and Lag() function

In [51]:
df.printSchema()

root
 |-- country: string (nullable = true)
 |-- weeknum: integer (nullable = true)
 |-- numinvoices: integer (nullable = true)
 |-- totalquantity: integer (nullable = true)
 |-- invoicevalue: float (nullable = true)



In [None]:
# Using lag() function
df.select(
    col("country"),
    col("weeknum"),
    col("invoicevalue"),
    lag("invoicevalue",1).over(window_spec).alias("preceding_sales")
).show(5)

+-------+-------+------------+--------------+
|country|weeknum|invoicevalue|previous_sales|
+-------+-------+------------+--------------+
|Austria|     44|       880.9|          NULL|
|Austria|     48|       870.9|         880.9|
|Belgium|     43|       970.0|          NULL|
|Belgium|     49|       950.0|         970.0|
|Czechia|     47|       515.0|          NULL|
+-------+-------+------------+--------------+
only showing top 5 rows



In [54]:
# Using lead() function
df.select(
    col("country"),
    col("weeknum"),
    col("invoicevalue"),
    lead("invoicevalue",1).over(window_spec).alias("next_sales")
).show(5)

+-------+-------+------------+----------+
|country|weeknum|invoicevalue|next_sales|
+-------+-------+------------+----------+
|Austria|     44|       880.9|     870.9|
|Austria|     48|       870.9|      NULL|
|Belgium|     43|       970.0|     950.0|
|Belgium|     49|       950.0|      NULL|
|Czechia|     47|       515.0|     500.0|
+-------+-------+------------+----------+
only showing top 5 rows



In [None]:
# Aggregation columns:
# First, we need to groupby "country" and then by "weeknum", in order to group the sales of "weeknum", then applying aggregation
df_agg = df.groupBy("country","weeknum").agg(sum("invoicevalue").alias("weekly_invoice"))

df_agg.where("country IN ('Finland','France')").select(
    col("country"),
    col("weeknum"),
    round(col("weekly_invoice"),1),
    round(avg("weekly_invoice").over(window_spec),1).alias("running_avg"),
    round(sum("weekly_invoice").over(window_spec),1).alias("running_sum")).show()

+-------+-------+------------------------+-----------+-----------+
|country|weeknum|round(weekly_invoice, 1)|running_avg|running_sum|
+-------+-------+------------------------+-----------+-----------+
|Finland|     47|                  1230.8|     1230.8|     1230.8|
|Finland|     49|                  1220.8|     1225.8|     2451.6|
|Finland|     50|                  1550.4|     1334.0|     4002.0|
| France|     45|                  1400.5|     1400.5|     1400.5|
| France|     49|                  2850.0|     2125.3|     4250.5|
| France|     50|                  1200.5|     1817.0|     5451.0|
+-------+-------+------------------------+-----------+-----------+



In [69]:
#1: Second way
df.withColumn("previous_week", lag("invoicevalue").over(window_spec)) \
    .withColumn("different_invoice",expr("invoicevalue - previous_week")).show()

+-------+-------+-----------+-------------+------------+-------------+-----------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|previous_week|different_invoice|
+-------+-------+-----------+-------------+------------+-------------+-----------------+
|Austria|     44|          6|          790|       880.9|         NULL|             NULL|
|Austria|     48|          6|          780|       870.9|        880.9|            -10.0|
|Belgium|     43|          7|          920|       970.0|         NULL|             NULL|
|Belgium|     49|          7|          900|       950.0|        970.0|            -20.0|
|Czechia|     47|          3|          410|       515.0|         NULL|             NULL|
|Czechia|     49|          3|          400|       500.0|        515.0|            -15.0|
|Denmark|     48|          4|          510|       645.0|         NULL|             NULL|
|Denmark|     50|          4|          540|       670.1|        645.0|        25.099976|
|Denmark|     50|    

In [70]:
# Third way with LEAD()
df.withColumn("next_invoice", lead("invoicevalue").over(window_spec)) \
    .withColumn("diff_invoice", expr("invoicevalue - next_invoice")).show()

+-------+-------+-----------+-------------+------------+------------+------------+
|country|weeknum|numinvoices|totalquantity|invoicevalue|next_invoice|diff_invoice|
+-------+-------+-----------+-------------+------------+------------+------------+
|Austria|     44|          6|          790|       880.9|       870.9|        10.0|
|Austria|     48|          6|          780|       870.9|        NULL|        NULL|
|Belgium|     43|          7|          920|       970.0|       950.0|        20.0|
|Belgium|     49|          7|          900|       950.0|        NULL|        NULL|
|Czechia|     47|          3|          410|       515.0|       500.0|        15.0|
|Czechia|     49|          3|          400|       500.0|        NULL|        NULL|
|Denmark|     48|          4|          510|       645.0|       670.1|  -25.099976|
|Denmark|     50|          4|          540|       670.1|       820.6|      -150.5|
|Denmark|     50|          6|          700|       820.6|        NULL|        NULL|
|Fin