In [11]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext 


In [6]:
# create Spark context with Spark configuration
conf = SparkConf().setAppName("cricket project")
sc = SparkContext(conf=conf)

In [12]:
sqlContext = SQLContext(sc)

##### Reading dataset

In [42]:
# read a csv file
my_data = sqlContext.read.csv('/home/hasan/DATA SET/ind-ban-comment.csv', header=True)

In [43]:
# see the default schema of the dataframe
my_data.printSchema()

root
 |-- Batsman: string (nullable = true)
 |-- Batsman_Name: string (nullable = true)
 |-- Bowler: string (nullable = true)
 |-- Bowler_Name: string (nullable = true)
 |-- Commentary: string (nullable = true)
 |-- Detail: string (nullable = true)
 |-- Dismissed: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- Isball: string (nullable = true)
 |-- Isboundary: string (nullable = true)
 |-- Iswicket: string (nullable = true)
 |-- Over: string (nullable = true)
 |-- Runs: string (nullable = true)
 |-- Timestamp: string (nullable = true)



##### Defining the Schema

In [44]:
import pyspark.sql.types as tp

my_schema = tp.StructType([
    tp.StructField(name= 'Batsman',      dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Batsman_Name', dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Bowler',       dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Bowler_Name',  dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Commentary',   dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Detail',       dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'Dismissed',    dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Id',           dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Isball',       dataType= tp.BooleanType(),   nullable= True),
    tp.StructField(name= 'Isboundary',   dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Iswicket',     dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Over',         dataType= tp.DoubleType(),    nullable= True),
    tp.StructField(name= 'Runs',         dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'Timestamp',    dataType= tp.TimestampType(), nullable= True)    
])


##### read the data again with the defined schema

In [45]:
# read the data again with the defined schema
my_data = sqlContext.read.csv('/home/hasan/DATA SET/ind-ban-comment.csv', schema=my_schema, header= True)


In [46]:
# print the schema
my_data.printSchema()


root
 |-- Batsman: integer (nullable = true)
 |-- Batsman_Name: string (nullable = true)
 |-- Bowler: integer (nullable = true)
 |-- Bowler_Name: string (nullable = true)
 |-- Commentary: string (nullable = true)
 |-- Detail: string (nullable = true)
 |-- Dismissed: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- Isball: boolean (nullable = true)
 |-- Isboundary: integer (nullable = true)
 |-- Iswicket: integer (nullable = true)
 |-- Over: double (nullable = true)
 |-- Runs: integer (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



##### Drop columns from the data

In [47]:
# drop the columns that are not required
my_data = my_data.drop(*['Batsman', 'Bowler', 'Id'])
my_data.columns


['Batsman_Name',
 'Bowler_Name',
 'Commentary',
 'Detail',
 'Dismissed',
 'Isball',
 'Isboundary',
 'Iswicket',
 'Over',
 'Runs',
 'Timestamp']

# Data Exploration using PySpark

##### Check the Data Dimensions

In [48]:
# get the dimensions of the data
(my_data.count() , len(my_data.columns))

(605, 11)

##### Describe the Data

In [49]:
# get the summary of the numerical columns
my_data.select('Isball', 'Isboundary', 'Runs').describe().show()

+-------+----------+------------------+
|summary|Isboundary|              Runs|
+-------+----------+------------------+
|  count|        67|               605|
|   mean|       1.0|0.9917355371900827|
| stddev|       0.0| 1.342725481259329|
|    min|         1|                 0|
|    max|         1|                 6|
+-------+----------+------------------+



##### Missing Values Count

In [50]:
# import sql function pyspark
import pyspark.sql.functions as f

# null values in each column
data_agg = my_data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in my_data.columns])
data_agg


DataFrame[Batsman_Name: bigint, Bowler_Name: bigint, Commentary: bigint, Detail: bigint, Dismissed: bigint, Isball: bigint, Isboundary: bigint, Iswicket: bigint, Over: bigint, Runs: bigint, Timestamp: bigint]

##### Value Counts of a Column

In [51]:
# value counts of Batsman_Name column
my_data.groupBy('Batsman_Name').count().show()

+------------------+-----+
|      Batsman_Name|count|
+------------------+-----+
|     Soumya Sarkar|   39|
|  Mashrafe Mortaza|    5|
|   Shakib Al Hasan|   75|
|   Mushfiqur Rahim|   23|
|Mohammad Saifuddin|   42|
|         Liton Das|   24|
|      Rishabh Pant|   43|
|    Mohammed Shami|    2|
|       Tamim Iqbal|   31|
|     Hardik Pandya|    2|
|          KL Rahul|   93|
| Bhuvneshwar Kumar|    4|
|     Rubel Hossain|   11|
|      Rohit Sharma|   94|
|    Dinesh Karthik|    9|
|       Virat Kohli|   27|
|          MS Dhoni|   33|
|     Sabbir Rahman|   40|
|  Mosaddek Hossain|    7|
| Mustafizur Rahman|    1|
+------------------+-----+



##### Encode Categorical Variables using PySpark

In [52]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder



In [53]:
# create object of StringIndexer class and specify input and output column
SI_batsman = StringIndexer(inputCol='Batsman_Name',outputCol='Batsman_Index')
SI_bowler = StringIndexer(inputCol='Bowler_Name',outputCol='Bowler_Index')

# transform the data
my_data = SI_batsman.fit(my_data).transform(my_data)
my_data = SI_bowler.fit(my_data).transform(my_data)

# view the transformed data
my_data.select('Batsman_Name', 'Batsman_Index', 'Bowler_Name', 'Bowler_Index').show(10)


+-----------------+-------------+------------------+------------+
|     Batsman_Name|Batsman_Index|       Bowler_Name|Bowler_Index|
+-----------------+-------------+------------------+------------+
|   Mohammed Shami|         18.0| Mustafizur Rahman|         0.0|
|Bhuvneshwar Kumar|         16.0| Mustafizur Rahman|         0.0|
|   Mohammed Shami|         18.0| Mustafizur Rahman|         0.0|
|Bhuvneshwar Kumar|         16.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
+-----------------+-------------+------------------+------------+
only showing top 10 rows



##### One-Hot Encoding

In [54]:
# create object and specify input and output column
OHE = OneHotEncoder(inputCols=['Batsman_Index', 'Bowler_Index'],outputCols=['Batsman_OHE', 'Bowler_OHE'])

# transform the data
my_data = OHE.fit(my_data).transform(my_data)

# view and transform the data
my_data.select('Batsman_Name', 'Batsman_Index', 'Batsman_OHE', 'Bowler_Name', 'Bowler_Index', 'Bowler_OHE').show(10)


+-----------------+-------------+---------------+------------------+------------+--------------+
|     Batsman_Name|Batsman_Index|    Batsman_OHE|       Bowler_Name|Bowler_Index|    Bowler_OHE|
+-----------------+-------------+---------------+------------------+------------+--------------+
|   Mohammed Shami|         18.0|(19,[18],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|Bhuvneshwar Kumar|         16.0|(19,[16],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|   Mohammed Shami|         18.0|(19,[18],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|Bhuvneshwar Kumar|         16.0|(19,[16],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|         MS Dhoni|          7.0| (19,[7],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|         MS Dhoni|          7.0| (19,[7],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|         MS Dhoni|          7.0| (19,[7],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|         MS Dhoni|          7

##### Vector Assembler

A vector assembler combines a given list of columns into a single vector column.

In [57]:
from pyspark.ml.feature import VectorAssembler

# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['Isboundary',
                                       'Iswicket',
                                       'Over',
                                       'Runs',
                                       'Batsman_Index',
                                       'Bowler_Index',
                                       'Batsman_OHE',
                                       'Bowler_OHE'],
                           outputCol='vector')


In [58]:
# fill the null values
my_data = my_data.fillna(0)

# transform the data
final_data = assembler.transform(my_data)

# view the transformed vector
final_data.select('vector').show()


+--------------------+
|              vector|
+--------------------+
|(36,[1,2,4,24,25]...|
|(36,[1,2,3,4,22,2...|
|(36,[2,3,4,24,25]...|
|(36,[2,3,4,22,25]...|
|(36,[1,2,4,13,25]...|
|(36,[2,4,13,25],[...|
|(36,[2,4,13,25],[...|
|(36,[2,3,4,5,13,3...|
|(36,[0,2,3,4,5,13...|
|(36,[2,4,5,13,33]...|
|(36,[2,4,5,13,33]...|
|(36,[0,2,3,4,5,13...|
|(36,[2,3,4,5,13,3...|
|(36,[2,4,22,25],[...|
|(36,[2,3,4,13,25]...|
|(36,[2,4,13,25],[...|
|(36,[2,3,4,22,25]...|
|(36,[1,2,4,19,25]...|
|(36,[2,3,4,13,25]...|
|(36,[2,3,4,5,13,3...|
+--------------------+
only showing top 20 rows



### Building Machine Learning Pipelines using PySpark

A pipeline allows us to maintain the data flow of all the relevant transformations that are required to reach the end result.

##### How to make Pipeline in pyspark

In [60]:
from pyspark.ml import Pipeline

# create a sample dataframe
sample_df = sqlContext.createDataFrame([
    (1, 'L101', 'R'),
    (2, 'L201', 'C'),
    (3, 'D111', 'R'),
    (4, 'F210', 'R'),
    (5, 'D110', 'C')
], ['id', 'category_1', 'category_2'])

sample_df.show()

+---+----------+----------+
| id|category_1|category_2|
+---+----------+----------+
|  1|      L101|         R|
|  2|      L201|         C|
|  3|      D111|         R|
|  4|      F210|         R|
|  5|      D110|         C|
+---+----------+----------+



In [61]:
# define stage 1 : transform the column category_1 to numeric
stage_1 = StringIndexer(inputCol= 'category_1', outputCol= 'category_1_index')

# define stage 2 : transform the column category_2 to numeric
stage_2 = StringIndexer(inputCol= 'category_2', outputCol= 'category_2_index')

# define stage 3 : one hot encode the numeric category_2 column
stage_3 = OneHotEncoder(inputCols=['category_2_index'], outputCols=['category_2_OHE'])



In [62]:
# setup the pipeline
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3])

# fit the pipeline model and transform the data as defined
pipeline_model = pipeline.fit(sample_df)
sample_df_updated = pipeline_model.transform(sample_df)

# view the transformed data
sample_df_updated.show()



+---+----------+----------+----------------+----------------+--------------+
| id|category_1|category_2|category_1_index|category_2_index|category_2_OHE|
+---+----------+----------+----------------+----------------+--------------+
|  1|      L101|         R|             3.0|             0.0| (1,[0],[1.0])|
|  2|      L201|         C|             4.0|             1.0|     (1,[],[])|
|  3|      D111|         R|             1.0|             0.0| (1,[0],[1.0])|
|  4|      F210|         R|             2.0|             0.0| (1,[0],[1.0])|
|  5|      D110|         C|             0.0|             1.0|     (1,[],[])|
+---+----------+----------+----------------+----------------+--------------+



##### Example of Pipeline

In [75]:
from pyspark.ml.classification import LogisticRegression



In [76]:
# create a sample dataframe with 4 features and 1 label column
sample_data_train = sqlContext.createDataFrame([
    (2.0, 'A', 'S10', 40, 1.0),
    (1.0, 'X', 'E10', 25, 1.0),
    (4.0, 'X', 'S20', 10, 0.0),
    (3.0, 'Z', 'S10', 20, 0.0),
    (4.0, 'A', 'E10', 30, 1.0),
    (2.0, 'Z', 'S10', 40, 0.0),
    (5.0, 'X', 'D10', 10, 1.0),], 
    ['feature_1', 'feature_2', 'feature_3', 'feature_4', 'label'])

# view the data
sample_data_train.show()

+---------+---------+---------+---------+-----+
|feature_1|feature_2|feature_3|feature_4|label|
+---------+---------+---------+---------+-----+
|      2.0|        A|      S10|       40|  1.0|
|      1.0|        X|      E10|       25|  1.0|
|      4.0|        X|      S20|       10|  0.0|
|      3.0|        Z|      S10|       20|  0.0|
|      4.0|        A|      E10|       30|  1.0|
|      2.0|        Z|      S10|       40|  0.0|
|      5.0|        X|      D10|       10|  1.0|
+---------+---------+---------+---------+-----+



In [77]:
# define stage 1: transform the column feature_2 to numeric
stage_1 = StringIndexer(inputCol= 'feature_2', outputCol= 'feature_2_index')

# define stage 2: transform the column feature_3 to numeric
stage_2 = StringIndexer(inputCol= 'feature_3', outputCol= 'feature_3_index')

# define stage 3: one hot encode the numeric versions of feature 2 and 3 generated from stage 1 and stage 2
stage_3 = OneHotEncoder(inputCols=[stage_1.getOutputCol(), stage_2.getOutputCol()], 
                                 outputCols= ['feature_2_encoded', 'feature_3_encoded'])

# define stage 4: create a vector of all the features required to train the logistic regression model 
stage_4 = VectorAssembler(inputCols=['feature_1', 'feature_2_encoded', 'feature_3_encoded', 'feature_4'],
                          outputCol='features')


In [78]:
# define stage 5: logistic regression model                          
stage_5 = LogisticRegression(featuresCol='features',labelCol='label')

# setup the pipeline
regression_pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, stage_4, stage_5])


In [79]:
# fit the pipeline for the trainind data
model = regression_pipeline.fit(sample_data_train)

# transform the data
sample_data_train = model.transform(sample_data_train)


In [80]:
# view some of the columns generated
sample_data_train.select('features', 'label', 'rawPrediction', 'probability', 'prediction').show()


+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[2.0,0.0,1.0,1.0,...|  1.0|[-18.956871848873...|[5.84972108437700...|       1.0|
|[1.0,1.0,0.0,0.0,...|  1.0|[-20.158269476976...|[1.75944137519423...|       1.0|
|(7,[0,1,6],[4.0,1...|  0.0|[18.0148602858562...|[0.99999998499466...|       0.0|
|(7,[0,3,6],[3.0,1...|  0.0|[24.5051237560212...|[0.99999999997721...|       0.0|
|[4.0,0.0,1.0,0.0,...|  1.0|[-50.288624611182...|[1.44519958724360...|       1.0|
|(7,[0,3,6],[2.0,1...|  0.0|[18.3280841853911...|[0.99999998902980...|       0.0|
|[5.0,1.0,0.0,0.0,...|  1.0|[-17.986823547341...|[1.54319845459279...|       1.0|
+--------------------+-----+--------------------+--------------------+----------+



##### Predict with Test Data

In [82]:
# create a sample data without the labels
sample_data_test = sqlContext.createDataFrame([
    (3.0, 'Z', 'S10', 40),
    (1.0, 'X', 'E10', 20),
    (4.0, 'A', 'S20', 10),
    (3.0, 'A', 'S10', 20),
    (4.0, 'X', 'D10', 30),
    (1.0, 'Z', 'E10', 20),
    (4.0, 'A', 'S10', 30),
], ['feature_1', 'feature_2', 'feature_3', 'feature_4'])


In [83]:
# transform the data using the pipeline
sample_data_test = model.transform(sample_data_test)


In [84]:
# see the prediction on the test data
sample_data_test.select('features', 'rawPrediction', 'probability', 'prediction').show()


+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(7,[0,3,6],[3.0,1...|[21.9361235191363...|[0.99999999970265...|       0.0|
|[1.0,1.0,0.0,0.0,...|[-19.516019417755...|[3.34426325212824...|       1.0|
|(7,[0,2,6],[4.0,1...|[-22.297362790363...|[2.07194574533262...|       1.0|
|[3.0,0.0,1.0,1.0,...|[-12.779832278243...|[2.81700837724653...|       1.0|
|[4.0,1.0,0.0,0.0,...|[-24.163863117971...|[3.20455394170119...|       1.0|
|(7,[0,4,6],[1.0,1...|[-22.543286459710...|[1.62022409523184...|       1.0|
|[4.0,0.0,1.0,1.0,...|[-10.456293062940...|[2.87658445082052...|       1.0|
+--------------------+--------------------+--------------------+----------+

