In [218]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import to_date, col, split, regexp_replace, when, count, sum, concat_ws, least, greatest, lag, collect_list, array_join

In [219]:
spark = SparkSession.builder.appName("epl").getOrCreate()

In [220]:
spark

### Code for Results

In [221]:
resutls_path = r"C:\Users\Stanisław\Desktop\Programowanie\GitHub\PL_predictions\epl_predictions\data\raw\results.csv"

df = spark.read.option("header", True).csv(resutls_path)

In [222]:
df.show(5)

+-----+---+---+----------+----+-------------+-----+---------------+----------+---------------+------------+--------------------+-----+-------+-------+---------+
|index| Wk|Day|      Date|Time|         Home|Score|           Away|Attendance|          Venue|     Referee|        Match Report|Notes|xG_Home|xG_Away|   Season|
+-----+---+---+----------+----+-------------+-----+---------------+----------+---------------+------------+--------------------+-----+-------+-------+---------+
|    0|  1|Sat|1995-08-19|NULL|  Southampton|  3–4|Nott'ham Forest|    15,164|       The Dell|Gary Willard|https://fbref.com...| NULL|   NULL|   NULL|1995-1996|
|    1|  1|Sat|1995-08-19|NULL|Newcastle Utd|  3–0|  Coventry City|    36,485|St. James' Park|Roger Dilkes|https://fbref.com...| NULL|   NULL|   NULL|1995-1996|
|    2|  1|Sat|1995-08-19|NULL|    Wimbledon|  3–2|         Bolton|     9,317|  Selhurst Park|Keith Cooper|https://fbref.com...| NULL|   NULL|   NULL|1995-1996|
|    3|  1|Sat|1995-08-19|NULL|   

In [223]:
df = df.withColumn("index", col("index").cast("integer"))

Casting Data column to right format

In [224]:
df = df.withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))

Creating Home_score and Away_score columns

In [225]:
df = df.withColumn("Home_score", split(col("Score"), "–").getItem(0).cast("integer"))
df = df.withColumn("Away_score", split(col("Score"), "–").getItem(1).cast("integer"))

Changing type of xG columns to float

In [226]:
df = df.withColumn("xG_Home", col("xG_Home").cast("float"))
df = df.withColumn("xG_Away", col("xG_Away").cast("float"))

Deleting Notes column

In [227]:
df = df.drop("Notes")

Changing type of Attendance column to int

In [228]:
df = df.withColumn("Attendance", regexp_replace(col("Attendance"), ",", "").cast("integer"))

Creating column Match_result

In [229]:
df = df.withColumn("Score_diff", col("Home_score") - col("Away_Score"))

result_condition = when(col("Score_diff") > 0, "W").when(col("Score_diff") == 0, "D").otherwise("L")
df = df.withColumn("Match_result", result_condition)

In [230]:
df.show(5)

+-----+---+---+----------+----+-------------+-----+---------------+----------+---------------+------------+--------------------+-------+-------+---------+----------+----------+----------+------------+
|index| Wk|Day|      Date|Time|         Home|Score|           Away|Attendance|          Venue|     Referee|        Match Report|xG_Home|xG_Away|   Season|Home_score|Away_score|Score_diff|Match_result|
+-----+---+---+----------+----+-------------+-----+---------------+----------+---------------+------------+--------------------+-------+-------+---------+----------+----------+----------+------------+
|    0|  1|Sat|1995-08-19|NULL|  Southampton|  3–4|Nott'ham Forest|     15164|       The Dell|Gary Willard|https://fbref.com...|   NULL|   NULL|1995-1996|         3|         4|        -1|           L|
|    1|  1|Sat|1995-08-19|NULL|Newcastle Utd|  3–0|  Coventry City|     36485|St. James' Park|Roger Dilkes|https://fbref.com...|   NULL|   NULL|1995-1996|         3|         0|         3|         

Creating column points_last_5_matches for home and away team

In [231]:
#Creating two df where the main team is home team or away team
home_df = df.select(col("Date"), col("Home").alias("Team"), col("Away").alias("Opponent"), col("Match_result").alias("Result"))
away_df = df.select(col("Date"), col("Away").alias("Team"), col("Home").alias("Opponent"), when(col("Match_result") == "W", "L").when(col("Match_result") == "L", "W").otherwise("D").alias("Result"))

In [232]:
#Combining two df. Now we have with double the rows, beacuase we split the home and away team, to new rows
combined_df = home_df.union(away_df)
combined_df = combined_df.withColumn("Points", when(col("Result") == "W", 3).when(col("Result") == "L", 0).otherwise(1))

In [233]:
match_count_window = Window.partitionBy("Team").orderBy("Date").rowsBetween(Window.unboundedPreceding, -1)

combined_df = combined_df.withColumn("Match_count", count("Points").over(match_count_window))

In [234]:
# match_count_window = Window.partitionBy("Team").orderBy("Date").rowsBetween(-5, -1)

# combined_df = combined_df.withColumn("Match_count", collect_list("Result").over(match_count_window)) \
#                             .withColumn("Match_count", array_join("Match_count", ''))

In [235]:
#Sum of last 5 matches
form_window = Window.partitionBy("Team").orderBy("Date").rowsBetween(-5, -1)

combined_df = combined_df.withColumn("Form_last_5_matches", collect_list("Result").over(form_window)) \
                        .withColumn("Form_last_5_matches", when(col("Match_count") >= 5, array_join("Form_last_5_matches", '')).otherwise(None))

In [236]:
combined_df.show(10)

+----------+-------+---------------+------+------+-----------+-------------------+
|      Date|   Team|       Opponent|Result|Points|Match_count|Form_last_5_matches|
+----------+-------+---------------+------+------+-----------+-------------------+
|1995-08-20|Arsenal|  Middlesbrough|     D|     1|          0|               NULL|
|1995-08-23|Arsenal|        Everton|     W|     3|          1|               NULL|
|1995-08-26|Arsenal|  Coventry City|     D|     1|          2|               NULL|
|1995-08-29|Arsenal|Nott'ham Forest|     D|     1|          3|               NULL|
|1995-09-10|Arsenal|Manchester City|     W|     3|          4|               NULL|
|1995-09-16|Arsenal|       West Ham|     W|     3|          5|              DWDDW|
|1995-09-23|Arsenal|    Southampton|     W|     3|          6|              WDDWW|
|1995-09-30|Arsenal|        Chelsea|     L|     0|          7|              DDWWW|
|1995-10-14|Arsenal|   Leeds United|     W|     3|          8|              DWWWL|
|199

In [237]:
#This line of code fix te issue with naming of columns and adding the same columns with join
combined_df_prepared = combined_df.select("Date", "Team", "Form_last_5_matches").withColumnRenamed("Date", "Date_combined")

In [238]:
#Adding form column for the home team, to original df
df= df.join(combined_df_prepared, (df["Date"] == combined_df_prepared["Date_combined"]) & (df["Home"] == combined_df_prepared["Team"]), "left") \
       .withColumnRenamed("Form_last_5_matches", "Home_form_last_5_matches") \
       .drop("Date_combined", "Team")

In [239]:
#Adding form column for the away team, to original df
df = df.join(combined_df_prepared, (df["Date"] == combined_df_prepared["Date_combined"]) & (df["Away"] == combined_df_prepared["Team"]), "left") \
       .withColumnRenamed("Form_last_5_matches", "Away_form_last_5_matches") \
       .drop("Date_combined", "Team")

In [240]:
df = df.orderBy("Date")

In [241]:
df.show(200)

+-----+---+---+----------+----+---------------+-----+---------------+----------+-----------------+----------------+--------------------+-------+-------+---------+----------+----------+----------+------------+------------------------+------------------------+
|index| Wk|Day|      Date|Time|           Home|Score|           Away|Attendance|            Venue|         Referee|        Match Report|xG_Home|xG_Away|   Season|Home_score|Away_score|Score_diff|Match_result|Home_form_last_5_matches|Away_form_last_5_matches|
+-----+---+---+----------+----+---------------+-----+---------------+----------+-----------------+----------------+--------------------+-------+-------+---------+----------+----------+----------+------------+------------------------+------------------------+
|    0|  1|Sat|1995-08-19|NULL|    Southampton|  3–4|Nott'ham Forest|     15164|         The Dell|    Gary Willard|https://fbref.com...|   NULL|   NULL|1995-1996|         3|         4|        -1|           L|               

Creating result of the previous match between two teams

In [242]:
head_to_head = df.select(col("Date").alias("Date_hth"), 
                         col("Home").alias("Home_hth"), 
                         col("Away").alias("Away_hth"), 
                         col("Match_result").alias("Match_result_hth"))

head_to_head.show()

+----------+---------------+---------------+----------------+
|  Date_hth|       Home_hth|       Away_hth|Match_result_hth|
+----------+---------------+---------------+----------------+
|1995-08-19|    Southampton|Nott'ham Forest|               L|
|1995-08-19|  Newcastle Utd|  Coventry City|               W|
|1995-08-19|      Wimbledon|         Bolton|               W|
|1995-08-19|      Liverpool| Sheffield Weds|               W|
|1995-08-19|       West Ham|   Leeds United|               L|
|1995-08-19|      Blackburn|            QPR|               W|
|1995-08-19|    Aston Villa| Manchester Utd|               W|
|1995-08-19|Manchester City|      Tottenham|               D|
|1995-08-19|        Chelsea|        Everton|               D|
|1995-08-20|        Arsenal|  Middlesbrough|               D|
|1995-08-21|   Leeds United|      Liverpool|               W|
|1995-08-22|         Bolton|  Newcastle Utd|               L|
|1995-08-23|      Tottenham|    Aston Villa|               L|
|1995-08

In [243]:
head_to_head = head_to_head.withColumn("Away_match_result", when(col("Match_result_hth") == "W", "L").when(col("Match_result_hth") == "L", "W").otherwise("D"))

In [244]:
head_to_head = head_to_head.withColumn("Matchup", concat_ws("_", least("Home_hth", "Away_hth"), greatest("Home_hth", "Away_hth")))

In [245]:
head_to_head_window = Window.partitionBy("Matchup").orderBy("Date_hth").rowsBetween(-1, -1)

In [246]:
head_to_head = head_to_head.withColumn(
    "Last_match_between_clubs",
    when(
        (col("Home_hth") == lag("Home_hth").over(head_to_head_window)),
        lag("Match_result_hth").over(head_to_head_window)
    ).when(
        (col("Home_hth") == lag("Away_hth").over(head_to_head_window)),
        lag("Away_match_result").over(head_to_head_window)
    ).otherwise(None)
)

In [247]:
head_to_head.show()

+----------+-----------+-----------+----------------+-----------------+-------------------+------------------------+
|  Date_hth|   Home_hth|   Away_hth|Match_result_hth|Away_match_result|            Matchup|Last_match_between_clubs|
+----------+-----------+-----------+----------------+-----------------+-------------------+------------------------+
|1995-10-21|    Arsenal|Aston Villa|               W|                L|Arsenal_Aston Villa|                    NULL|
|1995-12-02|Aston Villa|    Arsenal|               D|                D|Arsenal_Aston Villa|                       L|
|1996-09-07|Aston Villa|    Arsenal|               D|                D|Arsenal_Aston Villa|                       D|
|1996-12-28|    Arsenal|Aston Villa|               D|                D|Arsenal_Aston Villa|                       D|
|1997-10-26|    Arsenal|Aston Villa|               D|                D|Arsenal_Aston Villa|                       D|
|1998-05-10|Aston Villa|    Arsenal|               W|           

In [248]:
#Adding form column for the away team, to original df
df = df.join(head_to_head, (df["Date"] == head_to_head["Date_hth"]) & (df["Home"] == head_to_head["Home_hth"]), "left") \
       .withColumnRenamed("Last_match_between_clubs", "Home_last_match_between_clubs") \
       .drop("Date_hth", "Home_hth", "Matchup", "Away_hth", "Match_result_hth")

In [252]:
df = df.orderBy("index")

In [253]:
df.show(200)

+-----+---+---+----------+----+---------------+-----+---------------+----------+-----------------+----------------+--------------------+-------+-------+---------+----------+----------+----------+------------+------------------------+------------------------+-----------------+-----------------------------+-----------------------------+
|index| Wk|Day|      Date|Time|           Home|Score|           Away|Attendance|            Venue|         Referee|        Match Report|xG_Home|xG_Away|   Season|Home_score|Away_score|Score_diff|Match_result|Home_form_last_5_matches|Away_form_last_5_matches|Away_match_result|Home_last_match_between_clubs|Away_last_match_between_clubs|
+-----+---+---+----------+----+---------------+-----+---------------+----------+-----------------+----------------+--------------------+-------+-------+---------+----------+----------+----------+------------+------------------------+------------------------+-----------------+-----------------------------+--------------------

In [250]:
df = df.withColumn("Away_last_match_between_clubs", when(col("Home_last_match_between_clubs") == "W", "L") \
                                                    .when(col("Home_last_match_between_clubs") == "D", "D") \
                                                    .when(col("Home_last_match_between_clubs") == "L", "W") \
                                                    .otherwise(None))