In [177]:
import findspark

In [250]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt

In [179]:
spark = SparkSession\
        .builder\
        .appName("ligatutorial")\
        .getOrCreate()

In [180]:
df_matches =  spark.read.format('csv').options(header='true').load("/Users/user/Desktop/GitHub/pyspark_projects/Matches.csv")
df_matches.show(5)

+--------+---+------+----------+-------------+--------------+----+----+---+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|FTHG|FTAG|FTR|
+--------+---+------+----------+-------------+--------------+----+----+---+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|   2|   1|  H|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|   0|   1|  A|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|   1|   1|  D|
|       4| D2|  2009|2010-02-21|Frankfurt FSV|     Karlsruhe|   2|   1|  H|
|       5| D2|  2009|2009-12-06|        Ahlen|     Karlsruhe|   1|   3|  A|
+--------+---+------+----------+-------------+--------------+----+----+---+
only showing top 5 rows



In [181]:
df = df_matches.withColumnRenamed("FTHG", "HomeTeamGoals")\
    .withColumnRenamed("FTAG", "AwayTeamGoals")\
        .withColumnRenamed("FTR", "Result")
df.show(3)

+--------+---+------+----------+-------------+--------------+-------------+-------------+------+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|HomeTeamGoals|AwayTeamGoals|Result|
+--------+---+------+----------+-------------+--------------+-------------+-------------+------+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|            2|            1|     H|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|            0|            1|     A|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|            1|            1|     D|
+--------+---+------+----------+-------------+--------------+-------------+-------------+------+
only showing top 3 rows



In [182]:
df = df\
    .withColumn('HomeTeamWin', when(col('Result') == 'H', 1).otherwise(0)) \
    .withColumn('AwayTeamWin', when(col('Result') == 'A', 1).otherwise(0)) \
    .withColumn('GameTie', when(col('Result') == 'D', 1).otherwise(0))
df.show(3)

+--------+---+------+----------+-------------+--------------+-------------+-------------+------+-----------+-----------+-------+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|HomeTeamGoals|AwayTeamGoals|Result|HomeTeamWin|AwayTeamWin|GameTie|
+--------+---+------+----------+-------------+--------------+-------------+-------------+------+-----------+-----------+-------+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|            2|            1|     H|          1|          0|      0|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|            0|            1|     A|          0|          1|      0|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|            1|            1|     D|          0|          0|      1|
+--------+---+------+----------+-------------+--------------+-------------+-------------+------+-----------+-----------+-------+
only showing top 3 rows



In [183]:
df = df.filter((col("Season") >= 2000) & (col("Season") <= 2010) & (col("Div") == "D1"))
df.show(2)

+--------+---+------+----------+-------------+----------+-------------+-------------+------+-----------+-----------+-------+
|Match_ID|Div|Season|      Date|     HomeTeam|  AwayTeam|HomeTeamGoals|AwayTeamGoals|Result|HomeTeamWin|AwayTeamWin|GameTie|
+--------+---+------+----------+-------------+----------+-------------+-------------+------+-----------+-----------+-------+
|      21| D1|  2009|2010-02-06|       Bochum|Leverkusen|            1|            1|     D|          0|          0|      1|
|      22| D1|  2009|2009-11-22|Bayern Munich|Leverkusen|            1|            1|     D|          0|          0|      1|
+--------+---+------+----------+-------------+----------+-------------+-------------+------+-----------+-----------+-------+
only showing top 2 rows



In [184]:
home = df.groupby('Season', 'HomeTeam') \
       .agg(sum('HomeTeamWin').alias('TotalHomeWin'),
            sum('AwayTeamWin').alias('TotalHomeLoss'),
            sum('GameTie').alias('TotalHomeTie'),
            sum('HomeTeamGoals').alias('HomeScoredGoals'),
            sum('AwayTeamGoals').alias('HomeAgainstGoals')) \
       .withColumnRenamed('HomeTeam', 'Team')

#away game features 
away =  df.groupby('Season', 'AwayTeam') \
       .agg(sum('AwayTeamWin').alias('TotalAwayWin'),
            sum('HomeTeamWin').alias('TotalAwayLoss'),
            sum('GameTie').alias('TotalAwayTie'),
            sum('AwayTeamGoals').alias('AwayScoredGoals'),
            sum('HomeTeamGoals').alias('AwayAgainstGoals'))  \
       .withColumnRenamed('AwayTeam', 'Team')
away.show()

+------+--------------+------------+-------------+------------+---------------+----------------+
|Season|          Team|TotalAwayWin|TotalAwayLoss|TotalAwayTie|AwayScoredGoals|AwayAgainstGoals|
+------+--------------+------------+-------------+------------+---------------+----------------+
|  2005|Kaiserslautern|           3|           10|           4|           21.0|            38.0|
|  2006|       Cottbus|           5|            9|           3|           17.0|            27.0|
|  2001|      St Pauli|           0|           11|           6|           18.0|            42.0|
|  2005|         Mainz|           3|           10|           4|           15.0|            24.0|
|  2006|       Hamburg|           6|            5|           6|           21.0|            18.0|
|  2003|     Stuttgart|           9|            5|           3|           23.0|            11.0|
|  2003| Hansa Rostock|           2|            8|           7|           21.0|            36.0|
|  2007| Hansa Rostock|       

In [185]:
window = ['Season']
window = Window.partitionBy(window).orderBy(col('WinPct').desc(), col('GoalDifferentials').desc())
table = home.join(away, ['Team', 'Season'],  'inner') \
    .withColumn('GoalsScored', col('HomeScoredGoals') + col('AwayScoredGoals')) \
    .withColumn('GoalsAgainst', col('HomeAgainstGoals') + col('AwayAgainstGoals')) \
    .withColumn('GoalDifferentials', col('GoalsScored') - col('GoalsAgainst')) \
    .withColumn('Win', col('TotalHomeWin') + col('TotalAwayWin')) \
    .withColumn('Loss', col('TotalHomeLoss') + col('TotalAwayLoss')) \
    .withColumn('Tie', col('TotalHomeTie') + col('TotalAwayTie')) \
    .withColumn('WinPct', round((100* col('Win')/(col('Win') + col('Loss') + col('Tie'))), 2)) \
    .drop('HomeScoredGoals', 'AwayScoredGoals', 'HomeAgainstGoals', 'AwayAgainstGoals') \
    .drop('TotalHomeWin', 'TotalAwayWin', 'TotalHomeLoss', 'TotalAwayLoss', 'TotalHomeTie', 'TotalAwayTie') \
    .withColumn('TeamPosition', rank().over(window)) 

table_df = table.filter(col('TeamPosition') == 1).orderBy(asc('Season')).toPandas()
table_df

Unnamed: 0,Team,Season,GoalsScored,GoalsAgainst,GoalDifferentials,Win,Loss,Tie,WinPct,TeamPosition
0,Bayern Munich,2000,62.0,37.0,25.0,19,9,6,55.88,1
1,Leverkusen,2001,77.0,38.0,39.0,21,7,6,61.76,1
2,Bayern Munich,2002,70.0,25.0,45.0,23,5,6,67.65,1
3,Werder Bremen,2003,79.0,38.0,41.0,22,4,8,64.71,1
4,Bayern Munich,2004,75.0,33.0,42.0,24,5,5,70.59,1
5,Bayern Munich,2005,67.0,32.0,35.0,22,3,9,64.71,1
6,Stuttgart,2006,61.0,37.0,24.0,21,6,7,61.76,1
7,Bayern Munich,2007,68.0,21.0,47.0,22,2,10,64.71,1
8,Wolfsburg,2008,80.0,41.0,39.0,21,7,6,61.76,1
9,Bayern Munich,2009,72.0,31.0,41.0,20,4,10,58.82,1


In [251]:
titles_won = spark.createDataFrame(table_df).select('Team', 'Season').groupby("Team").count()\
    .withColumnRenamed("count", "Titles Won").orderBy(desc('count')).show()


+-------------+----------+
|         Team|Titles Won|
+-------------+----------+
|Bayern Munich|         6|
|    Stuttgart|         1|
|   Leverkusen|         1|
|    Wolfsburg|         1|
|Werder Bremen|         1|
|     Dortmund|         1|
+-------------+----------+



Which have been relegated in the past 10 years and how many times?

In [252]:
relegated = table.filter((col('TeamPosition')== 16) | (col('TeamPosition') == 17) | (col('TeamPosition') == 18)).orderBy(asc("Season"))
relegated.groupby("Season")
relegated.groupby("Team").agg(count("Team"))\
    .withColumnRenamed("count(Team)", "Number of Relegations").orderBy(desc("Number of Relegations")).show()


+-------------+---------------------+
|         Team|Number of Relegations|
+-------------+---------------------+
|      FC Koln|                    3|
|    Wolfsburg|                    3|
|    Bielefeld|                    3|
|       Bochum|                    3|
|     Nurnberg|                    3|
|Hansa Rostock|                    2|
|      Cottbus|                    2|
|     Freiburg|                    2|
|   M'gladbach|                    2|
|     St Pauli|                    2|
| Unterhaching|                    1|
|       Hertha|                    1|
|    Stuttgart|                    1|
|     Duisburg|                    1|
|Ein Frankfurt|                    1|
|  Munich 1860|                    1|
|        Mainz|                    1|
+-------------+---------------------+



In [196]:
table.show(3)

+-------------+------+-----------+------------+-----------------+---+----+---+------+------------+
|         Team|Season|GoalsScored|GoalsAgainst|GoalDifferentials|Win|Loss|Tie|WinPct|TeamPosition|
+-------------+------+-----------+------------+-----------------+---+----+---+------+------------+
|Bayern Munich|  2005|       67.0|        32.0|             35.0| 22|   3|  9| 64.71|           1|
|Werder Bremen|  2005|       79.0|        37.0|             42.0| 21|   6|  7| 61.76|           2|
|      Hamburg|  2005|       53.0|        30.0|             23.0| 21|   8|  5| 61.76|           3|
+-------------+------+-----------+------------+-----------------+---+----+---+------+------------+
only showing top 3 rows



Which Season is the most competitive?

In [350]:
import pyspark.sql.functions as f
w =Window.partitionBy('Season').orderBy("TeamPosition")
compete = table.select("*", abs(f.col('WinPct') - f.lag('WinPct').over(w)).alias('h'))
compete = compete.na.fill(0)
most = compete.groupby("Season").sum('h').orderBy(asc("sum(h)"))
most.show()

+------+------------------+
|Season|            sum(h)|
+------+------------------+
|  2000|35.290000000000006|
|  2006| 44.10999999999999|
|  2009|             44.11|
|  2007| 44.11999999999999|
|  2010|44.120000000000005|
|  2003|47.059999999999995|
|  2002|             47.06|
|  2005|49.999999999999986|
|  2001|49.999999999999986|
|  2008|49.999999999999986|
|  2004|61.769999999999996|
+------+------------------+

