In [1]:
import pyspark

In [2]:
# Importing various libraries

import numpy as np # For creating n-dimensional array object
import seaborn as sns  # For making statistical graphics
import matplotlib.pyplot as plt  # Plotting the graphs
import pandas as pd # For creating array 

In [3]:
#Spark session creation for providing the single point entry to interact with underlying spark functionalities and allows programming-
# Spark with dataframe and dataset APIs

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark on European football") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [4]:
# importing the data type
from pyspark.sql.types import *

# Creating Spark context.It is the entry gate of Apache Spark functionalities.
sc = spark.sparkContext

In [5]:

# Read the CSV file name 'Events.csv' having detail of events such as shot_place, shot_outcome ,is_game etc.
# These details will be further used in visualization.

eventsDf = spark.read.options(header="true",\
                              inferSchema="true",\
                              nullValue="NA",\
                              mode="failfast")\
                             .csv("/FileStore/tables/events.csv")

In [6]:
eventsDf.show(5)

In [7]:
# Counting the number of Rows and columns in dataset.

print('Number of rows', eventsDf.count(), ",and", len(eventsDf.columns), 'columns' )

In [8]:
# Printing the data schema

eventsDf.printSchema()

In [9]:
# Getting number of partitions in event dataset.
# RDDs are immutable, we can not change the partition of RDDs. 
# New RDDs can be create with the desired number of partitions

eventsDf.rdd.getNumPartitions()

In [10]:
eventsDf.show(1)

In [11]:
# Verifying that null values are present in the required variables or not.

from pyspark.sql.functions import isnan, when, count, col

# Creating temporary data files. Transfering all columns in temporary data file.

tempDF = eventsDf[['player', 'event_team', 'opponent', 'event_type', 'event_type2', 'shot_place', 
                   'shot_outcome', 'location', 'bodypart', 'assist_method', 'situation']]

# Doing alliasing of temporary schema 

tempDF.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in tempDF.columns]).show()

In [12]:
# Standrd feature for filling 'NA' values.

eventsDf = eventsDf.na.fill({'player': 'NA', 'event_team': 'NA', 'opponent': 'NA', 'event_type': 'NA', 'event_type2': 'NA', 'shot_place': 'NA',
                             'shot_outcome': 'NA', 'location': 'NA', 'bodypart': 'NA','assist_method': 'NA', 'situation': 'NA'}) 


In [13]:
# Reading the Game data set.

gameDf = spark.read.options(header="true",\
                              inferSchema="true",\
                              nullValue="NA",\
                              mode="failfast")\
                             .csv("/FileStore/tables/ginf.csv")

In [14]:
# Repeating Above procedure on Game CSV file.

In [15]:
# Printing the CSV file

gameDf.printSchema()

In [16]:
# Getting number of partitions in event dataset.
# RDDs are immutable, we can not change the partition of RDDs. 
# New RDDs can be create with the desired number of partitions


gameDf.rdd.getNumPartitions()

In [17]:
# Printing the Game data frame

gameDf.show(5)

In [18]:
# Taking Desired Columns

gameDf = gameDf[['id_odsp', 'country', 'adv_stats', 'date', 'league', 'season']]

In [19]:
gameDf.show(5)

In [20]:
gameDf.select([count(when(col(c).isNull(), c)).alias(c) for c in gameDf.columns]).show()

In [21]:
# Dictionary.txt file :- 
# event_type
# 0	Announcement
# 1	Attempt
# 2	Corner
# 3	Foul
# 4	Yellow card
# 5	Second yellow card
# 6	Red card
# 7	Substitution
# 8	Free kick won
# 9	Offside
# 10	Hand ball
# 11	Penalty conceded

# In below code Dictionary.txt file had been converted into Python Dictionaries.


In [22]:
#Converting Text file into Python Dictionaries

# Mapping Various events of football
evtTypeMap = {0:'Announcement', 1:'Attempt', 2:'Corner', 3:'Foul', 4:'Yellow card', 5:'Second yellow card', 6:'Red card', 7:'Substitution', 8:'Free kick won', 9:'Offside', 10:'Hand ball', 11:'Penalty conceded'}

evtTyp2Map = {12:'Key Pass', 13:'Failed through ball', 14:'Sending off', 15:'Own goal'}

sideMap = {1:'Home', 2:'Away'}

#  Type of football kick
shotPlaceMap = {1:'Bit too high', 2:'Blocked', 3:'Bottom left corner', 4:'Bottom right corner', 5:'Centre of the goal', 6:'High and wide', 7:'Hits the bar', 8:'Misses to the left', 9:'Misses to the right', 10:'Too high', 11:'Top centre of the goal', 12:'Top left corner', 13:'Top right corner'}

# Mapping of type of shot
shotOutcomeMap = {1:'On target', 2:'Off target', 3:'Blocked', 4:'Hit the bar'}

# Location form where football had been kicked 
locationMap = {1:'Attacking half', 2:'Defensive half', 3:'Centre of the box', 4:'Left wing', 5:'Right wing', 6:'Difficult angle and long range', 7:'Difficult angle on the left', 8:'Difficult angle on the right', 9:'Left side of the box', 10:'Left side of the six yard box', 11:'Right side of the box', 12:'Right side of the six yard box', 13:'Very close range', 14:'Penalty spot', 15:'Outside the box', 16:'Long range', 17:'More than 35 yards', 18:'More than 40 yards', 19:'Not recorded'}

# Which body part had been used, while kicking the football
bodyPartMap = {1:'Right foot', 2:'Left foot', 3:'Head'}

# Mapping of type of football kick
assistMethodMap = {0:'None', 1:'Pass', 2:'Cross', 3:'Headed pass', 4:'Through ball'}

# Mapping of Different football playgorund situations
situationMap = {1:'Open play', 2:'Set piece', 3:'Corner', 4:'Free kick'}

# Given Code names to the countries that are taking participation in Football leagus matches
countryCodeMap = {'germany':'DEU', 'france':'FRA', 'england':'GBR', 'spain':'ESP', 'italy':'ITA'}

In [23]:
# Counting Number of Event types from 'eventsDf' file

eventsDf.groupBy("event_type").count().show()

In [24]:
# Defining the UDF for mapping keys to value in dictionary
# UDFs work in a similar way as the pandas . map()

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def translateKeys(mapping):
    def translateKeys_(col):
        return mapping.get(col)
    return udf(translateKeys_, StringType())

In [25]:
#Creating country code for quick country column access and country name errors 

gameDf = gameDf.withColumn("country_code", translateKeys(countryCodeMap)("country"))
display(gameDf['id_odsp','country','country_code'])

id_odsp,country,country_code
UFot0hit/,germany,DEU
Aw5DflLH/,germany,DEU
bkjpaC6n/,germany,DEU
CzPV312a/,france,FRA
GUOdmtII/,france,FRA
lOpzwMkp/,germany,DEU
M7PhlM2C/,france,FRA
QuWqjrYa/,france,FRA
UBZQ4smg/,france,FRA
Wn69eU5B/,germany,DEU


In [26]:
## Add additional columns with the corresponding dict type name
# Performing a lookups using UDFs and joining Dataframes

eventsDf = (
             eventsDf.
             withColumn("event_type_str", translateKeys(evtTypeMap)("event_type")).
             withColumn("event_type2_str", translateKeys(evtTyp2Map)("event_type2")).
             withColumn("side_str", translateKeys(sideMap)("side")).
             withColumn("shot_place_str", translateKeys(shotPlaceMap)("shot_place")).
             withColumn("shot_outcome_str", translateKeys(shotOutcomeMap)("shot_outcome")).
             withColumn("location_str", translateKeys(locationMap)("location")).
             withColumn("bodypart_str", translateKeys(bodyPartMap)("bodypart")).
             withColumn("assist_method_str", translateKeys(assistMethodMap)("assist_method")).
             withColumn("situation_str", translateKeys(situationMap)("situation"))
           )

In [27]:
# See if all column for corresponding dictionary mapping generated with string sort.

# eventsDf.show(5)
eventsDf.printSchema()

In [28]:
# Verifying whether core values properly expressed or not

eventsDf.groupBy("event_type_str").count().show()

In [29]:
# Creating final data frame by adding Game and Event data frame.

finalDf = (
  eventsDf.join(gameDf, eventsDf.id_odsp == gameDf.id_odsp, 'inner').
  select(eventsDf.id_odsp, eventsDf.id_event, eventsDf.sort_order, eventsDf.time, eventsDf.event_type, 
         eventsDf.event_type_str, eventsDf.event_type2, eventsDf.event_type2_str, eventsDf.side, 
         eventsDf.side_str, eventsDf.event_team, eventsDf.opponent, eventsDf.player, eventsDf.player2, 
         eventsDf.player_in, eventsDf.player_out, eventsDf.shot_place, eventsDf.shot_place_str, 
         eventsDf.shot_outcome, eventsDf.shot_outcome_str, eventsDf.is_goal, eventsDf.location, 
         eventsDf.location_str, eventsDf.bodypart, eventsDf.bodypart_str, eventsDf.assist_method,
         eventsDf.assist_method_str, eventsDf.situation, eventsDf.situation_str, gameDf.date, gameDf.league,
         gameDf.season, gameDf.country, gameDf.country_code, gameDf.adv_stats)
)

In [30]:
finalDf.printSchema()

In [31]:
#QuantileDiscretizer takes a column with continuous features and outputs a column.
from pyspark.ml.feature import QuantileDiscretizer

# The bin ranges are chosen based on quantiles, that is, each bin will have the same number of observations.
finalDf = QuantileDiscretizer(numBuckets=10, inputCol="time", outputCol="time_bin").fit(finalDf).transform(finalDf)

In [32]:
# Checking the Null values if Dataset has any.

finalDf.select([count(when(col(c).isNull(), c)).alias(c) for c in finalDf.columns]).show()

In [33]:
# Handling the null values 

finalDf = finalDf.na.fill({'player': 'NA', 'event_team': 'NA', 'opponent': 'NA', 
                             'event_type_str': 'NA', 'event_type2_str': 'NA', 'shot_place_str': 'NA', 
                             'shot_outcome_str': 'NA', 'location_str': 'NA', 'bodypart_str': 'NA', 
                             'assist_method_str': 'NA', 'situation_str': 'NA'})

In [34]:
finalDf.rdd.getNumPartitions()

In [36]:
# Accessing the SQL Temporary Dataframe
finalDf.createOrReplaceTempView("soccer")

# Using SQL query for getting the total number of Goals at Particular Shot Place  

que1 = spark.sql("SELECT CASE WHEN shot_place_str == 'NA' THEN 'Unknown' ELSE shot_place_str END shot_place,\
                    COUNT(1) AS TOT_GOALS \
                    FROM soccer WHERE is_goal = 1 \
                    GROUP BY shot_place_str \
                    ORDER BY TOT_GOALS DESC")
que1.show()

In [37]:
# By looking at above data, we can clearly infer that Maixmum number of goals had been done from 'Bottom Left corner'

In [38]:
display(que1)

shot_place,TOT_GOALS
Bottom left corner,7212
Bottom right corner,6932
Centre of the goal,4446
Top right corner,2157
Top left corner,2023
Unknown,1676


In [39]:

# SQL query for selecting countries that had done maximum Goals 

que2 = spark.sql("SELECT country, COUNT(1) AS TOT_GOALS FROM soccer WHERE is_goal = 1 GROUP BY country\
                    ORDER BY 2 DESC")
que2.show()

In [40]:
# Plot above result in pie chart
display(que2)

country,TOT_GOALS
spain,5583
italy,5491
france,5199
germany,4621
england,3552


In [41]:
# By Looking at pie chart and table , we can say that 'Spain' had done the maximum Number of Goals in league Matches

In [42]:
# SQL query for selecting time and maximum number of goals. We had set the limit to 10 records only.

que3 = spark.sql("SELECT time, count(1) as num_of_goals FROM soccer WHERE is_goal = 1 GROUP BY time ORDER BY 2 DESC LIMIT 10")
                 
que3.show()

In [43]:
# Plot above result
res3 = que3.toPandas()
plot3 = res3.plot.bar(x='time', y='num_of_goals', figsize=(20,5))
display(plot3)

In [44]:
# Maximum number of goals had been done in 90th minute followed by 45th minute.

In [45]:
# Selecting the columns Time and Num_of_yellow_cards for getting the desired results

que4 = spark.sql("SELECT time, count(1) as num_of_yellow_cards FROM soccer \
                WHERE event_type_str == 'Yellow card' OR event_type_str == 'Second yellow card' \
                GROUP BY time ORDER BY 2 DESC LIMIT 10")
                 
que4.show()

In [46]:
# Plotting the bar graph using above results

res4 = que4.toPandas()
plot4 = res4.plot.bar(x='time', y='num_of_yellow_cards', figsize=(18,7))
display(plot4)

In [47]:
# By looking at Graph, we can say that Maximum number of Yellow cards had been given in 90th Minute.

# We can also illustrate that Maximum number of goals and maximum number of yellow cards had been given on same time frame i.e. 90th Minute. 


In [48]:
# Selecting the Time and Maximum number of Red cards columns from Final Data file.  

que5 = spark.sql("SELECT time, count(1) as num_of_red_card FROM soccer \
                WHERE event_type_str == 'Red card' \
                GROUP BY time ORDER BY 2 DESC LIMIT 10")
                 
que5.show()

In [49]:
res5 = que5.toPandas()

plot5 = res5.plot.bar(x='time', y='num_of_red_card', figsize=(18,7))

display(plot5)

In [50]:
# In 90th Minute maximum number of red card had been given and it is followed by 82th minute for maximum red cards.

In [51]:
#  We will use GTB classifier as machine learning algorithm.

#  GBT is a supervised machine learning algorithm for classiﬁcation. This constructs the model in a phase-by-phase way like many other boosting techniques #  and generalizes it through optimization.

In [52]:
# Importing necessary libraries for  GBT Classification model

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [53]:

# Build a categorical characteristic list
categFeatures = ["event_type_str", "event_team", "shot_place_str", "location_str", "assist_method_str", "situation_str", "country_code"]

# Encode the string cols for categorical mark indices
stringIndexers = [StringIndexer().setInputCol(baseFeature).setOutputCol(baseFeature + "_idx") for baseFeature in categFeatures]

# Used to transform indexes to binary vectors. 
encoders = [OneHotEncoder().setInputCol(baseFeature + "_idx").setOutputCol(baseFeature + "_vec") for baseFeature in categFeatures]


In [54]:
# Combine all columns into a single feature vector
featureAssembler = VectorAssembler()
featureAssembler.setInputCols([baseFeature + "_vec" for baseFeature in categFeatures])
featureAssembler.setOutputCol("features")

In [55]:
from pyspark.ml.classification import GBTClassifier


gbtClassifier = GBTClassifier(labelCol="is_goal", featuresCol="features", maxDepth=5, maxIter=20)

pipelineStages = stringIndexers + encoders + [featureAssembler, gbtClassifier]
pipeline = Pipeline(stages=pipelineStages)


# (trainingData, testData) = joinedDf.randomSplit([0.75, 0.30])
# eventsDf
(trainingData, testData) = finalDf.randomSplit([0.70, 0.30])

In [56]:
# model = pipeline.fit(trainingData)
#Is_goal in events_df

model = pipeline.fit(trainingData)

In [57]:
# Validate the model on test data, display predictions
predictions = model.transform(testData)
display(predictions.select("prediction", "is_goal", "features"))

prediction,is_goal,features
0.0,0,"List(0, 195, List(0, 15, 151, 168, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(2, 15, 152, 167, 186, 188, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(2, 12, 153, 175, 184, 188, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(6, 12, 151, 164, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(1, 15, 151, 164, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(0, 12, 151, 165, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(5, 12, 151, 164, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(2, 15, 152, 167, 185, 188, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(1, 12, 151, 164, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(0, 15, 151, 168, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"


In [58]:
evaluator = BinaryClassificationEvaluator(
    labelCol="is_goal", rawPredictionCol="prediction")
evaluator.evaluate(predictions)

In [59]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'is_goal', maxIter=10)

pipelineStages1 = stringIndexers + encoders + [featureAssembler, lr]
pipeline1 = Pipeline(stages=pipelineStages)


In [60]:
(trainingData, testData) = finalDf.randomSplit([0.70, 0.30])

In [61]:
lrModel = pipeline1.fit(trainingData)

In [62]:
predictions1 = lrModel.transform(testData)
display(predictions1.select("prediction", "is_goal", "features"))

prediction,is_goal,features
0.0,0,"List(0, 195, List(5, 12, 151, 164, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(2, 12, 158, 177, 184, 188, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,1,"List(0, 195, List(2, 12, 162, 167, 185, 189, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(1, 12, 151, 164, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(2, 16, 152, 167, 186, 188, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(3, 16, 151, 164, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
1.0,1,"List(0, 195, List(2, 16, 156, 173, 183, 188, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(4, 12, 151, 164, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(1, 12, 151, 164, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,0,"List(0, 195, List(1, 12, 151, 164, 183, 187, 191), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"


In [63]:
# Evaluating logistic regression model
evaluator = BinaryClassificationEvaluator(
    labelCol="is_goal", rawPredictionCol="prediction")
evaluator.evaluate(predictions1)