## Importing necessary packages and creating new spark session.

In [8]:
#Using 'findspark' package to locate spark from jupyter notebook.
import findspark
findspark.init()

#Importing the necessary packages.
from pyspark import SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt

In [9]:
#Creating new spark session and app.
spark = SparkSession.Builder().master("local").appName("Bundesliga Stats").getOrCreate()
sc = spark.sparkContext
spark

## Creating new dataframe and explore our data.

In [10]:
#function to read data files into pyspark dataframe, takes the file path and format as parameters.
def load_dataframe(filename, fileformat):
    df = spark.read.format(fileformat).options(header='true').load(filename)
    return df

#creating a dataframe
df_matches = load_dataframe('D:/Data Career/Projects/Spark/Bundesliga/Data/Matches.csv','csv')

#showing the first five rows of the dataframe.
df_matches.limit(5).show()

+--------+---+------+----------+-------------+--------------+----+----+---+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|FTHG|FTAG|FTR|
+--------+---+------+----------+-------------+--------------+----+----+---+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|   2|   1|  H|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|   0|   1|  A|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|   1|   1|  D|
|       4| D2|  2009|2010-02-21|Frankfurt FSV|     Karlsruhe|   2|   1|  H|
|       5| D2|  2009|2009-12-06|        Ahlen|     Karlsruhe|   1|   3|  A|
+--------+---+------+----------+-------------+--------------+----+----+---+



In [11]:
#lets rename some of the columns to be more meaningful.
old_cols = df_matches.columns[-3:]
new_cols = ["HomeTeamGoals", "AwayTeamGoals", "FinalResult"]
old_new_cols = [*zip(old_cols, new_cols)]
for old_col, new_col in old_new_cols:
    df_matches = df_matches.withColumnRenamed(old_col, new_col)

#showing the first five rows of the dataframe.
df_matches.limit(5).show()    

+--------+---+------+----------+-------------+--------------+-------------+-------------+-----------+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|HomeTeamGoals|AwayTeamGoals|FinalResult|
+--------+---+------+----------+-------------+--------------+-------------+-------------+-----------+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|            2|            1|          H|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|            0|            1|          A|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|            1|            1|          D|
|       4| D2|  2009|2010-02-21|Frankfurt FSV|     Karlsruhe|            2|            1|          H|
|       5| D2|  2009|2009-12-06|        Ahlen|     Karlsruhe|            1|            3|          A|
+--------+---+------+----------+-------------+--------------+-------------+-------------+-----------+



## Who are the winners of the D1 division in the Germany Football Association (Bundesliga) between 2000–2010?

In [15]:
#creating new columns to count results.
df_matches = df_matches \
    .withColumn('HomeTeamWin', when(col('FinalResult') == 'H', 1).otherwise(0)) \
    .withColumn('AwayTeamWin', when(col('FinalResult') == 'A', 1).otherwise(0)) \
    .withColumn('GameTie', when(col('FinalResult') == 'D', 1).otherwise(0))

#bundesliga is a D1 division and we are interested in season <= 2010 and >= 2000
bundesliga = df_matches \
                    .filter((col('Season') >= 2000) & 
                            (col('Season') <= 2010) & 
                            (col('Div') == 'D1'))

# home team features
home = bundesliga.groupby('Season', 'HomeTeam') \
       .agg(sum('HomeTeamWin').alias('TotalHomeWin'),
            sum('AwayTeamWin').alias('TotalHomeLoss'),
            sum('GameTie').alias('TotalHomeTie'),
            sum('HomeTeamGoals').alias('HomeScoredGoals'),
            sum('AwayTeamGoals').alias('HomeAgainstGoals')) \
       .withColumnRenamed('HomeTeam', 'Team')

#away game features 
away =  bundesliga.groupby('Season', 'AwayTeam') \
       .agg(sum('AwayTeamWin').alias('TotalAwayWin'),
            sum('HomeTeamWin').alias('TotalAwayLoss'),
            sum('GameTie').alias('TotalAwayTie'),
            sum('AwayTeamGoals').alias('AwayScoredGoals'),
            sum('HomeTeamGoals').alias('AwayAgainstGoals'))  \
       .withColumnRenamed('AwayTeam', 'Team')

#### Then, inner join them on ‘Team’ and ‘Season’ fields to create a single dataframe containing game level aggregation: table. After that, we use a basic window function to further aggregate game statistics on season level and rank them based on winning percentage and goal differentials.

In [20]:
#season features
window = ['Season']
window = Window.partitionBy(window).orderBy(col('WinPct').desc(), col('GoalDifferentials').desc())
table = home.join(away, ['Team', 'Season'],  'inner') \
    .withColumn('GoalsScored', col('HomeScoredGoals') + col('AwayScoredGoals')) \
    .withColumn('GoalsAgainst', col('HomeAgainstGoals') + col('AwayAgainstGoals')) \
    .withColumn('GoalDifferentials', col('GoalsScored') - col('GoalsAgainst')) \
    .withColumn('Win', col('TotalHomeWin') + col('TotalAwayWin')) \
    .withColumn('Loss', col('TotalHomeLoss') + col('TotalAwayLoss')) \
    .withColumn('Tie', col('TotalHomeTie') + col('TotalAwayTie')) \
    .withColumn('WinPct', round((100* col('Win')/(col('Win') + col('Loss') + col('Tie'))), 2)) \
    .drop('HomeScoredGoals', 'AwayScoredGoals', 'HomeAgainstGoals', 'AwayAgainstGoals') \
    .drop('TotalHomeWin', 'TotalAwayWin', 'TotalHomeLoss', 'TotalAwayLoss', 'TotalHomeTie', 'TotalAwayTie') \
    .withColumn('TeamPosition', rank().over(window)) 

table_df = table.filter(col('TeamPosition') == 1).orderBy(asc('Season'))

#printing the final result.
table_df.show()

+-------------+------+-----------+------------+-----------------+---+----+---+------+------------+
|         Team|Season|GoalsScored|GoalsAgainst|GoalDifferentials|Win|Loss|Tie|WinPct|TeamPosition|
+-------------+------+-----------+------------+-----------------+---+----+---+------+------------+
|Bayern Munich|  2000|       62.0|        37.0|             25.0| 19|   9|  6| 55.88|           1|
|   Leverkusen|  2001|       77.0|        38.0|             39.0| 21|   7|  6| 61.76|           1|
|Bayern Munich|  2002|       70.0|        25.0|             45.0| 23|   5|  6| 67.65|           1|
|Werder Bremen|  2003|       79.0|        38.0|             41.0| 22|   4|  8| 64.71|           1|
|Bayern Munich|  2004|       75.0|        33.0|             42.0| 24|   5|  5| 70.59|           1|
|Bayern Munich|  2005|       67.0|        32.0|             35.0| 22|   3|  9| 64.71|           1|
|    Stuttgart|  2006|       61.0|        37.0|             24.0| 21|   6|  7| 61.76|           1|
|Bayern Mu

## Which teams have been relegated in the past 2000 - 2010 years?

In [22]:
# lets check for 2000s
relegated = table.filter((col('TeamPosition') == 16) | 
             (col('TeamPosition') == 17) |
             (col('TeamPosition') == 18)).orderBy(asc('Season')) 
relegated.filter(col('Season') == 2000)

#printing the final result.
relegated.show()

+-------------+------+-----------+------------+-----------------+---+----+---+------+------------+
|         Team|Season|GoalsScored|GoalsAgainst|GoalDifferentials|Win|Loss|Tie|WinPct|TeamPosition|
+-------------+------+-----------+------------+-----------------+---+----+---+------+------------+
|       Bochum|  2000|       30.0|        67.0|            -37.0|  7|  21|  6| 20.59|          18|
|    Stuttgart|  2000|       42.0|        49.0|             -7.0|  9|  14| 11| 26.47|          16|
| Unterhaching|  2000|       35.0|        59.0|            -24.0|  8|  15| 11| 23.53|          17|
|      FC Koln|  2001|       26.0|        61.0|            -35.0|  7|  19|  8| 20.59|          17|
|     Freiburg|  2001|       37.0|        64.0|            -27.0|  7|  18|  9| 20.59|          16|
|     St Pauli|  2001|       37.0|        70.0|            -33.0|  4|  20| 10| 11.76|          18|
|     Nurnberg|  2002|       33.0|        60.0|            -27.0|  8|  20|  6| 23.53|          17|
|    Biele