In [None]:
import pandas as pd
import math



In [None]:
from datetime import datetime

import numpy as np

##Pre-Processing
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler

##Models
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes

##Evaluation
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder

In [None]:
spark = SparkSession.builder.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1").getOrCreate()

## Loading Data from MongoDB as SparkDFs

In [None]:
database = "MongoDBAtlas"
user_name = "XXXX"
password = "XXXXX"
ip_address = "chesscluster.ar0uw.mongodb.net"
collection_pos_eval = "pos_evals"
collection_elo_eval = "elo_eval"
connection_string_pos = f"mongodb+srv://{user_name}:{password}@{ip_address}/{database}.{collection_pos_eval}"
connection_string_elo = f"mongodb+srv://{user_name}:{password}@{ip_address}/{database}.{collection_elo_eval}"

In [None]:
df_pos = spark.read.format("mongo").option("uri", connection_string_pos).load()
df_eval = spark.read.format("mongo").option(
    "uri", connection_string_elo).load()

## Data Processing and Feature Engineering

Creating a column 'elo_diff' which calculates the difference in ELO of the player with White pieces and black pieces

In [None]:
df_eval = df_eval.withColumn('elo_diff',df_eval['White Elo'] - df_eval['Black Elo'])

Creating a User Defined Function (UDF) to record the Expected scores. This metric is estimated using a formula that FIDE(Governing body of chess) uses to define the expected score of a game.

In [None]:
def calculate_fide_expected_score(x):
    # Formula that FIDE(Governing body of chess) uses to calculate expected score of a game.
    return math.erfc(-x / ((2000.0/7) * math.sqrt(2))) / 2


xScore = udf(calculate_fide_expected_score, FloatType())

In [None]:
df_eval = df_eval.select('Black Elo', 'White Elo', 'Result', 'Time Class', 'Time Control','elo_diff',xScore("elo_diff").alias("expected_score_fide"))

Transforming categorical variables through StringIndexing followed by OneHotEncoding

In [None]:
def indexStringColumns(df, cols):
    # variable newdf will be updated several times
    newdf = df

    for c in cols:
        # For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c +
                           "-num").setHandleInvalid("keep")
        sm = si.fit(newdf)

        # Creates a DataFame by putting the transformed values in the new colum with suffix "-num"
        # and then drops the original columns.
        # and drop the "-num" suffix.
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf


def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        # For each given colum, create OneHotEncoder.
        # dropLast : Whether to drop the last category in the encoded vector (default: true)
        ohe = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        ohe_model = ohe.fit(newdf)
        # Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot"
        # and then drops the original columns.
        # and drop the "-onehot" suffix.
        newdf = ohe_model.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

In [None]:
categorical_cols = ["Time Class","Time Control"]
df_eval_sti = indexStringColumns(df_eval, categorical_cols)

In [None]:
df_eval_ohe = oneHotEncodeColumns(df_eval_sti, categorical_cols)

Creating a UDF to convert string target variable to FloatType

In [None]:
def convert_res_to_binary(x):
    if x == '1-0':
        return 1
    elif x == '0-1':
        return 0
    elif '5' in x:
        return 2


result_conv = udf(convert_res_to_binary, IntegerType())

In [None]:
df_eval = df_eval_ohe.withColumn('result_int',result_conv('Result'))

Converting two string features("Black Elo" and "White Elo") to Integer

In [None]:
def convert_to_int(x):
    try:
        return int(x)
    except ValueError:
        return None


int_conv = udf(convert_to_int, IntegerType())

In [None]:
df_eval = df_eval.select(int_conv('Black Elo').alias('Black Elo'), int_conv('White Elo').alias('White Elo'), 'Result', 'elo_diff', 'expected_score_fide', 'Time Class', 'Time Control', 'result_int')

In [None]:
df_eval = df_eval.where(df_eval.result_int != 2)

In [None]:
df_eval.show()

+---------+---------+------+--------+-------------------+-------------+--------------+----------+
|Black Elo|White Elo|Result|elo_diff|expected_score_fide|   Time Class|  Time Control|result_int|
+---------+---------+------+--------+-------------------+-------------+--------------+----------+
|     2350|     2500|   1-0|   150.0|          0.7002084|(5,[2],[1.0])|(27,[4],[1.0])|         1|
|     2646|     2331|   0-1|  -315.0|         0.13512218|(5,[2],[1.0])|(27,[4],[1.0])|         0|
|     2287|     2317|   0-1|    30.0|          0.5418121|(5,[2],[1.0])|(27,[4],[1.0])|         0|
|     2440|     2406|   1-0|   -34.0|         0.45263767|(5,[2],[1.0])|(27,[4],[1.0])|         1|
|     2386|     2544|   1-0|   158.0|          0.7098683|(5,[2],[1.0])|(27,[4],[1.0])|         1|
|     2778|     2746|   1-0|   -32.0|          0.4554117|(5,[0],[1.0])|(27,[0],[1.0])|         1|
|     2646|     2736|   0-1|    90.0|          0.6236192|(5,[0],[1.0])|(27,[0],[1.0])|         0|
|     2767|     2665

Create a dataframe with features and label

In [None]:
va = VectorAssembler(outputCol="features", inputCols=["Black Elo", "White Elo","elo_diff", "Time Class", "Time Control"])
va_df = va.transform(df_eval).select("features", "result_int").withColumnRenamed("result_int", "label")

In [None]:
va_df.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(35,[0,1,2,5,12],...|    1|
|(35,[0,1,2,5,12],...|    0|
|(35,[0,1,2,5,12],...|    0|
|(35,[0,1,2,5,12],...|    1|
|(35,[0,1,2,5,12],...|    1|
|(35,[0,1,2,3,8],[...|    1|
|(35,[0,1,2,3,8],[...|    0|
|(35,[0,1,2,3,8],[...|    1|
|(35,[0,1,2,3,8],[...|    0|
|(35,[0,1,2,3,8],[...|    1|
|(35,[0,1,2,3,8],[...|    1|
|(35,[0,1,2,3,8],[...|    1|
|(35,[0,1,2,3,8],[...|    0|
|(35,[0,1,2,3,16],...|    1|
|(35,[0,1,2,3,16],...|    0|
|(35,[0,1,2,5,12],...|    0|
|(35,[0,1,2,5,12],...|    1|
|(35,[0,1,2,3,10],...|    1|
|(35,[0,1,2,3,10],...|    0|
|(35,[0,1,2,3,10],...|    1|
+--------------------+-----+
only showing top 20 rows



In [None]:
va_df.take(2)

Out[20]: [Row(features=SparseVector(35, {0: 2350.0, 1: 2500.0, 2: 150.0, 5: 1.0, 12: 1.0}), label=1),
 Row(features=SparseVector(35, {0: 2646.0, 1: 2331.0, 2: -315.0, 5: 1.0, 12: 1.0}), label=0)]

## Modeling and Evaluation Using Naive Bayes

Getting and Splitting Data for All Features

In [None]:
# Naive Bayes doesn't like negative values
df_eval_naive = df_eval.withColumn('elo_diff', abs('elo_diff'))

va_nb = VectorAssembler(outputCol="features",
                        inputCols=["Black Elo", "White Elo", "Time Class", "Time Control", 'elo_diff'])

va_df_nb = va_nb.transform(df_eval_naive).select(
    "features", "result_int").withColumnRenamed("result_int", "label")

splits = va_df_nb.randomSplit([0.8, 0.2]) # train/test split

train_naive = splits[0]
validation_naive = splits[1]

Fitting and Predicting Using All Features

In [None]:
nb_model = NaiveBayes().fit(train_naive)
nb_preds = nb_model.transform(validation_naive)

Evaluating Model Fit with All Features

In [None]:
#to get accuracy and f1 score easily
evaluator = MulticlassClassificationEvaluator()\
                .setLabelCol("label")\
                .setPredictionCol("prediction") # default is accuracy

print("Accuracy : %s" % evaluator.evaluate(nb_preds))

evaluator.setMetricName("f1") 

print("F1 Score : %s" % evaluator.evaluate(nb_preds))

# to get area under ROC/PR curves easily
bceval = BinaryClassificationEvaluator() # default is AUC
print (bceval.getMetricName() +" : " + str(bceval.evaluate(nb_preds)))

bceval.setMetricName("areaUnderPR")
print (bceval.getMetricName() +" : " + str(bceval.evaluate(nb_preds)))

Accuracy : 0.566497754866941
F1 Score : 0.566497754866941
areaUnderROC : 0.4993156471781005
areaUnderPR : 0.5519124486387013


Geting and Splitting Data using Only Black and White Elo

In [None]:
va_nb = VectorAssembler(outputCol="features",
                        inputCols=["Black Elo", "White Elo"])

va_df_nb = va_nb.transform(df_eval_naive).select(
    "features", "result_int").withColumnRenamed("result_int", "label")

splits = va_df_nb.randomSplit([0.8, 0.2]) # train/test split

train_naive = splits[0]
validation_naive = splits[1]

Fitting and Predicting Using Only Black and White Elo

In [None]:
nb_model = NaiveBayes().fit(train_naive)
nb_preds = nb_model.transform(validation_naive)

Evaluating Model Fit with Only Black and White Elo

In [None]:
# to get accuracy and f1 score easily
evaluator = MulticlassClassificationEvaluator()\
                .setLabelCol("label")\
                .setPredictionCol("prediction") # default is accuracy

print("Accuracy : %s" % evaluator.evaluate(nb_preds))

evaluator.setMetricName("f1") 

print("F1 Score : %s" % evaluator.evaluate(nb_preds))

# to get area under ROC/PR curves easily
bceval = BinaryClassificationEvaluator() # default is AUC
print (bceval.getMetricName() +" : " + str(bceval.evaluate(nb_preds)))

bceval.setMetricName("areaUnderPR")
print (bceval.getMetricName() +" : " + str(bceval.evaluate(nb_preds)))




Accuracy : 0.7044099786445779
F1 Score : 0.7044099786445779
areaUnderROC : 0.49605595047007606
areaUnderPR : 0.5438571767899654


Getting and Splitting Data using Only Difference In Elo

In [None]:
va_nb = VectorAssembler(outputCol="features",
                        inputCols=["elo_diff"])

va_df_nb = va_nb.transform(df_eval_naive).select(
    "features", "result_int").withColumnRenamed("result_int", "label")

splits = va_df_nb.randomSplit([0.8, 0.2]) # train/test split

train_naive = splits[0]
validation_naive = splits[1]

Fitting and Predicting Using Only Difference In Elo

In [None]:
nb_model = NaiveBayes().fit(train_naive)
nb_preds = nb_model.transform(validation_naive)

Evaluating Model Fit with Only Difference in Elo

In [None]:
#to get accuracy and f1 score easily
evaluator = MulticlassClassificationEvaluator()\
                .setLabelCol("label")\
                .setPredictionCol("prediction") # default is Accuracy

print("Accuracy : %s" % evaluator.evaluate(nb_preds))

evaluator.setMetricName("f1") 

print("F1 Score : %s" % evaluator.evaluate(nb_preds))

# to get area under ROC/PR curves easily
bceval = BinaryClassificationEvaluator() # default is AUC
print (bceval.getMetricName() +" : " + str(bceval.evaluate(nb_preds)))

bceval.setMetricName("areaUnderPR")
print (bceval.getMetricName() +" : " + str(bceval.evaluate(nb_preds)))




Accuracy : 0.37093840525646904
F1 Score : 0.37093840525646904
areaUnderROC : 0.5
areaUnderPR : 0.5332671300893744
