In [5]:
col_names = [
    'tcp_flags',
    'tcp_time_delta',
    'tcp_len',
    'mqtt_conack_flags',
    'mqtt_conack_flags_reserved',
    'mqtt_conack_flags_sp',
    'mqtt_conack_val',
    'mqtt_conflag_cleansess',
    'mqtt_conflag_passwd',
    'mqtt_conflag_qos',
    'mqtt_conflag_reserved',
    'mqtt_conflag_retain',
    'mqtt_conflag_uname',
    'mqtt_conflag_willflag',
    'mqtt_conflags',
    'mqtt_dupflag',
    'mqtt_hdrflags',
    'mqtt_kalive',
    'mqtt_len',
    'mqtt_msg',
    'mqtt_msgid',
    'mqtt_msgtype',
    'mqtt_proto_len',
    'mqtt_protoname',
    'mqtt_qos',
    'mqtt_retain',
    'mqtt_sub_qos',
    'mqtt_suback_qos',
    'mqtt_ver',
    'mqtt_willmsg',
    'mqtt_willmsg_len',
    'mqtt_willtopic',
    'mqtt_willtopic_len',
    'target',
    'dataset',
    'tcp_flags_decimal',
    'mqtt_conflags_decimal',
    'mqtt_hdrflags_decimal'
]

nominal_cols = ["mqtt_msg"]

corelated_cols_to_remove = ["mqtt_conack_flags",
                     "mqtt_conack_flags_reserved",
                     "mqtt_conack_flags_sp", 
                     "mqtt_conflag_qos", 
                     "mqtt_conflag_reserved", 
                     "mqtt_conflag_retain", 
                     "mqtt_conflag_willflag", 
                     "mqtt_willtopic", 
                     "mqtt_willtopic_len", 
                     "mqtt_sub_qos", 
                     "mqtt_suback_qos", 
                     "mqtt_willmsg",
                     "mqtt_willmsg_len"]



binary_cols = ["mqtt_conack_val", 
                  "mqtt_conflag_cleansess", 
                  "mqtt_conflag_passwd", 
                  "mqtt_conflag_uname", 
                  "mqtt_dupflag", 
                  "mqtt_proto_len", 
                  "mqtt_protoname", 
                  "mqtt_qos", 
                  "mqtt_retain", 
                  "mqtt_ver"]


continuous_cols = ["tcp_flags_decimal",
                     'tcp_time_delta',
                     'tcp_len',
                     'mqtt_kalive',
                     'mqtt_len',
                     'mqtt_msgid',
                     'mqtt_msgtype',
                     'mqtt_conflags_decimal', 
                     "mqtt_hdrflags_decimal", ]

In [8]:
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler

from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

import numpy as np
from matplotlib import pyplot as plt


class OutcomeCreater(Transformer): # this defines a transformer that creates the outcome column
    
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        # label_to_binary = udf(lambda name: 0.0 if name == 'normal' else 1.0)
        # output_df = dataset.withColumn('outcome', label_to_binary(col('class'))).drop("class")  
        # output_df = output_df.withColumn('outcome', col('outcome').cast(DoubleType()))
        # output_df = output_df.drop('difficulty')
        # return output_df

        def converter(name):
            if name == 'legitimate':
                return 0
            elif name == 'dos':
                return 1      
            elif name == 'malformed':
                return 2     
            elif name == 'flood':
                return 3
            elif name == 'bruteforce':
                return 4   
            elif name == 'slowite':
                return 5  
            else:
                print("out of range")
                return None
            
        label_to_binary = udf(lambda name: converter(name))
        output_df = dataset.withColumn('label', label_to_binary(col('target')))
        output_df = output_df.drop("target")  
        output_df = output_df.withColumn('label', col('label').cast(DoubleType()))
        return output_df

class FeatureTypeCaster(Transformer): # this transformer will cast the columns as appropriate types  
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        output_df = dataset
        for col_name in binary_cols + continuous_cols:
            output_df = output_df.withColumn(col_name,col(col_name).cast(DoubleType()))

        return output_df
class ColumnDropper(Transformer): # this transformer drops unnecessary columns
    def __init__(self, columns_to_drop = None):
        super().__init__()
        self.columns_to_drop=columns_to_drop
    def _transform(self, dataset):
        output_df = dataset
        for col_name in self.columns_to_drop:
            output_df = output_df.drop(col_name)
        return output_df

# Stage where columns are casted as appropriate types
stage_typecaster = FeatureTypeCaster()

# Stage where nominal columns are transformed to index columns using StringIndexer
nominal_id_cols = [x+"_index" for x in nominal_cols]
nominal_onehot_cols = [x+"_encoded" for x in nominal_cols]
print(nominal_cols)
print(nominal_id_cols)
stage_nominal_indexer = StringIndexer(inputCols = nominal_cols, outputCols = nominal_id_cols )

# Stage where the index columns are further transformed using OneHotEncoder
stage_nominal_onehot_encoder = OneHotEncoder(inputCols=nominal_id_cols, outputCols=nominal_onehot_cols)

# Stage where all relevant features are assembled into a vector (and dropping a few)
feature_cols = continuous_cols+binary_cols+nominal_onehot_cols
for col_name in corelated_cols_to_remove:
    feature_cols.remove(col_name)
stage_vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="vectorized_features")

# Stage where we scale the columns
stage_scaler = StandardScaler(inputCol= 'vectorized_features', outputCol= 'features')


# Stage for creating the outcome column representing whether there is attack 
stage_outcome = OutcomeCreater()

# Removing all unnecessary columbs, only keeping the 'features' and 'outcome' columns
stage_column_dropper = ColumnDropper(columns_to_drop = nominal_cols+nominal_id_cols+
    nominal_onehot_cols+ binary_cols + continuous_cols + ['vectorized_features'])


# Estimator
lr = LogisticRegression(featuresCol = 'features', labelCol = 'outcome')

# ParameterGrid
lr_paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

# Evaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', 
    labelCol='outcome', metricName='areaUnderROC')

# CrossValidator
lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=lr_paramGrid, 
                    evaluator=evaluator, numFolds=5)


# Connect the columns into a pipeline
ml_pipeline = Pipeline(stages=[stage_typecaster,stage_nominal_indexer,stage_nominal_onehot_encoder,
    stage_vector_assembler,stage_scaler,stage_outcome,stage_column_dropper,lr_cv])


['mqtt_msg']
['mqtt_msg_index']


AssertionError: 