# Data cleaning and Data engineering

In [1]:
# Uncomment the following lines if you are using Windows!
# import findspark
# findspark.init()
# findspark.find()

import pyspark

from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

appName = "Big Data Analytics"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)\
    .set('spark.jars', 'gs://dataproc-staging-us-central1-159964990471-2n8oqiw8/postgresql-42.6.0.jar')

# Create Spark Context with the new configurations rather than relying on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.getOrCreate()

# df_train = spark.read.csv("train70_reduced.csv" ,header=True, inferSchema= True)
# df_test = spark.read.csv("test30_reduced.csv", header=True, inferSchema=True)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/16 19:08:37 INFO SparkEnv: Registering MapOutputTracker
23/11/16 19:08:37 INFO SparkEnv: Registering BlockManagerMaster
23/11/16 19:08:37 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/11/16 19:08:37 INFO SparkEnv: Registering OutputCommitCoordinator


### Loading data from Cloud bucket and the Postgres on cloud

In [2]:
df_train = spark.read.csv("gs://dataproc-staging-us-central1-159964990471-2n8oqiw8/train70_reduced.csv" ,header=True, inferSchema= True)
df_test = spark.read.csv("gs://dataproc-staging-us-central1-159964990471-2n8oqiw8/test30_reduced.csv", header=True, inferSchema=True)

                                                                                

In [3]:
from pyspark.sql.functions import lit

df_train = df_train.withColumn('Train', lit(1))
df_test = df_test.withColumn('Train', lit(0))

In [4]:
db_properties={}
#update your db username
db_properties['username']="postgres"
#update your db password
db_properties['password']="18763kebjeseaya"
#make sure you got the right port number here
db_properties['url']= "jdbc:postgresql://34.136.81.58/postgres"
#make sure you had the Postgres JAR file in the right location
db_properties['driver']="org.postgresql.Driver"
db_properties['table']= "mqtt"


df_train.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()

df_test.write.format("jdbc")\
.mode("append")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()

23/11/16 19:08:49 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [5]:
df = sqlContext.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()

df.show(1, vertical=True)

-RECORD 0--------------------------------
 tcp.flags                  | 0x00000018 
 tcp.time_delta             | 0.998867   
 tcp.len                    | 10         
 mqtt.conack.flags          | 0          
 mqtt.conack.flags.reserved | 0.0        
 mqtt.conack.flags.sp       | 0.0        
 mqtt.conack.val            | 0.0        
 mqtt.conflag.cleansess     | 0.0        
 mqtt.conflag.passwd        | 0.0        
 mqtt.conflag.qos           | 0.0        
 mqtt.conflag.reserved      | 0.0        
 mqtt.conflag.retain        | 0.0        
 mqtt.conflag.uname         | 0.0        
 mqtt.conflag.willflag      | 0.0        
 mqtt.conflags              | 0          
 mqtt.dupflag               | 0.0        
 mqtt.hdrflags              | 0x00000030 
 mqtt.kalive                | 0.0        
 mqtt.len                   | 8.0        
 mqtt.msg                   | 32         
 mqtt.msgid                 | 0.0        
 mqtt.msgtype               | 3.0        
 mqtt.proto_len             | 0.0 

In [6]:
df_train = df.filter(df.Train == 1)
df_test = df.filter(df.Train == 0)

#dropping the train column
df_train = df_train.drop('Train')
df_test = df_test.drop('Train')

### Renaming cols

In [7]:
cols = df_train.columns

for column in cols:
    new_column = column.replace('.', '_')
    df_train = df_train.withColumnRenamed(column, new_column)
    
# df_train.printSchema()

### Creating Output label column

In [8]:
from pyspark.sql.functions import when
from pyspark.sql.functions import col
df_with_target = df_train.withColumn('label', when(col('target') == 'slowite', 0).when(col('target') == 'bruteforce', 1).\
                               when(col('target') == 'flood', 2).when(col('target') == 'malformed', 3).when(col('target')\
                             == 'dos', 4).when(col('target') == 'legitimate', 5).otherwise(6)).drop('target')
# df_with_target.show(1, vertical=True)


### Deleting null records

In [9]:
from pyspark.sql.functions import *
null_counts_plays_df = df_with_target.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) \
                        for c in df_with_target.columns])
null_counts_plays_df.show(truncate=False, vertical=True)

# No rows with null values

[Stage 7:>                                                          (0 + 1) / 1]

-RECORD 0-------------------------
 tcp_flags                  | 0   
 tcp_time_delta             | 0   
 tcp_len                    | 0   
 mqtt_conack_flags          | 0   
 mqtt_conack_flags_reserved | 0   
 mqtt_conack_flags_sp       | 0   
 mqtt_conack_val            | 0   
 mqtt_conflag_cleansess     | 0   
 mqtt_conflag_passwd        | 0   
 mqtt_conflag_qos           | 0   
 mqtt_conflag_reserved      | 0   
 mqtt_conflag_retain        | 0   
 mqtt_conflag_uname         | 0   
 mqtt_conflag_willflag      | 0   
 mqtt_conflags              | 0   
 mqtt_dupflag               | 0   
 mqtt_hdrflags              | 0   
 mqtt_kalive                | 0   
 mqtt_len                   | 0   
 mqtt_msg                   | 0   
 mqtt_msgid                 | 0   
 mqtt_msgtype               | 0   
 mqtt_proto_len             | 0   
 mqtt_protoname             | 0   
 mqtt_qos                   | 0   
 mqtt_retain                | 0   
 mqtt_sub_qos               | 0   
 mqtt_suback_qos    

                                                                                

### Checking constraints

1. `tcp_time_delta` : Must be > 0, if not set it to 0.
2. [`tcp_len`, `mqtt_len`, `mqtt_proto_len`, `mqtt_willmsg_len`, `mqtt_willtopic_len`] --> Must be >= 0
3. Cols with flag information must be of the format '0x000000xx'. These include [`tcp_flags`, `mqtt_conack_flags`, `mqtt_conflags`, `mqtt_hdrflags`]

In [10]:
# Checking cols with > 0 condition

cols_non_negative = ['tcp_time_delta', 'tcp_len', 'mqtt_len', 'mqtt_proto_len', 'mqtt_willmsg_len', 'mqtt_willtopic_len']
from pyspark.sql.functions import *
df_with_target.select([count(when(col(c) < 0, c)).alias(c) for c in cols_non_negative]).show()

+--------------+-------+--------+--------------+----------------+------------------+
|tcp_time_delta|tcp_len|mqtt_len|mqtt_proto_len|mqtt_willmsg_len|mqtt_willtopic_len|
+--------------+-------+--------+--------------+----------------+------------------+
|             1|      0|       0|             0|               0|                 0|
+--------------+-------+--------+--------------+----------------+------------------+



In [11]:
# Drop that record which has negative value in tcp_time_delta column
df_with_target = df_with_target.filter(df_with_target.tcp_time_delta >= 0)

In [12]:
# Checking for the flag columns constraints

cols_with_hex = ['tcp_flags', 'mqtt_conack_flags', 'mqtt_conflags', 'mqtt_hdrflags']
df_with_target.select([count(when(col(c) < 10, c)).alias(c) for c in cols_with_hex]).show()

+---------+-----------------+-------------+-------------+
|tcp_flags|mqtt_conack_flags|mqtt_conflags|mqtt_hdrflags|
+---------+-----------------+-------------+-------------+
|        0|           229429|       229428|        87052|
+---------+-----------------+-------------+-------------+



In [13]:
# Out of ~230,000 entries ~229,000 entries in mqtt_conack_flags and mqtt_conflags are not in hex format just (0).
# I will drop these two columns as they are not of much use to us.
# But for mqtt_hdrflags : ~ 37% of the entries are not in hex format.
# We could either 1 hot encode them or treat the entire col as binary column.
# 37% seems too big to simply drop the non hex entries.
# So I will 1 hot encode the column

df_with_target_constraints_handled = df_with_target.drop('mqtt_conack_flags', 'mqtt_conflags')

### Investigating which cols are not useful to us.

### (1) Deleting columns with only 1 unique value

In [14]:
from pyspark.sql.functions import col, countDistinct

unique_values_count = df_with_target_constraints_handled.agg(*(countDistinct(col(c)).alias(c)\
                                         for c in df_with_target_constraints_handled.columns))
unique_values_count.show(vertical=True)

[Stage 16:>                                                         (0 + 1) / 1]

-RECORD 0---------------------------
 tcp_flags                  | 8     
 tcp_time_delta             | 8954  
 tcp_len                    | 723   
 mqtt_conack_flags_reserved | 1     
 mqtt_conack_flags_sp       | 1     
 mqtt_conack_val            | 2     
 mqtt_conflag_cleansess     | 2     
 mqtt_conflag_passwd        | 2     
 mqtt_conflag_qos           | 1     
 mqtt_conflag_reserved      | 1     
 mqtt_conflag_retain        | 1     
 mqtt_conflag_uname         | 2     
 mqtt_conflag_willflag      | 1     
 mqtt_dupflag               | 2     
 mqtt_hdrflags              | 14    
 mqtt_kalive                | 7     
 mqtt_len                   | 91    
 mqtt_msg                   | 35695 
 mqtt_msgid                 | 9813  
 mqtt_msgtype               | 11    
 mqtt_proto_len             | 2     
 mqtt_protoname             | 2     
 mqtt_qos                   | 2     
 mqtt_retain                | 2     
 mqtt_sub_qos               | 1     
 mqtt_suback_qos            | 1     
 

                                                                                

In [15]:
# There are some cols with only 1 unique value, lets see what they are then drop them

cols_to_drop = [column for column in unique_values_count.columns if unique_values_count.collect()[0][column] == 1]
print(cols_to_drop)

df_with_target_constraints_handled = df_with_target_constraints_handled.drop(*cols_to_drop)
print(df_with_target_constraints_handled.columns)

                                                                                

['mqtt_conack_flags_reserved', 'mqtt_conack_flags_sp', 'mqtt_conflag_qos', 'mqtt_conflag_reserved', 'mqtt_conflag_retain', 'mqtt_conflag_willflag', 'mqtt_sub_qos', 'mqtt_suback_qos', 'mqtt_willmsg', 'mqtt_willmsg_len', 'mqtt_willtopic', 'mqtt_willtopic_len']
['tcp_flags', 'tcp_time_delta', 'tcp_len', 'mqtt_conack_val', 'mqtt_conflag_cleansess', 'mqtt_conflag_passwd', 'mqtt_conflag_uname', 'mqtt_dupflag', 'mqtt_hdrflags', 'mqtt_kalive', 'mqtt_len', 'mqtt_msg', 'mqtt_msgid', 'mqtt_msgtype', 'mqtt_proto_len', 'mqtt_protoname', 'mqtt_qos', 'mqtt_retain', 'mqtt_ver', 'label']


### (2) Deleting highly correleated columns

In [16]:
# To find the numeric cols in the dataframe
numeric_features = [t[0] for t in df_with_target_constraints_handled.dtypes if t[1] != 'string']
# Don't want to include the label column
numeric_features.remove('label')
numeric_features

['tcp_time_delta',
 'tcp_len',
 'mqtt_conack_val',
 'mqtt_conflag_cleansess',
 'mqtt_conflag_passwd',
 'mqtt_conflag_uname',
 'mqtt_dupflag',
 'mqtt_kalive',
 'mqtt_len',
 'mqtt_msgid',
 'mqtt_msgtype',
 'mqtt_proto_len',
 'mqtt_qos',
 'mqtt_retain',
 'mqtt_ver']

In [17]:
import numpy as np
correlation_matrix = df_with_target_constraints_handled.select(numeric_features).toPandas().corr()
correlation_matrix

                                                                                

Unnamed: 0,tcp_time_delta,tcp_len,mqtt_conack_val,mqtt_conflag_cleansess,mqtt_conflag_passwd,mqtt_conflag_uname,mqtt_dupflag,mqtt_kalive,mqtt_len,mqtt_msgid,mqtt_msgtype,mqtt_proto_len,mqtt_qos,mqtt_retain,mqtt_ver
tcp_time_delta,1.0,-0.006663,-0.006295,-0.00944,-0.006333,-0.006346,-0.017258,-0.004982,-0.03623,-0.046167,0.28774,-0.00944,-0.037255,-0.00186,-0.00944
tcp_len,-0.006663,1.0,-0.010199,-0.012766,-0.008432,-0.00845,0.153574,-0.008053,0.268786,0.15651,0.08372,-0.012766,0.265729,0.008987,-0.012766
mqtt_conack_val,-0.006295,-0.010199,1.0,-0.006508,-0.004366,-0.004375,-0.015572,-0.003435,-0.031787,-0.03383,-0.001747,-0.006508,-0.029217,-0.001283,-0.006508
mqtt_conflag_cleansess,-0.00944,-0.012766,-0.006508,1.0,0.670844,0.672186,-0.02312,0.527816,0.002574,-0.050229,-0.055534,1.0,-0.043379,-0.001905,1.0
mqtt_conflag_passwd,-0.006333,-0.008432,-0.004366,0.670844,1.0,0.998003,-0.01551,-0.002252,-0.00497,-0.033696,-0.037255,0.670844,-0.0291,-0.001278,0.670844
mqtt_conflag_uname,-0.006346,-0.00845,-0.004375,0.672186,0.998003,1.0,-0.015541,-0.002257,-0.005,-0.033763,-0.037329,0.672186,-0.029159,-0.001281,0.672186
mqtt_dupflag,-0.017258,0.153574,-0.015572,-0.02312,-0.01551,-0.015541,1.0,-0.012203,0.527227,0.400831,0.12046,-0.02312,0.532977,-0.004559,-0.02312
mqtt_kalive,-0.004982,-0.008053,-0.003435,0.527816,-0.002252,-0.002257,-0.012203,1.0,-0.001207,-0.026512,-0.029312,0.527816,-0.022896,-0.001006,0.527816
mqtt_len,-0.03623,0.268786,-0.031787,0.002574,-0.00497,-0.005,0.527227,-0.001207,1.0,0.568639,0.259118,0.002574,0.988035,0.002293,0.002574
mqtt_msgid,-0.046167,0.15651,-0.03383,-0.050229,-0.033696,-0.033763,0.400831,-0.026512,0.568639,1.0,0.375774,-0.050229,0.584469,-0.009904,-0.050229


In [18]:
print('--------------------------------------------------------------------------')
row_idx = -1
for index, row in correlation_matrix.iterrows():
    row_idx += 1
    col_idx = 0
    for column, value in row.items():
        if col_idx <= row_idx:
            col_idx += 1
            continue
        if value > 0.9 or value < -0.9:
            print(f'Correlation > 0.9 between {index} and {column}: {value}')
        col_idx += 1
print('--------------------------------------------------------------------------')

--------------------------------------------------------------------------
Correlation > 0.9 between mqtt_conflag_cleansess and mqtt_proto_len: 1.0
Correlation > 0.9 between mqtt_conflag_cleansess and mqtt_ver: 1.0
Correlation > 0.9 between mqtt_conflag_passwd and mqtt_conflag_uname: 0.9980032722174356
Correlation > 0.9 between mqtt_len and mqtt_qos: 0.9880353598330669
Correlation > 0.9 between mqtt_proto_len and mqtt_ver: 1.0
--------------------------------------------------------------------------


- So there is very high correlation (> 0.9) between 
1. [`mqtt_conflag_cleansess`, `mqtt_proto_len`, `mqtt_ver`]
2. [`mqtt_conflag_passwd`, `mqtt_conflag_uname`] 
3. [`mqtt_len`, `mqtt_qos`].
- So from these 3 categories we can drop all but one columns

In [19]:
high_corr_cols_to_drop = ['mqtt_conflag_cleansess', 'mqtt_proto_len', 'mqtt_conflag_passwd', 'mqtt_len']
df_with_target_constraints_handled_sval_high_corr_dropped = df_with_target_constraints_handled.drop(*high_corr_cols_to_drop)

### Further Analysis to find which cols will not contribute much

In [20]:
# To find what % of values in each col are 0

from pyspark.sql.functions import lit, col, sum
zero_count_df = df_with_target_constraints_handled_sval_high_corr_dropped.select(\
                [sum(when(col(c) == 0, 1).otherwise(0)/df_train.count()).alias(c)\
                for c in df_with_target_constraints_handled_sval_high_corr_dropped.columns])
zero_count_df.show(vertical=True)

[Stage 170:>                                                        (0 + 1) / 1]

-RECORD 0----------------------------------
 tcp_flags          | 0.0                  
 tcp_time_delta     | 0.08778912651202521  
 tcp_len            | 0.3730994707442474   
 mqtt_conack_val    | 0.9956312649496492   
 mqtt_conflag_uname | 0.9956485326771299   
 mqtt_dupflag       | 0.9475794963024938   
 mqtt_hdrflags      | 0.3757975531631201   
 mqtt_kalive        | 0.9904250451141924   
 mqtt_msg           | 0.5237388083547482   
 mqtt_msgid         | 0.716770416930577    
 mqtt_msgtype       | 0.3757975531631201   
 mqtt_protoname     | 0.9904250451141924   
 mqtt_qos           | 0.8370056033792851   
 mqtt_retain        | 0.9996201099977106   
 mqtt_ver           | 0.9904250451141924   
 label              | 0.027805358175840077 



                                                                                

1. 99% values are 0 --> `mqtt_conack_val`, `mqtt_conflag_uname`, `mqtt_kalive`, `mqtt_protoname`, `mqtt_retain`, `mqtt_ver`.
2. I will be dropping these columns.

In [21]:
cols_99_percent_zero = [column for column in zero_count_df.columns if zero_count_df.collect()[0][column] > 0.99]
df_with_target_constraints_handled_sval_high_corr_99_percent_zero_dropped =\
      df_with_target_constraints_handled_sval_high_corr_dropped.drop(*cols_99_percent_zero)

                                                                                

`mqtt_msg` col has a lot of numbers and strings values. This column can be dropped as well.

In [22]:
df_with_target_constraints_handled_sval_high_corr_99_percent_zero_dropped = \
    df_with_target_constraints_handled_sval_high_corr_99_percent_zero_dropped.drop('mqtt_msg')

In [23]:
df_with_target_constraints_handled_sval_high_corr_99_percent_zero_dropped.printSchema()

root
 |-- tcp_flags: string (nullable = true)
 |-- tcp_time_delta: double (nullable = true)
 |-- tcp_len: integer (nullable = true)
 |-- mqtt_dupflag: double (nullable = true)
 |-- mqtt_hdrflags: string (nullable = true)
 |-- mqtt_msgid: double (nullable = true)
 |-- mqtt_msgtype: double (nullable = true)
 |-- mqtt_qos: double (nullable = true)
 |-- label: integer (nullable = false)



### Binary, Ordinal and Nominal Values

In [24]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

nominal_cols = ['tcp_flags', 'mqtt_hdrflags']
nominal_cols_string_indexed = [f'{col}_numeric' for col in nominal_cols]
nominal_cols_one_hot_encoded = [f'{col}_onehot' for col in nominal_cols]

# 1. transform cols to numeric
stage_1 = StringIndexer(inputCols=nominal_cols, outputCols=nominal_cols_string_indexed)

# 2. transform numeric cols to one hot encoded cols
stage_2 = OneHotEncoder(inputCols=stage_1.getOutputCols(), outputCols=nominal_cols_one_hot_encoded)

pipeline = Pipeline(stages=[stage_1, stage_2])

df_with_target_constraints_handled_sval_high_corr_99_percent_zero_dropped = pipeline.fit(df_with_target_constraints_handled_sval_high_corr_99_percent_zero_dropped)\
    .transform(df_with_target_constraints_handled_sval_high_corr_99_percent_zero_dropped)

# Now drop the original nominal cols and string indexed cols

df_with_target_constraints_handled_sval_high_corr_99_percent_zero_dropped = \
    df_with_target_constraints_handled_sval_high_corr_99_percent_zero_dropped.drop(*nominal_cols, *nominal_cols_string_indexed)

                                                                                

### Combining Features in a Vector

In [25]:
feature_list = df_with_target_constraints_handled_sval_high_corr_99_percent_zero_dropped.drop('label').columns

In [26]:
# Now to combine all the features into a single vector

from pyspark.ml.feature import VectorAssembler
vector_assembler = VectorAssembler(inputCols=feature_list, outputCol='vectorized_features')
df_with_assembled_features = vector_assembler.transform(df_with_target_constraints_handled_sval_high_corr_99_percent_zero_dropped)

In [27]:
df_with_assembled_features.select('vectorized_features', 'label').distinct().toPandas()

                                                                                

Unnamed: 0,vectorized_features,label
0,"(0.000288, 1460.0, 1.0, 4505.0, 3.0, 1.0, 0.0,...",4
1,"(0.000184, 14.0, 0.0, 0.0, 3.0, 0.0, 0.0, 0.0,...",3
2,"(0.0004, 1460.0, 0.0, 4775.0, 3.0, 1.0, 0.0, 1...",4
3,"(0.999952, 14.0, 0.0, 0.0, 3.0, 0.0, 1.0, 0.0,...",5
4,"(2.001151, 32760.0, 0.0, 0.0, 3.0, 0.0, 1.0, 0...",2
...,...,...
78861,"(0.000898, 4.0, 0.0, 0.0, 2.0, 0.0, 1.0, 0.0, ...",3
78862,"(0.000919, 4.0, 0.0, 4184.0, 4.0, 0.0, 1.0, 0....",4
78863,"(0.000269, 1460.0, 0.0, 5577.0, 3.0, 1.0, 0.0,...",4
78864,"(1e-06, 102.0, 0.0, 3360.0, 3.0, 1.0, 1.0, 0.0...",4


### Scaling

In [28]:
from pyspark.ml.feature import StandardScaler

standard_scaler = StandardScaler(inputCol= 'vectorized_features', outputCol= 'features')
scaled_model = standard_scaler.fit(df_with_assembled_features)
df_with_scaled_features = scaled_model.transform(df_with_assembled_features)

df_with_scaled_features.select("features", "label").distinct().toPandas()

                                                                                

Unnamed: 0,features,label
0,"(2.999199231941019e-05, 0.0, 0.0, 0.0, 0.0, 0....",4
1,"(0.1725115266724401, 0.010553827953049337, 0.0...",5
2,"(9.99733077313673e-06, 0.3683285955614219, 0.0...",4
3,"(0.0, 0.09603983437274898, 0.0, 3.216123081765...",4
4,"(0.00031784617147696776, 0.0, 0.0, 0.0, 0.0, 0...",3
...,...,...
78861,"(0.0, 0.11609210748354272, 0.0, 2.563671882800...",4
78862,"(6.894710878025331e-07, 0.037993780630977615, ...",4
78863,"(0.00042678260334976794, 0.004221531181219735,...",4
78864,"(0.0004388483473863123, 0.037993780630977615, ...",4


## Now combining all steps in a Pre-process Pipeline

In [29]:
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler
from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np


# These are length cols so their values must be 0 or +ve
cols_non_negative = ['tcp_time_delta', 'tcp_len', 'mqtt_len', 'mqtt_proto_len', 'mqtt_willmsg_len', 'mqtt_willtopic_len']

# nominal_cols = ['tcp_flags', 'mqtt_conack_flags', 'mqtt_conflags', 'mqtt_hdrflags']
# The other 2 are not useful --> (for more than 99% of the entries, they are 0)
# mqtt_hdrflags --> 37% of the entries are not in hex format but we will 1 hot encode it
nominal_cols = ['tcp_flags', 'mqtt_hdrflags']

continous_cols = ['tcp_time_delta', 'tcp_len', 'mqtt_conack_flags_reserved', 'mqtt_conack_flags_sp',\
                'mqtt_conflag_cleansess', 'mqtt_conflag_passwd', 'mqtt_conflag_qos', 'mqtt_conflag_reserved', \
                'mqtt_conflag_retain', 'mqtt_conflag_uname', 'mqtt_conflag_willflag', 'mqtt_dupflag', \
                'mqtt_kalive', 'mqtt_len', 'mqtt_msg', 'mqtt_msgid', 'mqtt_msgtype', 'mqtt_proto_len', \
                'mqtt_qos', 'mqtt_retain', 'mqtt_sub_qos', 'mqtt_suback_qos', 'mqtt_willmsg', \
                'mqtt_willmsg_len', 'mqtt_willtopic', 'mqtt_willtopic_len', 'mqtt_ver', 'mqtt_conack_val']

def convert_string(s):
    return 0 if s == '0' else 1

def convert_target_to_label(target):
    if target == 'slowite':
        return 0
    elif target == 'bruteforce':
        return 1
    elif target == 'flood':
        return 2
    elif target == 'malformed':
        return 3
    elif target == 'dos':
        return 4
    elif target == 'legitimate':
        return 5
    else:
        return 6
    
class RenameDatasetCols(Transformer):
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        output_df = dataset
        all_cols = output_df.columns
        for column in all_cols:
            new_column = column.replace('.', '_')
            output_df = output_df.withColumnRenamed(column, new_column)
        # print(output_df.columns)
        return output_df

class OutcomeCreator(Transformer):
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        label_to_classes = udf(convert_target_to_label, IntegerType())
        output_df = dataset.withColumn('label', label_to_classes(dataset['target'])).drop('target')
        output_df = output_df.withColumn('label', output_df['label'].cast(DoubleType()))
        return output_df
    
class ConstraintChecker(Transformer):
    def __init__(self, columns_to_check = None):
        super().__init__()

    def _transform(self, dataset):
        output_df = dataset
        for col_name in cols_non_negative:
            output_df = output_df.filter(output_df[col_name] >= 0)
        return output_df
    
class ColumnDropper(Transformer):
    def __init__(self, columns_to_drop = None):
        super().__init__()
        self.columns_to_drop = columns_to_drop

    def _transform(self, dataset):
        output_df = dataset
        for col_name in self.columns_to_drop:
            if col_name in output_df.columns:
                output_df = output_df.drop(col_name)
        return output_df
    
def get_preprocess_pipeline():

    # Stage where we rename the columns
    stage_column_renamer = RenameDatasetCols()

    # Stage where we handle nominal values
    nominal_id_cols = [x+"_index" for x in nominal_cols]
    nominal_onehot_cols = [x+"_encoded" for x in nominal_cols]
    stage_nominal_indexer = StringIndexer(inputCols = nominal_cols, outputCols = nominal_id_cols )

    # Stage where the index columns are further transformed using OneHotEncoder
    stage_nominal_onehot_encoder = OneHotEncoder(inputCols=nominal_id_cols, outputCols=nominal_onehot_cols)

    # Stage where all relevant features are assembled into a vector (and dropping a few)
    feature_cols = continous_cols + nominal_onehot_cols

    # These cols have only 1 unique value
    cols_single_val = ['mqtt_conack_flags_reserved', 'mqtt_conack_flags_sp', 'mqtt_conflag_qos', \
                       'mqtt_conflag_reserved', 'mqtt_conflag_retain', 'mqtt_conflag_willflag', 'mqtt_sub_qos', \
                        'mqtt_suback_qos', 'mqtt_willmsg', 'mqtt_willmsg_len', 'mqtt_willtopic', 'mqtt_willtopic_len']
    
    # Highly correlaterd columns
    cols_high_corr = ['mqtt_conflag_cleansess', 'mqtt_proto_len', 'mqtt_conflag_passwd', 'mqtt_len']
    
    # Not useful columns
    cols_not_useful = ['mqtt_msg']

    # These are supposed to be in form 0x0000XX but > 99% are 0
    cols_hex_constraints_not_met = ['mqtt_conack_flags', 'mqtt_conflags']

    # > 99% of the entries are 0
    cols_more_than_99_zero = ['mqtt_conack_val', 'mqtt_conflag_uname', 'mqtt_kalive', 'mqtt_protoname', 'mqtt_retain', 'mqtt_ver']
    
    cols_to_remove = cols_single_val + cols_high_corr + cols_not_useful + cols_hex_constraints_not_met + nominal_cols\
                     + nominal_id_cols + cols_more_than_99_zero
    
    for col_name in cols_to_remove:
        if col_name in feature_cols:
            feature_cols.remove(col_name)

    print(feature_cols)
    # Stage where we assemble all the features into a vector

    stage_vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol='vectorized_features')

    # Stage where we scale the features
    stage_scaler = StandardScaler(inputCol= 'vectorized_features', outputCol= 'features')

    # Stage where we create the label column
    stage_label_creator = OutcomeCreator()

    # Removing all unnecessary columns, only keeping the 'features' and 'outcome' columns
    stage_column_dropper = ColumnDropper(columns_to_drop = continous_cols + ['vectorized_features'] + nominal_cols\
                                         + nominal_id_cols + nominal_onehot_cols + cols_to_remove)

    # Connecting the columns into a pipeline
    pipeline = Pipeline(stages = [stage_column_renamer, stage_nominal_indexer,\
                                   stage_nominal_onehot_encoder, stage_vector_assembler, stage_scaler,\
                                      stage_label_creator, stage_column_dropper])
    
    return pipeline    


### This PreProcess Pipeline will be used for preprocessing before feeding data into the model

In [30]:
mqtt_train_raw = spark.read.csv("gs://dataproc-staging-us-central1-159964990471-2n8oqiw8/train70_reduced.csv" ,header=True, inferSchema= True)
mqtt_test_raw = spark.read.csv("gs://dataproc-staging-us-central1-159964990471-2n8oqiw8/test30_reduced.csv", header=True, inferSchema=True)

preprocess_pipeline = get_preprocess_pipeline()
preprocess_pipeline_model = preprocess_pipeline.fit(mqtt_train_raw)

mqtt_train_df = preprocess_pipeline_model.transform(mqtt_train_raw)
mqtt_test_df = preprocess_pipeline_model.transform(mqtt_test_raw)

                                                                                

['tcp_time_delta', 'tcp_len', 'mqtt_dupflag', 'mqtt_msgid', 'mqtt_msgtype', 'mqtt_qos', 'tcp_flags_encoded', 'mqtt_hdrflags_encoded']


                                                                                