### ML with Pyspark
+ classify/predict winning team 

### Data Source
+ https://www.kaggle.com/hikne707/big-five-european-soccer-leagues 

In [1]:
# Load our Packages
from pyspark import SparkContext

In [None]:
#do not want it to use all the cores on my machine so only specified 2
sc = SparkContext(master='local[2]')

In [3]:
sc

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName("Spark_Spain").getOrCreate()

### workflow
* Data Prep
+ Feature Engineering
+ Build Model 
+ Evaluate 

# Task 
+ predict what team will win in the spain league/country based on parameters
+ multi-classification problem win, lose, or tie

In [41]:
# load our dataset other data set that can be used ...
# France_league_V1.csv
# Spain_league_V1.csv
# Germany_league_V1.csv
# Italy_league_V1.csv
df = spark.read.csv("D:/Senior/Capstone/data-science-enviroment/data/Leagues/England_league_V1.csv", header=True, inferSchema=True)

In [42]:
# data prieview
df.show()

+-----+--------------------+------+------+----+-------+---------+---------+---------+---------+---+------------+------------+-------+
|Round|                Date|Team_1|Team_2|Year|Country|FT_Team_1|FT_Team_2|HT_Team_1|HT_Team_2|GGD|Team_1_(pts)|Team_2_(pts)|Outcome|
+-----+--------------------+------+------+----+-------+---------+---------+---------+---------+---+------------+------------+-------+
|    1|(Sat) 19 Aug 1995...|    28|   130|1995|      0|        3|        1|        3|        0|  2|           3|           0|      1|
|    1|(Sat) 19 Aug 1995...|    38|   150|1995|      0|        1|        0|        1|        0|  1|           3|           0|      1|
|    1|(Sat) 19 Aug 1995...|    63|    80|1995|      0|        0|        0|        0|        0|  0|           1|           1|      3|
|    1|(Sat) 19 Aug 1995...|   127|   184|1995|      0|        1|        0|        0|        0|  1|           3|           0|      1|
|    1|(Sat) 19 Aug 1995...|   129|   198|1995|      0|       

In [43]:
print(df.columns)

['Round', 'Date', 'Team_1', 'Team_2', 'Year', 'Country', 'FT_Team_1', 'FT_Team_2', 'HT_Team_1', 'HT_Team_2', 'GGD', 'Team_1_(pts)', 'Team_2_(pts)', 'Outcome']


In [44]:
# check datatypes
# inferschema changes type string to int, creates less work
df.dtypes

[('Round', 'int'),
 ('Date', 'string'),
 ('Team_1', 'int'),
 ('Team_2', 'int'),
 ('Year', 'int'),
 ('Country', 'int'),
 ('FT_Team_1', 'int'),
 ('FT_Team_2', 'int'),
 ('HT_Team_1', 'int'),
 ('HT_Team_2', 'int'),
 ('GGD', 'int'),
 ('Team_1_(pts)', 'int'),
 ('Team_2_(pts)', 'int'),
 ('Outcome', 'int')]

In [45]:
#descriptive summary
print(df.describe().show())

+-------+------------------+--------------------+------------------+------------------+------------------+-------+------------------+-----------------+------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|             Round|                Date|            Team_1|            Team_2|              Year|Country|         FT_Team_1|        FT_Team_2|         HT_Team_1|          HT_Team_2|               GGD|      Team_1_(pts)|      Team_2_(pts)|          Outcome|
+-------+------------------+--------------------+------------------+------------------+------------------+-------+------------------+-----------------+------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|              9408|                9408|              9408|              9408|              9408|   9408|              9408|             9408|              9408|               9408|            

In [46]:
# Value Count (check if dataSet is balance)
df.groupBy('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1| 4374|
|      3| 2413|
|      2| 2621|
+-------+-----+



### Feature Engineering
* Numericcal values
* Vectorization (the process of converting an algorithm from operating on a single value at a time to operating on a set of values (vector) at one time.)
* Scaling

In [47]:
import pyspark.ml
# load Ml packages
from pyspark.ml.feature import VectorAssembler, StringIndexer


In [48]:
# 2nd half goals full time - half time
df= df.withColumn("H2_Team_1", df['FT_Team_1']-df['HT_Team_1'])
df= df.withColumn("H2_Team_2", df['FT_Team_2']-df['HT_Team_2'])

In [49]:
#Rename HT to represent 1/2 halves, sorry for the redundency spark beginner
df= df.withColumnRenamed('HT_Team_1','H1_Team_1')
df= df.withColumnRenamed('HT_Team_2','H1_Team_2')

In [50]:
#Goal difference given, creating + for home win and - for away win
df= df.withColumn('FT_GD', df['FT_Team_1']-df['FT_Team_2'])
df= df.withColumn('H1_GD', df['H1_Team_1']-df['H1_Team_2'])
df= df.withColumn('H2_GD', df['FT_GD']-df['H1_GD'])



In [51]:
# addded colums include: 
#second half goal difference for home and away
#Full time goal differnce (displays negative if away team scores more)
# goals sccored second half 
# I thought about stripping the date in this case but i dont find it nesscary since we already have the year to work with, date will get dropped later on.
print(df.columns)

['Round', 'Date', 'Team_1', 'Team_2', 'Year', 'Country', 'FT_Team_1', 'FT_Team_2', 'H1_Team_1', 'H1_Team_2', 'GGD', 'Team_1_(pts)', 'Team_2_(pts)', 'Outcome', 'H2_Team_1', 'H2_Team_2', 'FT_GD', 'H1_GD', 'H2_GD']


In [52]:
#rearranging column so the output = outcome is at the end
df = df.select('Round', 'Date', 'Team_1', 'Team_2', 'Year', 'Country', 'FT_Team_1', 'FT_Team_2', 'H1_Team_1', 'H1_Team_2', 'GGD', 'Team_1_(pts)', 'Team_2_(pts)', 'H2_Team_1', 'H2_Team_2', 'FT_GD', 'H1_GD', 'H2_GD', 'Outcome')

In [53]:
df.show(5)

+-----+--------------------+------+------+----+-------+---------+---------+---------+---------+---+------------+------------+---------+---------+-----+-----+-----+-------+
|Round|                Date|Team_1|Team_2|Year|Country|FT_Team_1|FT_Team_2|H1_Team_1|H1_Team_2|GGD|Team_1_(pts)|Team_2_(pts)|H2_Team_1|H2_Team_2|FT_GD|H1_GD|H2_GD|Outcome|
+-----+--------------------+------+------+----+-------+---------+---------+---------+---------+---+------------+------------+---------+---------+-----+-----+-----+-------+
|    1|(Sat) 19 Aug 1995...|    28|   130|1995|      0|        3|        1|        3|        0|  2|           3|           0|        0|        1|    2|    3|   -1|      1|
|    1|(Sat) 19 Aug 1995...|    38|   150|1995|      0|        1|        0|        1|        0|  1|           3|           0|        0|        0|    1|    1|    0|      1|
|    1|(Sat) 19 Aug 1995...|    63|    80|1995|      0|        0|        0|        0|        0|  0|           1|           1|        0|     

As you can see home and away team have been label enconded already and converted into numbers(this data was worked on before bringinto jupyter notebook, it is not the raw file, all that is left to do is the date column and as I mention above the initial plan was to drop it but month and year(which we already) can be useful.

In [54]:
# Split the date column to get month, then label encode month
from pyspark.sql.functions import split
df = df.withColumn('Month', split(df['Date'],' ').getItem(2))

In [55]:
# convert the string Month into numbers
# label encoding
monthEncoder = StringIndexer(inputCol='Month',outputCol='Game_Month').fit(df)

In [56]:
df = monthEncoder.transform(df)

In [57]:
df.show(5)

+-----+--------------------+------+------+----+-------+---------+---------+---------+---------+---+------------+------------+---------+---------+-----+-----+-----+-------+-----+----------+
|Round|                Date|Team_1|Team_2|Year|Country|FT_Team_1|FT_Team_2|H1_Team_1|H1_Team_2|GGD|Team_1_(pts)|Team_2_(pts)|H2_Team_1|H2_Team_2|FT_GD|H1_GD|H2_GD|Outcome|Month|Game_Month|
+-----+--------------------+------+------+----+-------+---------+---------+---------+---------+---+------------+------------+---------+---------+-----+-----+-----+-------+-----+----------+
|    1|(Sat) 19 Aug 1995...|    28|   130|1995|      0|        3|        1|        3|        0|  2|           3|           0|        0|        1|    2|    3|   -1|      1|  Aug|       8.0|
|    1|(Sat) 19 Aug 1995...|    38|   150|1995|      0|        1|        0|        1|        0|  1|           3|           0|        0|        0|    1|    1|    0|      1|  Aug|       8.0|
|    1|(Sat) 19 Aug 1995...|    63|    80|1995|      0|

In [58]:
#get encoded labels, annoying since labels are not from 0-11 jan-dec
monthEncoder.labels

['Dec', 'Apr', 'Nov', 'Jan', 'Mar', 'Sep', 'Oct', 'Feb', 'Aug', 'May']

In [59]:
print(df.columns)

['Round', 'Date', 'Team_1', 'Team_2', 'Year', 'Country', 'FT_Team_1', 'FT_Team_2', 'H1_Team_1', 'H1_Team_2', 'GGD', 'Team_1_(pts)', 'Team_2_(pts)', 'H2_Team_1', 'H2_Team_2', 'FT_GD', 'H1_GD', 'H2_GD', 'Outcome', 'Month', 'Game_Month']


In [60]:
required_features = ['Round', 'Team_1', 'Team_2', 'Year', 'Country', 'FT_Team_1', 'FT_Team_2', 'H1_Team_1', 'H1_Team_2', 'GGD', 'Team_1_(pts)', 'Team_2_(pts)', 'H2_Team_1', 'H2_Team_2', 'FT_GD', 'H1_GD', 'H2_GD', 'Game_Month', 'Outcome']

In [61]:
# VectorAssembly
vec_assembler = VectorAssembler(inputCols=required_features,outputCol='features')

In [62]:
#sticking entire feature required in a vector which I will be using to Model 
vec_df = vec_assembler.transform(df)

In [63]:
vec_df.show()

+-----+--------------------+------+------+----+-------+---------+---------+---------+---------+---+------------+------------+---------+---------+-----+-----+-----+-------+-----+----------+--------------------+
|Round|                Date|Team_1|Team_2|Year|Country|FT_Team_1|FT_Team_2|H1_Team_1|H1_Team_2|GGD|Team_1_(pts)|Team_2_(pts)|H2_Team_1|H2_Team_2|FT_GD|H1_GD|H2_GD|Outcome|Month|Game_Month|            features|
+-----+--------------------+------+------+----+-------+---------+---------+---------+---------+---+------------+------------+---------+---------+-----+-----+-----+-------+-----+----------+--------------------+
|    1|(Sat) 19 Aug 1995...|    28|   130|1995|      0|        3|        1|        3|        0|  2|           3|           0|        0|        1|    2|    3|   -1|      1|  Aug|       8.0|[1.0,28.0,130.0,1...|
|    1|(Sat) 19 Aug 1995...|    38|   150|1995|      0|        1|        0|        1|        0|  1|           3|           0|        0|        0|    1|    1|   

### Train, Test, Split

In [64]:
train_df,test_df = vec_df.randomSplit([0.7,0.3])

In [65]:
train_df.count()

6520

### Model Building
+ Pyspark.ml: Dataframe


In [66]:
from pyspark.ml.classification import LogisticRegression 

In [67]:
# logistic model
lr = LogisticRegression(featuresCol='features',labelCol='Outcome')

In [68]:
lr_model = lr.fit(train_df)

In [69]:
y_pred = lr_model.transform(test_df)

In [70]:
y_pred.show()

+-----+--------------------+------+------+----+-------+---------+---------+---------+---------+---+------------+------------+---------+---------+-----+-----+-----+-------+-----+----------+--------------------+--------------------+--------------------+----------+
|Round|                Date|Team_1|Team_2|Year|Country|FT_Team_1|FT_Team_2|H1_Team_1|H1_Team_2|GGD|Team_1_(pts)|Team_2_(pts)|H2_Team_1|H2_Team_2|FT_GD|H1_GD|H2_GD|Outcome|Month|Game_Month|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+------+------+----+-------+---------+---------+---------+---------+---+------------+------------+---------+---------+-----+-----+-----+-------+-----+----------+--------------------+--------------------+--------------------+----------+
|    1|(Fri) 11 Aug 2017...|    26|   124|2017|      0|        4|        3|        2|        2|  1|           3|           0|        2|        1|    1|    0|    1|      1|  Aug|       8.0|[1.0,26.0,124.0,2...|[-

In [71]:
print (y_pred.columns)

['Round', 'Date', 'Team_1', 'Team_2', 'Year', 'Country', 'FT_Team_1', 'FT_Team_2', 'H1_Team_1', 'H1_Team_2', 'GGD', 'Team_1_(pts)', 'Team_2_(pts)', 'H2_Team_1', 'H2_Team_2', 'FT_GD', 'H1_GD', 'H2_GD', 'Outcome', 'Month', 'Game_Month', 'features', 'rawPrediction', 'probability', 'prediction']


In [72]:
y_pred.select('Outcome','rawPrediction', 'probability', 'prediction').show()

+-------+--------------------+--------------------+----------+
|Outcome|       rawPrediction|         probability|prediction|
+-------+--------------------+--------------------+----------+
|      1|[-628.40693778929...|[0.0,1.0,9.626270...|       1.0|
|      2|[-627.79755928057...|[0.0,1.1119943613...|       2.0|
|      1|[-623.80941974546...|[0.0,1.0,5.372925...|       1.0|
|      3|[-628.94281199670...|[0.0,1.1233144221...|       3.0|
|      3|[-625.31596269836...|[0.0,4.1390756847...|       3.0|
|      1|[-625.28852800621...|[0.0,1.0,1.558144...|       1.0|
|      2|[-625.30910218822...|[0.0,3.7223603023...|       2.0|
|      1|[-628.63279495177...|[0.0,1.0,3.951659...|       1.0|
|      2|[-628.71020253365...|[0.0,1.8834591570...|       2.0|
|      2|[-628.35420647851...|[0.0,4.6349473246...|       2.0|
|      3|[-624.64852986195...|[0.0,1.9497280301...|       3.0|
|      2|[-624.69159778021...|[0.0,1.4270148594...|       2.0|
|      2|[-624.69950109315...|[0.0,3.1770200805...|    

### Model Evaluation

In [73]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [74]:
#check accuracy
multi_eval = MulticlassClassificationEvaluator(labelCol='Outcome',metricName='accuracy')

In [75]:
multi_eval.evaluate(y_pred)

0.9996537396121884