**Create a spark session and load the FIFA Pleyers Data set**

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('FifaScore').getOrCreate()

In [3]:
df20 = spark.read.csv('/FileStore/tables/players_20.csv',inferSchema=True,header=True)
df20.printSchema()


## Data Visualization

#### Top 5 Countries with Most Number of Players rated 85+ (BEST RATED INTERNATIONAL TEAMS ON FIFA 20)

In [6]:
%sql

SELECT nationality, COUNT(overall) as Number_of_Players
FROM fifa20_players
WHERE overall BETWEEN 85 AND 95
GROUP BY nationality
ORDER BY Number_of_Players DESC
LIMIT 5

nationality,Number_of_Players
Spain,14
Brazil,14
France,12
Germany,9
Belgium,8


#### Top 5 Clubs with Most Number of Players rated 85+ (BEST RATED CLUBS ON FIFA 20)

In [8]:
%sql

SELECT club, COUNT(overall) as Number_of_Players
FROM fifa20_players
WHERE overall BETWEEN 85 AND 95
GROUP BY club
ORDER BY Number_of_Players DESC
LIMIT 5

club,Number_of_Players
Real Madrid,14
FC Barcelona,12
Juventus,10
Manchester City,10
FC Bayern München,8


#### Top 5 Most Valuable Players (in Euros)

In [10]:
%sql

SELECT short_name, value_eur
FROM fifa20_players
ORDER BY value_eur DESC
LIMIT 5

short_name,value_eur
Neymar Jr,105500000
L. Messi,95500000
K. Mbappé,93500000
K. De Bruyne,90000000
E. Hazard,90000000


#### TOP Young players with High Potential and Value

In [12]:
%sql

SELECT short_name AS Player_Name, age, club, potential, value_eur AS Player_Value
FROM fifa20_players
WHERE age BETWEEN 18 AND 26
AND potential > 90
ORDER BY value_eur DESC
LIMIT 5

Player_Name,age,club,potential,Player_Value
K. Mbappé,20,Paris Saint-Germain,95,93500000
H. Kane,25,Tottenham Hotspur,91,83000000
J. Oblak,26,Atlético Madrid,93,77500000
P. Dybala,25,Juventus,92,76500000
P. Pogba,26,Manchester United,91,72500000


#### Cheapest and Promising Young Players

In [14]:
%sql

SELECT short_name AS Player_Name, age, club, potential, value_eur AS Player_Value
FROM fifa20_players
WHERE age BETWEEN 16 AND 24
AND potential > 85
ORDER BY value_eur 
LIMIT 5

Player_Name,age,club,potential,Player_Value
E. Millot,16,AS Monaco,86,800000
B. Saka,17,Arsenal,86,1200000
J. Sands,18,New York City FC,86,1200000
J. Doku,17,RSC Anderlecht,86,1300000
E. Ampadu,18,RB Leipzig,86,1500000


In [15]:
df20 = spark.read.csv('/FileStore/tables/players_20.csv',inferSchema=True,header=True)
df20.printSchema()


**Data pre-processing**

creating 3 new columns: Work Rate Attack, Work Rate Defense,Best Position

In [18]:
from pyspark.sql.functions import split, size, col

split_col = split(df20['work_rate'], '/')
df20 = df20.withColumn('wrate1', split_col.getItem(0))
df20 = df20.withColumn('wrate2', split_col.getItem(1))

df20 = df20.withColumn('num_pos',  size(split(df20['player_positions'], ',')))
split_col = split(df20['player_positions'], ',')
df20 = df20.withColumn('best_pos', split_col.getItem(0))
df_noGK = df20.filter("best_pos != 'GK'")




In [19]:
df_noGK.printSchema()

In [20]:
#taking relevant columns

data = df_noGK.select(['value_eur', 'overall' , 'potential', 'age', 'shooting', 
                       'passing', 'dribbling', 'defending', 'physic', 'wrate1', 'wrate2', 
                       'num_pos','best_pos','international_reputation'])



## Correlation Coefficients between Numerical Variables

In [22]:
# There is a correlation function in the ml subpackage pyspark.ml.stat. However, it requires you to provide a column of type Vector. So you need to convert your columns into a vector column first using the VectorAssembler and then apply the correlation:

from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# convert to vector column first
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=['value_eur', 'overall' , 'potential', 'age', 'shooting', 
                       'passing', 'dribbling', 'defending', 'physic', 
                       'num_pos','international_reputation'], outputCol=vector_col)
df_vector = assembler.transform(data).select(vector_col)

# get correlation matrix
matrix = Correlation.corr(df_vector, vector_col)
display(matrix)

pearson(corr_features)
1.0 0.6459490155742238 0.5819757182620714 ... (11 total) 0.6459490155742238 1.0 0.636025807401127 ... 0.5819757182620714 0.636025807401127 1.0 ... 0.08728796256558598 0.4659774923235365 -0.27597065699814155 ... 0.34393838891913087 0.4978813003302847 0.303999505697894 ... 0.4482582457062205 0.6839544898419754 0.4248159580288013 ... 0.4277209726426948 0.601189612677944 0.4510965963015748 ... 0.1579281614892481 0.3347920259772648 0.15160759154119768 ... 0.2363947243603419 0.5172142467261274 0.14844853310500855 ... 0.0643230831716841 0.14239852720299073 0.06719683332015922 ... 0.6218264428188122 0.4780607062705893 0.33704875947651836 ...


In [23]:
matrix = Correlation.corr(df_vector, vector_col).collect()[0][0]
corrmatrix = matrix.toArray().tolist()
#print(corrmatrix)



In [24]:
columns = ['value_eur', 'overall' , 'potential', 'age', 'shooting', 
                       'passing', 'dribbling', 'defending', 'physic', 
                       'num_pos','international_reputation']
df = spark.createDataFrame(corrmatrix,columns)
df.show()

In [25]:
import matplotlib.pyplot as plt

def plot_corr_matrix(correlations,attr,fig_no):
    fig=plt.figure(fig_no)
    ax=fig.add_subplot(111)
    ax.set_title("Correlation Matrix for Specified Attributes")
    ax.set_xticklabels(['']+attr)
    ax.set_yticklabels(['']+attr)
    cax=ax.matshow(correlations,vmax=1,vmin=-1)
    fig.colorbar(cax)
    plt.show()

plot_corr_matrix(corrmatrix, columns, 234)

In [27]:
data2 = df_noGK.select(['value_eur', 'overall' , 'potential', 'age', 'shooting', 
                       'passing', 'dribbling', 'defending', 'physic', 
                       'num_pos','international_reputation'])


In [28]:
data.count()

In [29]:
data = data.dropna()

In [30]:
data.count()

In [31]:
# Create a 70-30 train test split

train_data,test_data=data.randomSplit([0.7,0.3])

**Linear Regression Model**

In [33]:
# Import the required libraries

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler,StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

In [34]:
categoricalFeatures = ['wrate1',
                       'wrate2',
                       'best_pos']

In [35]:
# Using StringIndexer to convert the categorical columns to hold numerical data

indexers = [StringIndexer(inputCol = column,
                        outputCol = column + '_index',
                        handleInvalid = 'keep') for column in categoricalFeatures
           ]

In [36]:
#Using OneHotEncoder to create a column for each category in each categorical feature assigns either one or zero

encoders = [OneHotEncoder(inputCol = column + '_index',
                        outputCol = column + '_encoded') for column in categoricalFeatures
           ]

In [37]:
# Vector assembler is used to create a vector of input features

assembler = VectorAssembler(inputCols=['overall', 'potential', 'age', 'shooting', 
                       'passing', 'dribbling', 'defending', 'physic', 'wrate1_encoded', 'wrate2_encoded', 
                       'num_pos','best_pos_encoded','international_reputation'],
                            outputCol="features")


**Instanciating the Linear Model**

In [39]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(maxIter = 100,
                     regParam = 1.0,
                     elasticNetParam = 0.8,
                     labelCol = 'value_eur',
                     featuresCol = 'features')

In [40]:
# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data

pipe = Pipeline(stages= indexers + encoders + [assembler, lr])

In [41]:
fitted_pipe=pipe.fit(train_data)

In [42]:
#in order to assess the linear regression, we need to pass the following code
lrModel = fitted_pipe.stages[-1]

In [43]:
#Assessing the goodness of fit of the model

print('Training R^2 score =', lrModel.summary.r2)
print('Training RMSE =', lrModel.summary.rootMeanSquaredError)


In [44]:
# Transform the test data using the model to predict the value of a player

predictions=fitted_pipe.transform(test_data)

In [45]:
predictionsDF = predictions.toPandas()
predictionsDF.head()

Unnamed: 0,value_eur,overall,potential,age,shooting,passing,dribbling,defending,physic,wrate1,wrate2,num_pos,best_pos,international_reputation,wrate1_index,wrate2_index,best_pos_index,wrate1_encoded,wrate2_encoded,best_pos_encoded,features,prediction
0,0,68,68,29,34,60,65,68,62,Medium,Medium,3,LB,1,0.0,0.0,5.0,"(1.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...","(68.0, 68.0, 29.0, 34.0, 60.0, 65.0, 68.0, 62....",921602.1
1,0,68,75,23,66,55,75,30,48,Medium,Medium,1,ST,1,0.0,0.0,1.0,"(1.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(68.0, 75.0, 23.0, 66.0, 55.0, 75.0, 30.0, 48....",3639160.0
2,0,69,69,29,38,61,61,69,67,Medium,Medium,2,CDM,1,0.0,0.0,3.0,"(1.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(69.0, 69.0, 29.0, 38.0, 61.0, 61.0, 69.0, 67....",1727771.0
3,0,69,73,25,29,43,43,69,70,Medium,Medium,2,CB,1,0.0,0.0,0.0,"(1.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(69.0, 73.0, 25.0, 29.0, 43.0, 43.0, 69.0, 70....",3577087.0
4,0,69,79,21,45,60,67,69,58,Medium,Medium,1,LB,1,0.0,0.0,5.0,"(1.0, 0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, ...","(69.0, 79.0, 21.0, 45.0, 60.0, 67.0, 69.0, 58....",4436887.0


**Evaluating the model in the Test Dataset**

In [47]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol = 'value_eur',
                               predictionCol = 'prediction',
                               metricName = 'r2')

rmse_evaluator = RegressionEvaluator(labelCol = 'value_eur',
                               predictionCol = 'prediction',
                               metricName = 'rmse')

In [48]:
r2 = evaluator.evaluate(predictions)
print('Test Data r^2 score is ', r2)

lr_rmse = rmse_evaluator.evaluate(predictions)
print('Test Data RMSE value is ', lr_rmse)

**HyperParameter Tuning**

In [50]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = ParamGridBuilder().addGrid(lr.maxIter,[10,50,100])\
                              .addGrid(lr.regParam, [0.1,0.3,1.0])\
                              .addGrid(lr.elasticNetParam,[0.0, 0.3, 0.8, 1.0])\
                              .build()

In [51]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol = 'value_eur',
                               predictionCol = 'prediction',
                               metricName = 'r2')

**CrossValidating the models - choosing the best one**

In [53]:
from pyspark.ml.tuning import CrossValidator

crossval = CrossValidator(estimator = pipe,
                         estimatorParamMaps = paramGrid,
                         evaluator = evaluator,
                         numFolds = 3)

In [54]:
model = crossval.fit(train_data)

In [55]:
lrModel = model.bestModel.stages[-1]

In [56]:
#maxIter from Best Model
lrModel._java_obj.getMaxIter()

In [57]:
#regParam from Best Model
lrModel._java_obj.getRegParam()

In [58]:
#elasticNetParam from Best Model
lrModel._java_obj.getElasticNetParam()

In [59]:
prediction = model.transform(test_data)

In [60]:
evaluator = RegressionEvaluator(labelCol = 'value_eur',
                               predictionCol = 'prediction',
                               metricName = 'r2')

rsquared = evaluator.evaluate(prediction)
print('Best model r^2 score = ', rsquared)

In [61]:
evaluator = RegressionEvaluator(labelCol = 'value_eur',
                               predictionCol = 'prediction',
                               metricName = 'rmse')

rmse = evaluator.evaluate(prediction)
print('Best model rmse = ', rmse)

#### 2. Genaralized Linear Regression
     Contrasted with linear regression where the output is assumed to follow a Gaussian distribution, generalized linear models (GLMs) are specifications of linear models where the response variable (value of a player) follows some distribution from the exponential family of distributions.
     
* We are creating and testing out our data with a model of this type because our response variable, which is value of a player may be following an exponential distribution.

In [63]:
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.feature import VectorAssembler,StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
glr = GeneralizedLinearRegression(labelCol = 'value_eur',family="gaussian", link="identity", maxIter=10, regParam=0.3)

In [64]:
# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-rocess the test data
# in the same way as that of the train data

pipe_glr = Pipeline(stages= indexers + encoders + [assembler, glr])

In [65]:
fp2=pipe_glr.fit(train_data)

In [66]:
#in order to assess the genaralized linear regression, we need to pass the following code
glrModel = fp2.stages[-1]

In [67]:
# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(glrModel.coefficients))
print("Intercept: " + str(glrModel.intercept))

In [68]:
# Summarize the model over the training set and print out some metrics
summary = glrModel.summary
#print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
# print("T Values: " + str(summary.tValues))
# print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()

#### 3. Decision tree regression

In [70]:
from pyspark.ml.regression import DecisionTreeRegressor

# Train a DecisionTree model.
dt = DecisionTreeRegressor(labelCol = 'value_eur',featuresCol = 'features')

In [71]:
# Pipeline is used to pass the data through indexer and assembler simultaneously. Also, it helps to pre-process the test data
# in the same way as that of the train data

pipe_dt = Pipeline(stages= indexers + encoders + [assembler, dt])

In [72]:
fp3=pipe_dt.fit(train_data)

In [73]:
#in order to assess the decision tree regression, we need to pass the following code
dtModel = fp3.stages[-1]

In [74]:
# Make predictions.
dt_predictions = fp3.transform(test_data)

**Evaluating the model in the Test Dataset**

In [76]:
from pyspark.ml.evaluation import RegressionEvaluator

dt_evaluator = RegressionEvaluator(labelCol = 'value_eur',
                               predictionCol = 'prediction',
                               metricName = 'rmse')
dt_rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % dt_rmse)

RMSE: 1.40679e+06
#####The model of Decision Tree Regression's lower value of RMSE compared to Linear Regression indicates better fit. 


*This is an important criterion and in Layman's terms, we can say that we are able to predict the value of a player more accurately*

#### Gradient-boosted tree regression

In [79]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Train a GBT model.
gbt = GBTRegressor(labelCol = 'value_eur',featuresCol = 'features', maxIter=10)

In [80]:
pipe_gbt = Pipeline(stages= indexers + encoders + [assembler, gbt])

In [81]:
fp4=pipe_gbt.fit(train_data)

In [82]:
#in order to assess the gradient-boosted tree regression, we need to pass the following code
gbtModel = fp4.stages[-1]

In [83]:
# Make predictions on the test data.
gbt_predictions = fp4.transform(test_data)

####Evaluating Gradient-boosted tree regression model

In [85]:
gbt_evaluator = RegressionEvaluator(labelCol = 'value_eur',
                               predictionCol = 'prediction',
                               metricName = 'rmse')
gbt_rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % gbt_rmse)

####Evaluation of the models:

1. Linear Regression RMSE (After cross-validation): 3.22989e+06 ; Adjusted R^2 value: 0.62 or 62%
2. Generalized Linear Regression Deviance: 1.3804824047121179e+17
3. Decision Tree Regression RMSE: 1.40679e+06
4. Gradient-boosted tree regression RMSE: 1.31104e+06

The model created using Gradient-boosted tree regression has the lowest RMSE value out of all the models that we have created.


*This is an important criterion and in Layman's terms, we can say that we are able to predict the value of a player more accurately*

####Conclusion:

*Our intention of this research on the FIFA 2020 dataset can serve as impetus for further research and data collection.*