In [30]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt

In [5]:
spark = SparkSession \
    .builder \
    .appName("PythonSparkSQLFighting") \
    .getOrCreate()

In [12]:
def load_spark_dataframe(file_path):
    return spark.read.format('csv').options(header='true', inframe=True).load(file_path)

In [13]:
df_matches = load_spark_dataframe('./Data/Matches.csv')

In [28]:
df_matches.limit(5).show()

+--------+---+------+----------+-------------+--------------+-------------+-------------+-----------+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|HomeTeamGoals|AwayTeamGoals|FinalResult|
+--------+---+------+----------+-------------+--------------+-------------+-------------+-----------+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|            2|            1|          H|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|            0|            1|          A|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|            1|            1|          D|
|       4| D2|  2009|2010-02-21|Frankfurt FSV|     Karlsruhe|            2|            1|          H|
|       5| D2|  2009|2009-12-06|        Ahlen|     Karlsruhe|            1|            3|          A|
+--------+---+------+----------+-------------+--------------+-------------+-------------+-----------+



In [25]:
old_cols = df_matches.columns[-3:]
new_cols = ["HomeTeamGoals", "AwayTeamGoals", "FinalResult"]
old_new_cols = [*zip(old_cols, new_cols)] 
for old_col, new_col in old_new_cols:
    df_matches = df_matches.withColumnRenamed(old_col, new_col)

In [31]:
df_matches = df_matches \
            .withColumn('HomeTeamWin', when (col('FinalResult') == 'H', 1).otherwise(0)) \
            .withColumn('AwayTeamWin', when (col('FinalResult') == 'A', 1).otherwise(0)) \
            .withColumn('GameTie', when (col('FinalResult') == 'D', 1).otherwise(0))

In [35]:
df_matches.count()

24625

In [36]:
bundesliga = df_matches.filter((col('Season')>=2000) & 
                               (col('Season')<=2010) &
                               (col('Div')=='D1'))

In [39]:
bundesliga.show(5)

+--------+---+------+----------+-------------+----------+-------------+-------------+-----------+-----------+-----------+-------+
|Match_ID|Div|Season|      Date|     HomeTeam|  AwayTeam|HomeTeamGoals|AwayTeamGoals|FinalResult|HomeTeamWin|AwayTeamWin|GameTie|
+--------+---+------+----------+-------------+----------+-------------+-------------+-----------+-----------+-----------+-------+
|      21| D1|  2009|2010-02-06|       Bochum|Leverkusen|            1|            1|          D|          0|          0|      1|
|      22| D1|  2009|2009-11-22|Bayern Munich|Leverkusen|            1|            1|          D|          0|          0|      1|
|      23| D1|  2009|2010-05-08|   M'gladbach|Leverkusen|            1|            1|          D|          0|          0|      1|
|      24| D1|  2009|2009-08-08|        Mainz|Leverkusen|            2|            2|          D|          0|          0|      1|
|      25| D1|  2009|2009-10-17|      Hamburg|Leverkusen|            0|            0|     

In [40]:
home = bundesliga.groupby('Season', 'HomeTeam') \
        .agg(sum('HomeTeamWin').alias('TotalHomeWin'),
             sum('AwayTeamWin').alias('TotalHomeLoss'),
            sum('GameTie').alias('TotalHomeTie'),
            sum('HomeTeamGoals').alias('HomeScoredGoals'),
            sum('AwayTeamGoals').alias('HomeAgainstGoals')) \
       .withColumnRenamed('HomeTeam', 'Team')

In [41]:
home.show(5)

+------+--------------+------------+-------------+------------+---------------+----------------+
|Season|          Team|TotalHomeWin|TotalHomeLoss|TotalHomeTie|HomeScoredGoals|HomeAgainstGoals|
+------+--------------+------------+-------------+------------+---------------+----------------+
|  2005|Kaiserslautern|           5|            7|           5|           26.0|            33.0|
|  2006|       Cottbus|           6|            6|           5|           21.0|            22.0|
|  2001|      St Pauli|           4|            9|           4|           19.0|            28.0|
|  2005|         Mainz|           6|            4|           7|           31.0|            23.0|
|  2006|       Hamburg|           4|            4|           9|           22.0|            19.0|
+------+--------------+------------+-------------+------------+---------------+----------------+
only showing top 5 rows



In [42]:
away = bundesliga.groupby('Season', 'AwayTeam') \
        .agg(sum('AwayTeamWin').alias('TotalAwayWin'),
             sum('HomeTeamWin').alias('TotalAwayLoss'),
            sum('GameTie').alias('TotalAwayTie'),
            sum('AwayTeamGoals').alias('AwayScoredGoals'),
            sum('HomeTeamGoals').alias('AwayAgainstGoals')) \
       .withColumnRenamed('AwayTeam', 'Team')

In [43]:
away.show(5)

+------+--------------+------------+-------------+------------+---------------+----------------+
|Season|          Team|TotalAwayWin|TotalAwayLoss|TotalAwayTie|AwayScoredGoals|AwayAgainstGoals|
+------+--------------+------------+-------------+------------+---------------+----------------+
|  2005|Kaiserslautern|           3|           10|           4|           21.0|            38.0|
|  2006|       Cottbus|           5|            9|           3|           17.0|            27.0|
|  2001|      St Pauli|           0|           11|           6|           18.0|            42.0|
|  2005|         Mainz|           3|           10|           4|           15.0|            24.0|
|  2006|       Hamburg|           6|            5|           6|           21.0|            18.0|
+------+--------------+------------+-------------+------------+---------------+----------------+
only showing top 5 rows



In [44]:
window = ['Season']
window = Window.partitionBy(window).orderBy(col('WinPct').desc(), col('GoalDifferentials').desc())

In [50]:
table = home.join(away, ['Team', 'Season'],  'inner') \
    .withColumn('GoalsScored', col('HomeScoredGoals') + col('AwayScoredGoals')) \
    .withColumn('GoalsAgainst', col('HomeAgainstGoals') + col('AwayAgainstGoals')) \
    .withColumn('GoalDifferentials', col('GoalsScored') - col('GoalsAgainst')) \
    .withColumn('Win', col('TotalHomeWin') + col('TotalAwayWin')) \
    .withColumn('Loss', col('TotalHomeLoss') + col('TotalAwayLoss')) \
    .withColumn('Tie', col('TotalHomeTie') + col('TotalAwayTie')) \
    .withColumn('WinPct', round((100* col('Win')/(col('Win') + col('Loss') + col('Tie'))), 2)) \
    .drop('HomeScoredGoals', 'AwayScoredGoals', 'HomeAgainstGoals', 'AwayAgainstGoals') \
    .drop('TotalHomeWin', 'TotalAwayWin', 'TotalHomeLoss', 'TotalAwayLoss', 'TotalHomeTie', 'TotalAwayTie') \
    .withColumn('TeamPosition', rank().over(window)) \
    .withColumn('Pts', 3*col('Win')+col('Tie'))


In [51]:
table.show()

+--------------+------+-----------+------------+-----------------+---+----+---+------+------------+---+
|          Team|Season|GoalsScored|GoalsAgainst|GoalDifferentials|Win|Loss|Tie|WinPct|TeamPosition|Pts|
+--------------+------+-----------+------------+-----------------+---+----+---+------+------------+---+
| Bayern Munich|  2000|       62.0|        37.0|             25.0| 19|   9|  6| 55.88|           1| 63|
|    Schalke 04|  2000|       65.0|        35.0|             30.0| 18|   8|  8| 52.94|           2| 62|
|        Hertha|  2000|       58.0|        52.0|              6.0| 18|  14|  2| 52.94|           3| 56|
|    Leverkusen|  2000|       54.0|        40.0|             14.0| 17|  11|  6|  50.0|           4| 57|
|      Dortmund|  2000|       62.0|        42.0|             20.0| 16|   8| 10| 47.06|           5| 58|
|      Freiburg|  2000|       54.0|        37.0|             17.0| 15|   9| 10| 44.12|           6| 55|
| Werder Bremen|  2000|       53.0|        48.0|              5.

In [74]:
table_df = table.filter(col('TeamPosition') == 1).orderBy(asc('Season'))

In [75]:
table_df.toPandas()

Unnamed: 0,Team,Season,GoalsScored,GoalsAgainst,GoalDifferentials,Win,Loss,Tie,WinPct,TeamPosition,Pts
0,Bayern Munich,2000,62.0,37.0,25.0,19,9,6,55.88,1,63
1,Leverkusen,2001,77.0,38.0,39.0,21,7,6,61.76,1,69
2,Bayern Munich,2002,70.0,25.0,45.0,23,5,6,67.65,1,75
3,Werder Bremen,2003,79.0,38.0,41.0,22,4,8,64.71,1,74
4,Bayern Munich,2004,75.0,33.0,42.0,24,5,5,70.59,1,77
5,Bayern Munich,2005,67.0,32.0,35.0,22,3,9,64.71,1,75
6,Stuttgart,2006,61.0,37.0,24.0,21,6,7,61.76,1,70
7,Bayern Munich,2007,68.0,21.0,47.0,22,2,10,64.71,1,76
8,Wolfsburg,2008,80.0,41.0,39.0,21,7,6,61.76,1,69
9,Bayern Munich,2009,72.0,31.0,41.0,20,4,10,58.82,1,70
