In [34]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window


In [35]:
spark = SparkSession.builder.appName("Football Window Functions").getOrCreate()


In [37]:
# url = 'https://raw.githubusercontent.com/footballcsv/world/main/WorldCup.csv' # Original URL
# football_df = spark.read.csv(url, header=True, inferSchema=True) # Original code

# Reading from a sample CSV file available in the Colab environment
football_df = spark.read.csv("/content/nl.1.csv", header=True, inferSchema=True)
football_df.show(10)
football_df.createOrReplaceTempView('football_table')

+-----+--------------+-------------------+---+-----------------+
|Round|          Date|             Team 1| FT|           Team 2|
+-----+--------------+-------------------+---+-----------------+
|    1|Fri Aug 2 2019|         PEC Zwolle|1-3|Willem II Tilburg|
|    1|Sat Aug 3 2019|     Vitesse Arnhem|2-2|   Ajax Amsterdam|
|    1|Sat Aug 3 2019|           FC Emmen|0-1|     FC Groningen|
|    1|Sat Aug 3 2019|          VVV Venlo|3-1|     RKC Waalwijk|
|    1|Sat Aug 3 2019|          FC Twente|1-1|    PSV Eindhoven|
|    1|Sun Aug 4 2019|    Heracles Almelo|0-4|    SC Heerenveen|
|    1|Sun Aug 4 2019|Feyenoord Rotterdam|2-2| Sparta Rotterdam|
|    1|Sun Aug 4 2019|       ADO Den Haag|2-4|       FC Utrecht|
|    1|Sun Aug 4 2019|         AZ Alkmaar|4-0|  Fortuna Sittard|
|    2|Fri Aug 9 2019|   Sparta Rotterdam|4-1|        VVV Venlo|
+-----+--------------+-------------------+---+-----------------+
only showing top 10 rows



In [48]:
# Drop any potentially ambiguous 'Goals' columns that might exist from previous steps
football_df = football_df.drop('Goals')

# Re-extract Year from Date and HomeGoals from FT within this cell
from pyspark.sql.types import IntegerType
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY") # Ensure date parsing works
football_df = football_df.withColumn('Year', F.year(F.to_date('Date', 'EEE MMM d yyyy')))
football_df = football_df.withColumn('HomeGoals', F.split(football_df['FT'], '-').getItem(0).cast(IntegerType()))

# Rename HomeGoals to Goals for use in the window function
football_df_cleaned = football_df.withColumnRenamed('HomeGoals', 'Goals')

# Select necessary columns for the window function
football_df_cleaned = football_df_cleaned.select('Round', 'Date', 'Team 1', 'FT', 'Team 2', 'Year', 'Goals', 'AwayGoals')


windowSpec = Window.partitionBy('Year').orderBy(F.desc('Goals'))
# Apply window function on the cleaned DataFrame
football_ranked = football_df_cleaned.withColumn('Rank', F.rank().over(windowSpec))
football_ranked.show(10)

+-----+---------------+-------------------+---+----------------+----+-----+---------+----+
|Round|           Date|             Team 1| FT|          Team 2|Year|Goals|AwayGoals|Rank|
+-----+---------------+-------------------+---+----------------+----+-----+---------+----+
|    6|Sun Sep 15 2019|         PEC Zwolle|6-2|    RKC Waalwijk|2019|    6|        2|   1|
|   12| Sun Nov 3 2019|         FC Utrecht|6-0| Fortuna Sittard|2019|    6|        0|   1|
|   13| Sat Nov 9 2019|    Heracles Almelo|6-1|       VVV Venlo|2019|    6|        1|   1|
|   18|Sun Dec 22 2019|     Ajax Amsterdam|6-1|    ADO Den Haag|2019|    6|        1|   1|
|    2|Sat Aug 10 2019|     Ajax Amsterdam|5-0|        FC Emmen|2019|    5|        0|   5|
|    6|Sat Sep 14 2019|         AZ Alkmaar|5-1|Sparta Rotterdam|2019|    5|        1|   5|
|    6|Sat Sep 14 2019|      PSV Eindhoven|5-0|  Vitesse Arnhem|2019|    5|        0|   5|
|    4|Wed Sep 25 2019|     Ajax Amsterdam|5-0| Fortuna Sittard|2019|    5|        0|   5|

In [50]:
from pyspark.sql.types import IntegerType

# Ensure legacy time parser policy is set
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Re-extract Year from Date and HomeGoals/AwayGoals from FT within this cell
football_df_cumulative = football_df.withColumn('Year', F.year(F.to_date('Date', 'EEE MMM d yyyy')))
football_df_cumulative = football_df_cumulative.withColumn('HomeGoals', F.split(football_df_cumulative['FT'], '-').getItem(0).cast(IntegerType()))
football_df_cumulative = football_df_cumulative.withColumn('AwayGoals', F.split(football_df_cumulative['FT'], '-').getItem(1).cast(IntegerType()))

# Rename HomeGoals to Goals for use in the window function for cumulative home goals
football_df_cumulative = football_df_cumulative.withColumnRenamed('HomeGoals', 'Goals')

# Define the window specification: partition by Team 1, order by Year and Date for cumulative sum
windowSpec2 = Window.partitionBy('Team 1').orderBy('Year', 'Date').rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Apply window function to calculate cumulative goals (home goals for Team 1)
football_cumulative = football_df_cumulative.withColumn('CumulativeGoals', F.sum('Goals').over(windowSpec2))

football_cumulative.show(10)

+-----+---------------+------------+----+-----------------+----+---------+-----+---------------+
|Round|           Date|      Team 1|  FT|           Team 2|Year|AwayGoals|Goals|CumulativeGoals|
+-----+---------------+------------+----+-----------------+----+---------+-----+---------------+
|    3|Sat Aug 17 2019|ADO Den Haag| 1-2| Sparta Rotterdam|2019|        2|    1|              1|
|    5|Sat Aug 31 2019|ADO Den Haag| 1-0|        VVV Venlo|2019|        0|    1|              2|
|   17|Sat Dec 14 2019|ADO Den Haag| 1-1|     FC Groningen|2019|        1|    1|              3|
|   16| Sat Dec 7 2019|ADO Den Haag| 0-0|        FC Twente|2019|        0|    0|              3|
|   14|Sat Nov 23 2019|ADO Den Haag| 3-3|Willem II Tilburg|2019|        3|    3|              6|
|    1| Sun Aug 4 2019|ADO Den Haag| 2-4|       FC Utrecht|2019|        4|    2|              8|
|   12| Sun Nov 3 2019|ADO Den Haag| 1-1|    SC Heerenveen|2019|        1|    1|              9|
|    9| Sun Oct 6 2019|ADO Den

In [54]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY") # Ensure date parsing works in SQL
query = '''
SELECT *,
       YEAR(TO_DATE(Date, 'EEE MMM d yyyy')) as Year,
       CAST(SPLIT(FT, '-')[0] AS INT) as HomeGoals,
       LAG(CAST(SPLIT(FT, '-')[0] AS INT), 1) OVER(PARTITION BY "Team 1" ORDER BY YEAR(TO_DATE(Date, 'EEE MMM d yyyy'))) as PrevHomeGoals,
       LEAD(CAST(SPLIT(FT, '-')[0] AS INT), 1) OVER(PARTITION BY "Team 1" ORDER BY YEAR(TO_DATE(Date, 'EEE MMM d yyyy'))) as NextHomeGoals
FROM football_table
'''
football_lag_lead = spark.sql(query)
football_lag_lead.show(10)

+-----+--------------+-------------------+---+-----------------+----+---------+---------+----+---------+-------------+-------------+
|Round|          Date|             Team 1| FT|           Team 2|Year|AwayGoals|HomeGoals|Year|HomeGoals|PrevHomeGoals|NextHomeGoals|
+-----+--------------+-------------------+---+-----------------+----+---------+---------+----+---------+-------------+-------------+
|    1|Fri Aug 2 2019|         PEC Zwolle|1-3|Willem II Tilburg|2019|        3|        1|2019|        1|         NULL|            2|
|    1|Sat Aug 3 2019|     Vitesse Arnhem|2-2|   Ajax Amsterdam|2019|        2|        2|2019|        2|            1|            0|
|    1|Sat Aug 3 2019|           FC Emmen|0-1|     FC Groningen|2019|        1|        0|2019|        0|            2|            3|
|    1|Sat Aug 3 2019|          VVV Venlo|3-1|     RKC Waalwijk|2019|        1|        3|2019|        3|            0|            1|
|    1|Sat Aug 3 2019|          FC Twente|1-1|    PSV Eindhoven|2019|