# 1-ая часть

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [2]:
filename_data = 'chess.csv'
csv = spark.read.csv(filename_data, inferSchema=True, header=True)
csv = csv.drop(csv._c0).withColumn('label', when(col('BlackElo') >= 1592, 1).otherwise(0))
csv.drop(csv['BlackElo']).show(10)

+----+--------------------+------+------------+--------+---------+-----------+----------+-----+
|GAME|             Opening|Result| Termination|WhiteElo|Game_type|Total_moves|Game_flips|label|
+----+--------------------+------+------------+--------+---------+-----------+----------+-----+
|  11|        Bird Opening|   0-1|Time forfeit|    1180|    Blitz|         66|         8|    0|
|  14|        Réti Opening|   0-1|      Normal|    1381|    Blitz|         64|         6|    0|
|  29|    Philidor Defense|   0-1|Time forfeit|    1485|    Blitz|         70|         5|    1|
|  40|Sicilian Defense:...|   0-1|      Normal|    2040|    Blitz|         86|         8|    1|
|  55|    Alekhine Defense|   1-0|      Normal|    2163|    Rapid|         71|         2|    1|
|  56|Nimzo-Indian Defe...|   0-1|      Normal|    2062|    Rapid|         73|         6|    1|
|  70|   Queen's Pawn Game|   1-0|      Normal|    1651|   Bullet|         39|         3|    1|
| 162|Four Knights Game...|   1-0|      

In [3]:
splits = csv.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("label", "trueLabel")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 2407659  Testing Rows: 1031792


In [4]:
strIdx = StringIndexer(inputCols = ['Game_type'], 
                       outputCols = ['Game_typeIdx'], 
                       handleInvalid = "keep")
catVect = VectorAssembler(inputCols = ['Game_typeIdx'], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ['Total_moves', 'Game_flips', 'WhiteElo'], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
lr = LogisticRegression(labelCol="label", 
                        featuresCol="features", 
                        maxIter=30,
                        regParam=0.3)
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, lr])

In [5]:
pipelineModel = pipeline.fit(train)

In [6]:
pred_df = pipelineModel.transform(test)
pred_df.select("features", "prediction", "trueLabel").show()

+--------------------+----------+---------+
|            features|prediction|trueLabel|
+--------------------+----------+---------+
|[0.0,0.4491525423...|       1.0|        1|
|[0.0,0.7203389830...|       0.0|        0|
|[0.0,0.6271186440...|       1.0|        1|
|[2.0,0.0508474576...|       1.0|        0|
|[0.0,0.5677966101...|       1.0|        1|
|[2.0,0.8050847457...|       1.0|        1|
|[0.0,0.3220338983...|       1.0|        1|
|[0.0,0.6864406779...|       1.0|        1|
|[2.0,0.7881355932...|       1.0|        1|
|[0.0,0.5169491525...|       0.0|        0|
|[0.0,0.6016949152...|       0.0|        0|
|[0.0,0.6271186440...|       1.0|        1|
|[0.0,0.2966101694...|       1.0|        1|
|[0.0,0.5593220338...|       1.0|        1|
|[2.0,0.6186440677...|       0.0|        0|
|[2.0,0.3728813559...|       0.0|        0|
|[1.0,0.4661016949...|       1.0|        1|
|[0.0,0.6525423728...|       1.0|        1|
|[1.0,0.3644067796...|       0.0|        0|
|[1.0,0.3983050847...|       0.0

In [7]:
print(pred_df)
tp = float(pred_df.filter("prediction == 1.0 AND truelabel == 1").count())
fp = float(pred_df.filter("prediction == 1.0 AND truelabel == 0").count())
tn = float(pred_df.filter("prediction == 0.0 AND truelabel == 0").count())
fn = float(pred_df.filter("prediction == 0.0 AND truelabel == 1").count())
pr = tp / (tp + fp)
re = tp / (tp + fn)
metrics = spark.createDataFrame([
 ("TP", tp),
 ("FP", fp),
 ("TN", tn),
 ("FN", fn),
 ("Precision", pr),
 ("Recall", re),
 ("F1", 2*pr*re/(re+pr))],["metric", "value"])
metrics.show()

DataFrame[GAME: int, BlackElo: int, Opening: string, Result: string, Termination: string, WhiteElo: int, Game_type: string, Total_moves: int, Game_flips: int, trueLabel: int, Game_typeIdx: double, catFeatures: vector, idxCatFeatures: vector, numFeatures: vector, normFeatures: vector, features: vector, rawPrediction: vector, probability: vector, prediction: double]
+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          449408.0|
|       FP|           41887.0|
|       TN|          482153.0|
|       FN|           58344.0|
|Precision|0.9147416521641784|
|   Recall|0.8850935102175865|
|       F1|0.8996733887394688|
+---------+------------------+



In [8]:
evaluator = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
aur = evaluator.evaluate(pred_df)
print ("AUR = ", aur)

AUR =  0.9524713926047002


In [9]:
paramGrid = ParamGridBuilder().\
    addGrid(lr.maxIter, [30, 40, 60]).\
    addGrid(lr.regParam, [0.6, 0.8, 0.9]).build()
cv = CrossValidator(estimator=pipeline, evaluator=BinaryClassificationEvaluator(metricName='areaUnderPR'), estimatorParamMaps=paramGrid, 
                    numFolds=2)

In [10]:
cv_model = cv.fit(train)

In [11]:
newPrediction = cv_model.transform(test)

In [12]:
# Recalculate confusion matrix
tp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 1").count())
fp2 = float(newPrediction.filter("prediction == 1.0 AND truelabel == 0").count())
tn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 0").count())
fn2 = float(newPrediction.filter("prediction == 0.0 AND truelabel == 1").count())
pr2 = tp2 / (tp2 + fp2)
re2 = tp2 / (tp2 + fn2)
metrics2 = spark.createDataFrame([
 ("TP", tp2),
 ("FP", fp2),
 ("TN", tn2),
 ("FN", fn2),
 ("Precision", pr2),
 ("Recall", re2),
 ("F1", 2*pr2*re2/(re2+pr2))],["metric", "value"])
metrics2.show()

+---------+------------------+
|   metric|             value|
+---------+------------------+
|       TP|          442338.0|
|       FP|           40344.0|
|       TN|          483696.0|
|       FN|           65414.0|
|Precision|0.9164170199013015|
|   Recall|0.8711693897808379|
|       F1|0.8932205477598709|
+---------+------------------+



In [13]:
# Recalculate the Area Under ROC
evaluator2 = BinaryClassificationEvaluator(labelCol="trueLabel", rawPredictionCol="prediction", metricName="areaUnderROC")
aur2 = evaluator2.evaluate(newPrediction)
print( "AUR2 = ", aur2)

AUR2 =  0.897091450099945


# 2-ая часть

In [14]:
csv = spark.read.csv(filename_data, inferSchema=True, header=True)
# csv = csv.withColumn('Rating', csv.Rating.cast(IntegerType()))
csv.show(10)

+---+----+--------+--------------------+------+------------+--------+---------+-----------+----------+
|_c0|GAME|BlackElo|             Opening|Result| Termination|WhiteElo|Game_type|Total_moves|Game_flips|
+---+----+--------+--------------------+------+------------+--------+---------+-----------+----------+
|  0|  11|    1143|        Bird Opening|   0-1|Time forfeit|    1180|    Blitz|         66|         8|
|  1|  14|    1504|        Réti Opening|   0-1|      Normal|    1381|    Blitz|         64|         6|
|  2|  29|    1933|    Philidor Defense|   0-1|Time forfeit|    1485|    Blitz|         70|         5|
|  3|  40|    1710|Sicilian Defense:...|   0-1|      Normal|    2040|    Blitz|         86|         8|
|  4|  55|    1598|    Alekhine Defense|   1-0|      Normal|    2163|    Rapid|         71|         2|
|  5|  56|    2207|Nimzo-Indian Defe...|   0-1|      Normal|    2062|    Rapid|         73|         6|
|  6|  70|    1632|   Queen's Pawn Game|   1-0|      Normal|    1651|   B

In [15]:
splits = csv.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1].withColumnRenamed("BlackElo", "trueBlackElo")
train_rows = train.count()
test_rows = test.count()
print("Training Rows:", train_rows, " Testing Rows:", test_rows)

Training Rows: 2408467  Testing Rows: 1030984


In [16]:
strIdx = StringIndexer(inputCols = ['Game_type'], 
                       outputCols = ['Game_typeIdx'], 
                       handleInvalid = "keep")
catVect = VectorAssembler(inputCols = ['Game_typeIdx'], outputCol="catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")
numVect = VectorAssembler(inputCols = ['WhiteElo', 'Total_moves', 'Game_flips'], outputCol="numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol="normFeatures")
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol="features")
rfr = RandomForestRegressor(featuresCol = 'features', 
                      labelCol='BlackElo',
                      numTrees = 10,
                      maxDepth=2,
                      maxBins = 2207)
pipeline = Pipeline(stages=[strIdx, catVect, catIdx, numVect, minMax, featVect, rfr])

In [17]:
pipelineModel = pipeline.fit(train)

In [18]:
pred_df = pipelineModel.transform(test)
pred_df.select("features", "prediction", "trueBlackElo").show()

+--------------------+------------------+------------+
|            features|        prediction|trueBlackElo|
+--------------------+------------------+------------+
|[0.0,0.3903826266...|1434.1574524473335|        1504|
|[0.0,0.4441571871...|1500.2725358503899|        1933|
|[1.0,0.7425025853...|1839.8695592496101|        2207|
|[1.0,0.7492244053...|1839.8695592496101|        2141|
|[1.0,0.7052740434...| 1815.445441915403|        1943|
|[0.0,0.7042399172...|1824.4574131128186|        2000|
|[2.0,0.4053774560...|1404.7691807863669|        1440|
|[0.0,0.7068252326...|1824.4574131128186|        2293|
|[1.0,0.4519131334...|1430.5314210290076|        1500|
|[0.0,0.4788004136...|1430.5314210290076|        1537|
|[1.0,0.5816959669...|1755.2972331221458|        1778|
|[1.0,0.6964839710...| 1845.515756752596|        1813|
|[0.0,0.4544984488...|1422.8458691667147|        1510|
|[0.0,0.4979317476...|1581.4058977860816|        1139|
|[0.0,0.4519131334...|1422.8458691667147|        1500|
|[2.0,0.61

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator

regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="trueBlackElo", metricName="rmse")

In [20]:
# RMSE
rmse = regressionEvaluator.evaluate(pred_df)
print(f"The RMSE for the random forest regression model is {rmse:0.2f}")
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(pred_df)
print(f"The MSE for the random forest regression model is {mse:0.2f}")
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(pred_df)
print(f"The R2 for the random forest regression model is {r2:0.2f}")
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(pred_df)
print(f"The MAE for the random forest regression model is {mae:0.2f}")

The RMSE for the random forest regression model is 201.67
The MSE for the random forest regression model is 40671.89
The R2 for the random forest regression model is 0.64
The MAE for the random forest regression model is 149.82


In [21]:
param_grid = ParamGridBuilder().\
    addGrid(rfr.numTrees, [10, 15, 20]).\
    addGrid(rfr.maxDepth, [1, 2, 4]).\
    addGrid(rfr.maxBins , [2207, 4414, 8828]).\
    build()

In [22]:
cv = CrossValidator(estimator=pipeline, \
                    estimatorParamMaps=param_grid, \
                    evaluator=RegressionEvaluator(
                                predictionCol="prediction", \
                                labelCol="BlackElo", \
                                metricName="rmse"), \
                    numFolds=2)

In [23]:
cv_model = cv.fit(train)

In [24]:
newPrediction = cv_model.transform(test)

In [25]:
# RMSE
rmse = regressionEvaluator.evaluate(newPrediction)
print(f"The RMSE for the random forest regression model is {rmse:0.2f}")
# MSE
mse = regressionEvaluator.setMetricName("mse").evaluate(newPrediction)
print(f"The MSE for the random forest regression model is {mse:0.2f}")
# R2
r2 = regressionEvaluator.setMetricName("r2").evaluate(newPrediction)
print(f"The R2 for the random forest regression model is {r2:0.2f}")
# MAE
mae = regressionEvaluator.setMetricName("mae").evaluate(newPrediction)
print(f"The MAE for the random forest regression model is {mae:0.2f}")

The RMSE for the random forest regression model is 106.46
The MSE for the random forest regression model is 24713.13
The R2 for the random forest regression model is 0.78
The MAE for the random forest regression model is 106.46
