In [15]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Lead_Lag").getOrCreate()

In [16]:
# Define schema
schema = ["country", "week_number", "number_of_invoices", "total_quantity", "invoice_values"]

# Data for each country (with repeated invoice values for ranking)
data = [
    ("Portugal", 22, 19, 1525, 4975.32), ("Portugal", 22, 18, 1230, 4975.32),
    ("Germany", 2, 14, 1680, 6040.99), ("Germany", 2, 15, 1890, 6040.99),
    ("Netherlands", 49, 5, 28, 103.78), ("Netherlands", 49, 7, 42, 103.78),
    ("Spain", 4, 16, 730, 1909.05), ("Spain", 4, 15, 680, 1909.05),
    ("Netherlands", 29, 4, 1723, 3435.96), ("Netherlands", 29, 3, 1543, 3435.96),
    ("Germany", 25, 5, 1169, 5154.49), ("Germany", 25, 4, 987, 5154.49),
    ("France", 47, 9, 1644, 3987.62), ("France", 47, 8, 1456, 3987.62),
    ("Spain", 49, 11, 657, 2881.72), ("Spain", 49, 10, 590, 2881.72),
    ("France", 18, 3, 1882, 9171.42), ("France", 18, 2, 1673, 9171.42),
    ("Germany", 42, 18, 1719, 1383.78), ("Germany", 42, 17, 1532, 1383.78),
    
]

df = spark.createDataFrame(data, schema=schema)

df.show()  

+-----------+-----------+------------------+--------------+--------------+
|    country|week_number|number_of_invoices|total_quantity|invoice_values|
+-----------+-----------+------------------+--------------+--------------+
|   Portugal|         22|                19|          1525|       4975.32|
|   Portugal|         22|                18|          1230|       4975.32|
|    Germany|          2|                14|          1680|       6040.99|
|    Germany|          2|                15|          1890|       6040.99|
|Netherlands|         49|                 5|            28|        103.78|
|Netherlands|         49|                 7|            42|        103.78|
|      Spain|          4|                16|           730|       1909.05|
|      Spain|          4|                15|           680|       1909.05|
|Netherlands|         29|                 4|          1723|       3435.96|
|Netherlands|         29|                 3|          1543|       3435.96|
|    Germany|         25|

In [21]:
df.createOrReplaceTempView("dftable")

In [17]:
from pyspark.sql import Window
from pyspark.sql.functions import *

In [18]:
my_window = Window.partitionBy("country") \
                    .orderBy(asc("week_number"))

In [19]:
df.withColumn("previous_week_sale" , lag("invoice_values" , 1).over(my_window)).show()

+-----------+-----------+------------------+--------------+--------------+------------------+
|    country|week_number|number_of_invoices|total_quantity|invoice_values|previous_week_sale|
+-----------+-----------+------------------+--------------+--------------+------------------+
|     France|         18|                 3|          1882|       9171.42|              null|
|     France|         18|                 2|          1673|       9171.42|           9171.42|
|     France|         47|                 9|          1644|       3987.62|           9171.42|
|     France|         47|                 8|          1456|       3987.62|           3987.62|
|    Germany|          2|                14|          1680|       6040.99|              null|
|    Germany|          2|                15|          1890|       6040.99|           6040.99|
|    Germany|         25|                 5|          1169|       5154.49|           6040.99|
|    Germany|         25|                 4|           987| 

In [20]:
df.withColumn("previous_2week_sale" , lag("invoice_values" , 2).over(my_window)).show()

+-----------+-----------+------------------+--------------+--------------+-------------------+
|    country|week_number|number_of_invoices|total_quantity|invoice_values|previous_2week_sale|
+-----------+-----------+------------------+--------------+--------------+-------------------+
|     France|         18|                 3|          1882|       9171.42|               null|
|     France|         18|                 2|          1673|       9171.42|               null|
|     France|         47|                 9|          1644|       3987.62|            9171.42|
|     France|         47|                 8|          1456|       3987.62|            9171.42|
|    Germany|          2|                14|          1680|       6040.99|               null|
|    Germany|          2|                15|          1890|       6040.99|               null|
|    Germany|         25|                 5|          1169|       5154.49|            6040.99|
|    Germany|         25|                 4|      

In [9]:
df.withColumn("upcoming_week_sale" , lead("invoice_values" , 1).over(my_window)).show()

+-----------+-----------+------------------+--------------+--------------+------------------+
|    country|week_number|number_of_invoices|total_quantity|invoice_values|upcoming_week_sale|
+-----------+-----------+------------------+--------------+--------------+------------------+
|     France|         18|                 3|          1882|       9171.42|           9171.42|
|     France|         18|                 2|          1673|       9171.42|           3987.62|
|     France|         47|                 9|          1644|       3987.62|           3987.62|
|     France|         47|                 8|          1456|       3987.62|              null|
|    Germany|          2|                14|          1680|       6040.99|           6040.99|
|    Germany|          2|                15|          1890|       6040.99|           5154.49|
|    Germany|         25|                 5|          1169|       5154.49|           5154.49|
|    Germany|         25|                 4|           987| 

In [10]:
df.withColumn("upcoming_2week_sale" , lead("invoice_values" , 2).over(my_window)).show()

+-----------+-----------+------------------+--------------+--------------+-------------------+
|    country|week_number|number_of_invoices|total_quantity|invoice_values|upcoming_2week_sale|
+-----------+-----------+------------------+--------------+--------------+-------------------+
|     France|         18|                 3|          1882|       9171.42|            3987.62|
|     France|         18|                 2|          1673|       9171.42|            3987.62|
|     France|         47|                 9|          1644|       3987.62|               null|
|     France|         47|                 8|          1456|       3987.62|               null|
|    Germany|          2|                14|          1680|       6040.99|            5154.49|
|    Germany|          2|                15|          1890|       6040.99|            5154.49|
|    Germany|         25|                 5|          1169|       5154.49|            1383.78|
|    Germany|         25|                 4|      

In [28]:
df.withColumn("upcoming_week_sale",
expr("lead(invoice_values ,1) over(partition by country order by week_number )")).show()

+-----------+-----------+------------------+--------------+--------------+------------------+
|    country|week_number|number_of_invoices|total_quantity|invoice_values|upcoming_week_sale|
+-----------+-----------+------------------+--------------+--------------+------------------+
|     France|         18|                 3|          1882|       9171.42|           9171.42|
|     France|         18|                 2|          1673|       9171.42|           3987.62|
|     France|         47|                 9|          1644|       3987.62|           3987.62|
|     France|         47|                 8|          1456|       3987.62|              null|
|    Germany|          2|                14|          1680|       6040.99|           6040.99|
|    Germany|          2|                15|          1890|       6040.99|           5154.49|
|    Germany|         25|                 5|          1169|       5154.49|           5154.49|
|    Germany|         25|                 4|           987| 

In [27]:
spark.sql("""
          select *,
          lead(invoice_values ,1) over(partition by country order by week_number ) as upcoming_sales
          from dftable
          """).show()

+-----------+-----------+------------------+--------------+--------------+--------------+
|    country|week_number|number_of_invoices|total_quantity|invoice_values|upcoming_sales|
+-----------+-----------+------------------+--------------+--------------+--------------+
|     France|         18|                 3|          1882|       9171.42|       9171.42|
|     France|         18|                 2|          1673|       9171.42|       3987.62|
|     France|         47|                 9|          1644|       3987.62|       3987.62|
|     France|         47|                 8|          1456|       3987.62|          null|
|    Germany|          2|                14|          1680|       6040.99|       6040.99|
|    Germany|          2|                15|          1890|       6040.99|       5154.49|
|    Germany|         25|                 5|          1169|       5154.49|       5154.49|
|    Germany|         25|                 4|           987|       5154.49|       1383.78|
|    Germa

In [11]:
results1_df = df.withColumn("previous_week_sale" , lag("invoice_values" , 1).over(my_window))
results2_df = results1_df.withColumn("invoice_diff" , expr("invoice_values - previous_week_sale"))
results2_df.show()

+-----------+-----------+------------------+--------------+--------------+------------------+-----------------+
|    country|week_number|number_of_invoices|total_quantity|invoice_values|previous_week_sale|     invoice_diff|
+-----------+-----------+------------------+--------------+--------------+------------------+-----------------+
|     France|         18|                 3|          1882|       9171.42|              null|             null|
|     France|         18|                 2|          1673|       9171.42|           9171.42|              0.0|
|     France|         47|                 9|          1644|       3987.62|           9171.42|          -5183.8|
|     France|         47|                 8|          1456|       3987.62|           3987.62|              0.0|
|    Germany|          2|                14|          1680|       6040.99|              null|             null|
|    Germany|          2|                15|          1890|       6040.99|           6040.99|           

In [29]:
df.withColumn("previous_week_sale",
expr("lag(invoice_values ,1) over(partition by country order by week_number )")).show()

+-----------+-----------+------------------+--------------+--------------+------------------+
|    country|week_number|number_of_invoices|total_quantity|invoice_values|previous_week_sale|
+-----------+-----------+------------------+--------------+--------------+------------------+
|     France|         18|                 3|          1882|       9171.42|              null|
|     France|         18|                 2|          1673|       9171.42|           9171.42|
|     France|         47|                 9|          1644|       3987.62|           9171.42|
|     France|         47|                 8|          1456|       3987.62|           3987.62|
|    Germany|          2|                14|          1680|       6040.99|              null|
|    Germany|          2|                15|          1890|       6040.99|           6040.99|
|    Germany|         25|                 5|          1169|       5154.49|           6040.99|
|    Germany|         25|                 4|           987| 

In [30]:
spark.sql("""
          select *,
          lag(invoice_values ,1) over(partition by country order by week_number ) as previous_sales
          from dftable
          """).show()

+-----------+-----------+------------------+--------------+--------------+--------------+
|    country|week_number|number_of_invoices|total_quantity|invoice_values|previous_sales|
+-----------+-----------+------------------+--------------+--------------+--------------+
|     France|         18|                 3|          1882|       9171.42|          null|
|     France|         18|                 2|          1673|       9171.42|       9171.42|
|     France|         47|                 9|          1644|       3987.62|       9171.42|
|     France|         47|                 8|          1456|       3987.62|       3987.62|
|    Germany|          2|                14|          1680|       6040.99|          null|
|    Germany|          2|                15|          1890|       6040.99|       6040.99|
|    Germany|         25|                 5|          1169|       5154.49|       6040.99|
|    Germany|         25|                 4|           987|       5154.49|       5154.49|
|    Germa

In [12]:
results1_df = df.withColumn("next_week_sale" , lead("invoice_values" , 1).over(my_window))
results2_df = results1_df.withColumn("invoice_diff" , expr("next_week_sale - invoice_values"))
results2_df.show()

+-----------+-----------+------------------+--------------+--------------+--------------+-----------------+
|    country|week_number|number_of_invoices|total_quantity|invoice_values|next_week_sale|     invoice_diff|
+-----------+-----------+------------------+--------------+--------------+--------------+-----------------+
|     France|         18|                 3|          1882|       9171.42|       9171.42|              0.0|
|     France|         18|                 2|          1673|       9171.42|       3987.62|          -5183.8|
|     France|         47|                 9|          1644|       3987.62|       3987.62|              0.0|
|     France|         47|                 8|          1456|       3987.62|          null|             null|
|    Germany|          2|                14|          1680|       6040.99|       6040.99|              0.0|
|    Germany|          2|                15|          1890|       6040.99|       5154.49|           -886.5|
|    Germany|         25|   

In [14]:
df.withColumn("total_invoice_value" , sum("invoice_values").over(my_window)).show()

+-----------+-----------+------------------+--------------+--------------+-------------------+
|    country|week_number|number_of_invoices|total_quantity|invoice_values|total_invoice_value|
+-----------+-----------+------------------+--------------+--------------+-------------------+
|     France|         18|                 3|          1882|       9171.42|           18342.84|
|     France|         18|                 2|          1673|       9171.42|           18342.84|
|     France|         47|                 9|          1644|       3987.62| 26318.079999999998|
|     France|         47|                 8|          1456|       3987.62| 26318.079999999998|
|    Germany|          2|                14|          1680|       6040.99|           12081.98|
|    Germany|          2|                15|          1890|       6040.99|           12081.98|
|    Germany|         25|                 5|          1169|       5154.49|           22390.96|
|    Germany|         25|                 4|      