In [0]:
import pandas as pd
import math



In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.ml.classification import LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer, IndexToString, OneHotEncoder, StandardScaler
from pyspark.sql.functions import trim
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import pyspark.sql.functions as F

spark = SparkSession.builder.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1").getOrCreate()

In [0]:
spark = SparkSession.builder.config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1").getOrCreate()

## Loading Data from MongoDB as SparkDFs

In [0]:
database = "MongoDBAtlas"
user_name = "aydinschwa"
password = "pyJR1deIMz2KeY3i"
ip_address = "chesscluster.ar0uw.mongodb.net"
collection_pos_eval = "pos_evals"
collection_elo_eval = "elo_eval"
connection_string_pos = f"mongodb+srv://{user_name}:{password}@{ip_address}/{database}.{collection_pos_eval}"
connection_string_elo = f"mongodb+srv://{user_name}:{password}@{ip_address}/{database}.{collection_elo_eval}"

In [0]:
df_pos = spark.read.format("mongo").option("uri",connection_string_pos).load()
df_eval = spark.read.format("mongo").option("uri",connection_string_elo).load()

## Data Processing and Feature Engineering

Creating a column 'elo_diff' which calculates the difference in ELO of the player with White pieces and black pieces

In [0]:
df_eval = df_eval.withColumn('elo_diff', df_eval['White Elo'] - df_eval['Black Elo'])

Creating a User Defined Function (UDF) to record the Expected scores. This metric is estimated using a formula that FIDE(Governing body of chess) uses to define the expected score of a game.

In [0]:
def calculate_fide_expected_score(x):
    return math.erfc(-x / ((2000.0/7) * math.sqrt(2))) / 2 ## Formula that FIDE(Governing body of chess) uses to calculate expected score of a game.

xScore = udf(calculate_fide_expected_score, FloatType())

In [0]:
df_eval = df_eval.select('Black Elo', 'White Elo', 'Result', 'Time Class', 'Time Control','elo_diff',xScore("elo_diff").alias("expected_score_fide"))

Transforming categorical variables through StringIndexing followed by OneHotEncoding

In [0]:
def indexStringColumns(df, cols):
    # variable newdf will be updated several times
    newdf = df
    
    for c in cols:
        # For each given colum, fits StringIndexerModel.
        si = StringIndexer(inputCol=c, outputCol=c+"-num").setHandleInvalid("keep")
        sm = si.fit(newdf)
        
        # Creates a DataFame by putting the transformed values in the new colum with suffix "-num" 
        # and then drops the original columns.
        # and drop the "-num" suffix. 
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

def oneHotEncodeColumns(df, cols):
    newdf = df
    for c in cols:
        # For each given colum, create OneHotEncoder. 
        # dropLast : Whether to drop the last category in the encoded vector (default: true)
        ohe = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        ohe_model = ohe.fit(newdf)
        #Creates a DataFame by putting the transformed values in the new colum with suffix "-onehot" 
        #and then drops the original columns.
        #and drop the "-onehot" suffix. 
        newdf = ohe_model.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

In [0]:
categorical_cols = ["Time Class","Time Control"]
df_eval_sti = indexStringColumns(df_eval, categorical_cols)

In [0]:
df_eval_ohe = oneHotEncodeColumns(df_eval_sti, categorical_cols)

Creating a UDF to convert string target variable to FloatType

In [0]:
def convert_res_to_binary(x):
    if x == '1-0':
        return 1
    elif x == '0-1':
        return 0
    elif '5' in x:
        return 2

result_conv = udf(convert_res_to_binary, IntegerType())
    

In [0]:
df_eval = df_eval_ohe.withColumn('result_int',result_conv('Result'))

Converting two string features("Black Elo" and "White Elo") to Integer

In [0]:
def convert_to_int(x):
    try:
        return int(x)
    except ValueError:
        return None
    
int_conv = udf(convert_to_int, IntegerType())
    

In [0]:
df_eval = df_eval.select(int_conv('Black Elo').alias('Black Elo'), int_conv('White Elo').alias('White Elo'), 'Result', 'elo_diff', 'expected_score_fide', 'Time Class', 'Time Control', 'result_int')

In [0]:
df_eval = df_eval.where(df_eval.result_int != 2)

In [0]:
df_eval.show()

+---------+---------+------+--------+-------------------+-------------+--------------+----------+
|Black Elo|White Elo|Result|elo_diff|expected_score_fide|   Time Class|  Time Control|result_int|
+---------+---------+------+--------+-------------------+-------------+--------------+----------+
|     2350|     2500|   1-0|   150.0|          0.7002084|(5,[2],[1.0])|(27,[4],[1.0])|         1|
|     2646|     2331|   0-1|  -315.0|         0.13512218|(5,[2],[1.0])|(27,[4],[1.0])|         0|
|     2287|     2317|   0-1|    30.0|          0.5418121|(5,[2],[1.0])|(27,[4],[1.0])|         0|
|     2440|     2406|   1-0|   -34.0|         0.45263767|(5,[2],[1.0])|(27,[4],[1.0])|         1|
|     2386|     2544|   1-0|   158.0|          0.7098683|(5,[2],[1.0])|(27,[4],[1.0])|         1|
|     2778|     2746|   1-0|   -32.0|          0.4554117|(5,[0],[1.0])|(27,[0],[1.0])|         1|
|     2646|     2736|   0-1|    90.0|          0.6236192|(5,[0],[1.0])|(27,[0],[1.0])|         0|
|     2767|     2665

Create a dataframe with features and label

In [0]:
# va = VectorAssembler(outputCol="features", inputCols=["Black Elo", "White Elo", "elo_diff", "Time Class", "Time Control"])
va = VectorAssembler(outputCol="features", inputCols=["Black Elo", "White Elo"])
va_df = va.transform(df_eval).select("features", "result_int").withColumnRenamed("result_int", "label")

In [0]:
# experiment with just white Elo and black Elo as predictors
va = VectorAssembler(outputCol="features", inputCols=["elo_diff"])
va_df = va.transform(df_eval).select("features", "result_int").withColumnRenamed("result_int", "label")

In [0]:
va_df.show()

##Scale the data for regularized logistic regression

In [0]:
# Instantiate the StandardScaler object
scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withMean=True, withStd=True)

# Fit the scaler to the data
scaler_model = scaler.fit(va_df)

# Transform the data using the scaler
scaled_df = scaler_model.transform(va_df)

# Select the scaled feature vector and 'result_int' columns
scaled_df = scaled_df.select('scaled_features', 'label').withColumnRenamed("scaled_features", "features")

## Modeling Using Logistic Regression

In [0]:
# create train/test split
splits = scaled_df.randomSplit([0.7, 0.3])

train = splits[0].cache()
test = splits[1].cache()

In [0]:
# basic logistic regression
lr = LogisticRegression(featuresCol="features", labelCol="label")

model = lr.fit(train)

predictions = model.transform(test)

evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
accuracy = evaluator.evaluate(predictions)
print("AUC:", accuracy)

evaluator = MulticlassClassificationEvaluator()\
                .setLabelCol("label")\
                .setPredictionCol("prediction")
evaluator.setMetricName("accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)

## Hyperparameter Tuning Logistic Regression

In [0]:
# ParamGrid tries LASSO, ridge, and ElasticNet so no need to try them separately
lr = LogisticRegression()

evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

paramGrid = ParamGridBuilder()\
    .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
    .addGrid(lr.regParam,[0.01, 0.5, 2.0]) \
    .build()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validation
model_cv = cv.fit(train)

predict_train = model_cv.transform(train)
predict_test = model_cv.transform(test)

# auc
auc_train = evaluator.evaluate(predict_train)
auc_test = evaluator.evaluate(predict_test)
print(f"AUC score is {auc_train}")
print(f"AUC score is {auc_test}")

# pr
evaluator.setMetricName("areaUnderPR")
pr_train = evaluator.evaluate(predict_train)
pr_test = evaluator.evaluate(predict_test)
print(f"PR score is {pr_train}")
print(f"PR score is {pr_test}")

# f1
evaluator = MulticlassClassificationEvaluator()\
                .setLabelCol("label")\
                .setPredictionCol("prediction")
evaluator.setMetricName("f1") 
f1_train = evaluator.evaluate(predict_train)
f1_test = evaluator.evaluate(predict_test)
print(f"PR score is {f1_train}")
print(f"PR score is {f1_test}")

# accuracy
evaluator.setMetricName("accuracy")
acc_train = evaluator.evaluate(predict_train)
acc_test = evaluator.evaluate(predict_test)
print(f"Accuracy is {acc_train}")
print(f"Accuracy is {acc_test}")

#Testing Accuracy of Raw Elo

In [0]:
# create UDF to round expected ELO to binary outcome
def predict_outcome_elo(x):
    if x <= 0.5:
        return 0
    else:
        return 1

elo_outcome = udf(predict_outcome_elo, IntegerType())

elo_outcome_df = df_eval.withColumn("outcome_fide", elo_outcome("expected_score_fide")).select("outcome_fide", "result_int")


In [0]:
elo_outcome_df.show()

In [0]:
predictions = elo_outcome_df.select("result_int", "outcome_fide").withColumn("rawPrediction", F.col("result_int").cast(DoubleType()))\
                                                                 .withColumn("prediction", F.col("result_int").cast(DoubleType()))\
                                                                 .withColumn("label", F.col("outcome_fide").cast(DoubleType()))

evaluator = BinaryClassificationEvaluator(labelCol="outcome_fide", metricName="areaUnderROC")

# auc
auc_test = evaluator.evaluate(predictions)
print(f"AUC score is {auc_test}")

# pr
evaluator.setMetricName("areaUnderPR")
pr_test = evaluator.evaluate(predictions)
print(f"PR score is {pr_test}")

# f1
evaluator = MulticlassClassificationEvaluator()\
                .setLabelCol("label")\
                .setPredictionCol("prediction")
evaluator.setMetricName("f1") 
f1_test = evaluator.evaluate(predictions)
print(f"F1 score is {f1_test}")

# accuracy
evaluator.setMetricName("accuracy")
acc_test = evaluator.evaluate(predictions)
print(f"Accuracy is {acc_test}")

##Testing Gradient Boosted Trees

In [0]:
va = VectorAssembler(outputCol="features", inputCols=["White Elo", "Black Elo"])
va_df = va.transform(df_eval).select("features", "result_int").withColumnRenamed("result_int", "label")

va_df.show()

+---------------+-----+
|       features|label|
+---------------+-----+
|[2500.0,2350.0]|    1|
|[2331.0,2646.0]|    0|
|[2317.0,2287.0]|    0|
|[2406.0,2440.0]|    1|
|[2544.0,2386.0]|    1|
|[2746.0,2778.0]|    1|
|[2736.0,2646.0]|    0|
|[2665.0,2767.0]|    1|
|[2637.0,2785.0]|    0|
|[2805.0,2748.0]|    1|
|[2878.0,2771.0]|    1|
|[2781.0,2868.0]|    1|
|[2785.0,2864.0]|    0|
|[2909.0,2748.0]|    1|
|[2761.0,2908.0]|    0|
|[2417.0,2625.0]|    0|
|[2453.0,2529.0]|    1|
|[2770.0,2514.0]|    1|
|[2431.0,2772.0]|    0|
|[2926.0,2767.0]|    1|
+---------------+-----+
only showing top 20 rows



In [0]:
# create train/test split
splits = va_df.randomSplit([0.7, 0.3])

train = splits[0].cache()
test = splits[1].cache()

In [0]:
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
model = gbt.fit(train)

In [0]:
predictions = model.transform(test)

evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

# auc
auc_test = evaluator.evaluate(predictions)
print(f"AUC score is {auc_test}")

# pr
evaluator.setMetricName("areaUnderPR")
pr_test = evaluator.evaluate(predictions)
print(f"PR score is {pr_test}")

# f1
evaluator = MulticlassClassificationEvaluator()\
                .setLabelCol("label")\
                .setPredictionCol("prediction")
evaluator.setMetricName("f1") 
f1_test = evaluator.evaluate(predictions)
print(f"F1 score is {f1_test}")

# accuracy
evaluator.setMetricName("accuracy")
acc_test = evaluator.evaluate(predictions)
print(f"Accuracy is {acc_test}")

AUC score is 0.7477670143795642
PR score is 0.7485644497359304
F1 score is 0.683867084367122
Accuracy is 0.6845277963831212


##Hyperparameter Tuning

In [0]:
gbt = GBTClassifier(labelCol="label", featuresCol="features")

param_grid = (ParamGridBuilder()
              .addGrid(gbt.maxDepth, [2, 5, 10])
              .addGrid(gbt.minInstancesPerNode, [1, 5, 10])
              .addGrid(gbt.stepSize, [0.1, 0.01])
              .build())

evaluator = BinaryClassificationEvaluator()

cv = CrossValidator(estimator=gbt,
                    estimatorParamMaps=param_grid,
                    evaluator=evaluator,
                    numFolds=5)

cv_model = cv.fit(train)

best_model = cv_model.bestModel

In [0]:
predictions = best_model.transform(test)

evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")

# auc
auc_test = evaluator.evaluate(predictions)
print(f"AUC score is {auc_test}")

# pr
evaluator.setMetricName("areaUnderPR")
pr_test = evaluator.evaluate(predictions)
print(f"PR score is {pr_test}")

# f1
evaluator = MulticlassClassificationEvaluator()\
                .setLabelCol("label")\
                .setPredictionCol("prediction")
evaluator.setMetricName("f1") 
f1_test = evaluator.evaluate(predictions)
print(f"F1 score is {f1_test}")

# accuracy
evaluator.setMetricName("accuracy")
acc_test = evaluator.evaluate(predictions)
print(f"Accuracy is {acc_test}")