In [1]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=c91ce247a91a83b8ad1d3ae3426f724f8d80d8a81db083f84b0906e586105543
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


Create a spark session for code and appname for build and create

In [20]:
from pyspark.sql import SparkSession, Window
spark = SparkSession.builder.appName("DataAnalysis").getOrCreate()

In [3]:
#import necessary libraries
import pandas as pd
from pyspark.sql.functions import *

In [4]:
#Load the Dataset
df_matches = spark.read.format('csv').options(header = 'True').load('/content/drive/MyDrive/Colab Notebooks/Data/Matches.csv')
df_matches.show()

+--------+---+------+----------+------------------+--------------+----+----+---+
|Match_ID|Div|Season|      Date|          HomeTeam|      AwayTeam|FTHG|FTAG|FTR|
+--------+---+------+----------+------------------+--------------+----+----+---+
|       1| D2|  2009|2010-04-04|        Oberhausen|Kaiserslautern|   2|   1|  H|
|       2| D2|  2009|2009-11-01|       Munich 1860|Kaiserslautern|   0|   1|  A|
|       3| D2|  2009|2009-10-04|     Frankfurt FSV|Kaiserslautern|   1|   1|  D|
|       4| D2|  2009|2010-02-21|     Frankfurt FSV|     Karlsruhe|   2|   1|  H|
|       5| D2|  2009|2009-12-06|             Ahlen|     Karlsruhe|   1|   3|  A|
|       6| D2|  2009|2010-04-03|      Union Berlin|     Karlsruhe|   1|   1|  D|
|       7| D2|  2009|2009-08-14|         Paderborn|     Karlsruhe|   2|   0|  H|
|       8| D2|  2009|2010-03-08|         Bielefeld|     Karlsruhe|   0|   1|  A|
|       9| D2|  2009|2009-09-26|    Kaiserslautern|     Karlsruhe|   2|   0|  H|
|      10| D2|  2009|2009-11

Meaning of the features :

Match_ID : Unique identifier for each match

Div : Championship level (D1 = first level, D2 = second level, E0 = third level)

Season : Starting year of the season in which the match took place

Date : Date of the match

HomeTeam : Team playing in its stadium

AwayTeam : Team playing in the opposing team's stadium

FTHG : Number of goals of Home Team

FTAG : Number of goals of Away Team

FTR : Final Result

In [5]:
# Rename columns
df_matches = df_matches.withColumnRenamed("FTHG", "Home_Goals")
df_matches = df_matches.withColumnRenamed("FTAG", "Away_Goals")
df_matches = df_matches.withColumnRenamed("FTR", "FinalResult")
df_matches.show()

+--------+---+------+----------+------------------+--------------+----------+----------+-----------+
|Match_ID|Div|Season|      Date|          HomeTeam|      AwayTeam|Home_Goals|Away_Goals|FinalResult|
+--------+---+------+----------+------------------+--------------+----------+----------+-----------+
|       1| D2|  2009|2010-04-04|        Oberhausen|Kaiserslautern|         2|         1|          H|
|       2| D2|  2009|2009-11-01|       Munich 1860|Kaiserslautern|         0|         1|          A|
|       3| D2|  2009|2009-10-04|     Frankfurt FSV|Kaiserslautern|         1|         1|          D|
|       4| D2|  2009|2010-02-21|     Frankfurt FSV|     Karlsruhe|         2|         1|          H|
|       5| D2|  2009|2009-12-06|             Ahlen|     Karlsruhe|         1|         3|          A|
|       6| D2|  2009|2010-04-03|      Union Berlin|     Karlsruhe|         1|         1|          D|
|       7| D2|  2009|2009-08-14|         Paderborn|     Karlsruhe|         2|         0|   

In [6]:
#preview the data in pandas datafram
pandas_df = df_matches.toPandas()
pandas_df.head(10)

Unnamed: 0,Match_ID,Div,Season,Date,HomeTeam,AwayTeam,Home_Goals,Away_Goals,FinalResult
0,1,D2,2009,2010-04-04,Oberhausen,Kaiserslautern,2,1,H
1,2,D2,2009,2009-11-01,Munich 1860,Kaiserslautern,0,1,A
2,3,D2,2009,2009-10-04,Frankfurt FSV,Kaiserslautern,1,1,D
3,4,D2,2009,2010-02-21,Frankfurt FSV,Karlsruhe,2,1,H
4,5,D2,2009,2009-12-06,Ahlen,Karlsruhe,1,3,A
5,6,D2,2009,2010-04-03,Union Berlin,Karlsruhe,1,1,D
6,7,D2,2009,2009-08-14,Paderborn,Karlsruhe,2,0,H
7,8,D2,2009,2010-03-08,Bielefeld,Karlsruhe,0,1,A
8,9,D2,2009,2009-09-26,Kaiserslautern,Karlsruhe,2,0,H
9,10,D2,2009,2009-11-21,Hansa Rostock,Karlsruhe,2,1,H


In [7]:
# Add new column in pyspark
df_matches = df_matches.withColumn("HomeTeamWin", when(col("FinalResult") == "H", 1).otherwise(0)) \
    .withColumn("AwayTeamWin", when(col("FinalResult") == "A", 1).otherwise(0)) \
    .withColumn("Draw", when(col("FinalResult") == "D", 1).otherwise(0))

In [8]:
df_matches.show()


+--------+---+------+----------+------------------+--------------+----------+----------+-----------+-----------+-----------+----+
|Match_ID|Div|Season|      Date|          HomeTeam|      AwayTeam|Home_Goals|Away_Goals|FinalResult|HomeTeamWin|AwayTeamWin|Draw|
+--------+---+------+----------+------------------+--------------+----------+----------+-----------+-----------+-----------+----+
|       1| D2|  2009|2010-04-04|        Oberhausen|Kaiserslautern|         2|         1|          H|          1|          0|   0|
|       2| D2|  2009|2009-11-01|       Munich 1860|Kaiserslautern|         0|         1|          A|          0|          1|   0|
|       3| D2|  2009|2009-10-04|     Frankfurt FSV|Kaiserslautern|         1|         1|          D|          0|          0|   1|
|       4| D2|  2009|2010-02-21|     Frankfurt FSV|     Karlsruhe|         2|         1|          H|          1|          0|   0|
|       5| D2|  2009|2009-12-06|             Ahlen|     Karlsruhe|         1|         3|  

In [9]:
df_bundesliga = df_matches.filter((col("Div") == "D1") &
                                 (col("Season") >= 2000) &
                                 (col("Season") <= 2015))
df_bundesliga.show()

+--------+---+------+----------+-------------+----------+----------+----------+-----------+-----------+-----------+----+
|Match_ID|Div|Season|      Date|     HomeTeam|  AwayTeam|Home_Goals|Away_Goals|FinalResult|HomeTeamWin|AwayTeamWin|Draw|
+--------+---+------+----------+-------------+----------+----------+----------+-----------+-----------+-----------+----+
|      21| D1|  2009|2010-02-06|       Bochum|Leverkusen|         1|         1|          D|          0|          0|   1|
|      22| D1|  2009|2009-11-22|Bayern Munich|Leverkusen|         1|         1|          D|          0|          0|   1|
|      23| D1|  2009|2010-05-08|   M'gladbach|Leverkusen|         1|         1|          D|          0|          0|   1|
|      24| D1|  2009|2009-08-08|        Mainz|Leverkusen|         2|         2|          D|          0|          0|   1|
|      25| D1|  2009|2009-10-17|      Hamburg|Leverkusen|         0|         0|          D|          0|          0|   1|
|      26| D1|  2009|2010-04-17|

In [10]:
#Analyzing Home Performances of teams
df_home_matches = df_bundesliga.groupBy('Season', 'HomeTeam') \
    .agg(sum('HomeTeamWin').alias('TotalHomeWin'),
         sum('AwayTeamWin').alias('TotalHomeLoss'),
         sum('Draw').alias('TotalHomeDraw'),
         sum('Home_Goals').alias('HomeScoredGoals'),
         sum('Away_Goals').alias('HomeAgainstGoals')) \
         .withColumnRenamed('HomeTeam', 'Team')

df_home_matches.toPandas().head(10)

Unnamed: 0,Season,Team,TotalHomeWin,TotalHomeLoss,TotalHomeDraw,HomeScoredGoals,HomeAgainstGoals
0,2011,Hamburg,3,7,7,19.0,29.0
1,2005,Kaiserslautern,5,7,5,26.0,33.0
2,2006,Cottbus,6,6,5,21.0,22.0
3,2001,St Pauli,4,9,4,19.0,28.0
4,2005,Mainz,6,4,7,31.0,23.0
5,2006,Hamburg,4,4,9,22.0,19.0
6,2003,Stuttgart,9,1,7,29.0,13.0
7,2015,Hertha,9,3,5,24.0,15.0
8,2003,Hansa Rostock,10,6,1,34.0,18.0
9,2012,Hannover,9,3,5,34.0,23.0


In [11]:
#Analyzing Away Performances of teams
df_away_matches = df_bundesliga.groupBy('Season', 'AwayTeam') \
    .agg(sum('AwayTeamWin').alias('TotalAwayWin'),
         sum('HomeTeamWin').alias('TotalAwayLoss'),
         sum('Draw').alias('TotalAwayDraw'),
         sum('Away_Goals').alias('AwayScoredGoals'),
         sum('Home_Goals').alias('AwayAgainstdGoals')) \
         .withColumnRenamed('AwayTeam', 'Team')

df_away_matches.toPandas().head(10)

Unnamed: 0,Season,Team,TotalAwayWin,TotalAwayLoss,TotalAwayDraw,AwayScoredGoals,AwayAgainstdGoals
0,2011,Hamburg,5,7,5,16.0,28.0
1,2005,Kaiserslautern,3,10,4,21.0,38.0
2,2006,Cottbus,5,9,3,17.0,27.0
3,2001,St Pauli,0,11,6,18.0,42.0
4,2005,Mainz,3,10,4,15.0,24.0
5,2006,Hamburg,6,5,6,21.0,18.0
6,2003,Stuttgart,9,5,3,23.0,11.0
7,2015,Hertha,5,9,3,18.0,27.0
8,2003,Hansa Rostock,2,8,7,21.0,36.0
9,2012,Hannover,4,12,1,26.0,39.0


In [12]:
#Merging home and Away df to 1 table for further analyzing
df_merged = df_home_matches.join(df_away_matches, ['Season', 'Team'], 'inner')
df_merged.toPandas().head(10)

Unnamed: 0,Season,Team,TotalHomeWin,TotalHomeLoss,TotalHomeDraw,HomeScoredGoals,HomeAgainstGoals,TotalAwayWin,TotalAwayLoss,TotalAwayDraw,AwayScoredGoals,AwayAgainstdGoals
0,2011,Hamburg,3,7,7,19.0,29.0,5,7,5,16.0,28.0
1,2005,Kaiserslautern,5,7,5,26.0,33.0,3,10,4,21.0,38.0
2,2006,Cottbus,6,6,5,21.0,22.0,5,9,3,17.0,27.0
3,2001,St Pauli,4,9,4,19.0,28.0,0,11,6,18.0,42.0
4,2005,Mainz,6,4,7,31.0,23.0,3,10,4,15.0,24.0
5,2006,Hamburg,4,4,9,22.0,19.0,6,5,6,21.0,18.0
6,2003,Stuttgart,9,1,7,29.0,13.0,9,5,3,23.0,11.0
7,2015,Hertha,9,3,5,24.0,15.0,5,9,3,18.0,27.0
8,2003,Hansa Rostock,10,6,1,34.0,18.0,2,8,7,21.0,36.0
9,2012,Hannover,9,3,5,34.0,23.0,4,12,1,26.0,39.0


In [13]:
#create column for total score and result
df_totals = df_merged.withColumn('GoalsScore', col('HomeScoredGoals') + col('AwayScoredGoals')) \
                     .withColumn('GoalsAgainst', col('HomeAgainstGoals') + col('AwayAgainstdGoals')) \
                     .withColumn('Win', col('TotalHomeWin') + col('TotalAwayWin')) \
                     .withColumn('Loss', col('TotalHomeLoss') + col('TotalAwayLoss')) \
                     .withColumn('Draw', col('TotalHomeDraw') + col('TotalAwayDraw'))

df_totals.toPandas()

Unnamed: 0,Season,Team,TotalHomeWin,TotalHomeLoss,TotalHomeDraw,HomeScoredGoals,HomeAgainstGoals,TotalAwayWin,TotalAwayLoss,TotalAwayDraw,AwayScoredGoals,AwayAgainstdGoals,GoalsScore,GoalsAgainst,Win,Loss,Draw
0,2011,Hamburg,3,7,7,19.0,29.0,5,7,5,16.0,28.0,35.0,57.0,8,14,12
1,2005,Kaiserslautern,5,7,5,26.0,33.0,3,10,4,21.0,38.0,47.0,71.0,8,17,9
2,2006,Cottbus,6,6,5,21.0,22.0,5,9,3,17.0,27.0,38.0,49.0,11,15,8
3,2001,St Pauli,4,9,4,19.0,28.0,0,11,6,18.0,42.0,37.0,70.0,4,20,10
4,2005,Mainz,6,4,7,31.0,23.0,3,10,4,15.0,24.0,46.0,47.0,9,14,11
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
283,2004,Bielefeld,7,7,3,21.0,21.0,4,9,4,16.0,28.0,37.0,49.0,11,16,7
284,2004,Werder Bremen,9,4,4,33.0,15.0,9,7,1,35.0,22.0,68.0,37.0,18,11,5
285,2013,Augsburg,9,5,3,27.0,22.0,6,7,4,20.0,25.0,47.0,47.0,15,12,7
286,2004,Stuttgart,12,3,2,34.0,15.0,5,7,5,20.0,25.0,54.0,40.0,17,10,7


In [14]:
#Drop Un necessary columns
cols_to_drop = ['HomeScoredGoals', 'AwayScoredGoals', 'AwayAgainstdGoals', 'HomeAgainstGoals', 'TotalHomeWin', 'TotalAwayWin'
                , 'TotalHomeLoss', 'TotalAwayLoss', 'TotalHomeDraw', 'TotalAwayDraw']

#cleaned df
df_totals_cleaned = df_totals.drop(*cols_to_drop)


df_totals_cleaned.toPandas()

Unnamed: 0,Season,Team,GoalsScore,GoalsAgainst,Win,Loss,Draw
0,2011,Hamburg,35.0,57.0,8,14,12
1,2005,Kaiserslautern,47.0,71.0,8,17,9
2,2006,Cottbus,38.0,49.0,11,15,8
3,2001,St Pauli,37.0,70.0,4,20,10
4,2005,Mainz,46.0,47.0,9,14,11
...,...,...,...,...,...,...,...
283,2004,Bielefeld,37.0,49.0,11,16,7
284,2004,Werder Bremen,68.0,37.0,18,11,5
285,2013,Augsburg,47.0,47.0,15,12,7
286,2004,Stuttgart,54.0,40.0,17,10,7


In [15]:
#Create percentage columns


In [16]:
df_processed = df_totals_cleaned.withColumn('GOalDifferentials', col('GoalsScore') - col('GoalsAgainst')) \
                                .withColumn('WinPercentage', round(( 100* col('Win') / (col('Win') + col('Loss') + col('Draw'))), 2))

df_processed.toPandas()

Unnamed: 0,Season,Team,GoalsScore,GoalsAgainst,Win,Loss,Draw,GOalDifferentials,WinPercentage
0,2011,Hamburg,35.0,57.0,8,14,12,-22.0,23.53
1,2005,Kaiserslautern,47.0,71.0,8,17,9,-24.0,23.53
2,2006,Cottbus,38.0,49.0,11,15,8,-11.0,32.35
3,2001,St Pauli,37.0,70.0,4,20,10,-33.0,11.76
4,2005,Mainz,46.0,47.0,9,14,11,-1.0,26.47
...,...,...,...,...,...,...,...,...,...
283,2004,Bielefeld,37.0,49.0,11,16,7,-12.0,32.35
284,2004,Werder Bremen,68.0,37.0,18,11,5,31.0,52.94
285,2013,Augsburg,47.0,47.0,15,12,7,0.0,44.12
286,2004,Stuttgart,54.0,40.0,17,10,7,14.0,50.00


In [26]:
#set window partition
window_partition = Window.partitionBy('Season').orderBy(col('WinPercentage').desc(), col('GoalDifferentials').desc() )

#Rank Teams by season
df_ranked = df_processed.withColumn('TeamPosition', rank().over(window_partition))


df_rankP = df_ranked.toPandas()


In [27]:
#Reindexing the columns
df_rankP.columns

Index(['Season', 'Team', 'GoalsScore', 'GoalsAgainst', 'Win', 'Loss', 'Draw',
       'GOalDifferentials', 'WinPercentage', 'TeamPosition'],
      dtype='object')

In [30]:
df_ranked_pandas = df_rankP.reindex(columns=['Team', 'Season', 'TeamPosition', 'GoalsScore', 'GoalsAgainst', 'Win', 'Loss', 'Draw',
       'GOalDifferentials', 'WinPercentage'])
df_ranked_pandas

Unnamed: 0,Team,Season,TeamPosition,GoalsScore,GoalsAgainst,Win,Loss,Draw,GOalDifferentials,WinPercentage
0,Bayern Munich,2000,1,62.0,37.0,19,9,6,25.0,55.88
1,Schalke 04,2000,2,65.0,35.0,18,8,8,30.0,52.94
2,Hertha,2000,3,58.0,52.0,18,14,2,6.0,52.94
3,Leverkusen,2000,4,54.0,40.0,17,11,6,14.0,50.00
4,Dortmund,2000,5,62.0,42.0,16,8,10,20.0,47.06
...,...,...,...,...,...,...,...,...,...,...
283,Hoffenheim,2015,14,39.0,54.0,9,15,10,-15.0,26.47
284,Darmstadt,2015,14,38.0,53.0,9,14,11,-15.0,26.47
285,Ein Frankfurt,2015,16,34.0,52.0,9,16,9,-18.0,26.47
286,Stuttgart,2015,17,50.0,75.0,9,19,6,-25.0,26.47
