# Supervised Machine Learning Project with PySpark
## - Black Belts Team

We will be using Player Attribute Table from the European Player Database for this Project. This dataset has 183,978 rows and 43 columns.

Learning Outcomes:
1. Obtain data using PySpark
2. Clean data using PySpark
3. Data Exploration using PySpark
4. Model Building with PySpark
5. Model Evaluation with PySpark

In [None]:
import warnings
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

import pyspark
from pyspark.sql import functions as F
from pyspark.sql import types

from pyspark.ml.feature import Imputer, VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor, DecisionTreeRegressor, GBTRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

warnings.filterwarnings('ignore')

## Connect to the Spark server

We will be using the SparkSession (`spark`) to access our spark cluster..

In [None]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

## Obtain the Data

We connect to the database and get all available tables

In [None]:
import sqlite3
import matplotlib.pyplot as plt

database = 'database.sqlite'
conn = sqlite3.connect(database)

tables = pd.read_sql("""SELECT *
                        FROM sqlite_master
                        WHERE type='table';""", conn)
tables

Unnamed: 0,type,name,tbl_name,rootpage,sql
0,table,sqlite_sequence,sqlite_sequence,4,"CREATE TABLE sqlite_sequence(name,seq)"
1,table,Player_Attributes,Player_Attributes,11,"CREATE TABLE ""Player_Attributes"" (\n\t`id`\tIN..."
2,table,Player,Player,14,CREATE TABLE `Player` (\n\t`id`\tINTEGER PRIMA...
3,table,Match,Match,18,CREATE TABLE `Match` (\n\t`id`\tINTEGER PRIMAR...
4,table,League,League,24,CREATE TABLE `League` (\n\t`id`\tINTEGER PRIMA...
5,table,Country,Country,26,CREATE TABLE `Country` (\n\t`id`\tINTEGER PRIM...
6,table,Team,Team,29,"CREATE TABLE ""Team"" (\n\t`id`\tINTEGER PRIMARY..."
7,table,Team_Attributes,Team_Attributes,2,CREATE TABLE `Team_Attributes` (\n\t`id`\tINTE...


We Select the Dataset Table (Player_Attributes) and save it as a CSV file to be used in spark

In [None]:
df = pd.read_sql("""SELECT * FROM Player_Attributes;""", conn)
df.to_csv("Player_Attributes.csv")

Read the dataset using spark

In [None]:
data = spark.read.csv("Player_Attributes.csv",
                     sep=',',
                     inferSchema=True,
                     header=True,
                     multiLine=True)

data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- player_fifa_api_id: integer (nullable = true)
 |-- player_api_id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- overall_rating: double (nullable = true)
 |-- potential: double (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: double (nullable = true)
 |-- finishing: double (nullable = true)
 |-- heading_accuracy: double (nullable = true)
 |-- short_passing: double (nullable = true)
 |-- volleys: double (nullable = true)
 |-- dribbling: double (nullable = true)
 |-- curve: double (nullable = true)
 |-- free_kick_accuracy: double (nullable = true)
 |-- long_passing: double (nullable = true)
 |-- ball_control: double (nullable = true)
 |-- acceleration: double (nullable = true)
 |-- sprint_speed: double (nullable = true)
 |-- agility: double (nullable = tru

Now lets see the shape of the data

In [None]:
print((data.count(), len(data.columns)))

(183978, 43)


## Data Cleaning


Drop useless columns

In [None]:
# these columns are useless to us, drop them
drop_cols = ['_c0', 'id','player_fifa_api_id','player_api_id']

data = data.drop(*drop_cols)

Drop Missing Data

In [None]:
data = data.na.drop()

Lets see the effect of the previous steps on the shape of the dataset

In [None]:
print((data.count(), len(data.columns)))

(180354, 39)


We don't need the full Timestamp, so we will add to columns for the year and month. Then delete the date column

In [None]:
data = data.withColumn('year', F.year(F.col('date')))
data = data.withColumn('month', F.month(F.col('date')))

data = data.drop('date')

### Descriptive Statistics
We are looking for the summary for some of the imoortant numerical columns

In [None]:
data.select('overall_rating', 'potential', 'finishing','crossing','balance','vision','strength','year').summary().show()

+-------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+
|summary|   overall_rating|        potential|        finishing|         crossing|           balance|            vision|          strength|              year|
+-------+-----------------+-----------------+-----------------+-----------------+------------------+------------------+------------------+------------------+
|  count|           180354|           180354|           180354|           180354|            180354|            180354|            180354|            180354|
|   mean|68.63531720948801|73.47945706776673|49.96213557780809|55.14207059449749| 65.19008172815684| 57.86817592068931| 67.43247723920733|2012.5871175576922|
| stddev|7.027950024610432|6.581962574918881|19.04176034040408|17.24723051650521|13.076191953149287|15.152408257341522|12.085131456050377|2.5694132962377934|
|    min|             33.0|             39.0|       

## Data Exploration


looking for the preferred_foot column distribution

In [None]:
data.registerTempTable('data')

state_counts = spark.sql(r"""SELECT preferred_foot, COUNT(preferred_foot) AS total 
                                     FROM data 
                                     GROUP BY preferred_foot 
                                     ORDER BY total desc """)
state_counts.show()

+--------------+------+
|preferred_foot| total|
+--------------+------+
|         right|136247|
|          left| 44107|
+--------------+------+



looking for the attacking_work_rate column distribution

In [None]:
data.registerTempTable('data')

state_counts = spark.sql(r"""SELECT attacking_work_rate, COUNT(attacking_work_rate) AS total 
                                     FROM data 
                                     GROUP BY attacking_work_rate 
                                     ORDER BY total desc """)
state_counts.show()

+-------------------+------+
|attacking_work_rate| total|
+-------------------+------+
|             medium|125070|
|               high| 42823|
|                low|  8569|
|               None|  3317|
|               norm|   317|
|                  y|    94|
|               stoc|    86|
|                 le|    78|
+-------------------+------+



looking for the defensive_work_rate column distribution

In [None]:
data.registerTempTable('data')

state_counts = spark.sql(r"""SELECT defensive_work_rate, COUNT(defensive_work_rate) AS total 
                                     FROM data 
                                     GROUP BY defensive_work_rate 
                                     ORDER BY total desc """)
state_counts.show()

+-------------------+------+
|defensive_work_rate| total|
+-------------------+------+
|             medium|130846|
|               high| 27041|
|                low| 18432|
|                  o|  1328|
|                  1|   421|
|                  2|   334|
|              ormal|   317|
|                  3|   243|
|                  5|   231|
|                  7|   207|
|                  0|   188|
|                  6|   179|
|                  9|   143|
|                  4|   116|
|                 es|    94|
|              tocky|    86|
|                ean|    78|
|                  8|    70|
+-------------------+------+



## Data Preparation for ML

### Imputation

Then we will encode all the categorical columns using StringIndexer and drop the original columns.

In [None]:
cat_cols = ['preferred_foot', 'defensive_work_rate', 'attacking_work_rate']

for col in cat_cols:
    indexer = StringIndexer(inputCol=col, outputCol=col+'_idx')
    data = indexer.fit(data).transform(data)
    
data = data.drop(*cat_cols)

### Combining Feature Columns


In [None]:
cols1 = data.columns
cols1.remove('overall_rating') #remove overall_rating -> we need this to be our label

assembler = VectorAssembler(inputCols=cols1, outputCol='features')

data1 = assembler.transform(data)

### Set columns and Split

Before modelling, the data are split into train and test data sets. We will make the train set bigger at this point as will incorporate cross validation later on.

In [None]:
# We have created a new dataframe only consisting of the features column and the label column (actually price column but renamed)
df_data = data1.select(F.col('features'), F.col('overall_rating').alias('label'))

df_train, df_test = df_data.randomSplit([0.75, 0.25])

## Model Building


### Initialize Evaluator and Grid
As we are dealing with continuous values, we will be using Regressors to be trained on the data and then be used in prediction. Accordingly, we will have to use Regression Evaluator to evaluate all the Regressors we will be using.

In [None]:
evaluator = RegressionEvaluator() # Can specify what metrics we want to use. Default metric is Root Mean Squared Error (RMSE)
grid = ParamGridBuilder().build()

In [None]:
#Random Forest Regressor
classifier_rf = RandomForestRegressor(featuresCol='features', labelCol='label')
cv_rf = CrossValidator(estimator=classifier_rf, evaluator=evaluator, estimatorParamMaps=grid, numFolds=10)
cv_model_rf = cv_rf.fit(df_train)

In [None]:
#Gradient Boosted Tree Regressor
classifier_gbt = GBTRegressor(featuresCol="features", labelCol='label', maxIter=10)
cv_gbt = CrossValidator(estimator=classifier_gbt, evaluator=evaluator, estimatorParamMaps=grid, numFolds=10)
cv_model_gbt = cv_gbt.fit(df_train)

In [None]:
#Decision Tree Regressor
classifier_dt = DecisionTreeRegressor(featuresCol="features", labelCol='label')
cv_dt = CrossValidator(estimator=classifier_dt, evaluator=evaluator, estimatorParamMaps=grid, numFolds=10)
cv_model_dt = cv_dt.fit(df_train)

In [None]:
#Linear Regression
classifier_lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)
cv_lr = CrossValidator(estimator=classifier_lr, evaluator=evaluator, estimatorParamMaps=grid, numFolds=10)
cv_model_lr = cv_lr.fit(df_train)

### Evaluation

We will now get the average metrics of all models created by the regressors in the last step. And we will use the best model from each of the cross validated regressors to make predictions on the testing set. Lastly, this will all be presented in a dataframe for us to compare.

In [None]:
metrics = []
models = [cv_model_rf, cv_model_gbt, cv_model_dt, cv_model_lr]

for model in models:
    metrics.append(model.avgMetrics)
print (metrics)

for idx, model in enumerate(models): 
    metrics[idx].append(RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='r2').evaluate(model.bestModel.transform(df_test)))
    metrics[idx].append(RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='rmse').evaluate(model.bestModel.transform(df_test)))
    metrics[idx].append(RegressionEvaluator(predictionCol='prediction', labelCol='label', metricName='mae').evaluate(model.bestModel.transform(df_test)))

df = pd.DataFrame(metrics, index = ['Random Forest Regressor', 'Gradient Boosted Tree Regressor', 'Decision Tree Regressor', 'Linear Regression'], columns=['Average Metrics (CV)', 'Best Model R2 on Test Set', 'Best Model RMSE on Test Set', 'Best Model MAE on Test Set'])

df

[[2.7407853093463674], [2.512697899440387], [3.336886836746062], [2.9751355333918026]]


Unnamed: 0,Average Metrics (CV),Best Model R2 on Test Set,Best Model RMSE on Test Set,Best Model MAE on Test Set
Random Forest Regressor,2.740785,0.848606,2.731802,1.942337
Gradient Boosted Tree Regressor,2.512698,0.871742,2.514421,1.887638
Decision Tree Regressor,3.336887,0.774089,3.337055,2.470341
Linear Regression,2.975136,0.820623,2.973568,2.249735
