# Classification in Spark

The intent of this blog is to demonstrate binary classification in pySpark. The various steps involved in developing a classification model in pySpark are as follows:

1) Initialize a Spark session

2) Download and read the the dataset

3) Developing initial understanding about the data

4) Handling missing values

5) Scalerizing the features

6) Train test split

7) Imbalance handling

8) Feature selection

9) Performance evaluation

In [1]:
# Initializing a Spark session
from pyspark.sql import SparkSession
import time
spark = SparkSession.builder.master("local").appName("flight").config("spark.some.config.option","some-value").getOrCreate()

# Download and read the dataset


In [2]:
!pwd
!ls

/home/hadoop
diabetes.csv			    logisticregression-flight.ipynb
hw3.ipynb			    logisticregression-Income.ipynb
launchJupyter.sh		    logisticregression-poker.ipynb
logisticregression_diabeties.ipynb  spark_2.ipynb


In [15]:
start_time = time.time()
raw_data = spark.read.csv('s3://516ml/airlinedelay.csv',
                    header='true', inferSchema='true')
print("--- %s seconds ---" % (time.time() - start_time))
raw_data.columns

--- 13.2190270424 seconds ---


['Year',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'DepTime',
 'CRSDepTime',
 'ArrTime',
 'CRSArrTime',
 'UniqueCarrier',
 'FlightNum',
 'TailNum',
 'ActualElapsedTime',
 'CRSElapsedTime',
 'AirTime',
 'ArrDelay',
 'DepDelay',
 'Origin',
 'Dest',
 'Distance',
 'TaxiIn',
 'TaxiOut',
 'Cancelled',
 'CancellationCode',
 'Diverted',
 'CarrierDelay',
 'WeatherDelay',
 'NASDelay',
 'SecurityDelay',
 'LateAircraftDelay']

In [16]:
cols = ["DayOfWeek","DepTime","AirTime", "ArrDelay", "DepDelay", "Origin", "Distance", "CarrierDelay", "WeatherDelay", "SecurityDelay", "Cancelled"]

raw_data.select(cols).show()

+---------+-------+-------+--------+--------+------+--------+------------+------------+-------------+---------+
|DayOfWeek|DepTime|AirTime|ArrDelay|DepDelay|Origin|Distance|CarrierDelay|WeatherDelay|SecurityDelay|Cancelled|
+---------+-------+-------+--------+--------+------+--------+------------+------------+-------------+---------+
|        4|   2003|    116|     -14|       8|   IAD|     810|        null|        null|         null|        0|
|        4|    754|    113|       2|      19|   IAD|     810|        null|        null|         null|        0|
|        4|    628|     76|      14|       8|   IND|     515|        null|        null|         null|        0|
|        4|   1829|     77|      34|      34|   IND|     515|           2|           0|            0|        0|
|        4|   1940|     87|      11|      25|   IND|     688|        null|        null|         null|        0|
|        4|   1937|    230|      57|      67|   IND|    1591|          10|           0|            0|   

In [18]:
import numpy as np
from pyspark.sql.functions import when
raw_data=raw_data.withColumn("ArrDelay",when(raw_data.ArrDelay > 0, 1.0).otherwise(0.0))
cols = ["DayOfWeek","DepTime","AirTime", "ArrDelay", "DepDelay", "Distance", "CarrierDelay", "WeatherDelay", "SecurityDelay", "Cancelled"]
from pyspark.sql import functions as F

for col in raw_data.columns:
     raw_data= raw_data.withColumn(col,F.col(col).cast("float"))

In [19]:
raw_data.select(cols).show(5)

+---------+-------+-------+--------+--------+--------+------------+------------+-------------+---------+
|DayOfWeek|DepTime|AirTime|ArrDelay|DepDelay|Distance|CarrierDelay|WeatherDelay|SecurityDelay|Cancelled|
+---------+-------+-------+--------+--------+--------+------------+------------+-------------+---------+
|      4.0| 2003.0|  116.0|     0.0|     8.0|   810.0|        null|        null|         null|      0.0|
|      4.0|  754.0|  113.0|     1.0|    19.0|   810.0|        null|        null|         null|      0.0|
|      4.0|  628.0|   76.0|     1.0|     8.0|   515.0|        null|        null|         null|      0.0|
|      4.0| 1829.0|   77.0|     1.0|    34.0|   515.0|         2.0|         0.0|          0.0|      0.0|
|      4.0| 1940.0|   87.0|     1.0|    25.0|   688.0|        null|        null|         null|      0.0|
+---------+-------+-------+--------+--------+--------+------------+------------+-------------+---------+
only showing top 5 rows



In [37]:
raw_data.show()

+---+----+---+----+---+----+---+----+---+----+-----+
| s1|  c1| s2|  c2| s3|  c3| s4|  c4| s5|  c5|Class|
+---+----+---+----+---+----+---+----+---+----+-----+
|4.0| 7.0|3.0| 5.0|3.0| 3.0|1.0|13.0|4.0| 8.0|  0.0|
|2.0| 8.0|4.0| 9.0|4.0| 6.0|4.0| 1.0|3.0| 7.0|  0.0|
|3.0| 6.0|1.0| 3.0|2.0|11.0|3.0| 9.0|2.0| 3.0|  1.0|
|2.0|10.0|2.0| 5.0|4.0|13.0|3.0| 9.0|1.0| 6.0|  0.0|
|3.0| 2.0|1.0| 3.0|4.0| 7.0|3.0| 5.0|1.0|11.0|  0.0|
|2.0| 1.0|3.0| 6.0|2.0|10.0|4.0|11.0|2.0| 6.0|  1.0|
|2.0| 3.0|2.0| 4.0|3.0| 5.0|4.0|12.0|1.0| 6.0|  0.0|
|4.0| 9.0|4.0| 7.0|3.0| 7.0|1.0| 6.0|2.0| 4.0|  1.0|
|4.0| 7.0|2.0| 4.0|3.0| 7.0|2.0| 8.0|2.0|11.0|  1.0|
|4.0| 8.0|3.0| 8.0|1.0|11.0|3.0| 5.0|2.0| 1.0|  1.0|
|1.0| 7.0|3.0|10.0|3.0| 7.0|3.0| 4.0|2.0| 1.0|  1.0|
|1.0| 3.0|4.0| 2.0|3.0|11.0|2.0| 2.0|3.0| 4.0|  1.0|
|4.0| 1.0|2.0| 9.0|4.0| 6.0|3.0| 2.0|1.0| 3.0|  0.0|
|2.0| 2.0|1.0| 2.0|4.0| 8.0|4.0|12.0|3.0| 4.0|  1.0|
|4.0|11.0|1.0| 5.0|3.0| 6.0|2.0|13.0|4.0| 7.0|  0.0|
|1.0| 9.0|2.0| 8.0|4.0|11.0|4.0| 9.0|1.0|11.0|

So we have replaced all "0" with NaN. Now, we can simply impute the NaN by calling an imputer :)

In [20]:
from pyspark.ml.feature import Imputer

imputer=Imputer(inputCols=cols,outputCols=cols)
model=imputer.fit(raw_data)
raw_data=model.transform(raw_data)
raw_data.show(5)

+------+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|  Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+------+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008.0|  1.0|       3.0|      4.0| 2003.0|    1955.0| 2211.0|    2225.0|         null| 

In [21]:
cols.remove("ArrDelay")
# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")
# Now let us use the transform method to transform our dataset
raw_data=assembler.transform(raw_data)
raw_data.select("features").show(truncate=False)

+--------------------------------------------------------------------------------------------+
|features                                                                                    |
+--------------------------------------------------------------------------------------------+
|[4.0,2003.0,116.0,8.0,810.0,18.870359420776367,3.5680322647094727,0.09328398108482361,0.0]  |
|[4.0,754.0,113.0,19.0,810.0,18.870359420776367,3.5680322647094727,0.09328398108482361,0.0]  |
|[4.0,628.0,76.0,8.0,515.0,18.870359420776367,3.5680322647094727,0.09328398108482361,0.0]    |
|[4.0,1829.0,77.0,34.0,515.0,2.0,0.0,0.0,0.0]                                                |
|[4.0,1940.0,87.0,25.0,688.0,18.870359420776367,3.5680322647094727,0.09328398108482361,0.0]  |
|[4.0,1937.0,230.0,67.0,1591.0,10.0,0.0,0.0,0.0]                                             |
|[4.0,706.0,106.0,6.0,828.0,18.870359420776367,3.5680322647094727,0.09328398108482361,0.0]   |
|[4.0,1644.0,107.0,94.0,828.0,8.0,0.0,0.0,0.0]    

# Standard Sclarizer 

So we have created a feature vector. Now let us use StandardScaler to scalerize the newly created "feature" column 

In [22]:
from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
raw_data=standardscaler.fit(raw_data).transform(raw_data)
raw_data.select("features","Scaled_features").show(5)

+--------------------+--------------------+
|            features|     Scaled_features|
+--------------------+--------------------+
|[4.0,2003.0,116.0...|[2.02326575050239...|
|[4.0,754.0,113.0,...|[2.02326575050239...|
|[4.0,628.0,76.0,8...|[2.02326575050239...|
|[4.0,1829.0,77.0,...|[2.02326575050239...|
|[4.0,1940.0,87.0,...|[2.02326575050239...|
+--------------------+--------------------+
only showing top 5 rows



# Train, test split

Now that the preprocessing of the data is complete. Let us split the dataset in training and testing set. 

In [23]:
train, test = raw_data.randomSplit([0.8, 0.2], seed=12345)

Let us check whether their is imbalance in the dataset

In [24]:
dataset_size=float(train.select("ArrDelay").count())
numPositives=train.select("ArrDelay").where('ArrDelay == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

The number of ones are 751692
Percentage of ones are 89.6679732699


# Imbalancing handling

Since the percentage of ones in the dataset is just 89.67 % surely there is imbalance in the dataset.  For this purpose we calculate the BalancingRatio as follows:

BalancingRatio= numNegatives/dataset_size

Then against every Outcome == 1, we put BalancingRatio in column "classWeights", and  against every Outcome == 0, we put 1-BalancingRatio in column  "classWeights" 

In this way, we assign higher weightage to the minority class (i.e. positive class)

In [25]:
BalancingRatio= numNegatives/dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

BalancingRatio = 0.103320267301


In [27]:
from pyspark.sql.functions import when
train=train.withColumn("classWeights", when(train.ArrDelay == 1,BalancingRatio).otherwise(1-BalancingRatio))
train.select("classWeights").show(5)

+-------------------+
|       classWeights|
+-------------------+
|0.10332026730096171|
|0.10332026730096171|
|0.10332026730096171|
| 0.8966797326990383|
|0.10332026730096171|
+-------------------+
only showing top 5 rows



# Building a classification model using Logistic Regression (LR)

In [29]:
from pyspark.ml.classification import LogisticRegression
start_time = time.time()
lr = LogisticRegression(labelCol="ArrDelay", featuresCol="Scaled_features",weightCol="classWeights",maxIter=10)
model = lr.fit(train)    

print("--- %s seconds ---" % (time.time() - start_time))
predict_train=model.transform(train)
predict_test=model.transform(test)

predict_test.select("ArrDelay","prediction").show(10)

--- 30.4018301964 seconds ---
+--------+----------+
|ArrDelay|prediction|
+--------+----------+
|     1.0|       0.0|
|     1.0|       1.0|
|     1.0|       1.0|
|     1.0|       0.0|
|     1.0|       1.0|
|     1.0|       1.0|
|     1.0|       0.0|
|     1.0|       0.0|
|     1.0|       1.0|
|     1.0|       1.0|
+--------+----------+
only showing top 10 rows



# Evaluating the model

Now let us evaluate the model using BinaryClassificationEvaluator class in Spark ML. BinaryClassificationEvaluator by default uses areaUnderROC as the performance metric 

In [30]:
# The BinaryClassificationEvaluator uses areaUnderROC as the default metric. As o fnow we will continue with the same
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="ArrDelay")

In [32]:
predict_test.select("ArrDelay","rawPrediction","prediction","probability").show(5)

+--------+--------------------+----------+--------------------+
|ArrDelay|       rawPrediction|prediction|         probability|
+--------+--------------------+----------+--------------------+
|     1.0|[1.38552941723670...|       0.0|[0.79987758089153...|
|     1.0|[-2.6706861162332...|       1.0|[0.06472542143979...|
|     1.0|[-15.444727776943...|       1.0|[1.96082965673554...|
|     1.0|[1.94633222302961...|       0.0|[0.87504615703462...|
|     1.0|[-0.5710423256649...|       1.0|[0.36099634844705...|
+--------+--------------------+----------+--------------------+
only showing top 5 rows



In [33]:
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

The area under ROC for train set is 0.90625283217
The area under ROC for test set is 0.90603355168


# Hyper parameters

To this point we have developed a classification model using logistic regression. However, the working of logistic regression depends upon the on a number of parameters. As of now we have worked with only the default parameters. Now, let s try to tune the hyperparameters and see whether it make any difference.  

In [None]:
# if you are unsure which parameters to tune pls use "print(lr.explainParams())" to get the list of parameters available for tuning  
print(lr.explainParams())

# List of tunable parameters in LR

1) aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)

2) elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)

3) family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)

4) featuresCol: features column name. (default: features, current: Aspect)

5) fitIntercept: whether to fit an intercept term. (default: True)

6) labelCol: label column name. (default: label, current: Outcome)

7) maxIter: max number of iterations (>= 0). (default: 100, current: 10)

8) predictionCol: prediction column name. (default: prediction)

9) probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (default: probability)

10) rawPredictionCol: raw prediction (a.k.a. confidence) column name. (default: rawPrediction)

11) regParam: regularization parameter (>= 0). (default: 0.0)

12) standardization: whether to standardize the training features before fitting the model. (default: True)

13) threshold: Threshold in binary classification prediction, in range [0, 1].

14) If threshold and thresholds are both set, they must match.e.g. if threshold is p, then thresholds must be equal to [1-p, p]. (default: 0.5)

15) thresholds: Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values > 0, excepting that at most one value may be 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class's threshold. (undefined)

16) tol: the convergence tolerance for iterative algorithms (>= 0). (default: 1e-06)

17) weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (current: classWeights)


Now let us tune some of these parameters and observe their effect on the performance of the algorithm.

For the purpose of hyperparameter tuning we will consider the following parameters:

1) aggregationDepth [2, 5, 10]

2) elasticNetParam [0.0, 0.5, 1.0]

3) fitIntercept [True / False]

4) maxIter [10, 100, 1000]

5) regParam [0.01, 0.5, 2.0]

frist off all let us define a parameter grid as follows:

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder()\
    .addGrid(lr.aggregationDepth,[2,5,10])\
    .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
    .addGrid(lr.fitIntercept,[False, True])\
    .addGrid(lr.maxIter,[10, 100, 1000])\
    .addGrid(lr.regParam,[0.01, 0.5, 2.0]) \
    .build()

# https://spark.apache.org/docs/2.1.0/ml-tuning.html

# K-fold cross validation

In [None]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing
predict_train=cvModel.transform(train)
predict_test=cvModel.transform(test)
print("The area under ROC for train set after CV  is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set after CV  is {}".format(evaluator.evaluate(predict_test)))

In [9]:
print((raw_data.count(), len(raw_data.columns)))

(1048575, 29)
