# Machine Learning Modelling with Pyspark

# India vs Bangladesh Cricket Match and Predictive outcomes

In [42]:
# Source Analytics Vidya

In [43]:
import pyspark.sql
from pyspark.sql import SparkSession
# create Spark Reader
spark = SparkSession.Builder().appName('Sparky').getOrCreate()
# read a csv file
my_data = spark.read.csv(r'C:\Users\muham\Downloads\ind-ban-comment.csv',header=True)

# see the default schema of the dataframe
my_data.printSchema()

root
 |-- Batsman: string (nullable = true)
 |-- Batsman_Name: string (nullable = true)
 |-- Bowler: string (nullable = true)
 |-- Bowler_Name: string (nullable = true)
 |-- Commentary: string (nullable = true)
 |-- Detail: string (nullable = true)
 |-- Dismissed: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- Isball: string (nullable = true)
 |-- Isboundary: string (nullable = true)
 |-- Iswicket: string (nullable = true)
 |-- Over: string (nullable = true)
 |-- Runs: string (nullable = true)
 |-- Timestamp: string (nullable = true)



In [44]:
import pyspark.sql.types as tp

# define the schema
my_schema = tp.StructType([
    tp.StructField(name= 'Batsman',      dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Batsman_Name', dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Bowler',       dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Bowler_Name',  dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Commentary',   dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Detail',       dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Dismissed',    dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Id',           dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Isball',       dataType= tp.BooleanType(),   nullable= True),
    tp.StructField(name= 'Isboundary',   dataType= tp.BinaryType(),   nullable= True),
    tp.StructField(name= 'Iswicket',     dataType= tp.BinaryType(),   nullable= True),
    tp.StructField(name= 'Over',         dataType= tp.DoubleType(),    nullable= True),
    tp.StructField(name= 'Runs',         dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Timestamp',    dataType= tp.TimestampType(), nullable= True)    
])

# read the data again with the defined schema
my_data = spark.read.csv(r'C:\Users\muham\Downloads\ind-ban-comment.csv',schema= my_schema,header= True)

# print the schema
my_data.printSchema()

root
 |-- Batsman: integer (nullable = true)
 |-- Batsman_Name: string (nullable = true)
 |-- Bowler: integer (nullable = true)
 |-- Bowler_Name: string (nullable = true)
 |-- Commentary: string (nullable = true)
 |-- Detail: string (nullable = true)
 |-- Dismissed: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- Isball: boolean (nullable = true)
 |-- Isboundary: binary (nullable = true)
 |-- Iswicket: binary (nullable = true)
 |-- Over: double (nullable = true)
 |-- Runs: integer (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



In [45]:
# drop the columns that are not required
my_data = my_data.drop(*['Batsman', 'Bowler', 'Id'])
my_data.columns

['Batsman_Name',
 'Bowler_Name',
 'Commentary',
 'Detail',
 'Dismissed',
 'Isball',
 'Isboundary',
 'Iswicket',
 'Over',
 'Runs',
 'Timestamp']

# Using pyspark for Data Exploration

In [46]:
# get the dimensions of the data
(my_data.count() , len(my_data.columns))
# >> (605, 11)

(605, 11)

In [47]:
# get the summary of the numerical columns
my_data.select('Isball', 'Isboundary', 'Runs').describe().show()

+-------+------------------+
|summary|              Runs|
+-------+------------------+
|  count|               605|
|   mean|0.9917355371900827|
| stddev| 1.342725481259329|
|    min|                 0|
|    max|                 6|
+-------+------------------+



In [48]:
# import sql function pyspark
import pyspark.sql.functions as f

# null values in each column
data_agg = my_data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in my_data.columns])

In [49]:
data_agg

DataFrame[Batsman_Name: bigint, Bowler_Name: bigint, Commentary: bigint, Detail: bigint, Dismissed: bigint, Isball: bigint, Isboundary: bigint, Iswicket: bigint, Over: bigint, Runs: bigint, Timestamp: bigint]

In [50]:
# value counts of Batsman_Name column
my_data.groupBy('Batsman_Name').count().show()

+------------------+-----+
|      Batsman_Name|count|
+------------------+-----+
|     Soumya Sarkar|   39|
|  Mashrafe Mortaza|    5|
|   Shakib Al Hasan|   75|
|   Mushfiqur Rahim|   23|
|Mohammad Saifuddin|   42|
|         Liton Das|   24|
|      Rishabh Pant|   43|
|    Mohammed Shami|    2|
|       Tamim Iqbal|   31|
|     Hardik Pandya|    2|
|          KL Rahul|   93|
| Bhuvneshwar Kumar|    4|
|     Rubel Hossain|   11|
|      Rohit Sharma|   94|
|    Dinesh Karthik|    9|
|       Virat Kohli|   27|
|          MS Dhoni|   33|
|     Sabbir Rahman|   40|
|  Mosaddek Hossain|    7|
| Mustafizur Rahman|    1|
+------------------+-----+



# Encode Categorical Variables using PySpark

In [51]:
# String Indexing
# String Indexing is similar to Label Encoding. 
# It assigns a unique integer value to each category.
# 0 is assigned to the most frequent category, 1 to the next most frequent value
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
# create object of StringIndexer class and specify input and output column
SI_batsman = StringIndexer(inputCol='Batsman_Name',outputCol='Batsman_Index')
SI_bowler = StringIndexer(inputCol='Bowler_Name',outputCol='Bowler_Index')

# transform the data
my_data = SI_batsman.fit(my_data).transform(my_data)
my_data = SI_bowler.fit(my_data).transform(my_data)

# view the transformed data
my_data.select('Batsman_Name', 'Batsman_Index', 'Bowler_Name', 'Bowler_Index').show(10)

+-----------------+-------------+------------------+------------+
|     Batsman_Name|Batsman_Index|       Bowler_Name|Bowler_Index|
+-----------------+-------------+------------------+------------+
|   Mohammed Shami|         18.0| Mustafizur Rahman|         0.0|
|Bhuvneshwar Kumar|         16.0| Mustafizur Rahman|         0.0|
|   Mohammed Shami|         18.0| Mustafizur Rahman|         0.0|
|Bhuvneshwar Kumar|         16.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
+-----------------+-------------+------------------+------------+
only showing top 10 rows



# First, we need to use the String Indexer to convert the variable into numerical form and then use OneHotEncoderEstimator to encode multiple columns of the dataset.

In [52]:
# create object and specify input and output column
OHE = OneHotEncoder(inputCols=['Batsman_Index', 'Bowler_Index'],outputCols=['Batsman_OHE', 'Bowler_OHE'])

# transform the data
my_data = OHE.fit(my_data).transform(my_data)

# view and transform t
my_data.select('Batsman_Name', 'Batsman_Index', 'Batsman_OHE', 'Bowler_Name', 'Bowler_Index', 'Bowler_OHE').show(10)


+-----------------+-------------+---------------+------------------+------------+--------------+
|     Batsman_Name|Batsman_Index|    Batsman_OHE|       Bowler_Name|Bowler_Index|    Bowler_OHE|
+-----------------+-------------+---------------+------------------+------------+--------------+
|   Mohammed Shami|         18.0|(19,[18],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|Bhuvneshwar Kumar|         16.0|(19,[16],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|   Mohammed Shami|         18.0|(19,[18],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|Bhuvneshwar Kumar|         16.0|(19,[16],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|         MS Dhoni|          7.0| (19,[7],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|         MS Dhoni|          7.0| (19,[7],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|         MS Dhoni|          7.0| (19,[7],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|         MS Dhoni|          7

# The Vector Assembler now converts them into a single feature column in order to train the machine learning model

In [53]:
from pyspark.ml.feature import VectorAssembler
# Since ISwicket and IsBowler are not supported here, we thus remove them

# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['Over',
                                       'Runs',
                                       'Batsman_Index',
                                       'Bowler_Index',
                                       'Batsman_OHE',
                                       'Bowler_OHE'],
                           outputCol='vector')

# fill the null values
my_data = my_data.fillna(0)

# transform the data
final_data = assembler.transform(my_data)

# view the transformed vector
final_data.select('vector').show()

+--------------------+
|              vector|
+--------------------+
|(34,[0,2,22,23],[...|
|(34,[0,1,2,20,23]...|
|(34,[0,1,2,22,23]...|
|(34,[0,1,2,20,23]...|
|(34,[0,2,11,23],[...|
|(34,[0,2,11,23],[...|
|(34,[0,2,11,23],[...|
|(34,[0,1,2,3,11,3...|
|(34,[0,1,2,3,11,3...|
|(34,[0,2,3,11,31]...|
|(34,[0,2,3,11,31]...|
|(34,[0,1,2,3,11,3...|
|(34,[0,1,2,3,11,3...|
|(34,[0,2,20,23],[...|
|(34,[0,1,2,11,23]...|
|(34,[0,2,11,23],[...|
|(34,[0,1,2,20,23]...|
|(34,[0,2,17,23],[...|
|(34,[0,1,2,11,23]...|
|(34,[0,1,2,3,11,3...|
+--------------------+
only showing top 20 rows



# Building Machine Learning Pipelines using PySpark

In [54]:
# A pipeline allows us to maintain the data flow of all the relevant transformations 
# that are required to reach the end result.

In [55]:
from pyspark.ml import Pipeline
from sklearn.tree import DecisionTreeClassifier

# Let’s create a sample dataframe with three columns as shown below. Here, we will define some of the stages in which we want to transform the data and see how to set up the pipeline:

In [56]:
# define stage 3 : one hot encode the numeric category_2 column
# create a sample dataframe
sample_df = spark.createDataFrame([
    (1, 'L101', 'R'),
    (2, 'L201', 'C'),
    (3, 'D111', 'R'),
    (4, 'F210', 'R'),
    (5, 'D110', 'C')
], ['id', 'category_1', 'category_2'])

sample_df.show()

+---+----------+----------+
| id|category_1|category_2|
+---+----------+----------+
|  1|      L101|         R|
|  2|      L201|         C|
|  3|      D111|         R|
|  4|      F210|         R|
|  5|      D110|         C|
+---+----------+----------+



In [57]:
# define stage 1 : transform the column category_1 to numeric
stage_1 = StringIndexer(inputCol= 'category_1', outputCol= 'category_1_index')
# define stage 2 : transform the column category_2 to numeric
stage_2 = StringIndexer(inputCol= 'category_2', outputCol= 'category_2_index')
# define stage 3 : one hot encode the numeric category_2 column
stage_3 = OneHotEncoder(inputCols=['category_2_index'], outputCols=['category_2_OHE'])

# setup the pipeline
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3])

# fit the pipeline model and transform the data as defined
pipeline_model = pipeline.fit(sample_df)
sample_df_updated = pipeline_model.transform(sample_df)

# view the transformed data
sample_df_updated.show()

+---+----------+----------+----------------+----------------+--------------+
| id|category_1|category_2|category_1_index|category_2_index|category_2_OHE|
+---+----------+----------+----------------+----------------+--------------+
|  1|      L101|         R|             3.0|             0.0| (1,[0],[1.0])|
|  2|      L201|         C|             4.0|             1.0|     (1,[],[])|
|  3|      D111|         R|             1.0|             0.0| (1,[0],[1.0])|
|  4|      F210|         R|             2.0|             0.0| (1,[0],[1.0])|
|  5|      D110|         C|             0.0|             1.0|     (1,[],[])|
+---+----------+----------+----------------+----------------+--------------+



In [58]:
from pyspark.ml.classification import LogisticRegression

# create a sample dataframe with 4 features and 1 label column
sample_data_train = spark.createDataFrame([
    (2.0, 'A', 'S10', 40, 1.0),
    (1.0, 'X', 'E10', 25, 1.0),
    (4.0, 'X', 'S20', 10, 0.0),
    (3.0, 'Z', 'S10', 20, 0.0),
    (4.0, 'A', 'E10', 30, 1.0),
    (2.0, 'Z', 'S10', 40, 0.0),
    (5.0, 'X', 'D10', 10, 1.0),
], ['feature_1', 'feature_2', 'feature_3', 'feature_4', 'label'])

# view the data
sample_data_train.show()

+---------+---------+---------+---------+-----+
|feature_1|feature_2|feature_3|feature_4|label|
+---------+---------+---------+---------+-----+
|      2.0|        A|      S10|       40|  1.0|
|      1.0|        X|      E10|       25|  1.0|
|      4.0|        X|      S20|       10|  0.0|
|      3.0|        Z|      S10|       20|  0.0|
|      4.0|        A|      E10|       30|  1.0|
|      2.0|        Z|      S10|       40|  0.0|
|      5.0|        X|      D10|       10|  1.0|
+---------+---------+---------+---------+-----+



In [59]:
# define stage 1: transform the column feature_2 to numeric
stage_1 = StringIndexer(inputCol= 'feature_2', outputCol= 'feature_2_index')
# define stage 2: transform the column feature_3 to numeric
stage_2 = StringIndexer(inputCol= 'feature_3', outputCol= 'feature_3_index')
# define stage 3: one hot encode the numeric versions of feature 2 and 3 generated from stage 1 and stage 2
stage_3 = OneHotEncoder(inputCols=[stage_1.getOutputCol(), stage_2.getOutputCol()], 
                                 outputCols= ['feature_2_encoded', 'feature_3_encoded'])
# define stage 4: create a vector of all the features required to train the logistic regression model 
stage_4 = VectorAssembler(inputCols=['feature_1', 'feature_2_encoded', 'feature_3_encoded', 'feature_4'],
                          outputCol='features')
# define stage 5: logistic regression model                          
stage_5 = LogisticRegression(featuresCol='features',labelCol='label')

# setup the pipeline
regression_pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, stage_4, stage_5])

# fit the pipeline for the trainind data
model = regression_pipeline.fit(sample_data_train)
# transform the data
sample_data_train = model.transform(sample_data_train)

# view some of the columns generated
sample_data_train.select('features', 'label', 'rawPrediction', 'probability', 'prediction').show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[2.0,0.0,1.0,1.0,...|  1.0|[-18.956871848873...|[5.84972108437742...|       1.0|
|[1.0,1.0,0.0,0.0,...|  1.0|[-20.158269476976...|[1.75944137519452...|       1.0|
|(7,[0,1,6],[4.0,1...|  0.0|[18.0148602858562...|[0.99999998499466...|       0.0|
|(7,[0,3,6],[3.0,1...|  0.0|[24.5051237560211...|[0.99999999997721...|       0.0|
|[4.0,0.0,1.0,0.0,...|  1.0|[-50.288624611182...|[1.44519958724377...|       1.0|
|(7,[0,3,6],[2.0,1...|  0.0|[18.3280841853911...|[0.99999998902980...|       0.0|
|[5.0,1.0,0.0,0.0,...|  1.0|[-17.986823547341...|[1.54319845459317...|       1.0|
+--------------------+-----+--------------------+--------------------+----------+



# To Test the model

In [60]:
# create a sample data without the labels
sample_data_test = spark.createDataFrame([
    (3.0, 'Z', 'S10', 40),
    (1.0, 'X', 'E10', 20),
    (4.0, 'A', 'S20', 10),
    (3.0, 'A', 'S10', 20),
    (4.0, 'X', 'D10', 30),
    (1.0, 'Z', 'E10', 20),
    (4.0, 'A', 'S10', 30),
], ['feature_1', 'feature_2', 'feature_3', 'feature_4'])

# transform the data using the pipeline
sample_data_test = model.transform(sample_data_test)

# see the prediction on the test data
sample_data_test.select('features', 'rawPrediction', 'probability', 'prediction').show()

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(7,[0,3,6],[3.0,1...|[21.9361235191363...|[0.99999999970265...|       0.0|
|[1.0,1.0,0.0,0.0,...|[-19.516019417755...|[3.34426325212871...|       1.0|
|(7,[0,2,6],[4.0,1...|[-22.297362790363...|[2.07194574533260...|       1.0|
|[3.0,0.0,1.0,1.0,...|[-12.779832278243...|[2.81700837724637...|       1.0|
|[4.0,1.0,0.0,0.0,...|[-24.163863117971...|[3.20455394170236...|       1.0|
|(7,[0,4,6],[1.0,1...|[-22.543286459710...|[1.62022409523199...|       1.0|
|[4.0,0.0,1.0,1.0,...|[-10.456293062940...|[2.87658445082044...|       1.0|
+--------------------+--------------------+--------------------+----------+



# Thus the Model works with Pyspark so lets test our data in the model now

In [75]:
# define stage 1: transform the column feature_2 to numeric
stage_1 = StringIndexer(inputCol= 'Batsman_Name', outputCol='Batsman_Index' )
# define stage 2: transform the column feature_3 to numeric
stage_2 = StringIndexer(inputCol= 'Bowler_Name', outputCol= 'Bowler_Index')
                               
# define stage 4: create a vector of all the features required to train the logistic regression model 
stage_4 = VectorAssembler(inputCols=[ 'Batsman_Name', 'Bowler_Index'],
                          outputCol='features')
# define stage 5: logistic regression model                          
stage_5 = LogisticRegression(featuresCol='features',labelCol='Bowler_Index')

# setup the pipeline
regression_pipeline = Pipeline(stages= [stage_1, stage_2, stage_4, stage_5])

# fit the pipeline for the trainind data
model = regression_pipeline.fit(data_agg)
# transform the data
sample_data_train = model.transform(data_agg)

# view some of the columns generated
sample_data_train.select('rawPrediction', 'probability', 'prediction').show()

+--------------------+-----------+----------+
|       rawPrediction|probability|prediction|
+--------------------+-----------+----------+
|[Infinity,-Infinity]|  [1.0,0.0]|       0.0|
+--------------------+-----------+----------+



In [None]:
Thus Our Mo