In [1]:
#importing libraries

import findspark
import pyspark
import pyspark.sql.functions as F
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler

from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
#creating spark session
findspark.init()
findspark.find()

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("MQTT") \
    .getOrCreate()

spark = SparkSession.builder.appName("mqttProject").getOrCreate()
sc    = spark.sparkContext

sqlContext = SQLContext(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/28 08:43:14 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
22/11/28 08:43:14 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
22/11/28 08:43:14 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
22/11/28 08:43:14 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [3]:
train = spark.read.csv( ("gs://dataproc-staging-us-west3-650974721448-eojcphee/train70_augmented.csv"),header=True, inferSchema= True)
test = spark.read.csv( ("gs://dataproc-staging-us-west3-650974721448-eojcphee/test30_augmented.csv"),header=True, inferSchema= True)
DF = train.union(test)
DF = DF.toDF(*(c.replace('.', '_') for c in DF.columns))
train = train.toDF(*(c.replace('.', '_') for c in train.columns))
test = test.toDF(*(c.replace('.', '_') for c in test.columns))

                                                                                

In [4]:
remove= [ "mqtt_hdrflags", "tcp_flags", 'mqtt_conack_flags','mqtt_conflags','mqtt_msg',
        'mqtt_protoname']
train = train.drop("mqtt_hdrflags", "tcp_flags", 'mqtt_conack_flags','mqtt_conflags','mqtt_msg',
        'mqtt_protoname')
test = test.drop("mqtt_hdrflags", "tcp_flags", 'mqtt_conack_flags','mqtt_conflags','mqtt_msg',
        'mqtt_protoname')

In [5]:
train = train.limit(10000)
test = test.limit(1000)

In [6]:
numeric_features = [feature[0] for feature in train.dtypes if feature[1] not in ('string')]
string_features = [feature[0] for feature in train.dtypes if feature[1] in ('string')]
to_drop =  ["mqtt_conflag_cleansess","mqtt_proto_len","mqtt_conflag_passwd","mqtt_qos"]

In [7]:
col_names = ['tcp_time_delta','tcp_len','mqtt_conack_flags','mqtt_conack_flags_reserved','mqtt_conack_flags_sp',
 'mqtt_conack_val','mqtt_conflag_cleansess','mqtt_conflag_passwd','mqtt_conflag_qos','mqtt_conflag_reserved',
 'mqtt_conflag_retain','mqtt_conflag_uname','mqtt_conflag_willflag','mqtt_conflags','mqtt_dupflag', 
 'mqtt_kalive', 'mqtt_len','mqtt_msg','mqtt_msgid', 'mqtt_msgtype', 'mqtt_proto_len', 'mqtt_qos', 'mqtt_retain',
 'mqtt_sub_qos', 'mqtt_suback_qos', 'mqtt_ver', 'mqtt_willmsg', 'mqtt_willmsg_len', 'mqtt_willtopic', 'mqtt_willtopic_len',
 'target']

# nominal_cols = ['mqtt_conack_flags','mqtt_conflags', 'mqtt_msg', 'mqtt_protoname']
nominal_cols = []

continuous_cols = ['tcp_time_delta', 'tcp_len', 'mqtt_conack_flags_reserved', 'mqtt_conack_flags_sp', 'mqtt_conack_val',
 'mqtt_conflag_cleansess', 'mqtt_conflag_passwd', 'mqtt_conflag_qos', 'mqtt_conflag_reserved', 'mqtt_conflag_retain',
 'mqtt_conflag_uname', 'mqtt_conflag_willflag', 'mqtt_dupflag', 'mqtt_kalive', 'mqtt_len', 'mqtt_msgid',
 'mqtt_msgtype', 'mqtt_proto_len', 'mqtt_qos', 'mqtt_retain', 'mqtt_sub_qos', 'mqtt_suback_qos', 'mqtt_ver',
 'mqtt_willmsg', 'mqtt_willmsg_len', 'mqtt_willtopic', 'mqtt_willtopic_len']

In [8]:
class OutcomeCreater_binary(Transformer): # this defines a transformer that creates the outcome column
    
    def __init__(self):
        super().__init__()
  
    def _transform(self, dataset):
        label_to_binary = udf(lambda name: 0.0 if name == 'legitimate' else 1.0)
        output_df = dataset.withColumn('outcome', label_to_binary(col('target'))).drop("target")  
        output_df = output_df.withColumn('outcome', col('outcome').cast(DoubleType()))
        return output_df
    
class OutcomeCreater_multi(Transformer): # this defines a transformer that creates the outcome column
    
    def __init__(self):
        super().__init__()
  
    def _transform(self, dataset):
        label_to_multiple = udf(lambda name: 0.0 if name == 'legitimate' else (1.0 if name == "flood" else(2.0 if name == "dos" else(3.0 if name == "bruteforce" else(4.0 if name == "slowite" else (5.0))))))
        output_df = dataset.withColumn('outcome', label_to_multiple(col('target'))).drop("target")  
        output_df = output_df.withColumn('outcome', col('outcome').cast(DoubleType()))
        return output_df
        
class FeatureTypeCaster(Transformer): # this transformer will cast the columns as appropriate types  
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        output_df = dataset
        for col_name in continuous_cols:
            output_df = output_df.withColumn(col_name,col(col_name).cast(DoubleType()))

        return output_df
    
class ColumnDropper(Transformer): # this transformer drops unnecessary columns
    def __init__(self, columns_to_drop = None):
        super().__init__()
        self.columns_to_drop=columns_to_drop
    def _transform(self, dataset):
        output_df = dataset
        for col_name in self.columns_to_drop:
            output_df = output_df.drop(col_name)
            
        return output_df
    
def get_preprocess_pipeline(classification):
    # Stage where columns are casted as appropriate types
    stage_typecaster = FeatureTypeCaster()

    # Stage where nominal columns are transformed to index columns using StringIndexer
    nominal_id_cols = [x+"_index" for x in nominal_cols]
    nominal_onehot_cols = [x+"_encoded" for x in nominal_cols]
    stage_nominal_indexer = StringIndexer(inputCols = nominal_cols, outputCols = nominal_id_cols )
    
    # Stage where the index columns are further transformed using OneHotEncoder
    stage_nominal_onehot_encoder = OneHotEncoder(inputCols=nominal_id_cols, outputCols=nominal_onehot_cols)

    # Stage where all relevant features are assembled into a vector (and dropping a few)
    feature_cols = continuous_cols+nominal_onehot_cols
    corelated_cols_to_remove = to_drop
    
    for col_name in corelated_cols_to_remove:
        feature_cols.remove(col_name)
    stage_vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="vectorized_features")

    # Stage where we scale the columns
    stage_scaler = StandardScaler(inputCol= 'vectorized_features', outputCol= 'features')
    

    # Stage for creating the outcome column representing whether there is attack
    if(classification == "binary"): 
        stage_outcome = OutcomeCreater_binary()
    else:
        stage_outcome = OutcomeCreater_multi()

    # Removing all unnecessary columbs, only keeping the 'features' and 'outcome' columns
    stage_column_dropper = ColumnDropper(columns_to_drop = nominal_cols+nominal_id_cols+
        nominal_onehot_cols + continuous_cols + ['vectorized_features'])
    
    pipeline = Pipeline(stages=[stage_typecaster,stage_nominal_indexer,stage_nominal_onehot_encoder,
        stage_vector_assembler,stage_scaler,stage_outcome,stage_column_dropper])
    
    return pipeline 

### Multiclass Classification

In [9]:
preprocess_pipeline = get_preprocess_pipeline("multi")
preprocess_pipeline_model = preprocess_pipeline.fit(train)

train_df = preprocess_pipeline_model.transform(train)
test_df = preprocess_pipeline_model.transform(test)

22/11/28 08:44:34 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [10]:
train_df = train_df.limit(10000)
test_df = test_df.limit(1000)

### Decision Tree

In [11]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol="features",labelCol = "outcome")
dt_model = dt.fit(train_df)

                                                                                

In [12]:
dt_prediction_train = dt_model.transform(train_df)
dt_prediction_test = dt_model.transform(test_df)

dt_accuracy_train = (dt_prediction_train.filter(
    dt_prediction_train.outcome == dt_prediction_train.prediction).count() / 
    float(dt_prediction_train.count()))
dt_accuracy_test = (dt_prediction_test.filter(
    dt_prediction_test.outcome == dt_prediction_test.prediction).count()
    / float(dt_prediction_test.count()))

print(f"Train accuracy = {np.round(dt_accuracy_train*100,2)}%")
print(f"Test accuracy = {np.round(dt_accuracy_test*100,2)}%")

                                                                                

Train accuracy = 74.1%
Test accuracy = 72.3%


#### Hyper Parameter Tuning with Crossvalidation

In [13]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'outcome')

dt_paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [5, 10])# maximum depth for each tree
             .addGrid(dt.maxBins,[5,  10])
             .build())

evaluator = MulticlassClassificationEvaluator(labelCol='outcome',predictionCol='prediction', metricName="accuracy")

dt_cv = CrossValidator(estimator=dt, estimatorParamMaps=dt_paramGrid, 
                    evaluator=evaluator, numFolds=5)

dt_cv_model = dt_cv.fit(train_df)

dt_cv_prediction_test = dt_cv_model.transform(test_df)

accuracy = evaluator.evaluate(dt_cv_prediction_test)

print(f"Test accuracy after tuning= {accuracy * 100 :1.2f}")   ### After tuning

                                                                                

Test accuracy after tuning= 72.90


### Logistic Regression

In [14]:
from pyspark.ml.classification import LogisticRegression


lr = LogisticRegression(labelCol = 'outcome')


lrModel = lr.fit(train_df)

22/11/28 08:45:24 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/11/28 08:45:24 WARN com.github.fommil.netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS

In [15]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
predictions = lrModel.transform(test_df)

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="outcome", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy * 100))

Test set accuracy = 65.10000000000001


                                                                                

#### Hyper Parameter Tuning with Crossvalidation

In [16]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [17]:
# Create ParamGrid for Cross Validation
lr_paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01,  2.0])
             .addGrid(lr.maxIter, [3, 10])
             .build())


lr = LogisticRegression(featuresCol = 'features', labelCol = 'outcome')


evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', 
    labelCol='outcome', metricName='accuracy')

lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=lr_paramGrid, 
                    evaluator=evaluator, numFolds=5)

lr_cv_model = lr_cv.fit(train_df)

lr_cv_prediction_test = lr_cv_model.transform(test_df)

accuracy = evaluator.evaluate(lr_cv_prediction_test)

print(f"Test accuracy after tuning= {accuracy * 100 :1.2f}")   ### After tuning



Test accuracy after tuning= 65.10


                                                                                

### Binary Classification

In [18]:
preprocess_pipeline = get_preprocess_pipeline("binary")
preprocess_pipeline_model = preprocess_pipeline.fit(train)

train_df = preprocess_pipeline_model.transform(train)
test_df = preprocess_pipeline_model.transform(test)

                                                                                

In [19]:
train_df= train_df.limit(14000)
test_df = test_df.limit(6000)

### Decision Tree

In [20]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol="features",labelCol = "outcome")
dt_model = dt.fit(train_df)

                                                                                

In [21]:
dt_prediction_train = dt_model.transform(train_df)
dt_prediction_test = dt_model.transform(test_df)

dt_accuracy_train = (dt_prediction_train.filter(
    dt_prediction_train.outcome == dt_prediction_train.prediction).count() / 
    float(dt_prediction_train.count()))
dt_accuracy_test = (dt_prediction_test.filter(
    dt_prediction_test.outcome == dt_prediction_test.prediction).count()
    / float(dt_prediction_test.count()))

dt_auc = evaluator.evaluate(dt_prediction_test)

print(f"Train accuracy = {np.round(dt_accuracy_train*100,2)}%, test accuracy = {np.round(dt_accuracy_test*100,2)}%, AUC = {np.round(dt_auc,2)}")

                                                                                

Train accuracy = 91.11%, test accuracy = 90.7%, AUC = 0.91


##### Tuning with crossvalidation

In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'outcome')

dt_paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [5, 10])# maximum depth for each tree
             .addGrid(dt.maxBins,[5,  10])
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', 
    labelCol='outcome', metricName='areaUnderROC')

dt_cv = CrossValidator(estimator=dt, estimatorParamMaps=dt_paramGrid, 
                    evaluator=evaluator, numFolds=5)

dt_cv_model = dt_cv.fit(train_df)

dt_cv_prediction_test = dt_cv_model.transform(test_df)


dt_cv_auc = evaluator.evaluate(dt_cv_prediction_test)

                                                                                

In [23]:
print(f"Before cross-validation and parameter tuning, AUC={np.round(dt_auc,2)}")
print(f"After cross-validation and parameter tuning, AUC={np.round(dt_cv_auc,2)}")

Before cross-validation and parameter tuning, AUC=0.91
After cross-validation and parameter tuning, AUC=0.92


### Linear SVC

In [24]:
from pyspark.ml.classification import LinearSVC

svm = LinearSVC(featuresCol="features",labelCol = "outcome")

svm_model = svm.fit(train_df)

22/11/28 08:47:32 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
22/11/28 08:47:33 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
22/11/28 08:47:34 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
22/11/28 08:47:35 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
22/11/28 08:47:36 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 
22/11/28 08:47:36 ERROR breeze.optimize.OWLQN: Failure! Resetting history: breeze.optimize.NaNHistory: 

In [25]:
# make predictions on training dataset and test data set
svm_prediction_train = svm_model.transform(train_df)
svm_prediction_test = svm_model.transform(test_df)

# calculate train and test accuracy

svm_accuracy_train = (svm_prediction_train.filter(
    svm_prediction_train.outcome == svm_prediction_train.prediction).count() / 
    float(svm_prediction_train.count()))
svm_accuracy_test = (svm_prediction_test.filter(
    svm_prediction_test.outcome == svm_prediction_test.prediction).count()
    / float(svm_prediction_test.count()))
    
# calculate AUC
svm_auc = evaluator.evaluate(svm_prediction_test)

print(f"Train accuracy = {np.round(svm_accuracy_train*100,2)}%")
print(f"Test accuracy = {np.round(svm_accuracy_test*100,2)}%")
print(f"AUC = {np.round(svm_auc,2)}")

                                                                                

Train accuracy = 77.64%
Test accuracy = 78.9%
AUC = 0.81


#### Tuning with Crossvalidation

In [26]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

svm = LinearSVC(featuresCol="features",labelCol = "outcome")

svm_paramGrid = (ParamGridBuilder()
             .addGrid(svm.regParam, [0.01, 0.5, 2.0])# regularization parameter
             .addGrid(svm.maxIter, [10, 50, 100])#Number of iterations
             .build())

evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', 
    labelCol='outcome', metricName='areaUnderROC')

svm_cv = CrossValidator(estimator=svm, estimatorParamMaps=svm_paramGrid, 
                    evaluator=evaluator, numFolds=5)

svm_cv_model = svm_cv.fit(train_df)



In [27]:
svm_cv_prediction_test = svm_cv_model.transform(test_df)

svm_cv_auc = evaluator.evaluate(svm_cv_prediction_test)

print(f"Before cross-validation and parameter tuning, AUC={np.round(svm_auc,2)}")
print(f"After cross-validation and parameter tuning, AUC={np.round(svm_cv_auc,2)}")

Before cross-validation and parameter tuning, AUC=0.81
After cross-validation and parameter tuning, AUC=0.8


                                                                                