# Create Spark Context

In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession


sc = SparkContext()


Lets initialize our sparksession now.


In [2]:
spark = SparkSession \
    .builder \
    .appName("how to read csv file") \
    .getOrCreate()

Lets then check the spark version

In [3]:
spark.version

'3.1.1'

Reading a CSV file

In [4]:
#read csv
df = spark.read.csv('ind-ban-comment.csv', header=True)

#see the default Schema
df.printSchema()

root
 |-- Batsman: string (nullable = true)
 |-- Batsman_Name: string (nullable = true)
 |-- Bowler: string (nullable = true)
 |-- Bowler_Name: string (nullable = true)
 |-- Commentary: string (nullable = true)
 |-- Detail: string (nullable = true)
 |-- Dismissed: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- Isball: string (nullable = true)
 |-- Isboundary: string (nullable = true)
 |-- Iswicket: string (nullable = true)
 |-- Over: string (nullable = true)
 |-- Runs: string (nullable = true)
 |-- Timestamp: string (nullable = true)



Defining the Schema ---
Now, we do not want all the columns in our dataset to be treated as strings. So what can we do about that?

We can define the custom schema for our dataframe in Spark. For this, we need to create an object of StructType which takes a list of StructField. And of course, we should define StructField with a column name, the data type of the column and whether null values are allowed for the particular column or not.

Refer to the below code snippet to understand how to create this custom schema:

In [5]:
import pyspark.sql.types as tp

#Define the schema
my_schema = tp.StructType([
    tp.StructField(name = 'Batsman', dataType= tp.IntegerType(), nullable= True),
    tp.StructField( name = 'Batsman_Name', dataType= tp.StringType(), nullable= True),
    tp.StructField(name = 'Bowler', dataType= tp.IntegerType(), nullable=True),
    tp.StructField(name= 'Bowler_Name', dataType= tp.StringType(), nullable=True),
    tp.StructField(name= 'Commentary', dataType= tp.StringType(), nullable=True),
    tp.StructField(name= 'Detail', dataType= tp.StringType(), nullable=True),
    tp.StructField(name= 'Dismissed', dataType= tp.IntegerType(), nullable=True),
    tp.StructField(name='Id', dataType= tp.IntegerType(), nullable=True),
    tp.StructField(name='Isball', dataType= tp.BooleanType(), nullable=True),
    tp.StructField(name='Isboundary', dataType= tp.IntegerType(), nullable=True),
    tp.StructField(name= 'Iswicket', dataType= tp.IntegerType(), nullable=True),
    tp.StructField(name= 'Over', dataType= tp.DoubleType(), nullable=True),
    tp.StructField(name= 'Runs', dataType= tp.IntegerType(), nullable=True),
    tp.StructField(name= 'Timestamp', dataType= tp.TimestampType(), nullable=True)
])

#Read the Data again with the defined schema
my_data = spark.read.csv('ind-ban-comment.csv', schema= my_schema, header=True,)

#Print the Schema
my_data.printSchema()

root
 |-- Batsman: integer (nullable = true)
 |-- Batsman_Name: string (nullable = true)
 |-- Bowler: integer (nullable = true)
 |-- Bowler_Name: string (nullable = true)
 |-- Commentary: string (nullable = true)
 |-- Detail: string (nullable = true)
 |-- Dismissed: integer (nullable = true)
 |-- Id: integer (nullable = true)
 |-- Isball: boolean (nullable = true)
 |-- Isboundary: integer (nullable = true)
 |-- Iswicket: integer (nullable = true)
 |-- Over: double (nullable = true)
 |-- Runs: integer (nullable = true)
 |-- Timestamp: timestamp (nullable = true)



Drop Columns from the data

In any machine learning project, we always have a few columns that are not required for solving problem. I'm sure you've come across this dilemma before as well, whether thats in the industry or in an online hackathon.

In out instance, we can use the drop function to remove the column from the data. Use the asterisk(*) sign before the list to drop multiple columns from the dataset:

In [6]:
#Drop the columns that are not required
my_data = my_data.drop(*['Batsman', 'Bowler', 'Id'])
my_data.columns

['Batsman_Name',
 'Bowler_Name',
 'Commentary',
 'Detail',
 'Dismissed',
 'Isball',
 'Isboundary',
 'Iswicket',
 'Over',
 'Runs',
 'Timestamp']

# Data Exploration using PySpark

Check the Data Dimensions

Unlike Pandas, Spark dataframes do not have the shape function to check the dimensions of the data. We can instead use the code below to check the dimensions of the dataset:

In [7]:
#get the dimensions of the data
(my_data.count(), len(my_data.columns))


(605, 11)

Describe the Data

Sparks describe function gives us most of the statistical results like mean, count, min, max, and standard deviation. You can use the summary function to get the quartiles of the numeric variables as well;

In [8]:
#get the summary of the numerical columns
my_data.select('Isball', 'Isboundary', 'Runs').describe().show()

+-------+----------+------------------+
|summary|Isboundary|              Runs|
+-------+----------+------------------+
|  count|        67|               605|
|   mean|       1.0|0.9917355371900827|
| stddev|       0.0| 1.342725481259329|
|    min|         1|                 0|
|    max|         1|                 6|
+-------+----------+------------------+



Missing Values Count

Its rare when we get a dataset without any values. Can you remember the last time that happened?


It is important to check the number of missing values present in all the columns. Knowing the count helps us treat the missing values before building any machine learning model using that data

So you can use the code below to find the null value count in your dataset:

In [9]:
#import sql function pyspark
import pyspark.sql.functions as f

#null values in each column
data_agg = my_data.agg(*[f.count(f.when(f.isnull(c),c)).alias(c) for c in my_data.columns])
data_agg.show()

+------------+-----------+----------+------+---------+------+----------+--------+----+----+---------+
|Batsman_Name|Bowler_Name|Commentary|Detail|Dismissed|Isball|Isboundary|Iswicket|Over|Runs|Timestamp|
+------------+-----------+----------+------+---------+------+----------+--------+----+----+---------+
|           0|          0|         0|   565|      586|     0|       538|     586|   0|   0|        0|
+------------+-----------+----------+------+---------+------+----------+--------+----+----+---------+



Value Counts of a Column

Unlike Pandas, we do not have the value_counts() function in Spark dataframes. You can use the groupBy function to calculate the unique value counts of categorical variables:

In [10]:
#Value counts of Batsman_Name column
my_data.groupBy('Batsman_Name').count().show()

+------------------+-----+
|      Batsman_Name|count|
+------------------+-----+
|     Soumya Sarkar|   39|
|  Mashrafe Mortaza|    5|
|   Shakib Al Hasan|   75|
|   Mushfiqur Rahim|   23|
|Mohammad Saifuddin|   42|
|         Liton Das|   24|
|      Rishabh Pant|   43|
|    Mohammed Shami|    2|
|       Tamim Iqbal|   31|
|     Hardik Pandya|    2|
|          KL Rahul|   93|
| Bhuvneshwar Kumar|    4|
|     Rubel Hossain|   11|
|      Rohit Sharma|   94|
|    Dinesh Karthik|    9|
|       Virat Kohli|   27|
|          MS Dhoni|   33|
|     Sabbir Rahman|   40|
|  Mosaddek Hossain|    7|
| Mustafizur Rahman|    1|
+------------------+-----+



Encode Categorical Variables using PySpark

Most machine learning algorithms accept the data only in numerical form. So, it is essential to convert any categorical variable present in our dataset into numbers.

Remember that we cannot simply drop them from our dataset as they might contain useful information. It would be a nightmare to lose that just because we dont want to figure out how to use them!

Lets see some of the methods to encode categorical variables using PySpark

# String Indexing

String Indexing is similar  to Label Encoding. It assigns a unique integer value to each category. 0 is assigned to the most frequent category, 1 to the next most frequent value, and so on. We have to define the input column name that we want to index and the output column name in which we want the results:

In [11]:
from pyspark.ml.feature import StringIndexer,OneHotEncoder

#create object of StringIndexer class and specify input and output column
SI_batsman = StringIndexer(inputCol = 'Batsman_Name', outputCol='Batsman_Index')
SI_bowler = StringIndexer(inputCol='Bowler_Name', outputCol='Bowler_Index')

#transform the data
my_data = SI_batsman.fit(my_data).transform(my_data)
my_data = SI_bowler.fit(my_data).transform(my_data)

#view the transformed data
my_data.select('Batsman_Name', 'Batsman_Index', 'Bowler_Name', 'Bowler_Index').show(10)

+-----------------+-------------+------------------+------------+
|     Batsman_Name|Batsman_Index|       Bowler_Name|Bowler_Index|
+-----------------+-------------+------------------+------------+
|   Mohammed Shami|         18.0| Mustafizur Rahman|         0.0|
|Bhuvneshwar Kumar|         16.0| Mustafizur Rahman|         0.0|
|   Mohammed Shami|         18.0| Mustafizur Rahman|         0.0|
|Bhuvneshwar Kumar|         16.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0| Mustafizur Rahman|         0.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
|         MS Dhoni|          7.0|Mohammad Saifuddin|         8.0|
+-----------------+-------------+------------------+------------+
only showing top 10 rows



# One-Hot Encoding

One-hot encoding is a concept every data scientitst should know. I've relied on it multiple times when dealing with missing values. Its a lifesaver

Here's the caveat - Spark's OneHotEncoder does not directly encode the categorical variable.

"First, we need to use the String Indexer to convert the variable into numerical form and then use OneHotEncoder to encode multiple columns of the dataset"

It creates a Sparse Vector for each row:

In [12]:
#create object and specify input and output column
OHE = OneHotEncoder(inputCols=['Batsman_Index', 'Bowler_Index'], outputCols=['Batsman_OHE', 'Bowler_OHE'])

#transform the data
my_data = OHE.fit(my_data).transform(my_data)

#view and transform the data
my_data.select('Batsman_Name', 'Batsman_Index', 'Batsman_OHE', 'Bowler_Name', 'Bowler_Index', 'Bowler_OHE').show(10)



+-----------------+-------------+---------------+------------------+------------+--------------+
|     Batsman_Name|Batsman_Index|    Batsman_OHE|       Bowler_Name|Bowler_Index|    Bowler_OHE|
+-----------------+-------------+---------------+------------------+------------+--------------+
|   Mohammed Shami|         18.0|(19,[18],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|Bhuvneshwar Kumar|         16.0|(19,[16],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|   Mohammed Shami|         18.0|(19,[18],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|Bhuvneshwar Kumar|         16.0|(19,[16],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|         MS Dhoni|          7.0| (19,[7],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|         MS Dhoni|          7.0| (19,[7],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|         MS Dhoni|          7.0| (19,[7],[1.0])| Mustafizur Rahman|         0.0|(11,[0],[1.0])|
|         MS Dhoni|          7

# Vector Assembler

"A. vector assembler combines a given list of columns into a single vector column"

This is typically used at the end of the data exploration and pre-processing steps. At this stage, we usually work with a few raw or transformed features that can be used to train our model

The Vector Assembler converts them into a single feature column in order to train the machine learning model(such as Logistic Regression). It accepts numerical, boolean and vector type columns:

In [13]:
from pyspark.ml.feature import VectorAssembler

#specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=['Isboundary', 'Iswicket', 'Over', 'Runs', 'Batsman_Index', 'Bowler_Index', 'Batsman_OHE', 'Bowler_OHE'], outputCol='vector')

#fill the null values
my_data = my_data.fillna(0)

#transform the data
final_data = assembler.transform(my_data)

#view the transformed vector
final_data.select('vector').show()

+--------------------+
|              vector|
+--------------------+
|(36,[1,2,4,24,25]...|
|(36,[1,2,3,4,22,2...|
|(36,[2,3,4,24,25]...|
|(36,[2,3,4,22,25]...|
|(36,[1,2,4,13,25]...|
|(36,[2,4,13,25],[...|
|(36,[2,4,13,25],[...|
|(36,[2,3,4,5,13,3...|
|(36,[0,2,3,4,5,13...|
|(36,[2,4,5,13,33]...|
|(36,[2,4,5,13,33]...|
|(36,[0,2,3,4,5,13...|
|(36,[2,3,4,5,13,3...|
|(36,[2,4,22,25],[...|
|(36,[2,3,4,13,25]...|
|(36,[2,4,13,25],[...|
|(36,[2,3,4,22,25]...|
|(36,[1,2,4,19,25]...|
|(36,[2,3,4,13,25]...|
|(36,[2,3,4,5,13,3...|
+--------------------+
only showing top 20 rows



# Building Machine Learning Pipelines using PySpark

A machine learning project typically involves steps like data preprocessing, feature extraction, model fitting and evaluating results. We need to perform alot of transformations on the data in sequence. As you can imagine, keeping track of them can potentially become a tedious task.

This is where machine learning pipeline come in.

"A pipeline allows us to maintain the data flow of all the relevant transformations that are required to reach the end result."

We need to define the stages of the pipeline which act as a chain of command for spark to run. Here, each stage is either a Transformer or an Estimator.

Transformers and Estimators

As the name suggests, Transformers convert one dataframe into another either by updating the current values of a particular column(like converting categorical columns to numeric) or mapping it to some other values by using a defined logic.

An Estimator implements the fit()method on a dataframe and produces a model. For example, LogisticRegression is an Estimator that trains a classification model when we call the fit() method.


Lets understand this with the help of some examples.

# Example of Pipelines

Let's create a sample dataframe with three columns as shown below. Here, we will define some of the stages in which we want to transform the data and see how to set up the pipeline:

In [14]:
from pyspark.ml import Pipeline

#create a sample dataframe
sample_df = spark.createDataFrame([
    (1, 'L101', 'R'),
    (2, 'L201', 'c'),
    (3, 'D111', 'R'),
    (4, 'F210', 'R'),
    (5, 'D110', 'C')
], ['id', 'category_1', 'category_2'])

sample_df.show()

+---+----------+----------+
| id|category_1|category_2|
+---+----------+----------+
|  1|      L101|         R|
|  2|      L201|         c|
|  3|      D111|         R|
|  4|      F210|         R|
|  5|      D110|         C|
+---+----------+----------+



We have created the dataframe. Suppose we have to transform the data in the below order:
1. stage_1: Label Encode or String Index the column category_1 
2. stage_2: Label Encode or String Index the column category_2
3. stage_3: One-Hot Encode the indexed column category_2


At each stage, we will pass the input and output column name and setup the pipeline by passing the defined stages in the list of the Pipeline object.

The pipeline model then performs certain steps one by one in a sequence and gives us the end result. Lets see how to implement the pipeline:
    

In [15]:
#define stage 1 : transform the column category_1 to numeric
stage_1 = StringIndexer(inputCol= 'category_1', outputCol='category_1_index')

#define stage 2 : transform the column category_2 to numeric
stage_2 = StringIndexer(inputCol='category_2', outputCol='category_2_index')

#define stage 3 : one hot encode the numeric category_2 column
stage_3 = OneHotEncoder(inputCols=['category_2_index'], outputCols=['category_2_OHE'])

#setup the pipeline
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3])

#fit the pipeline model and transform the data as defined
pipeline_model = pipeline.fit(sample_df)
sample_df_updated = pipeline_model.transform(sample_df)

#view the transformed data
sample_df_updated.show()

+---+----------+----------+----------------+----------------+--------------+
| id|category_1|category_2|category_1_index|category_2_index|category_2_OHE|
+---+----------+----------+----------------+----------------+--------------+
|  1|      L101|         R|             3.0|             0.0| (2,[0],[1.0])|
|  2|      L201|         c|             4.0|             2.0|     (2,[],[])|
|  3|      D111|         R|             1.0|             0.0| (2,[0],[1.0])|
|  4|      F210|         R|             2.0|             0.0| (2,[0],[1.0])|
|  5|      D110|         C|             0.0|             1.0| (2,[1],[1.0])|
+---+----------+----------+----------------+----------------+--------------+



Now, lets take a more complex example of setting up a pipeline. Here we will do transformations on the data and build a logistic regression model.

For this, we will create a sample dataframe which will be our training dataset with four features and the target label:

In [16]:
from pyspark.ml.classification import LogisticRegression

#create a sample dataframe with 4 features and 1 label column
sample_data_train = spark.createDataFrame([
    (2.0, 'A', 'S10', 40, 1.0),
    (1.0, 'X', 'E10', 25, 1.0),
    (4.0, 'X', 'S20', 10, 0.0),
    (3.0, 'Z', 'S10', 20, 0.0),
    (4.0, 'A', 'E10', 30, 1.0),
    (2.0, 'Z', 'S10', 40, 0.0),
    (5.0, 'X', 'D10', 10, 1.0),
], ['feature_1', 'feature_2', 'feature_3', 'feature_4', 'label'])

#view the data
sample_data_train.show()

+---------+---------+---------+---------+-----+
|feature_1|feature_2|feature_3|feature_4|label|
+---------+---------+---------+---------+-----+
|      2.0|        A|      S10|       40|  1.0|
|      1.0|        X|      E10|       25|  1.0|
|      4.0|        X|      S20|       10|  0.0|
|      3.0|        Z|      S10|       20|  0.0|
|      4.0|        A|      E10|       30|  1.0|
|      2.0|        Z|      S10|       40|  0.0|
|      5.0|        X|      D10|       10|  1.0|
+---------+---------+---------+---------+-----+



Now, suppose this is the order of our pipeline:
1. stage_1: Label Encode or String Index the column feature_2
2. stage_2: Label Encode or String index the column features_3
3. stage_3: One Hot Encode the indexed column of feature_2 and feature_3
4. stage_4: create a vector of all the features required to train a Logistic Regression model
5. stage_5: Build a Logistic Regression model

We have to define the stages by providing the input column name and output column name. The final stage would be to build a logistic regression model. And in the end, when  we run the pipeline on the training dataset, it will run the steps in a sequence and add new columns to the dataframe(like rawPrediction, probability, and prediction)

In [17]:
#define stage 1: transform the column feature_2 to numeric
stage_1 = StringIndexer(inputCol='feature_2', outputCol= 'feature_2_index')
#define stage 2: transform the column feature_3 to numeric
stage_2 = StringIndexer(inputCol='feature_3', outputCol= 'feature_3_index')
#define stage 3: one hot encode the numeric versions of feature 2 and 3 generated from stage 1 and stage 2
stage_3 = OneHotEncoder(inputCols=[stage_1.getOutputCol(), stage_2.getOutputCol()], outputCols= ['feature_2_encoded', 'feature_3_encoded'])
#define stage 4: create a vector of all the features required to train the logistic regression model
stage_4 = VectorAssembler(inputCols=['feature_1', 'feature_2_encoded', 'feature_3_encoded', 'feature_4'], outputCol='features')

#define stage 5: logistic regression model
stage_5 = LogisticRegression(featuresCol='features',labelCol='label')

# Setup the pipeline
regression_pipeline = Pipeline(stages=[stage_1, stage_2, stage_3, stage_4, stage_5])

# fit the pipeline for the training data
model = regression_pipeline.fit(sample_data_train)

# transform the data
sample_data_train = model.transform(sample_data_train)

#view some of the columns generated
sample_data_train.select('features', 'label', 'rawPrediction', 'probability', 'prediction').show()



+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[2.0,0.0,1.0,1.0,...|  1.0|[-18.956871848873...|[5.84972108437796...|       1.0|
|[1.0,1.0,0.0,0.0,...|  1.0|[-20.158269476976...|[1.75944137519465...|       1.0|
|(7,[0,1,6],[4.0,1...|  0.0|[18.0148602858563...|[0.99999998499466...|       0.0|
|(7,[0,3,6],[3.0,1...|  0.0|[24.5051237560211...|[0.99999999997721...|       0.0|
|[4.0,0.0,1.0,0.0,...|  1.0|[-50.288624611182...|[1.44519958724398...|       1.0|
|(7,[0,3,6],[2.0,1...|  0.0|[18.3280841853911...|[0.99999998902980...|       0.0|
|[5.0,1.0,0.0,0.0,...|  1.0|[-17.986823547341...|[1.54319845459293...|       1.0|
+--------------------+-----+--------------------+--------------------+----------+



congrats! We have successfully set up the pipeline. Lets create a sample test dataset without the labels and this time, we do not need to define all the steps again.  We will just pass the data through the pipeline and we are done!

In [18]:
# create a sample data without the labels
sample_data_test = spark.createDataFrame([
    (3.0, 'Z', 'S10', 40),
    (1.0, 'X', 'E10', 20),
    (4.0, 'A', 'S20', 10),
    (3.0, 'A', 'S10', 20),
    (4.0, 'X', 'D10', 30),
    (1.0, 'Z', 'E10', 20),
    (4.0, 'A', 'S10', 30),
], ['feature_1', 'feature_2', 'feature_3', 'feature_4'])

#transform the data using the pipeline
sample_data_test = model.transform(sample_data_test)

#see the prediction on the test data
sample_data_test.select('features', 'rawPrediction', 'probability', 'prediction').show()

+--------------------+--------------------+--------------------+----------+
|            features|       rawPrediction|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|(7,[0,3,6],[3.0,1...|[21.9361235191363...|[0.99999999970265...|       0.0|
|[1.0,1.0,0.0,0.0,...|[-19.516019417755...|[3.34426325212895...|       1.0|
|(7,[0,2,6],[4.0,1...|[-22.297362790363...|[2.07194574533280...|       1.0|
|[3.0,0.0,1.0,1.0,...|[-12.779832278243...|[2.81700837724663...|       1.0|
|[4.0,1.0,0.0,0.0,...|[-24.163863117971...|[3.20455394170191...|       1.0|
|(7,[0,4,6],[1.0,1...|[-22.543286459710...|[1.62022409523207...|       1.0|
|[4.0,0.0,1.0,1.0,...|[-10.456293062940...|[2.87658445082073...|       1.0|
+--------------------+--------------------+--------------------+----------+

