In [10]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler

from pyspark.sql.functions import col, SparkContext, udf
from pyspark.sql.types import *
import numpy as np

In [2]:
appName = "Big Data ML"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

# Create Spark Context with the new configurations rather than relying on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
# spark = sqlContext.sparkSession.builder.getOrCreate()
spark = sqlContext.sparkSession.builder.getOrCreate()



In [3]:
nslkdd_raw = spark.read.csv("train70_reduced.csv",header=True, inferSchema= True)
nslkdd_test_raw = spark.read.csv("test30_reduced.csv",header=True, inferSchema= True)

# Rename columns
for column_name in nslkdd_raw.columns:
    new_column_name = column_name.replace(".", "_")
    nslkdd_raw = nslkdd_raw.withColumnRenamed(column_name, new_column_name)

for column_name in nslkdd_test_raw.columns:
    new_column_name = column_name.replace(".", "_")
    nslkdd_test_raw = nslkdd_test_raw.withColumnRenamed(column_name, new_column_name)

# Drop the few rows with NA values (only 190)
nslkdd_raw = nslkdd_raw.na.drop()
nslkdd_test_raw = nslkdd_test_raw.na.drop()

# Dropping a couple useless columns
nslkdd_raw = nslkdd_raw.drop("mqtt_protoname")
nslkdd_test_raw = nslkdd_test_raw.drop("mqtt_protoname")

# Converting nominal columns to string so they can be encoded by the transformer
nslkdd_raw = nslkdd_raw.withColumn("mqtt_msgtype", nslkdd_raw["mqtt_msgtype"].cast("string"))
nslkdd_raw = nslkdd_raw.withColumn("mqtt_conack_val", nslkdd_raw["mqtt_conack_val"].cast("string"))
nslkdd_test_raw = nslkdd_test_raw.withColumn("mqtt_msgtype", nslkdd_test_raw["mqtt_msgtype"].cast("string"))
nslkdd_test_raw = nslkdd_test_raw.withColumn("mqtt_conack_val", nslkdd_test_raw["mqtt_conack_val"].cast("string"))

In [4]:
nslkdd_raw.printSchema()

root
 |-- tcp_flags: string (nullable = true)
 |-- tcp_time_delta: double (nullable = true)
 |-- tcp_len: integer (nullable = true)
 |-- mqtt_conack_flags: string (nullable = true)
 |-- mqtt_conack_flags_reserved: double (nullable = true)
 |-- mqtt_conack_flags_sp: double (nullable = true)
 |-- mqtt_conack_val: string (nullable = true)
 |-- mqtt_conflag_cleansess: double (nullable = true)
 |-- mqtt_conflag_passwd: double (nullable = true)
 |-- mqtt_conflag_qos: double (nullable = true)
 |-- mqtt_conflag_reserved: double (nullable = true)
 |-- mqtt_conflag_retain: double (nullable = true)
 |-- mqtt_conflag_uname: double (nullable = true)
 |-- mqtt_conflag_willflag: double (nullable = true)
 |-- mqtt_conflags: string (nullable = true)
 |-- mqtt_dupflag: double (nullable = true)
 |-- mqtt_hdrflags: string (nullable = true)
 |-- mqtt_kalive: double (nullable = true)
 |-- mqtt_len: double (nullable = true)
 |-- mqtt_msg: string (nullable = true)
 |-- mqtt_msgid: double (nullable = true)
 |-

In [5]:
col_names =  ["tcp_flags","tcp_time_delta","tcp_len",
"mqtt_conack_flags","mqtt_conack_flags_reserved","mqtt_conack_flags_sp",
"mqtt_conack_val","mqtt_conflag_cleansess","mqtt_conflag_passwd",
"mqtt_conflag_qos","mqtt_conflag_reserved","mqtt_conflag_retain",
"mqtt_conflag_uname","mqtt_conflag_willflag",
"mqtt_conflags","mqtt_dupflag","mqtt_hdrflags",
"mqtt_kalive","mqtt_len","mqtt_msg","mqtt_msgid","mqtt_msgtype",
"mqtt_proto_len","mqtt_qos",
"mqtt_retain","mqtt_sub_qos","mqtt_suback_qos","mqtt_ver",
"mqtt_willmsg","mqtt_willmsg_len","mqtt_willtopic","mqtt_willtopic_len",
"target"]

nominal_cols = ["mqtt_msgtype", "mqtt_conack_val", "tcp_flags", "mqtt_conack_flags", "mqtt_conflags", "mqtt_hdrflags"]
binary_cols = ["mqtt_dupflag"]
continuous_cols = ["tcp_time_delta","tcp_len",
"mqtt_conack_flags_reserved","mqtt_conack_flags_sp",
"mqtt_conflag_cleansess","mqtt_conflag_passwd",
"mqtt_conflag_qos","mqtt_conflag_reserved","mqtt_conflag_retain",
"mqtt_conflag_uname","mqtt_conflag_willflag",
"mqtt_kalive","mqtt_len","mqtt_msg","mqtt_msgid",
"mqtt_proto_len","mqtt_qos",
"mqtt_retain","mqtt_sub_qos","mqtt_suback_qos","mqtt_ver",
"mqtt_willmsg","mqtt_willmsg_len","mqtt_willtopic","mqtt_willtopic_len"]

In [6]:
# nslkdd_raw.select("mqtt_hdrflags").distinct().show()

In [7]:
columns_to_drop = [] # Add any columns that we want to just get rid of here

In [8]:
'''
Data preprocessing pipeline
'''
class OutcomeCreater(Transformer): # this defines a transformer that creates the outcome column
    def __init__(self):
        super().__init__()

    def label_to_vector(self, name):
        name = name.lower()
        
        if name == 'normal':
            return 0.0
        elif name == 'dos':
            return 1.0
        elif name == 'malformed':
            return 2.0
        elif name == 'slowite':
            return 3.0
        elif name == 'bruteforce':
            return 4.0
        else:
            return -100.0

    def _transform(self, dataset):
        label_to_vector_udf = udf(self.label_to_vector, StringType())
        output_df = dataset.withColumn('outcome', label_to_vector_udf(col('target'))).drop("target")  
        output_df = output_df.withColumn('outcome', col('outcome').cast(DoubleType()))
        return output_df

class FeatureTypeCaster(Transformer): # this transformer will cast the columns as appropriate types  
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        output_df = dataset
        for col_name in binary_cols + continuous_cols:
            output_df = output_df.withColumn(col_name,col(col_name).cast(DoubleType()))

        return output_df

class ColumnDropper(Transformer): # this transformer drops unnecessary columns
    def __init__(self, columns_to_drop = None):
        super().__init__()
        self.columns_to_drop=columns_to_drop
    def _transform(self, dataset):
        output_df = dataset
        for col_name in self.columns_to_drop:
            output_df = output_df.drop(col_name)
        return output_df

def get_preprocess_pipeline():
    # Stage where columns are casted as appropriate types
    stage_typecaster = FeatureTypeCaster()

    # Stage where coulmns are imputed if they have NAs
    stage_imputer = Imputer(inputCols=binary_cols+continuous_cols, outputCols=binary_cols+continuous_cols)

    # Stage where nominal columns are transformed to index columns using StringIndexer
    nominal_id_cols = [x+"_index" for x in nominal_cols]
    nominal_onehot_cols = [x+"_encoded" for x in nominal_cols]
    stage_nominal_indexer = StringIndexer(inputCols=nominal_cols, outputCols=nominal_id_cols)

    # Stage where the index columns are further transformed using OneHotEncoder
    stage_nominal_onehot_encoder = OneHotEncoder(inputCols=nominal_id_cols, outputCols=nominal_onehot_cols)

    # Stage where all relevant features are assembled into a vector (and dropping a few)
    feature_cols = continuous_cols+binary_cols+nominal_onehot_cols
    for col_name in columns_to_drop:
        if col_name in nominal_cols:
            feature_cols.remove(col_name+"_encoded")
        else:
            feature_cols.remove(col_name)
    stage_vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="vectorized_features")

    # Stage where we scale the columns
    stage_scaler = StandardScaler(inputCol= 'vectorized_features', outputCol= 'features')

    # Stage for creating the outcome column representing whether there is attack 
    stage_outcome = OutcomeCreater()

    # Removing all unnecessary columbs, only keeping the 'features' and 'outcome' columns
    stage_column_dropper = ColumnDropper(columns_to_drop = nominal_cols + nominal_id_cols + nominal_onehot_cols +
        binary_cols + continuous_cols + columns_to_drop + ['vectorized_features'])
    # Connect the columns into a pipeline
    pipeline = Pipeline(stages=[stage_typecaster, stage_imputer, stage_nominal_indexer, stage_nominal_onehot_encoder,
        stage_vector_assembler, stage_scaler, stage_outcome, stage_column_dropper])
    return pipeline

In [11]:
preprocess_pipeline = get_preprocess_pipeline()
preprocess_pipeline_model = preprocess_pipeline.fit(nslkdd_raw)

nslkdd_df = preprocess_pipeline_model.transform(nslkdd_raw)
nslkdd_df_test = preprocess_pipeline_model.transform(nslkdd_test_raw)

In [13]:
nslkdd_df_test.printSchema()

root
 |-- features: vector (nullable = true)
 |-- outcome: double (nullable = true)



In [14]:
nslkdd_df_test.show(1, vertical=True)

-RECORD 0------------------------
 features | (61,[0,27,36,44,4... 
 outcome  | 2.0                  
only showing top 1 row

