In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('firstProject').getOrCreate()

In [3]:
def load_dataframe(filename):
    df = spark.read.format('csv').options(header='true').load(filename)
    return df

In [4]:
df_matches = load_dataframe('./Data/Matches.csv')

In [5]:
df_matches.limit(5).show()

+--------+---+------+----------+-------------+--------------+----+----+---+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|FTHG|FTAG|FTR|
+--------+---+------+----------+-------------+--------------+----+----+---+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|   2|   1|  H|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|   0|   1|  A|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|   1|   1|  D|
|       4| D2|  2009|2010-02-21|Frankfurt FSV|     Karlsruhe|   2|   1|  H|
|       5| D2|  2009|2009-12-06|        Ahlen|     Karlsruhe|   1|   3|  A|
+--------+---+------+----------+-------------+--------------+----+----+---+



In [6]:
# rename some columns

old_cols = df_matches.columns[-3:]
new_cols = ['HomeTeamGoals', 'AwayTeamGoals', 'FinalResult']
old_new_cols = [*zip(old_cols, new_cols)]

for old_col, new_col in old_new_cols:
    df_matches = df_matches.withColumnRenamed(old_col, new_col)
    
df_matches.show()

+--------+---+------+----------+------------------+--------------+-------------+-------------+-----------+
|Match_ID|Div|Season|      Date|          HomeTeam|      AwayTeam|HomeTeamGoals|AwayTeamGoals|FinalResult|
+--------+---+------+----------+------------------+--------------+-------------+-------------+-----------+
|       1| D2|  2009|2010-04-04|        Oberhausen|Kaiserslautern|            2|            1|          H|
|       2| D2|  2009|2009-11-01|       Munich 1860|Kaiserslautern|            0|            1|          A|
|       3| D2|  2009|2009-10-04|     Frankfurt FSV|Kaiserslautern|            1|            1|          D|
|       4| D2|  2009|2010-02-21|     Frankfurt FSV|     Karlsruhe|            2|            1|          H|
|       5| D2|  2009|2009-12-06|             Ahlen|     Karlsruhe|            1|            3|          A|
|       6| D2|  2009|2010-04-03|      Union Berlin|     Karlsruhe|            1|            1|          D|
|       7| D2|  2009|2009-08-14|     

**Who are the winners of the D1 division in the Germany Football Association (Bundesliga) between 2000–2010?**

In [7]:
from pyspark.sql.functions import col, when

df_matches = (df_matches
              .withColumn('HomeTeamWin', when(col('FinalResult')=='H', 1).otherwise(0))
              .withColumn('AwayTeamWin', when(col('FinalResult')=='A', 1).otherwise(0))
              .withColumn('GameTie', when(col('FinalResult')=='D', 1).otherwise(0))
             )

In [8]:
df_matches.show(5)

+--------+---+------+----------+-------------+--------------+-------------+-------------+-----------+-----------+-----------+-------+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|HomeTeamGoals|AwayTeamGoals|FinalResult|HomeTeamWin|AwayTeamWin|GameTie|
+--------+---+------+----------+-------------+--------------+-------------+-------------+-----------+-----------+-----------+-------+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|            2|            1|          H|          1|          0|      0|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|            0|            1|          A|          0|          1|      0|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|            1|            1|          D|          0|          0|      1|
|       4| D2|  2009|2010-02-21|Frankfurt FSV|     Karlsruhe|            2|            1|          H|          1|          0|      0|
|       5| D2|  2009|2009-12-06|        Ahlen|     Karlsruhe| 

In [9]:
# bundesliga is a D1 division and we are interested in season <= 2010 and >= 2000

bundesliga = (df_matches
             .filter((col('Season') <= 2010) & (col('Season') >= 2000) & (col('Div') == 'D1')))

In [10]:
bundesliga.agg({'Season': 'min'}).show()

+-----------+
|min(Season)|
+-----------+
|       2000|
+-----------+



In [11]:
bundesliga.printSchema()

root
 |-- Match_ID: string (nullable = true)
 |-- Div: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- HomeTeam: string (nullable = true)
 |-- AwayTeam: string (nullable = true)
 |-- HomeTeamGoals: string (nullable = true)
 |-- AwayTeamGoals: string (nullable = true)
 |-- FinalResult: string (nullable = true)
 |-- HomeTeamWin: integer (nullable = false)
 |-- AwayTeamWin: integer (nullable = false)
 |-- GameTie: integer (nullable = false)



In [12]:
# Home team Features
from pyspark.sql.functions import sum

home = bundesliga.groupby('Season', 'HomeTeam') \
       .agg(sum('HomeTeamWin').alias('TotalHomeWin'),
            sum('AwayTeamWin').alias('TotalHomeLoss'),
            sum('GameTie').alias('TotalHomeTie'),
            sum('HomeTeamGoals').alias('HomeScoredGoals'),
            sum('AwayTeamGoals').alias('HomeAgainstGoals')) \
       .withColumnRenamed('HomeTeam', 'Team')

In [13]:
home.show(5, truncate=False)

+------+--------------+------------+-------------+------------+---------------+----------------+
|Season|Team          |TotalHomeWin|TotalHomeLoss|TotalHomeTie|HomeScoredGoals|HomeAgainstGoals|
+------+--------------+------------+-------------+------------+---------------+----------------+
|2005  |Kaiserslautern|5           |7            |5           |26.0           |33.0            |
|2006  |Cottbus       |6           |6            |5           |21.0           |22.0            |
|2001  |St Pauli      |4           |9            |4           |19.0           |28.0            |
|2005  |Mainz         |6           |4            |7           |31.0           |23.0            |
|2006  |Hamburg       |4           |4            |9           |22.0           |19.0            |
+------+--------------+------------+-------------+------------+---------------+----------------+
only showing top 5 rows



In [14]:
# Away Team Features

away = (bundesliga.groupby('Season', 'AwayTeam')
       .agg(sum('AwayTeamWin').alias('TotalAwayWin'),
           sum('HomeTeamWin').alias('TotalAwayLoss'),
           sum('GameTie').alias('TotalAwayTie'),
           sum('AwayTeamGoals').alias('AwayScoredGoals'),
           sum('HomeTeamGoals').alias('AwayAgainstGoals'))
       .withColumnRenamed('AwayTeam', 'Team'))

In [15]:
away.show(5, truncate=False)

+------+--------------+------------+-------------+------------+---------------+----------------+
|Season|Team          |TotalAwayWin|TotalAwayLoss|TotalAwayTie|AwayScoredGoals|AwayAgainstGoals|
+------+--------------+------------+-------------+------------+---------------+----------------+
|2005  |Kaiserslautern|3           |10           |4           |21.0           |38.0            |
|2006  |Cottbus       |5           |9            |3           |17.0           |27.0            |
|2001  |St Pauli      |0           |11           |6           |18.0           |42.0            |
|2005  |Mainz         |3           |10           |4           |15.0           |24.0            |
|2006  |Hamburg       |6           |5            |6           |21.0           |18.0            |
+------+--------------+------------+-------------+------------+---------------+----------------+
only showing top 5 rows



In [16]:
#season features
from pyspark.sql.window import Window

window_col = ['Season']
window = Window.partitionBy(window_col).orderBy(col('WinPct').desc(), col('GoalDifferentials').desc())

In [17]:
from pyspark.sql.functions import round, rank

table = (home.join(away, ['Team', 'Season'], 'inner')
        .withColumn('GoalsScored', col('HomeScoredGoals') + col('AwayScoredGoals'))
        .withColumn('GoalsAgainst', col('HomeAgainstGoals') + col('AwayAgainstGoals'))
        .withColumn('GoalDifferentials', col('GoalsScored') - col('GoalsAgainst'))
        .withColumn('Win', col('TotalHomeWin') + col('TotalAwayWin'))
        .withColumn('Loss', col('TotalHomeLoss') + col('TotalAwayLoss'))
        .withColumn('Tie', col('TotalHomeTie') + col('TotalAwayTie'))
        .withColumn('WinPct', round((100* col('Win')/(col('Win') + col('Loss') + col('Tie'))), 2))
        .drop('HomeScoredGoals', 'AwayScoredGoals', 'HomeAgainstGoals', 'AwayAgainstGoals',
             'TotalHomeWin', 'TotalAwayWin', 'TotalHomeLoss', 'TotalAwayLoss', 'TotalHomeTie', 'TotalAwayTie')
        .withColumn('TeamPosition', rank().over(window)))

In [18]:
table.show()

+--------------+------+-----------+------------+-----------------+---+----+---+------+------------+
|          Team|Season|GoalsScored|GoalsAgainst|GoalDifferentials|Win|Loss|Tie|WinPct|TeamPosition|
+--------------+------+-----------+------------+-----------------+---+----+---+------+------------+
| Bayern Munich|  2005|       67.0|        32.0|             35.0| 22|   3|  9| 64.71|           1|
| Werder Bremen|  2005|       79.0|        37.0|             42.0| 21|   6|  7| 61.76|           2|
|       Hamburg|  2005|       53.0|        30.0|             23.0| 21|   8|  5| 61.76|           3|
|    Schalke 04|  2005|       47.0|        31.0|             16.0| 16|   5| 13| 47.06|           4|
|    Leverkusen|  2005|       64.0|        49.0|             15.0| 14|  10| 10| 41.18|           5|
|        Hertha|  2005|       52.0|        48.0|              4.0| 12|  10| 12| 35.29|           6|
|      Nurnberg|  2005|       49.0|        51.0|             -2.0| 12|  14|  8| 35.29|           7|


In [19]:
table_df = table.filter(col('TeamPosition') == 1).orderBy(col('Season').asc())

In [20]:
table_df.show()

+-------------+------+-----------+------------+-----------------+---+----+---+------+------------+
|         Team|Season|GoalsScored|GoalsAgainst|GoalDifferentials|Win|Loss|Tie|WinPct|TeamPosition|
+-------------+------+-----------+------------+-----------------+---+----+---+------+------------+
|Bayern Munich|  2000|       62.0|        37.0|             25.0| 19|   9|  6| 55.88|           1|
|   Leverkusen|  2001|       77.0|        38.0|             39.0| 21|   7|  6| 61.76|           1|
|Bayern Munich|  2002|       70.0|        25.0|             45.0| 23|   5|  6| 67.65|           1|
|Werder Bremen|  2003|       79.0|        38.0|             41.0| 22|   4|  8| 64.71|           1|
|Bayern Munich|  2004|       75.0|        33.0|             42.0| 24|   5|  5| 70.59|           1|
|Bayern Munich|  2005|       67.0|        32.0|             35.0| 22|   3|  9| 64.71|           1|
|    Stuttgart|  2006|       61.0|        37.0|             24.0| 21|   6|  7| 61.76|           1|
|Bayern Mu

In [21]:
spark.stop()