# Pyspark Experimentation - Data Cleaning and Analysis


# Importing all Libararies

In [63]:
from pyspark.sql import SparkSession

In [67]:
import findspark 

In [76]:
import findspark
from pyspark import SparkContext
from pyspark.sql import Window, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt

In [9]:
pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Note: you may need to restart the kernel to use updated packages.


# Starting Spark Session

In [64]:
spark = SparkSession.builder.appName('Proto').getOrCreate()

In [75]:
spark

# Importing CSV Data and converting to Dataframe

In [68]:
##
def loaddata(filename):
    df = spark.read.format('csv').options(header=True, inferSchema = True).load(filename)
    return df

df_matches = loaddata('./Matches.csv')
df_matches.show()

+--------+---+------+-------------------+------------------+--------------+----+----+---+
|Match_ID|Div|Season|               Date|          HomeTeam|      AwayTeam|FTHG|FTAG|FTR|
+--------+---+------+-------------------+------------------+--------------+----+----+---+
|       1| D2|  2009|2010-04-04 00:00:00|        Oberhausen|Kaiserslautern|   2|   1|  H|
|       2| D2|  2009|2009-11-01 00:00:00|       Munich 1860|Kaiserslautern|   0|   1|  A|
|       3| D2|  2009|2009-10-04 00:00:00|     Frankfurt FSV|Kaiserslautern|   1|   1|  D|
|       4| D2|  2009|2010-02-21 00:00:00|     Frankfurt FSV|     Karlsruhe|   2|   1|  H|
|       5| D2|  2009|2009-12-06 00:00:00|             Ahlen|     Karlsruhe|   1|   3|  A|
|       6| D2|  2009|2010-04-03 00:00:00|      Union Berlin|     Karlsruhe|   1|   1|  D|
|       7| D2|  2009|2009-08-14 00:00:00|         Paderborn|     Karlsruhe|   2|   0|  H|
|       8| D2|  2009|2010-03-08 00:00:00|         Bielefeld|     Karlsruhe|   0|   1|  A|
|       9|

In [69]:
##Renaming Columns

old_cols = df_matches.columns[-3:]
new_cols = ["HomeTeamGoals", "AwayTeamGoals", "FinalResult"]

old_new_cols = [*zip(old_cols, new_cols)]
for old_col, new_col in old_new_cols:
    df_matches = df_matches.withColumnRenamed(old_col, new_col)

df_matches.limit(5).toPandas()

Unnamed: 0,Match_ID,Div,Season,Date,HomeTeam,AwayTeam,HomeTeamGoals,AwayTeamGoals,FinalResult
0,1,D2,2009,2010-04-04,Oberhausen,Kaiserslautern,2,1,H
1,2,D2,2009,2009-11-01,Munich 1860,Kaiserslautern,0,1,A
2,3,D2,2009,2009-10-04,Frankfurt FSV,Kaiserslautern,1,1,D
3,4,D2,2009,2010-02-21,Frankfurt FSV,Karlsruhe,2,1,H
4,5,D2,2009,2009-12-06,Ahlen,Karlsruhe,1,3,A


# Who are the winners of the D1 division in the Germany Football Association (Bundesliga) between 2000–2010?


In [70]:
df_matches = df_matches \
    .withColumn('HomeTeamWin', when(col('FinalResult') == 'H', 1).otherwise(0)) \
    .withColumn('AwayTeamWin', when(col('FinalResult') == 'A', 1).otherwise(0)) \
    .withColumn('GameTie', when(col('FinalResult') == 'D', 1).otherwise(0))


#bundesliga is a D1 division and we are interested in season <= 2010 and >= 2000
bundesliga = df_matches \
                    .filter((col('Season') >= 2000) & 
                            (col('Season') <= 2010) & 
                            (col('Div') == 'D1'))

# home team features
home = bundesliga.groupby('Season', 'HomeTeam') \
       .agg(sum('HomeTeamWin').alias('TotalHomeWin'),
            sum('AwayTeamWin').alias('TotalHomeLoss'),
            sum('GameTie').alias('TotalHomeTie'),
            sum('HomeTeamGoals').alias('HomeScoredGoals'),
            sum('AwayTeamGoals').alias('HomeAgainstGoals')) \
       .withColumnRenamed('HomeTeam', 'Team')

#away game features 
away =  bundesliga.groupby('Season', 'AwayTeam') \
       .agg(sum('AwayTeamWin').alias('TotalAwayWin'),
            sum('HomeTeamWin').alias('TotalAwayLoss'),
            sum('GameTie').alias('TotalAwayTie'),
            sum('AwayTeamGoals').alias('AwayScoredGoals'),
            sum('HomeTeamGoals').alias('AwayAgainstGoals'))  \
       .withColumnRenamed('AwayTeam', 'Team')

In [72]:
window = ['Season']
window = Window.partitionBy(window).orderBy(col('WinPct').desc(), col('GoalDifferentials').desc())
table = home.join(away, ['Team', 'Season'],  'inner') \
    .withColumn('GoalsScored', col('HomeScoredGoals') + col('AwayScoredGoals')) \
    .withColumn('GoalsAgainst', col('HomeAgainstGoals') + col('AwayAgainstGoals')) \
    .withColumn('GoalDifferentials', col('GoalsScored') - col('GoalsAgainst')) \
    .withColumn('Win', col('TotalHomeWin') + col('TotalAwayWin')) \
    .withColumn('Loss', col('TotalHomeLoss') + col('TotalAwayLoss')) \
    .withColumn('Tie', col('TotalHomeTie') + col('TotalAwayTie')) \
    .withColumn('WinPct', round((100* col('Win')/(col('Win') + col('Loss') + col('Tie'))), 2)) \
    .drop('HomeScoredGoals', 'AwayScoredGoals', 'HomeAgainstGoals', 'AwayAgainstGoals') \
    .drop('TotalHomeWin', 'TotalAwayWin', 'TotalHomeLoss', 'TotalAwayLoss', 'TotalHomeTie', 'TotalAwayTie') \
    .withColumn('TeamPosition', rank().over(window)) 

table_df = table.filter(col('TeamPosition') == 1).orderBy(asc('Season')).toPandas()
table_df

Unnamed: 0,Team,Season,GoalsScored,GoalsAgainst,GoalDifferentials,Win,Loss,Tie,WinPct,TeamPosition
0,Bayern Munich,2000,62,37,25,19,9,6,55.88,1
1,Leverkusen,2001,77,38,39,21,7,6,61.76,1
2,Bayern Munich,2002,70,25,45,23,5,6,67.65,1
3,Werder Bremen,2003,79,38,41,22,4,8,64.71,1
4,Bayern Munich,2004,75,33,42,24,5,5,70.59,1
5,Bayern Munich,2005,67,32,35,22,3,9,64.71,1
6,Stuttgart,2006,61,37,24,21,6,7,61.76,1
7,Bayern Munich,2007,68,21,47,22,2,10,64.71,1
8,Wolfsburg,2008,80,41,39,21,7,6,61.76,1
9,Bayern Munich,2009,72,31,41,20,4,10,58.82,1


# Conclusion