<a id="2"></a>
# <p style="background-color:lightpink;font-family:newtimeroman;color:#FFF9ED;font-size:150%;text-align:center;border-radius:10px 10px;">CLASSIFICATION WITH LOGISTIC REGRESSION</p>

# Classification in Spark

 The various steps involved in developing a classification model in pySpark are as follows:

1) Initialize a Spark session

2) Download and read the the dataset

3) Developing initial understanding about the data

4) Handling missing values

5) Scalerizing the features

6) Train test split

7) Imbalance handling

8) Feature selection

9) Performance evaluation

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder. \
appName("Major Project").\
master("local[1]").\
config("spark.memory.offHeap.enabled","true"). \
config("spark.memory.offHeap.size","10g").getOrCreate()


spark

In [3]:
raw_data =spark.read.csv(r"C:/Users/abhi/Desktop/project feb2023/major project code/small dataset/train2.csv",inferSchema=True,header=True)

The ask is to buid a machine learning model to accurately predict whether or not the patients in the dataset have diabetes?

In [4]:
raw_data.describe()

DataFrame[summary: string, bidirectional_duration_ms: string, bidirectional_packets: string, bidirectional_bytes: string, src2dst_first_seen_ms: string, src2dst_last_seen_ms: string, src2dst_packets: string, src2dst_bytes: string, dst2src_first_seen_ms: string, dst2src_last_seen_ms: string, dst2src_packets: string, dst2src_bytes: string, dst2src_duration_ms: string, src2dst_duration_ms: string, Target: string]

In [5]:
cols=raw_data.columns
cols.remove("Target")
# Let us import the vector assembler

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")
# Now let us use the transform method to transform our dataset
raw_data=assembler.transform(raw_data)
raw_data.select("features").show(truncate=False)

+----------------------------------------------------------------------------+
|features                                                                    |
+----------------------------------------------------------------------------+
|(13,[1,2,3,4,5,6],[1.0,100.0,1.537542439378E12,1.537542439378E12,1.0,100.0])|
|(13,[1,2,3,4,5,6],[1.0,100.0,1.537542439402E12,1.537542439402E12,1.0,100.0])|
|(13,[1,2,3,4,5,6],[1.0,100.0,1.537542439415E12,1.537542439415E12,1.0,100.0])|
|(13,[1,2,3,4,5,6],[1.0,100.0,1.537542443387E12,1.537542443387E12,1.0,100.0])|
|(13,[1,2,3,4,5,6],[1.0,100.0,1.537542443391E12,1.537542443391E12,1.0,100.0])|
|(13,[1,2,3,4,5,6],[1.0,100.0,1.537542455693E12,1.537542455693E12,1.0,100.0])|
|(13,[1,2,3,4,5,6],[1.0,110.0,1.532103708659E12,1.532103708659E12,1.0,110.0])|
|(13,[1,2,3,4,5,6],[1.0,110.0,1.532103730254E12,1.532103730254E12,1.0,110.0])|
|(13,[1,2,3,4,5,6],[1.0,110.0,1.532106434466E12,1.532106434466E12,1.0,110.0])|
|(13,[1,2,3,4,5,6],[1.0,110.0,1.532106807748E12,1.53

# Standard Sclarizer 

So we have created a feature vector. Now let us use StandardScaler to scalerize the newly created "feature" column 

In [6]:
from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
raw_data=standardscaler.fit(raw_data).transform(raw_data)
raw_data.select("features","Scaled_features").show(5)

+--------------------+--------------------+
|            features|     Scaled_features|
+--------------------+--------------------+
|(13,[1,2,3,4,5,6]...|(13,[1,2,3,4,5,6]...|
|(13,[1,2,3,4,5,6]...|(13,[1,2,3,4,5,6]...|
|(13,[1,2,3,4,5,6]...|(13,[1,2,3,4,5,6]...|
|(13,[1,2,3,4,5,6]...|(13,[1,2,3,4,5,6]...|
|(13,[1,2,3,4,5,6]...|(13,[1,2,3,4,5,6]...|
+--------------------+--------------------+
only showing top 5 rows



# Train, test split

Now that the preprocessing of the data is complete. Let us split the dataset in training and testing set. 

In [None]:
<a id="2"></a>
# <p style="background-color:lightpink;font-family:newtimeroman;color:#FFF9ED;font-size:150%;text-align:center;border-radius:10px 10px;">CLASSIFICATION WITH LOGISTIC REGRESSION</p>

In [7]:
train, test = raw_data.randomSplit([0.8, 0.2], seed=12345)

Let us check whether their is imbalance in the dataset

In [8]:
dataset_size=float(train.select("Target").count())
numPositives=train.select("Target").where('Target == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

The number of ones are 5310104
Percentage of ones are 99.99167693868013


In [9]:
BalancingRatio= numNegatives/dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

BalancingRatio = 8.323061319871818e-05


In [10]:

from pyspark.sql.functions import *
train=train.withColumn("classWeights",when(train.Target == 0,BalancingRatio).otherwise(1-BalancingRatio))
train.select("classWeights").show(5)

+--------------------+
|        classWeights|
+--------------------+
|8.323061319871818E-5|
|8.323061319871818E-5|
|  0.9999167693868013|
|  0.9999167693868013|
|  0.9999167693868013|
+--------------------+
only showing top 5 rows



# Feature selection

We use the ChiSqSelector provided by Spark ML for selecting significant features. Please refer my previous blog for more details about working of the ChiSqSelector.   

# Building a classification model using Logistic Regression (LR)

In [16]:
from pyspark.ml.classification import LogisticRegression
import time
lr = LogisticRegression(labelCol="Target", featuresCol='Scaled_features',weightCol="classWeights",maxIter=10)
model=lr.fit(train)





In [17]:
import time
start = time.time()

predict_train=model.transform(train)

end = time.time()
print("total time",end - start)

total time 0.12219858169555664


In [18]:
import time
start = time.time()

predict_test=model.transform(test)
end = time.time()
print("total time",end - start)

total time 0.08529520034790039


In [19]:
predict_test.select("Target","prediction").show(10)

+------+----------+
|Target|prediction|
+------+----------+
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
|     1|       1.0|
+------+----------+
only showing top 10 rows



In [20]:
predict_test.columns

['bidirectional_duration_ms',
 'bidirectional_packets',
 'bidirectional_bytes',
 'src2dst_first_seen_ms',
 'src2dst_last_seen_ms',
 'src2dst_packets',
 'src2dst_bytes',
 'dst2src_first_seen_ms',
 'dst2src_last_seen_ms',
 'dst2src_packets',
 'dst2src_bytes',
 'dst2src_duration_ms',
 'src2dst_duration_ms',
 'Target',
 'features',
 'Scaled_features',
 'rawPrediction',
 'probability',
 'prediction']

In [None]:
# paramGrid = ParamGridBuilder()\
#     .addGrid(lr.regParam, [0.1, 0.01]) \
#     .addGrid(lr.fitIntercept, [False, True])\
#     .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
#     .build()

# https://spark.apache.org/docs/2.1.0/ml-tuning.html

# Evaluating the model

Now let us evaluate the model using BinaryClassificationEvaluator class in Spark ML. BinaryClassificationEvaluator by default uses areaUnderROC as the performance metric 

In [21]:
# The BinaryClassificationEvaluator uses areaUnderROC as the default metric. As o fnow we will continue with the same
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="Target")

In [22]:
predict_test.select("Target","rawPrediction","prediction","probability").show(5)

+------+--------------------+----------+--------------------+
|Target|       rawPrediction|prediction|         probability|
+------+--------------------+----------+--------------------+
|     1|[-18.845521373899...|       1.0|[6.53873980482537...|
|     1|[-18.845521373566...|       1.0|[6.53873980700126...|
|     1|[-18.845521373552...|       1.0|[6.53873980709190...|
|     1|[-18.845521359562...|       1.0|[6.53873989857046...|
|     1|[-18.845521359493...|       1.0|[6.53873989902375...|
+------+--------------------+----------+--------------------+
only showing top 5 rows



In [23]:
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

The area under ROC for train set is 0.9605533880332757
The area under ROC for test set is 0.970945446623839


# Hyper parameters

To this point we have developed a classification model using logistic regression. However, the working of logistic regression depends upon the on a number of parameters. As of now we have worked with only the default parameters. Now, let s try to tune the hyperparameters and see whether it make any difference.  

In [24]:
# if you are unsure which parameters to tune pls use "print(lr.explainParams())" to get the list of parameters available for tuning  
#print(lr.explainParams())

# List of tunable parameters in LR

1) aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)

2) elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)

3) family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)

4) featuresCol: features column name. (default: features, current: Aspect)

5) fitIntercept: whether to fit an intercept term. (default: True)

6) labelCol: label column name. (default: label, current: Outcome)

7) maxIter: max number of iterations (>= 0). (default: 100, current: 10)

8) predictionCol: prediction column name. (default: prediction)

9) probabilityCol: Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities. (default: probability)

10) rawPredictionCol: raw prediction (a.k.a. confidence) column name. (default: rawPrediction)

11) regParam: regularization parameter (>= 0). (default: 0.0)

12) standardization: whether to standardize the training features before fitting the model. (default: True)

13) threshold: Threshold in binary classification prediction, in range [0, 1].

14) If threshold and thresholds are both set, they must match.e.g. if threshold is p, then thresholds must be equal to [1-p, p]. (default: 0.5)

15) thresholds: Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values > 0, excepting that at most one value may be 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class's threshold. (undefined)

16) tol: the convergence tolerance for iterative algorithms (>= 0). (default: 1e-06)

17) weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0. (current: classWeights)


Now let us tune some of these parameters and observe their effect on the performance of the algorithm.

For the purpose of hyperparameter tuning we will consider the following parameters:

1) aggregationDepth [2, 5, 10]

2) elasticNetParam [0.0, 0.5, 1.0]

3) fitIntercept [True / False]

4) maxIter [10, 100, 1000]

5) regParam [0.01, 0.5, 2.0]

frist off all let us define a parameter grid as follows:

In [25]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = ParamGridBuilder()\
    .addGrid(lr.aggregationDepth,[2,5,10])\
    .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
    .addGrid(lr.fitIntercept,[False, True])\
    .addGrid(lr.maxIter,[10, 100, 1000])\
    .addGrid(lr.regParam,[0.01, 0.5, 2.0]) \
    .build()

# https://spark.apache.org/docs/2.1.0/ml-tuning.html

# K-fold cross validation

In [None]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing
predict_train=cvModel.transform(train)
predict_test=cvModel.transform(test)
print("The area under ROC for train set after CV  is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set after CV  is {}".format(evaluator.evaluate(predict_test)))

# Hyper parameters and K-fold cross validation

For this problem we have not seen any significant improvement in the performance metric after tuning the hyperparameters. Looks like, the default parameters have worked well for this problem. However, hyperparameter tuning is an important aspect while solving problems with ML and must not be ignored.

# Future work:

Few important things still remain, which I have not covered in this blog:
    
1) Outlier detection

2) Imbalance handling 

The class weighing thechnique which we have used in this work is, currently, suitable only for logisticregression. However, in the case of other algorithms Random Forest, Naive Bayes, Support Vector Machine we may need to use techniques such as Synthetic Minority Oversampling Technique (SMOTE).

# Useful links

https://spark.apache.org/docs/2.1.0/ml-tuning.html

https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html 

https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html

In [None]:
cols=raw_data.columns

In [None]:
cols

In [None]:
cols