In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [3]:
import os

In [4]:
data = spark.read.option("mode", 'DROPMALFORMED').csv("games.csv", header=True, inferSchema=True)

In [5]:
data.printSchema()

root
 |-- id: string (nullable = true)
 |-- rated: boolean (nullable = true)
 |-- created_at: double (nullable = true)
 |-- last_move_at: double (nullable = true)
 |-- turns: integer (nullable = true)
 |-- victory_status: string (nullable = true)
 |-- winner: string (nullable = true)
 |-- increment_code: string (nullable = true)
 |-- white_id: string (nullable = true)
 |-- white_rating: integer (nullable = true)
 |-- black_id: string (nullable = true)
 |-- black_rating: integer (nullable = true)
 |-- moves: string (nullable = true)
 |-- opening_eco: string (nullable = true)
 |-- opening_name: string (nullable = true)
 |-- opening_ply: integer (nullable = true)



In [6]:
data.count()

20058

In [7]:
data.createOrReplaceTempView("data")

In [8]:
from pyspark.sql.functions import length
data = data.where(length("moves") >= 90)

In [9]:
data.count()

17957

In [10]:
from pyspark.sql.functions import split
split_col = split(data['moves'], ' ')
data = data.withColumn('move1', split_col.getItem(0)) \
       .withColumn('move2', split_col.getItem(1))   \
       .withColumn('move3', split_col.getItem(2))   \
       .withColumn('move4', split_col.getItem(3))   \
       .withColumn('move5', split_col.getItem(4))   \
       .withColumn('move6', split_col.getItem(5))   \
       .withColumn('move7', split_col.getItem(6))   \
       .withColumn('move8', split_col.getItem(7))   \
       .withColumn('move9', split_col.getItem(8))   \
       .withColumn('move10', split_col.getItem(9))  \
       .withColumn('move11', split_col.getItem(10)) \
       .withColumn('move12', split_col.getItem(11)) \
       .withColumn('move13', split_col.getItem(12)) \
       .withColumn('move14', split_col.getItem(13)) \
       .withColumn('move15', split_col.getItem(14)) \
       .withColumn('move16', split_col.getItem(15)) \
       .withColumn('move17', split_col.getItem(16)) \
       .withColumn('move18', split_col.getItem(17)) \
       .withColumn('move19', split_col.getItem(18)) \
       .withColumn('move20', split_col.getItem(19)) 
data = data.drop("moves")
data.show(truncate=False) 

+--------+-----+----------+------------+-----+--------------+------+--------------+------------------+------------+------------------+------------+-----------+---------------------------------------------------------------------+-----------+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+
|id      |rated|created_at|last_move_at|turns|victory_status|winner|increment_code|white_id          |white_rating|black_id          |black_rating|opening_eco|opening_name                                                         |opening_ply|move1|move2|move3|move4|move5|move6|move7|move8|move9|move10|move11|move12|move13|move14|move15|move16|move17|move18|move19|move20|
+--------+-----+----------+------------+-----+--------------+------+--------------+------------------+------------+------------------+------------+-----------+---------------------------------------------------------------------+-----------+-----+-----+-

Check for null values

In [11]:
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(col(c).isNull(),c)).alias(c) for c in data.columns]).show()

+---+-----+----------+------------+-----+--------------+------+--------------+--------+------------+--------+------------+-----------+------------+-----------+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+
| id|rated|created_at|last_move_at|turns|victory_status|winner|increment_code|white_id|white_rating|black_id|black_rating|opening_eco|opening_name|opening_ply|move1|move2|move3|move4|move5|move6|move7|move8|move9|move10|move11|move12|move13|move14|move15|move16|move17|move18|move19|move20|
+---+-----+----------+------------+-----+--------------+------+--------------+--------+------------+--------+------------+-----------+------------+-----------+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+------+
|  0|    0|         0|           0|    0|             0|     0|             0|       0|           0|       0|           0|     

Remove rows where players drawed

In [12]:
data.createOrReplaceTempView("data")

In [13]:
data = spark.sql("SELECT * FROM data WHERE winner != 'draw'")

In [14]:
data.count()

17106

Remove unnecessary columns and rows

In [15]:
data = data.drop("id","victory_status","rated")
data.count()

17106

One Hot Encoding

In [16]:
data.printSchema()

root
 |-- created_at: double (nullable = true)
 |-- last_move_at: double (nullable = true)
 |-- turns: integer (nullable = true)
 |-- winner: string (nullable = true)
 |-- increment_code: string (nullable = true)
 |-- white_id: string (nullable = true)
 |-- white_rating: integer (nullable = true)
 |-- black_id: string (nullable = true)
 |-- black_rating: integer (nullable = true)
 |-- opening_eco: string (nullable = true)
 |-- opening_name: string (nullable = true)
 |-- opening_ply: integer (nullable = true)
 |-- move1: string (nullable = true)
 |-- move2: string (nullable = true)
 |-- move3: string (nullable = true)
 |-- move4: string (nullable = true)
 |-- move5: string (nullable = true)
 |-- move6: string (nullable = true)
 |-- move7: string (nullable = true)
 |-- move8: string (nullable = true)
 |-- move9: string (nullable = true)
 |-- move10: string (nullable = true)
 |-- move11: string (nullable = true)
 |-- move12: string (nullable = true)
 |-- move13: string (nullable = true)
 

In [17]:
data.head()

Row(created_at=1504130000000.0, last_move_at=1504130000000.0, turns=61, winner='white', increment_code='5+10', white_id='ischia', white_rating=1496, black_id='a-00', black_rating=1500, opening_eco='C20', opening_name="King's Pawn Game: Leonardis Variation", opening_ply=3, move1='e4', move2='e5', move3='d3', move4='d6', move5='Be3', move6='c6', move7='Be2', move8='b5', move9='Nd2', move10='a5', move11='a4', move12='c5', move13='axb5', move14='Nc6', move15='bxc6', move16='Ra6', move17='Nc4', move18='a4', move19='c3', move20='a3')

In [18]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
import numpy as np
categoricalColumns = [item[0] for item in data.dtypes if item[1].startswith('string') ]

stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + '_index')
    stages += [stringIndexer]

pipeline = Pipeline(stages = stages)

pipelineModel = pipeline.fit(data)
data = pipelineModel.transform(data)

cols = np.array(categoricalColumns)
data = data.drop(*cols)
data.show()

+----------+------------+-----+------------+------------+-----------+------------+--------------------+--------------+--------------+-----------------+------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+
|created_at|last_move_at|turns|white_rating|black_rating|opening_ply|winner_index|increment_code_index|white_id_index|black_id_index|opening_eco_index|opening_name_index|move1_index|move2_index|move3_index|move4_index|move5_index|move6_index|move7_index|move8_index|move9_index|move10_index|move11_index|move12_index|move13_index|move14_index|move15_index|move16_index|move17_index|move18_index|move19_index|move20_index|
+----------+------------+-----+------------+------------+-----------+------------+--------------------+--------------+--------------+-----------------+-----

In [19]:
data.head()

Row(created_at=1504130000000.0, last_move_at=1504130000000.0, turns=61, white_rating=1496, black_rating=1500, opening_ply=3, winner_index=0.0, increment_code_index=16.0, white_id_index=4508.0, black_id_index=1914.0, opening_eco_index=6.0, opening_name_index=18.0, move1_index=0.0, move2_index=0.0, move3_index=10.0, move4_index=3.0, move5_index=39.0, move6_index=12.0, move7_index=11.0, move8_index=22.0, move9_index=51.0, move10_index=64.0, move11_index=44.0, move12_index=15.0, move13_index=177.0, move14_index=2.0, move15_index=330.0, move16_index=340.0, move17_index=93.0, move18_index=172.0, move19_index=13.0, move20_index=669.0)

Minmax Scaler

In [21]:
data.printSchema()

root
 |-- created_at: double (nullable = true)
 |-- last_move_at: double (nullable = true)
 |-- turns: integer (nullable = true)
 |-- white_rating: integer (nullable = true)
 |-- black_rating: integer (nullable = true)
 |-- opening_ply: integer (nullable = true)
 |-- winner_index: double (nullable = false)
 |-- increment_code_index: double (nullable = false)
 |-- white_id_index: double (nullable = false)
 |-- black_id_index: double (nullable = false)
 |-- opening_eco_index: double (nullable = false)
 |-- opening_name_index: double (nullable = false)
 |-- move1_index: double (nullable = false)
 |-- move2_index: double (nullable = false)
 |-- move3_index: double (nullable = false)
 |-- move4_index: double (nullable = false)
 |-- move5_index: double (nullable = false)
 |-- move6_index: double (nullable = false)
 |-- move7_index: double (nullable = false)
 |-- move8_index: double (nullable = false)
 |-- move9_index: double (nullable = false)
 |-- move10_index: double (nullable = false)
 |-

In [22]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

columns_to_scale = ["created_at", "last_move_at", "turns", "white_rating", "black_rating", "opening_ply"]
assemblers = [VectorAssembler(inputCols=[col], outputCol=col+"_vec") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vec", outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(data)
scaledData = scalerModel.transform(data)

data = scaledData.drop("created_at", "last_move_at", "turns", "white_rating", "black_rating", "opening_ply", "rated_vec", "created_at_vec", "last_move_at_vec", "turns_vec", "white_rating_vec", "black_rating_vec", "opening_ply_vec")
data.show()

+------------+--------------------+--------------+--------------+-----------------+------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|winner_index|increment_code_index|white_id_index|black_id_index|opening_eco_index|opening_name_index|move1_index|move2_index|move3_index|move4_index|move5_index|move6_index|move7_index|move8_index|move9_index|move10_index|move11_index|move12_index|move13_index|move14_index|move15_index|move16_index|move17_index|move18_index|move19_index|move20_index|   created_at_scaled| last_move_at_scaled|        turns_scaled| white_rating_scaled| black_rating_scaled|  opening_ply_scaled|
+------------+--------------------+-----

In [23]:
data.head()

Row(winner_index=0.0, increment_code_index=16.0, white_id_index=4508.0, black_id_index=1914.0, opening_eco_index=6.0, opening_name_index=18.0, move1_index=0.0, move2_index=0.0, move3_index=10.0, move4_index=3.0, move5_index=39.0, move6_index=12.0, move7_index=11.0, move8_index=22.0, move9_index=51.0, move10_index=64.0, move11_index=44.0, move12_index=15.0, move13_index=177.0, move14_index=2.0, move15_index=330.0, move16_index=340.0, move17_index=93.0, move18_index=172.0, move19_index=13.0, move20_index=669.0, created_at_scaled=DenseVector([0.9972]), last_move_at_scaled=DenseVector([0.9972]), turns_scaled=DenseVector([0.122]), white_rating_scaled=DenseVector([0.3874]), black_rating_scaled=DenseVector([0.3881]), opening_ply_scaled=DenseVector([0.0741]))

Implement Neural Network

In [20]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [21]:
assembler = VectorAssembler(inputCols=[s for s in data.columns if s != "winner_index"], outputCol="features")

In [22]:
output = assembler.transform(data)

In [23]:
output.printSchema()

root
 |-- winner_index: double (nullable = false)
 |-- increment_code_index: double (nullable = false)
 |-- white_id_index: double (nullable = false)
 |-- black_id_index: double (nullable = false)
 |-- opening_eco_index: double (nullable = false)
 |-- opening_name_index: double (nullable = false)
 |-- move1_index: double (nullable = false)
 |-- move2_index: double (nullable = false)
 |-- move3_index: double (nullable = false)
 |-- move4_index: double (nullable = false)
 |-- move5_index: double (nullable = false)
 |-- move6_index: double (nullable = false)
 |-- move7_index: double (nullable = false)
 |-- move8_index: double (nullable = false)
 |-- move9_index: double (nullable = false)
 |-- move10_index: double (nullable = false)
 |-- move11_index: double (nullable = false)
 |-- move12_index: double (nullable = false)
 |-- move13_index: double (nullable = false)
 |-- move14_index: double (nullable = false)
 |-- move15_index: double (nullable = false)
 |-- move16_index: double (nullable 

In [24]:
columns = [s for s in output.columns]
columns.remove("winner_index")
columns.remove("features")
cols = np.array(columns)
data = output.drop(*cols)
data = data.withColumnRenamed("winner_index", "label")

In [25]:
data = data.dropDuplicates() #dropping duplicates slow program down a lot
data.show(truncate=False)

+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                                                                                                                                               |
+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1.0  |[9.0,710.0,0.0,9.0,12.0,0.0,0.0,0.0,0.0,3.0,0.0,0.0,5.0,2.0,2.0,4.0,5.0,5.0,5.0,42.0,7.0,2.0,54.0,7.0,108.0,0.9945729966490177,0.9945676541756724,0.25914634146341464,0.10010881392818281,0.2816593886462882,0.25925925925925924]

In [26]:
data.count()

16762

In [27]:
[train, test] = data.randomSplit([0.7, 0.3])

In [28]:
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=64, maxBins=11772)

In [29]:
model = rf.fit(train)

In [30]:
result = model.transform(test)

In [31]:
print(result.columns)

['label', 'features', 'rawPrediction', 'probability', 'prediction']


In [32]:
result.printSchema()

root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [33]:
result.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|[0.0,4.0,639.0,4....|[44.8557905891769...|[0.70087172795588...|       0.0|
|  0.0|[0.0,4.0,781.0,46...|[28.9976079140727...|[0.45308762365738...|       1.0|
|  0.0|[0.0,10.0,1533.0,...|[11.3692810598809...|[0.17764501656064...|       1.0|
|  0.0|[0.0,12.0,1225.0,...|[29.8226535744126...|[0.46597896210019...|       1.0|
|  0.0|[0.0,13.0,698.0,2...|[15.4832032349736...|[0.24192505054646...|       1.0|
|  0.0|[0.0,13.0,1495.0,...|[11.7397836296608...|[0.18343411921345...|       1.0|
|  0.0|[0.0,19.0,884.0,7...|[28.1483630496176...|[0.43981817265027...|       1.0|
|  0.0|[0.0,19.0,1127.0,...|[33.7851038243944...|[0.52789224725616...|       0.0|
|  0.0|[0.0,28.0,1391.0,...|[11.5877739144737...|[0.18105896741365...|       1.0|
|  0.0|[0.0,28.0

In [34]:
predictionAndLabels = result.select("prediction", "label")

In [35]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

In [36]:
accuracy = evaluator.evaluate(result)

In [37]:
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.3910956306694533
