## Dataset Install

🚀 Dataset Install:
<ol> 
  <li> AWS and Sudo installed in the runtime </li>
  <li> Using Sudo, AWS commands are activated </li>
  <li> CIC IDS 2018 installed from AWS bucket </li>
</ol> 

In [1]:
import time

In [2]:
#Package to hide cell outputs. Cells to be hidden are decorated with %%capture
!pip install -q gwpy

[K     |████████████████████████████████| 1.4 MB 5.1 MB/s 
[K     |████████████████████████████████| 11.2 MB 33.6 MB/s 
[K     |████████████████████████████████| 51 kB 5.6 MB/s 
[K     |████████████████████████████████| 55 kB 2.8 MB/s 
[K     |████████████████████████████████| 3.6 MB 57.3 MB/s 
[?25h  Building wheel for ligo-segments (setup.py) ... [?25l[?25hdone
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
albumentations 0.1.12 requires imgaug<0.2.7,>=0.2.5, but you have imgaug 0.2.9 which is incompatible.[0m


In [3]:
%%capture
!apt-get install sudo -y

In [4]:
%%capture
!curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
!unzip awscliv2.zip

In [5]:
!sudo ./aws/install

You can now run: /usr/local/bin/aws --version


In [6]:
%%capture
!mkdir cicids2018
!aws s3 cp "s3://cse-cic-ids2018/Processed Traffic Data for ML Algorithms/" ./cicids2018  --recursive --no-sign-request

In [7]:
!ls cicids2018

Friday-02-03-2018_TrafficForML_CICFlowMeter.csv
Friday-16-02-2018_TrafficForML_CICFlowMeter.csv
Friday-23-02-2018_TrafficForML_CICFlowMeter.csv
Thuesday-20-02-2018_TrafficForML_CICFlowMeter.csv
Thursday-01-03-2018_TrafficForML_CICFlowMeter.csv
Thursday-15-02-2018_TrafficForML_CICFlowMeter.csv
Thursday-22-02-2018_TrafficForML_CICFlowMeter.csv
Wednesday-14-02-2018_TrafficForML_CICFlowMeter.csv
Wednesday-21-02-2018_TrafficForML_CICFlowMeter.csv
Wednesday-28-02-2018_TrafficForML_CICFlowMeter.csv


In [8]:
#@title
import pandas as pd 
import numpy as np
import plotly.express as px 

# df = pd.read_csv("cicids2018/Wednesday-14-02-2018_TrafficForML_CICFlowMeter.csv")
# value_counts = df['Label'].value_counts().reset_index()
# fig = px.pie(value_counts, values='Label', names='index', color='index',
#              title='Label Counts in the Dataset',
#              color_discrete_map={'Benign':'lightcyan',
#                                  'FTP-BruteForce':'cyan',
#                                  'SSH-BruteForce':'royalblue'},
#              width = 500,
#              height = 500)
# # fig.show()

# df['Flow Byts/s'].fillna(df['Flow Byts/s'].median(),inplace = True)
# df.replace([np.inf, -np.inf], np.nan, inplace=True)
# df['Flow Pkts/s'].fillna(400000, inplace = True)
# df['Label'] = df['Label'].map({
#     'Benign' : 0,
#     'FTP-BruteForce'  :1,
#     'SSH-BruteForce' : 2
# })
# df['Label'].fillna(2,inplace = True)

# px.imshow(df.corr(),title = "Correlation between features ", width = 500,height = 500)


## Pyspark Setup

🚀 Pyspark Setup:
<ol> 
  <li> Java and spark installed </li>
  <li> Env Variables for the same are set </li>
  <li> Using findspark package initialise spark session </li>

In [9]:
%%capture
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://mirrors.estointernet.in/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [10]:
import os 
os.environ["JAVA_HOME"] =  "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [11]:
!pip install -q findspark
import findspark
findspark.init()

In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local[*]")\
        .appName("ISAA_CIC_IDS_2018")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()
print("Spark Session Created")
spark

Spark Session Created


In [13]:
import numpy as np

from pyspark.sql.functions import col, lit, udf, when,count,isnan
from pyspark.sql.types import DoubleType

from pyspark.sql.functions import max as sparkMax
from pyspark.sql.functions import min as sparkMin
from pyspark.sql.functions import first, col

from pyspark.ml.feature import Imputer

## Dataset Preprocessing

🚀 Dataset preprocessing:
<ol>
  <li> Dataset read and checked for Null and Infinity Values</li>
  <li> Infinity Values replaced with the max value in the column </li>
  <li> Null Values in columns handled using Imputer using median of column</li>
  <li> Categorical Column which includes the labels then encoded to numbers </li>
</ol>


In [14]:
spark_df = spark.read.csv("cicids2018/Wednesday-14-02-2018_TrafficForML_CICFlowMeter.csv",header  =True, inferSchema = True)
spark_df.show(2)

+--------+--------+-------------------+-------------+------------+------------+---------------+---------------+---------------+---------------+----------------+---------------+---------------+---------------+----------------+---------------+-----------+------------+-------------+--------------+------------+------------+-----------+------------+--------------+-----------+-----------+-----------+------------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+--------------+--------------+------------+----------+-----------+-----------+------------+-----------+-----------+------------+------------+------------+------------+------------+------------+--------------+------------+-------------+------------+----------------+----------------+--------------+--------------+----------------+--------------+--------------+----------------+----------------+----------------+----------------+----------------+-----------------+-----------------+-----------------+

In [15]:
spark_df.printSchema()

root
 |-- Dst Port: integer (nullable = true)
 |-- Protocol: integer (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- Flow Duration: long (nullable = true)
 |-- Tot Fwd Pkts: integer (nullable = true)
 |-- Tot Bwd Pkts: integer (nullable = true)
 |-- TotLen Fwd Pkts: integer (nullable = true)
 |-- TotLen Bwd Pkts: integer (nullable = true)
 |-- Fwd Pkt Len Max: integer (nullable = true)
 |-- Fwd Pkt Len Min: integer (nullable = true)
 |-- Fwd Pkt Len Mean: double (nullable = true)
 |-- Fwd Pkt Len Std: double (nullable = true)
 |-- Bwd Pkt Len Max: integer (nullable = true)
 |-- Bwd Pkt Len Min: integer (nullable = true)
 |-- Bwd Pkt Len Mean: double (nullable = true)
 |-- Bwd Pkt Len Std: double (nullable = true)
 |-- Flow Byts/s: double (nullable = true)
 |-- Flow Pkts/s: double (nullable = true)
 |-- Flow IAT Mean: double (nullable = true)
 |-- Flow IAT Std: double (nullable = true)
 |-- Flow IAT Max: long (nullable = true)
 |-- Flow IAT Min: long (nullable = true)
 |

In [16]:
#Checking for Null Values
spark_df.select([count(when(isnan(c), c)).alias(c) for c in spark_df.columns]).show()

+--------+--------+---------+-------------+------------+------------+---------------+---------------+---------------+---------------+----------------+---------------+---------------+---------------+----------------+---------------+-----------+-----------+-------------+------------+------------+------------+-----------+------------+-----------+-----------+-----------+-----------+------------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+--------------+--------------+----------+----------+-----------+-----------+------------+-----------+-----------+------------+------------+------------+------------+------------+------------+--------------+------------+-------------+------------+----------------+----------------+--------------+--------------+----------------+--------------+--------------+----------------+----------------+----------------+----------------+----------------+-----------------+-----------------+-----------------+----------------+-

In [17]:
seq_of_columns = spark_df.columns

# Using List comprehensions to create a list of columns of String DataType
string_columns = [i[0] for i in spark_df.dtypes if i[1]=='string']

# Using Set function to get non-string columns by subtracting one list from another.
non_string_columns = list(set(seq_of_columns) - set(string_columns))

In [18]:
print("Checking for the presence of infinite values")
df = spark_df.select(*[sparkMax(col(c)).alias(c) for c in non_string_columns],*[first(col(c),ignorenulls = True).alias(c) for c in string_columns])
df = df[[seq_of_columns]]
df.show()
print("Flow Pkts/s contains infinity")

Checking for the presence of infinite values
+--------+--------+-------------------+-------------+------------+------------+---------------+---------------+---------------+---------------+----------------+----------------+---------------+---------------+----------------+---------------+-----------+-----------+-------------+-------------------+------------+------------+-----------+------------+-------------------+------------+-----------+-----------+------------+----------------+-----------+-----------+-------------+-------------+-------------+-------------+--------------+--------------+----------+----------+-----------+-----------+---------------+-------------+-----------------+------------+------------+------------+------------+------------+------------+--------------+------------+-------------+--------------+----------------+----------------+--------------+--------------+----------------+--------------+--------------+----------------+----------------+----------------+----------------

In [19]:
print("Checking for the presence of negative infinite values")
df = spark_df.select(*[sparkMin(col(c)).alias(c) for c in non_string_columns],*[first(col(c),ignorenulls = True).alias(c) for c in string_columns])
df = df[[seq_of_columns]]
df.show() 
print("No column contains negative infinity")

Checking for the presence of negative infinite values
+--------+--------+-------------------+-------------+------------+------------+---------------+---------------+---------------+---------------+----------------+---------------+---------------+---------------+----------------+---------------+-----------+-------------+-------------+------------+-------------+-------------+-------------+------------+-----------+-------------+-------------+-----------+------------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+--------------+--------------+----------+----------+-----------+-----------+------------+-----------+-----------+------------+------------+------------+------------+------------+------------+--------------+------------+-------------+------------+----------------+----------------+--------------+--------------+----------------+--------------+--------------+----------------+----------------+----------------+----------------+---------------

In [20]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [21]:
replace_infs_udf = udf(
    lambda x, v: float(v) if x and np.isinf(x) else x, DoubleType()
)
temp = spark_df.withColumn("Flow", replace_infs_udf(col("Flow Pkts/s"), lit(-100)))

In [22]:
des = temp.agg({"Flow":"max"}).collect()[0]
des

Row(max(Flow)=4000000.0)

In [23]:
spark_df = spark_df.withColumn("Flow Pkts/s", replace_infs_udf(col("Flow Pkts/s"), lit(4000000.0)))
spark_df.agg({"Flow Pkts/s":"max"}).collect()[0]
print("Infinite Values replaced")

Infinite Values replaced


In [24]:
columns_to_drop = ['Timestamp']
spark_df = spark_df.drop(*columns_to_drop)

In [25]:
imputer = Imputer(inputCol="Flow Byts/s", outputCol="Flow Byts/s",strategy = "median")
imputer_mod = imputer.fit(spark_df)
spark_df = imputer_mod.transform(spark_df)

## Modelling

🚀 Modelling:
<ol>
<li> All independent features compiled using vector assembler </li> 
<li> The data is scaled in the range of 0 and 1 </li>
<li> Data split into train-test in 80/20 ratio</li>
<li> Random Forest, Decision Tree and Naive Bayes classification models trained </li>
<li> Test predictions evaluated using Accuracy </li>
</ol>

In [26]:
from pyspark.ml.feature import VectorAssembler 
feature_assembler = VectorAssembler(inputCols = spark_df.columns[:-1],
                                    outputCol = "independent_features")
training = feature_assembler.transform(spark_df)
training.show()

+--------+--------+-------------+------------+------------+---------------+---------------+---------------+---------------+----------------+---------------+---------------+---------------+----------------+---------------+---------------+-------------+----------------+----------------+------------+------------+-----------+----------------+----------------+-----------+-----------+-----------+----------------+----------------+-----------+-----------+-------------+-------------+-------------+-------------+--------------+--------------+-------------+------------+-----------+-----------+--------------+--------------+----------------+------------+------------+------------+------------+------------+------------+--------------+------------+-------------+--------------+----------------+----------------+--------------+--------------+----------------+--------------+--------------+----------------+----------------+----------------+----------------+----------------+-----------------+----------------

In [27]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler

final_data = training.select("independent_features","Label")

scaler = MinMaxScaler(inputCol="independent_features", outputCol="scaled_features")
final_data = scaler.fit(final_data).transform(final_data)

label_encoder = StringIndexer(inputCol = 'Label',outputCol = 'encoded_label')
final_data = label_encoder.fit(final_data).transform(final_data)

In [28]:
final_data.show()

+--------------------+------+--------------------+-------------+
|independent_features| Label|     scaled_features|encoded_label|
+--------------------+------+--------------------+-------------+
|(78,[2,3,16,17,18...|Benign|(78,[2,3,15,16,17...|          0.0|
|(78,[2,3,16,17,18...|Benign|(78,[2,3,15,16,17...|          0.0|
|(78,[2,3,16,17,18...|Benign|(78,[2,3,15,16,17...|          0.0|
|(78,[0,1,2,3,4,5,...|Benign|[3.35708726900950...|          0.0|
|(78,[0,1,2,3,4,5,...|Benign|[3.35708726900950...|          0.0|
|(78,[0,1,2,3,4,5,...|Benign|[3.35708726900950...|          0.0|
|(78,[2,3,16,17,18...|Benign|(78,[2,3,15,16,17...|          0.0|
|(78,[2,3,16,17,18...|Benign|(78,[2,3,15,16,17...|          0.0|
|(78,[0,1,2,3,4,5,...|Benign|[0.00122075900691...|          0.0|
|(78,[0,1,2,3,4,5,...|Benign|[0.00122075900691...|          0.0|
|(78,[0,1,2,3,4,5,...|Benign|[0.00122075900691...|          0.0|
|(78,[0,1,2,3,4,5,...|Benign|[0.00122075900691...|          0.0|
|(78,[0,1,2,3,4,5,...|Ben

In [29]:
train_data, valid_data = final_data.randomSplit([0.8,0.2])

#### MultiLayer Perceptron

In [30]:
layers = [len(feature_assembler.getInputCols()), 4, 2, 3]

In [31]:
%%time
from pyspark.ml.classification import MultilayerPerceptronClassifier

classifier = MultilayerPerceptronClassifier(labelCol='encoded_label',
                                            featuresCol='scaled_features',
                                            maxIter=100,
                                            layers=layers,
                                            blockSize=128,
                                            seed=1234)
classifier = classifier.fit(train_data)
mlp_predictions = classifier.transform(valid_data)

CPU times: user 3.29 s, sys: 391 ms, total: 3.68 s
Wall time: 9min 26s


#### Random Forest

In [32]:
%%time

from pyspark.ml.classification import RandomForestClassifier,DecisionTreeClassifier,MultilayerPerceptronClassifier
train_data, valid_data = final_data.randomSplit([0.8,0.2])

rf = RandomForestClassifier(featuresCol = 'independent_features', 
                            labelCol = 'encoded_label' )

model = rf.fit(train_data)
rf_predictions = model.transform(valid_data)

CPU times: user 899 ms, sys: 106 ms, total: 1.01 s
Wall time: 2min 39s


#### Decision Tree Algorithm

In [33]:
%%time
gbt = DecisionTreeClassifier(labelCol="encoded_label", 
                    featuresCol="independent_features")
gb_ = gbt.fit(train_data)
gb_predictions = gb_.transform(valid_data)

CPU times: user 731 ms, sys: 93.7 ms, total: 825 ms
Wall time: 2min 11s


 #### Naive Bayes 

In [34]:
%%time
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1.0, modelType="multinomial",labelCol="encoded_label", 
                    featuresCol="scaled_features")

nb_model = nb.fit(train_data)
nb_predictions = nb_model.transform(valid_data)

CPU times: user 305 ms, sys: 34.5 ms, total: 339 ms
Wall time: 45 s


#### Evaluation

In [35]:
NAMES = [
        'RF','GBT', 'NB','MLP'
]
DATA =[
      rf_predictions,gb_predictions,nb_predictions,mlp_predictions
]

METRICS = [
           'f1',
           'precisionByLabel',
           'recallByLabel',
           'accuracy'
]


In [36]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
def evaluate(data,metric_name):
  evaluator = MulticlassClassificationEvaluator(labelCol = "encoded_label",
                                              predictionCol = "prediction")
  evaluator.setMetricName(metric_name)
  print(f"################ {metric_name} #################")
  for i in range(len(data)):
    print(f"{NAMES[i]} : {evaluator.evaluate(data[i])}")


In [37]:
for i in METRICS:
  evaluate(DATA, i)

################ f1 #################
RF : 0.9999141550458501
GBT : 0.9999618494145136
NB : 0.8936871068237494
MLP : 0.9962710266951234
################ precisionByLabel #################
RF : 0.9999249699879952
GBT : 1.0
NB : 0.8751777270773402
MLP : 1.0
################ recallByLabel #################
RF : 0.9999849930968245
GBT : 0.9999849930968245
NB : 0.9930067831202353
MLP : 0.9941716111707035
################ accuracy #################
RF : 0.9999141610434202
GBT : 0.9999618493526312
NB : 0.9055151529602518
MLP : 0.996263075651214
