In [142]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, date_format, concat_ws, to_timestamp, to_date, expr, StringType, collect_list, when
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("FB").getOrCreate()

df = spark.read.csv("C:/Users/ThinkPad/GitProjects/FootballPredictions/E0.csv", header=True, inferSchema=True)
df2 = spark.read.csv("C:/Users/ThinkPad/GitProjects/FootballPredictions/E0 (1).csv", header=True, inferSchema=True)
df3 = spark.read.csv("C:/Users/ThinkPad/GitProjects/FootballPredictions/E0 (2).csv", header=True, inferSchema=True)
df4 = spark.read.csv("C:/Users/ThinkPad/GitProjects/FootballPredictions/E0 (3).csv", header=True, inferSchema=True)


In [143]:
df.show(5)

+---+----------+-------------------+-----------+-------------+----+----+---+----+----+---+---------+---+---+---+---+---+---+---+---+---+---+---+---+-----+-----+-----+----+----+----+----+----+----+----+----+-----+----+---+----+----+----+----+----+----+----+----+----+-----+--------+--------+-----+-----+-------+-------+-------+-------+-----+-------+-------+----+----+------+------+------+------+------+------+------+----+----+----+----+----+----+----+----+-----+----+----+----+----+----+----+-----+-----+-----+-----+-----+-----+---------+---------+------+------+--------+--------+--------+--------+-----+--------+--------+-----+-----+-------+-------+-------+-------+
|Div|      Date|               Time|   HomeTeam|     AwayTeam|FTHG|FTAG|FTR|HTHG|HTAG|HTR|  Referee| HS| AS|HST|AST| HF| AF| HC| AC| HY| AY| HR| AR|B365H|B365D|B365A| BWH| BWD| BWA| IWH| IWD| IWA| PSH| PSD|  PSA| WHH|WHD| WHA| VCH| VCD| VCA|MaxH|MaxD|MaxA|AvgH|AvgD| AvgA|B365>2.5|B365<2.5|P>2.5|P<2.5|Max>2.5|Max<2.5|Avg>2.5|Avg<2.5|

In [144]:
df = df.union(df2)
df = df.union(df3)
df = df.union(df4)

In [145]:
df = df.drop("Div")
df = df.select("Date", "Time", "HomeTeam", "AwayTeam", "FTHG", "FTAG", "FTR", "HTHG", "HTAG", "HTR", "Referee", "HS", "AS", "HST", "AST", "HF", "AF", "HC", "AC", "HY", "AY", "HR", "AR")

In [146]:
df = df.withColumn("corrected_date", expr("concat('20', substr(date, 9, 2), '-', substr(date, 4, 2), '-', substr(date, 1, 2))"))
df = df.withColumn("corrected_date_type", to_date(col("corrected_date")))
df = df.withColumn("updated_datetime", (to_timestamp(concat_ws(' ', date_format(col("corrected_date_type"), 'yyyy-MM-dd'), date_format(col("Time"), 'HH:mm:ss')))))
df = df.drop("Date", "Time", "corrected_date")
df = df.withColumnRenamed("corrected_date_type", "Date")
df = df.withColumnRenamed("updated_datetime", "Time")

In [147]:
dfHome = df.select(col("Time"), col("HomeTeam").alias("Team"), col("FTR"))
dfAway = df.select(col("Time"), col("AwayTeam").alias("Team"), col("FTR"))

dfHome = dfHome.withColumn("Result", when(dfHome.FTR == "H", "W")
                           .when(dfHome.FTR == "A", "L")
                           .when(dfHome.FTR == "D", "D"))

dfAway = dfAway.withColumn("Result", when(dfAway.FTR == "H", "W")
                           .when(dfAway.FTR == "A", "L")
                           .when(dfAway.FTR == "D", "D"))

dfForm = dfHome.union(dfAway).orderBy("Time")
dfForm = dfForm.select("Time", "Team", "Result")

#Last 5 game results for the home team
window_spec = Window.partitionBy("Team").orderBy("Time").rowsBetween(Window.unboundedPreceding, Window.currentRow)

dfForm = dfForm.withColumn("results_list", collect_list("Result").over(window_spec))

dfForm.createOrReplaceTempView("games")
query = """
SELECT *,
    CASE 
        WHEN size(results_list) >= 5 THEN concat_ws('', slice(results_list, size(results_list) - 4, 5))
        ELSE concat_ws('', results_list)
    END AS TeamForm
FROM games
"""

dfForm = spark.sql(query)
dfForm = dfForm.drop("results_list")

home_form_df = dfForm.withColumnRenamed("Team", "HomeTeam").withColumnRenamed("TeamForm", "HomeTeamForm").withColumnRenamed("Result", "FTR1").withColumnRenamed("Time", "Time1")
away_form_df = dfForm.withColumnRenamed("Team", "AwayTeam").withColumnRenamed("TeamForm", "AwayTeamForm").withColumnRenamed("Result", "FTR1").withColumnRenamed("Time", "Time1")

# Join on HomeTeam and Time
df = df.join(home_form_df, on=[df.HomeTeam == home_form_df.HomeTeam, df.Time == home_form_df.Time1], how="left").drop(home_form_df.HomeTeam).drop(home_form_df.Time1).drop(home_form_df.FTR1)

# Join on AwayTeam and Time
df = df.join(away_form_df, on=[df.AwayTeam == away_form_df.AwayTeam, df.Time == away_form_df.Time1], how="left").drop(away_form_df.AwayTeam).drop(away_form_df.Time1).drop(away_form_df.FTR1)



df.orderBy(["Time"], ascending=True).show()

+----------------+----------------+----+----+---+----+----+---+----------+---+---+---+---+---+---+---+---+---+---+---+---+----------+-------------------+------------+------------+
|        HomeTeam|        AwayTeam|FTHG|FTAG|FTR|HTHG|HTAG|HTR|   Referee| HS| AS|HST|AST| HF| AF| HC| AC| HY| AY| HR| AR|      Date|               Time|HomeTeamForm|AwayTeamForm|
+----------------+----------------+----+----+---+----+----+---+----------+---+---+---+---+---+---+---+---+---+---+---+---+----------+-------------------+------------+------------+
|          Fulham|         Arsenal|   0|   3|  A|   0|   1|  A|C Kavanagh|  5| 13|  2|  6| 12| 12|  2|  3|  2|  2|  0|  0|2020-09-12|2020-09-12 12:30:00|           L|           L|
|  Crystal Palace|     Southampton|   1|   0|  H|   1|   0|  H|    J Moss|  5|  9|  3|  5| 14| 11|  7|  3|  2|  1|  0|  0|2020-09-12|2020-09-12 15:00:00|           W|           W|
|       Liverpool|           Leeds|   4|   3|  H|   3|   2|  H|  M Oliver| 22|  6|  6|  3|  9|  6|  

In [148]:
df.groupBy("Referee").count().orderBy("count", ascending=False).show(300)

+------------+-----+
|     Referee|count|
+------------+-----+
|    A Taylor|  113|
|    M Oliver|  108|
|   P Tierney|  103|
|    C Pawson|   89|
|   S Attwell|   85|
|    S Hooper|   81|
|    A Madley|   79|
|     D Coote|   78|
|  C Kavanagh|   73|
|     R Jones|   69|
|    P Bankes|   59|
|  A Marriner|   57|
|   D England|   56|
|  M Atkinson|   52|
|      M Dean|   51|
|      J Moss|   49|
|   J Gillett|   46|
|    J Brooks|   45|
|    K Friend|   43|
|     G Scott|   37|
| M Salisbury|   32|
|  T Robinson|   23|
|   T Bramall|   19|
|T Harrington|   19|
|   S Barrott|   15|
|     L Mason|   11|
|      D Bond|    9|
|     J Smith|    5|
|   S Allison|    3|
|    R Madley|    2|
|     R Welch|    2|
|     L Smith|    2|
|   M Donohue|    2|
|  J Gillett |    1|
|     S Singh|    1|
|      A Moss|    1|
+------------+-----+



In [149]:
df.head(5)

[Row(HomeTeam='Burnley', AwayTeam='Man City', FTHG=0, FTAG=3, FTR='A', HTHG=0, HTAG=2, HTR='A', Referee='C Pawson', HS=6, AS=17, HST=1, AST=8, HF=11, AF=8, HC=6, AC=5, HY=0, AY=0, HR=1, AR=0, Date=datetime.date(2023, 8, 11), Time=datetime.datetime(2023, 8, 11, 20, 0), HomeTeamForm='LWDLL', AwayTeamForm='LWDWL'),
 Row(HomeTeam='Arsenal', AwayTeam="Nott'm Forest", FTHG=2, FTAG=1, FTR='H', HTHG=2, HTAG=0, HTR='H', Referee='M Oliver', HS=15, AS=6, HST=7, AST=2, HF=12, AF=12, HC=8, AC=3, HY=2, AY=2, HR=0, AR=0, Date=datetime.date(2023, 8, 12), Time=datetime.datetime(2023, 8, 12, 12, 30), HomeTeamForm='LLWWW', AwayTeamForm='WDWDW'),
 Row(HomeTeam='Bournemouth', AwayTeam='West Ham', FTHG=1, FTAG=1, FTR='D', HTHG=0, HTAG=0, HTR='D', Referee='P Bankes', HS=14, AS=16, HST=5, AST=3, HF=9, AF=14, HC=10, AC=4, HY=1, AY=4, HR=0, AR=0, Date=datetime.date(2023, 8, 12), Time=datetime.datetime(2023, 8, 12, 15, 0), HomeTeamForm='LWLWD', AwayTeamForm='WWWWD'),
 Row(HomeTeam='Brighton', AwayTeam='Luton', F