#### Initialize Spark session

In [119]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("xor") \
    .config("spark.executor.memory", '2g') \
    .config('spark.executor.cores', '1') \
    .config('spark.cores.max', '1') \
    .config("spark.driver.memory",'1g') \
    .getOrCreate()

sc = spark.sparkContext


#### Read in data and merge dataset on ab_id

In [120]:
pitches = spark.read.option("inferSchema", "true").csv('Data/pitches.csv', header = True)
atbats = spark.read.option("inferSchema", "true").csv('Data/atbats.csv', header = True).select("ab_id", "batter_id", 
                                                                                               "inning", "p_score", 
                                                                                               "p_throws", "pitcher_id",
                                                                                               "stand", "top")

df = pitches.join(atbats, "ab_id")

#### Drop unnecessary variables

In [121]:
df = df.drop("ax", "ay", "az", "batter_id", "break_angle", "break_length", "break_y", "code", "event", "g_id", "o",
            
             "p_throws", 'nasty',"pfx_x", "pfx_z", "px", "pz", "spin_dir", "end_speed", "start_speed"
             
             "sz_bot", "sz_top", "vx0", "vy0", "vz0", "x", "x0", "y", "y0", "z", "z0", "zone", "spin_rate")

#### Create new variable score_difference

In [122]:
df=df.withColumn("score_difference", df.p_score-df.b_score)

#### Remove low frequency observations (look at pitch_type to decide which ones to remove)

In [123]:
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql.functions import udf
from pyspark.sql import functions as f
from pyspark.sql.functions import col

In [124]:
df = df.filter(
    (col('pitch_type') != 'UN') &
    (col('pitch_type') != 'EP') &
    (col('pitch_type') != 'AB') &
    (col('pitch_type') != 'FA') &
    (col('pitch_type') != 'IN') &
    (col('pitch_type') != 'SC'))

#### FO and PO are the same so consolidate

In [125]:
df=df.na.replace(['FO'], ['PO'], 'pitch_type')

#### Create new column that is a latent variable based on pitch_type

In [126]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.tuning import CrossValidator

indexer = StringIndexer(inputCol="pitch_type", outputCol="latent_pitch_type")
df = indexer.fit(df).transform(df)
df.head(2)

[Row(ab_id=2015000044, b_count=0, b_score=0, on_1b=False, on_2b=True, on_3b=False, outs=1, pitch_num=1, pitch_type='FC', s_count=0, start_speed=84.6, sz_bot=1.52, type='B', type_confidence=2.0, inning=5, p_score=3, pitcher_id=425794, stand='L', top=False, score_difference=3, latent_pitch_type=6.0),
 Row(ab_id=2015000044, b_count=1, b_score=0, on_1b=False, on_2b=True, on_3b=False, outs=1, pitch_num=2, pitch_type='FC', s_count=0, start_speed=88.4, sz_bot=1.52, type='B', type_confidence=2.0, inning=5, p_score=3, pitcher_id=425794, stand='L', top=False, score_difference=3, latent_pitch_type=6.0)]

In [127]:
df.groupBy("latent_pitch_type").count().show()
df.groupBy("pitch_type").count().show()

+-----------------+-------+
|latent_pitch_type|  count|
+-----------------+-------+
|              8.0|  43705|
|              0.0|1014880|
|              7.0|  66484|
|              1.0| 450581|
|              4.0| 242506|
|              3.0| 292789|
|              2.0| 337983|
|             10.0|   1438|
|              6.0| 149756|
|              5.0| 234391|
|              9.0|  11260|
+-----------------+-------+

+----------+-------+
|pitch_type|  count|
+----------+-------+
|        FT| 337983|
|        SL| 450581|
|        FC| 149756|
|        FF|1014880|
|        FS|  43705|
|        PO|   1438|
|        KC|  66484|
|        CH| 292789|
|        CU| 234391|
|        KN|  11260|
|        SI| 242506|
+----------+-------+



In [128]:
## udf_latent_base = udf(lambda z: if)

#### Create new column that is latent variable based on balls and strikes

In [129]:
def count_status(b_count, s_count):
    if   b_count==0 and s_count==0: return 0
    elif b_count==1 and s_count==0: return 1
    elif b_count==0 and s_count==1: return 2
    elif b_count==1 and s_count==1: return 3
    elif b_count==2 and s_count==0: return 4
    elif b_count==0 and s_count==2: return 5
    elif b_count==3 and s_count==0: return 6
    elif b_count==2 and s_count==1: return 7
    elif b_count==1 and s_count==2: return 8
    elif b_count==3 and s_count==1: return 9
    elif b_count==2 and s_count==2: return 10
    elif b_count==3 and s_count==2: return 11
    
udfcount_status = udf(count_status, IntegerType())
df = df.withColumn("count_status", udfcount_status("b_count", "s_count"))

#### Create new column that is latent variable based on on_1b, on_2b, and on_3b

In [130]:
def base_status(on_1b, on_2b, on_3b):
    if   on_1b==0 and on_2b==0 and on_3b==0: return 0
    elif on_1b==1 and on_2b==0 and on_3b==0: return 1
    elif on_1b==0 and on_2b==1 and on_3b==0: return 2
    elif on_1b==0 and on_2b==0 and on_3b==1: return 3
    elif on_1b==1 and on_2b==1 and on_3b==0: return 4
    elif on_1b==1 and on_2b==0 and on_3b==1: return 5
    elif on_1b==0 and on_2b==1 and on_3b==1: return 6
    elif on_1b==1 and on_2b==1 and on_3b==1: return 7
    
udfbase_status = udf(base_status, IntegerType())
df = df.withColumn("base_status", udfbase_status("on_1b", "on_2b", "on_3b"))

#### Create new column binning score_difference

In [131]:
def bin_score(score_difference):
    if score_difference<(-4): return (-5)
    elif score_difference>(4): return (5)
    elif score_difference==(-4): return (-4)
    elif score_difference==(-3): return (-3) 
    elif score_difference==(-2): return (-2)
    elif score_difference==(-1): return (-1) 
    elif score_difference==(0): return (0) 
    elif score_difference==(1): return (1)
    elif score_difference==(2): return (2)
    elif score_difference==(3): return (3)
    elif score_difference==(4): return (4)
udfbin_score = udf(bin_score, IntegerType())
df = df.withColumn("binned_score_difference", udfbin_score("score_difference"))

#### Create new column binning pitch_num

In [132]:
def bin_pitch_num(pitch_num):
    if pitch_num>(14): 
        return (14)
    else: 
        return(pitch_num)
udfpitch_num = udf(bin_pitch_num, IntegerType())
df = df.withColumn("pitch_num", udfpitch_num("pitch_num"))

In [133]:
df.groupby("pitch_num").count().show()

+---------+------+
|pitch_num| count|
+---------+------+
|       12|   409|
|        1|735198|
|       13|   150|
|        6|145005|
|        3|539590|
|        5|268666|
|        9|  8831|
|        4|406352|
|        8| 23528|
|        7| 60001|
|       10|  3210|
|       11|  1130|
|       14|    80|
|        2|653623|
+---------+------+



#### Convert data to dense vector

##### We need to dummy encode some of these because it offers useful info

In [134]:
df = df.drop('on_1b', 'on_2b', 'on_3b', 'pitch_type', 'type', 'stand', 'top', 'start_speed', 'sz_bot', 'type_confidence')


In [135]:
df.dtypes

[('ab_id', 'int'),
 ('b_count', 'int'),
 ('b_score', 'int'),
 ('outs', 'int'),
 ('pitch_num', 'int'),
 ('s_count', 'int'),
 ('inning', 'int'),
 ('p_score', 'int'),
 ('pitcher_id', 'int'),
 ('score_difference', 'int'),
 ('latent_pitch_type', 'double'),
 ('count_status', 'int'),
 ('base_status', 'int'),
 ('binned_score_difference', 'int')]

In [136]:
df = df.select("ab_id","b_count","b_score","outs", 'pitch_num', 's_count', 'inning', 'p_score', 'pitcher_id', 'score_difference',
              'count_status', 'base_status', 'binned_score_difference', 'latent_pitch_type')

In [137]:
df.dtypes

[('ab_id', 'int'),
 ('b_count', 'int'),
 ('b_score', 'int'),
 ('outs', 'int'),
 ('pitch_num', 'int'),
 ('s_count', 'int'),
 ('inning', 'int'),
 ('p_score', 'int'),
 ('pitcher_id', 'int'),
 ('score_difference', 'int'),
 ('count_status', 'int'),
 ('base_status', 'int'),
 ('binned_score_difference', 'int'),
 ('latent_pitch_type', 'double')]

In [138]:
df.select('binned_score_difference').distinct().rdd.map(lambda r: r[0]).collect()

[-1, 1, 3, -5, 5, 4, -4, -2, 2, -3, 0]

In [139]:
df = df.na.drop(subset=["count_status"])

In [140]:
df.groupby('binned_score_difference').count().show()

+-----------------------+------+
|binned_score_difference| count|
+-----------------------+------+
|                     -1|340572|
|                      1|344147|
|                      3|160611|
|                     -5|194611|
|                      5|203694|
|                      4|112138|
|                     -4|106880|
|                     -2|232629|
|                      2|238861|
|                     -3|154079|
|                      0|757537|
+-----------------------+------+



In [141]:
def transData(data):
    return data.rdd.map(lambda r: [r[-1], Vectors.dense(r[:-1])]).\
           toDF(['label','features'])

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

data= transData(df)
data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  6.0|[2.015000044E9,0....|
|  6.0|[2.015000044E9,1....|
|  6.0|[2.015000044E9,2....|
|  6.0|[2.015000044E9,3....|
|  0.0|[2.015000044E9,3....|
|  0.0|[2.015000044E9,3....|
|  0.0|[2.015000059E9,0....|
|  0.0|[2.015000059E9,0....|
|  2.0|[2.015000059E9,1....|
|  3.0|[2.015000059E9,2....|
|  3.0|[2.015000059E9,2....|
|  3.0|[2.015000059E9,3....|
|  2.0|[2.015000183E9,0....|
|  2.0|[2.015000183E9,1....|
|  2.0|[2.015000183E9,2....|
|  0.0|[2.015000183E9,3....|
|  2.0|[2.015000183E9,3....|
|  2.0|[2.015000183E9,3....|
|  2.0|[2.015000294E9,0....|
|  2.0|[2.015000294E9,1....|
+-----+--------------------+
only showing top 20 rows



#### Split data into training and test data

In [142]:
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

#### specify layers for the neural network: input layer of size 11 (features), two intermediate of size 5 and 4 and output of size 7 (classes)

# Currently not working so experimenting with other code
from pyspark.ml.feature import IndexToString, StringIndexer
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

layers = [11, 5, 4, 4, 3 , 11]

# create the trainer and set its parameters
FNN = MultilayerPerceptronClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures",\
                                         maxIter=100, layers=layers, blockSize=128, seed=1234)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)
# Chain indexers and forest in a Pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, FNN, labelConverter])
# train the model
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)


In [149]:
from pyspark.ml.feature import IndexToString, StringIndexer
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

layers = [13, 5, 4, 11]

trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

#### Train the model

In [150]:
model = trainer.fit(train)

#### Compute accuracy on the test set

In [151]:
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Test set accuracy = 0.3570523656068907
