## **This is the jupyter notebook running our implementation of the ISA project.**

It is divided in two parts, first the preprocessing then the training of the classifier, and finally the classification of testing data.

The first part, the pre processing depends on a unix like system with the command line utilities wireshark and tshark installed. Further dependencies are the python packets pandas, jupyter, pyspark and findspark

The setup is describe in more detail in the Readme file which is also included in the next cell:

# ISA Project 2019 Group 2
These are the project results of Group 2.

## This Folder
The directory structure is as follows:
```
./data/
    |->.csv/
    |   |-> extracted csv files
    |
    |->.tshark_csv/
            |-> files extracted by tshark
    |
    |->labels/
    |   |-> label files from the public modbus dataset
    |
    |-> complete.ipynb
    |
    |-> backup_complete.py
    |
    |-> spark_dataframe/
    |    |-> this is a preprocessed version of the dataset. If wireshark is not available, this can be used.
    |     
    |-> testData.pq/
         |-> this is stored testdata to only test the classifier
    |-> logRegModel
``` 

Some of these directories might not be present before the project ran for the first time, they are created during the execution.

## Running the Classifier
The final result we consolidated in the complete.ipynb notebook consist of 4 major parts:
 1. data preprocessing
 2. loading the data into spark
 3. training the classifier
 4. testing the classifier
 
 Part 1 depends on a bash environment, any unix machine should work, however we could only test it on linux.
 This is the case because we used the experimental "info column" feature, which can not be generated without the wireshark command line utility "tshark". The preprocessing part can be skipped if a spark dataframe we already generated is used, the dataframe is included in this folder. This makes it possible to test the classifier on hosts without a bash environment or wireshark.
 
### Dependencies for Running the complete notebook
To run the complete project, please install wireshark. On Ubuntu, this can be done with `sudo apt install wireshark`, for other distributions or Mac OS other package managers are available.

To check if the installation of tshark was successfull, run the following command: `tshark --v`. The output should look similar to this:

```
viktor@yp /m/v/d/U/i/i/final> tshark --v
TShark (Wireshark) 2.6.5 (Git v2.6.5 packaged as 2.6.5-1~ubuntu16.04.0)

Copyright 1998-2018 Gerald Combs <gerald@wireshark.org> and contributors.
License GPLv2+: GNU GPL version 2 or later <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html>
This is free software; see the source for copying conditions. There is NO
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.

Compiled (64-bit) with libpcap, with POSIX capabilities (Linux), with libnl 3,
with GLib 2.48.2, with zlib 1.2.8, with SMI 0.4.8, with c-ares 1.10.0, with Lua
5.2.4, with GnuTLS 3.4.10, with Gcrypt 1.6.5, with MIT Kerberos, with MaxMind DB
resolver, with nghttp2 1.7.1, with LZ4, with Snappy, with libxml2 2.9.3.

Running on Linux 4.4.0-141-generic, with Intel(R) Core(TM) i7-4500U CPU @
1.80GHz (with SSE4.2), with 7895 MB of physical memory, with locale
LC_CTYPE=en_US.UTF-8, LC_NUMERIC=de_DE.UTF-8, LC_TIME=de_DE.UTF-8,
LC_COLLATE=en_US.UTF-8, LC_MONETARY=de_DE.UTF-8, LC_MESSAGES=en_US.UTF-8,
LC_PAPER=de_DE.UTF-8, LC_NAME=de_DE.UTF-8, LC_ADDRESS=de_DE.UTF-8,
LC_TELEPHONE=de_DE.UTF-8, LC_MEASUREMENT=de_DE.UTF-8,
LC_IDENTIFICATION=de_DE.UTF-8, with libpcap version 1.7.4, with GnuTLS 3.4.10,
with Gcrypt 1.6.5, with zlib 1.2.8, binary plugins supported (13 loaded).

Built using gcc 5.4.0 20160609.
```
### Dependencies for training and testing the classifier
For all other parts of the project some python packages are required. They can be installed with the following command on most systems: `pip3 install numpy pandas pyspark findspark jupyter`

Finally spark has to be installed.
For that please follow the instructions on this website: https://spark.apache.org/docs/latest/

In the installation process, the archived is unpacked to some folder on the filesystem where from then spark is located. The path to this top level directory is needed for our jupyter notebook to connect to spark and to access the spark api from python. If spark is not installed to `/usr/local/spark`, then this path in the first cell of the  notebook has to be adjusted.

In case that the notebook can't connect to spark have included backup scripts called `backup_complete.py` and `backup_complete.py` which can be run by the `./bin/spark-submit` binary in the spark folder.


### The complete Jupyter Notebook
Our programm is consolidated in a jupyter notebook. To open this, please enter `jupyter notebook  complete.ipynb` into a terminal while beeing in our project folder.

To run all the parts of the programm, select Cell -> Run All.
Cells can be run individually by placing the cursor in them and pressing CTRL + Enter.

### Only running the Classifier
The classification process is a part of the complete notebook. It loads the Testdata from the 'testData.pq' file in the project folder which is saved there by the Complete notebook.
It also loads a stored Logistic Regression Model saved there by the Complete Notebook and then predicts attacks on that test data and prints the performance results.
To only start the Classification Process, please scroll down in the Notebook to the big 'Classification' headline and run the remaining cell from there.

In [2]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

import numpy as np
import pandas as pd
pd.options.display.max_columns = None

import findspark
findspark.init("/usr/local/spark/")
from pyspark import SparkContext
from pyspark import SQLContext

from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()


spark = SQLContext(sc)

In [3]:
import os
#extracting the pcap files to csv
os.system("./wireshark_extraction.sh")
os.system("./tshark_extraction.sh")

0

0

In [4]:
#loading wireshark extracted CSVs
csv_path = './data/csv'
label_path = './data/labels'
tshark_path = './data/tshark_csv'
print("Looking up files in path", os.path.abspath(label_path))
label_files = os.listdir(label_path)
csv_files = os.listdir(csv_path)

l = []
for file in sorted(label_files):
    print("found:",file)
    if 'labeled' in file:
        csv_file = file.replace('_labeled', '.pcap_extracted')
        print("found:",csv_file)
        tshark_file = file.replace('_labeled', '.pcap_tshark')
        print("found:",tshark_file)
        t = os.path.join(label_path,file),os.path.join(csv_path,csv_file),os.path.join(tshark_path,tshark_file)
        l.append(t)

Looking up files in path /media/viktor/data/Uni/intellisec/ids/data/labels
found: CnC_uploading_exe_modbus_6RTU_with_operate_labeled.csv
found: CnC_uploading_exe_modbus_6RTU_with_operate.pcap_extracted.csv
found: CnC_uploading_exe_modbus_6RTU_with_operate.pcap_tshark.csv
found: Modbus_polling_only_6RTU_labeled.csv
found: Modbus_polling_only_6RTU.pcap_extracted.csv
found: Modbus_polling_only_6RTU.pcap_tshark.csv
found: characterization_modbus_6RTU_with_operate_labeled.csv
found: characterization_modbus_6RTU_with_operate.pcap_extracted.csv
found: characterization_modbus_6RTU_with_operate.pcap_tshark.csv
found: exploit_ms08_netapi_modbus_6RTU_with_operate_labeled.csv
found: exploit_ms08_netapi_modbus_6RTU_with_operate.pcap_extracted.csv
found: exploit_ms08_netapi_modbus_6RTU_with_operate.pcap_tshark.csv
found: moving_two_files_modbus_6RTU_labeled.csv
found: moving_two_files_modbus_6RTU.pcap_extracted.csv
found: moving_two_files_modbus_6RTU.pcap_tshark.csv
found: run11_labeled.csv
found: r

In [5]:
#these are helper functions for the data preprocessing
def merge(traffic,tshark,labeled):
    attacks = labeled.iloc[:,1]
    merged = pd.concat([extracted,tshark,attacks], axis=1)
    return merged

def add(all_extracted,merged):
    added = pd.concat([all_extracted,merged],axis=0)
    return added

#this function generates the sport and dport features.
#they described source and destination port regardlerss whether udp or tcp was used.
#the tcp and udp features are set if the corresponding transport protocol was used
def extract_ports(df):
    tshark = df
    import math

    for i in tshark.index:
        if  ((not math.isnan(tshark.iloc[i,:]["tcp.srcport"])) or (not math.isnan(tshark.iloc[i,:]["tcp.dstport"]))):
            #print("tcp")
            tshark["sport"] = int(tshark.iloc[i,:]["tcp.srcport"])
            tshark["dport"] = int(tshark.iloc[i,:]["tcp.dstport"])
            tshark["tcp"] = 1
            tshark["udp"] = 0

            pass
        elif ((not math.isnan(tshark.iloc[i,:]["udp.srcport"])) or (not math.isnan(tshark.iloc[i,:]["udp.dstport"]))):
            tshark["sport"] = int(tshark.iloc[i,:]["udp.srcport"])
            tshark["dport"] = int(tshark.iloc[i,:]["udp.dstport"])
            tshark["udp"] = 1
            tshark["tcp"] = 0
        
        else:
            tshark["sport"] = 0
            tshark["dport"] = 0
            tshark["udp"] = 0
            tshark["tcp"] = 0

In [6]:
#now we load all csv files, to the port processing on the tshark file and merge them.
#the merged files are appended together to one big dataframe containing all the features.


#this takes a while, depending on the harddrive speed and CPU of the computer where it is executed.

all_extracted = pd.DataFrame()

for p in l:
    print("reading" + p[0])
    extracted = pd.read_csv(p[1], sep=';;',engine="python")
    
    tshark_extracted = pd.read_csv(p[2], sep=',')
    extract_ports(tshark_extracted)
    tshark_extracted = tshark_extracted[["sport","dport","udp","tcp","eth.src","eth.dst"]]
    print(tshark_extracted.shape)
    
    labeled = pd.read_csv(p[0], sep=';',names=["nr","attack"])
    print(extracted.shape)
    merged = merge(extracted,tshark_extracted,labeled)
    
    print(labeled.shape)
    
    
    if all_extracted.empty:
        all_extracted = merged
    else:
         all_extracted = add(all_extracted,merged)
    print("new size of complete dataframe:",all_extracted.shape)
all_extracted

reading./data/labels/CnC_uploading_exe_modbus_6RTU_with_operate_labeled.csv
(1426, 6)
(1426, 5)
(1426, 2)
new size of complete dataframe: (1426, 12)
reading./data/labels/Modbus_polling_only_6RTU_labeled.csv
(58325, 6)
(58325, 5)
(58325, 2)
new size of complete dataframe: (59751, 12)
reading./data/labels/characterization_modbus_6RTU_with_operate_labeled.csv
(12296, 6)
(12296, 5)
(12296, 2)
new size of complete dataframe: (72047, 12)
reading./data/labels/exploit_ms08_netapi_modbus_6RTU_with_operate_labeled.csv
(1856, 6)
(1856, 5)
(1856, 2)
new size of complete dataframe: (73903, 12)
reading./data/labels/moving_two_files_modbus_6RTU_labeled.csv
(3319, 6)
(3319, 5)
(3319, 2)
new size of complete dataframe: (77222, 12)
reading./data/labels/run11_labeled.csv
(72498, 6)
(72498, 5)
(72498, 2)
new size of complete dataframe: (149720, 12)
reading./data/labels/run1_12rtu_labeled.csv
(238360, 6)
(238360, 5)
(238360, 2)
new size of complete dataframe: (388080, 12)
reading./data/labels/run1_3rtu_2s_

Unnamed: 0,Source,Destination,Protocol,Length,Info,sport,dport,udp,tcp,eth.src,eth.dst,attack
0,192.168.1.100,192.168.1.101,TCP,62,1355 → 502 [SYN] Seq=0 Win=64240 Len=0 MSS=146...,1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:f9:a8:75,0
1,192.168.1.101,192.168.1.100,TCP,62,"502 → 1355 [SYN, ACK] Seq=0 Ack=1 Win=64240 Le...",1500,502,0,1,00:0c:29:f9:a8:75,00:0c:29:ee:b7:84,0
2,192.168.1.100,192.168.1.101,TCP,54,1355 → 502 [ACK] Seq=1 Ack=1 Win=64240 Len=0,1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:f9:a8:75,0
3,192.168.1.100,192.168.1.101,Modbus/TCP,66,"Query: Trans: 2260; Unit: 1, Func: 3: ...",1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:f9:a8:75,0
4,192.168.1.100,192.168.1.106,TCP,62,1356 → 502 [SYN] Seq=0 Win=64240 Len=0 MSS=146...,1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:58:97:2a,0
5,192.168.1.106,192.168.1.100,TCP,62,"502 → 1356 [SYN, ACK] Seq=0 Ack=1 Win=64240 Le...",1500,502,0,1,00:0c:29:58:97:2a,00:0c:29:ee:b7:84,0
6,192.168.1.101,192.168.1.100,Modbus/TCP,71,"Response: Trans: 2260; Unit: 1, Func: 3: ...",1500,502,0,1,00:0c:29:f9:a8:75,00:0c:29:ee:b7:84,0
7,192.168.1.100,192.168.1.106,TCP,54,1356 → 502 [ACK] Seq=1 Ack=1 Win=64240 Len=0,1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:58:97:2a,0
8,192.168.1.100,192.168.1.106,Modbus/TCP,66,"Query: Trans: 2288; Unit: 1, Func: 3: ...",1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:58:97:2a,0
9,192.168.1.100,192.168.1.104,TCP,62,1357 → 502 [SYN] Seq=0 Win=64240 Len=0 MSS=146...,1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:3c:11:3f,0


In [None]:
try:
    all_extracted.to_csv("./data/preprocessed.csv")
except:
    pass

In [8]:
all_extracted = pd.read_csv("./data/preprocessed.csv")


In [9]:
all_extracted

Unnamed: 0.1,Unnamed: 0,Source,Destination,Protocol,Length,Info,sport,dport,udp,tcp,eth.src,eth.dst,attack
0,0,192.168.1.100,192.168.1.101,TCP,62,1355 → 502 [SYN] Seq=0 Win=64240 Len=0 MSS=146...,1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:f9:a8:75,0
1,1,192.168.1.101,192.168.1.100,TCP,62,"502 → 1355 [SYN, ACK] Seq=0 Ack=1 Win=64240 Le...",1500,502,0,1,00:0c:29:f9:a8:75,00:0c:29:ee:b7:84,0
2,2,192.168.1.100,192.168.1.101,TCP,54,1355 → 502 [ACK] Seq=1 Ack=1 Win=64240 Len=0,1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:f9:a8:75,0
3,3,192.168.1.100,192.168.1.101,Modbus/TCP,66,"Query: Trans: 2260; Unit: 1, Func: 3: ...",1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:f9:a8:75,0
4,4,192.168.1.100,192.168.1.106,TCP,62,1356 → 502 [SYN] Seq=0 Win=64240 Len=0 MSS=146...,1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:58:97:2a,0
5,5,192.168.1.106,192.168.1.100,TCP,62,"502 → 1356 [SYN, ACK] Seq=0 Ack=1 Win=64240 Le...",1500,502,0,1,00:0c:29:58:97:2a,00:0c:29:ee:b7:84,0
6,6,192.168.1.101,192.168.1.100,Modbus/TCP,71,"Response: Trans: 2260; Unit: 1, Func: 3: ...",1500,502,0,1,00:0c:29:f9:a8:75,00:0c:29:ee:b7:84,0
7,7,192.168.1.100,192.168.1.106,TCP,54,1356 → 502 [ACK] Seq=1 Ack=1 Win=64240 Len=0,1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:58:97:2a,0
8,8,192.168.1.100,192.168.1.106,Modbus/TCP,66,"Query: Trans: 2288; Unit: 1, Func: 3: ...",1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:58:97:2a,0
9,9,192.168.1.100,192.168.1.104,TCP,62,1357 → 502 [SYN] Seq=0 Win=64240 Len=0 MSS=146...,1500,502,0,1,00:0c:29:ee:b7:84,00:0c:29:3c:11:3f,0


In [10]:
#rename the wrongly labeled attack column (pandas par)
df = all_extracted
df.rename({"nr":"attack"}, axis="columns", inplace=True)
df.rename({"eth.src":"eth_src"}, axis="columns", inplace=True)
df.rename({"eth.dst":"eth_dst"}, axis="columns", inplace=True)
#make shure we have only the columns we want
cols = ['Source', 'Destination', 'Protocol', 'Length', 'Info',  "sport","dport", "tcp","udp","eth_src","eth_dst",'attack']
df = df[cols]

#print the dataframe in the current form
df

Unnamed: 0,Source,Destination,Protocol,Length,Info,sport,dport,tcp,udp,eth_src,eth_dst,attack
0,192.168.1.100,192.168.1.101,TCP,62,1355 → 502 [SYN] Seq=0 Win=64240 Len=0 MSS=146...,1500,502,1,0,00:0c:29:ee:b7:84,00:0c:29:f9:a8:75,0
1,192.168.1.101,192.168.1.100,TCP,62,"502 → 1355 [SYN, ACK] Seq=0 Ack=1 Win=64240 Le...",1500,502,1,0,00:0c:29:f9:a8:75,00:0c:29:ee:b7:84,0
2,192.168.1.100,192.168.1.101,TCP,54,1355 → 502 [ACK] Seq=1 Ack=1 Win=64240 Len=0,1500,502,1,0,00:0c:29:ee:b7:84,00:0c:29:f9:a8:75,0
3,192.168.1.100,192.168.1.101,Modbus/TCP,66,"Query: Trans: 2260; Unit: 1, Func: 3: ...",1500,502,1,0,00:0c:29:ee:b7:84,00:0c:29:f9:a8:75,0
4,192.168.1.100,192.168.1.106,TCP,62,1356 → 502 [SYN] Seq=0 Win=64240 Len=0 MSS=146...,1500,502,1,0,00:0c:29:ee:b7:84,00:0c:29:58:97:2a,0
5,192.168.1.106,192.168.1.100,TCP,62,"502 → 1356 [SYN, ACK] Seq=0 Ack=1 Win=64240 Le...",1500,502,1,0,00:0c:29:58:97:2a,00:0c:29:ee:b7:84,0
6,192.168.1.101,192.168.1.100,Modbus/TCP,71,"Response: Trans: 2260; Unit: 1, Func: 3: ...",1500,502,1,0,00:0c:29:f9:a8:75,00:0c:29:ee:b7:84,0
7,192.168.1.100,192.168.1.106,TCP,54,1356 → 502 [ACK] Seq=1 Ack=1 Win=64240 Len=0,1500,502,1,0,00:0c:29:ee:b7:84,00:0c:29:58:97:2a,0
8,192.168.1.100,192.168.1.106,Modbus/TCP,66,"Query: Trans: 2288; Unit: 1, Func: 3: ...",1500,502,1,0,00:0c:29:ee:b7:84,00:0c:29:58:97:2a,0
9,192.168.1.100,192.168.1.104,TCP,62,1357 → 502 [SYN] Seq=0 Win=64240 Len=0 MSS=146...,1500,502,1,0,00:0c:29:ee:b7:84,00:0c:29:3c:11:3f,0


In [13]:
#now we generate the spark dataframe and save it to disk. This can take a while
data = spark.createDataFrame(df)
#The below command will fail if the spark dataframe was already written earlier. 
#Then you can just ontinue with the next cell!
try:
    data.write.parquet("spark_dataframe.pq")
except:
    print("spark dataframe not writte, it probably already exists?")
    pass

spark dataframe not writte, it probably already exists?


In [14]:
data = spark.read.parquet('spark_dataframe.pq')

In [15]:
#import the necessary pakages for the data processing
from pyspark.ml.feature import RegexTokenizer,StringIndexer, OneHotEncoder, VectorAssembler,StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

In [16]:
############INFO COLUMN #########
# regular expression tokenizer
regexTokenizer = [RegexTokenizer(inputCol="Info", outputCol="words", pattern="\\W")]
# stop words
add_stopwords = ["->"] 
stopwordsRemover = [StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)]
# bag of words count
countVectors = [CountVectorizer(inputCol="filtered", outputCol="info_features", vocabSize=10000, minDF=5)]

In [17]:
##########Categorical Columns################

categorical_columns = ["Source", "Destination", "Protocol", "eth_dst", "eth_src","sport","dport",]
#categorical_columns.remove('Info')
print("Categorical columns:", categorical_columns)

####Build StringIndexe stages


stringindexer_stages = [StringIndexer(inputCol=c, outputCol='stringindexed_' + c) for c in categorical_columns]
# encode label column and add it to stringindexer stages
stringindexer_stages += [StringIndexer(inputCol='attack', outputCol='label')]

print("stringindexer_stages:",stringindexer_stages)


#####Build OneHotEncoder stages


onehotencoder_stages = [OneHotEncoder(inputCol='stringindexed_' + c, outputCol='onehot_'+c) for c in categorical_columns]

print("onehotencoder_stages", onehotencoder_stages)

#####Build VectorAssembler stage

feature_columns = ['onehot_' + c for c in categorical_columns]
feature_columns += ["info_features", "Length", "tcp", "udp"]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features')

print("vectorassembler_stage", vectorassembler_stage)

Categorical columns: ['Source', 'Destination', 'Protocol', 'eth_dst', 'eth_src', 'sport', 'dport']
stringindexer_stages: [StringIndexer_40568534f45f21069993, StringIndexer_4eca9ed8d33e802a440d, StringIndexer_48c4a98202f5a57e576c, StringIndexer_42219625823d3b889d81, StringIndexer_44bfabdc6f6579e22283, StringIndexer_4d379732a23ac57568b1, StringIndexer_4811bdf9fdb9feb61060, StringIndexer_403282391c76dc3c98f8]
onehotencoder_stages [OneHotEncoder_400db2af3f6de88f06fd, OneHotEncoder_444e9d96a61a18d430b9, OneHotEncoder_4a6f960ca872ca0a254d, OneHotEncoder_4a4eab3dc1b01d226a5c, OneHotEncoder_41a2babda15adcde7f78, OneHotEncoder_4c32882dd951009816ac, OneHotEncoder_43b89fb70fb032fdb457]
vectorassembler_stage VectorAssembler_4263a4233ca7ffd7fc7a


In [18]:
#####Build pipeline model

all_stages = regexTokenizer + stopwordsRemover + countVectors + stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]

print(all_stages)

pipeline = Pipeline(stages=all_stages)


#######Fit pipeline model

pipeline_model = pipeline.fit(data)


#######Transform data
print("Pipeline assembled, now starting transformation")
final_columns = ['features', 'label']
cuse_df = pipeline_model.transform(data).select(final_columns)
print("data preprocessing is finished")
cuse_df.show(5)

[RegexTokenizer_4371bcc23e6a4ea3c821, StopWordsRemover_4fbebefc36928665d43c, CountVectorizer_4aa4882638feffbc0d4b, StringIndexer_40568534f45f21069993, StringIndexer_4eca9ed8d33e802a440d, StringIndexer_48c4a98202f5a57e576c, StringIndexer_42219625823d3b889d81, StringIndexer_44bfabdc6f6579e22283, StringIndexer_4d379732a23ac57568b1, StringIndexer_4811bdf9fdb9feb61060, StringIndexer_403282391c76dc3c98f8, OneHotEncoder_400db2af3f6de88f06fd, OneHotEncoder_444e9d96a61a18d430b9, OneHotEncoder_4a6f960ca872ca0a254d, OneHotEncoder_4a4eab3dc1b01d226a5c, OneHotEncoder_41a2babda15adcde7f78, OneHotEncoder_4c32882dd951009816ac, OneHotEncoder_43b89fb70fb032fdb457, VectorAssembler_4263a4233ca7ffd7fc7a]
Pipeline assembled, now starting transformation
data preprocessing is finished
+--------------------+-----+
|            features|label|
+--------------------+-----+
|(10219,[8,64,119,...|  0.0|
|(10219,[0,72,119,...|  0.0|
|(10219,[0,72,119,...|  0.0|
|(10219,[8,64,119,...|  0.0|
|(10219,[0,74,119,...|  0

In [19]:
###############Train & Test ###################################


from pyspark.sql.functions import lit


print("length of dataset:",cuse_df.count())
                                                           

(trainingData, testData) = cuse_df.randomSplit([0.7, 0.3], seed = 100)



print("length of training data:",trainingData.count())
print("length of test data:",testData.count())


from pyspark.sql.functions import desc

attacks = trainingData.filter(cuse_df.label == 1)
no_attacks = trainingData.filter(cuse_df.label == 0)
training_attacks = attacks.count()
print("how many attacks in training dataset? ",training_attacks)
#these are fixed lengths, only fitting to the dataset of length 902754!
(taken, not_taken) = no_attacks.randomSplit([(training_attacks/637997),1-(training_attacks/637997)], seed = 100)
print("how many of the non attack packets do we use for the traiing dataset?",str(taken.count()))


trainingData = attacks.union(taken)

testattacks = testData.filter("label == 1").count()
testratio = testattacks/testData.count()
print("length of test data set: ",str(testData.count()))
print("test ratio:",testratio)

trainattacks = trainingData.filter("label == 1").count()
print(trainattacks)
trainratio = trainattacks/trainingData.count()
print("length of training data set: ",trainingData.count())
print("train ratio:",trainratio)


length of dataset: 912054
length of training data: 637997
length of test data: 274057
how many attacks in training dataset?  1003
how many of the non attack packets do we use for the traiing dataset? 962
length of test data set:  274057
test ratio: 0.0014668481374312643
1003
length of training data set:  1965
train ratio: 0.5104325699745547


In [21]:
try:
    testData.write.parquet("testData.pq")
except:
    print("testData already saved!")
    pass

testData already saved!


# Classification starts here
## If you only want to try out the classification, run the remaining cells from here

In [22]:
#start or get spark context so notebook can be started from here
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

import numpy as np
import pandas as pd
pd.options.display.max_columns = None

import findspark
findspark.init("/usr/local/spark/")
from pyspark import SparkContext
from pyspark import SQLContext

from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()


spark = SQLContext(sc)

In [23]:
testData = spark.read.parquet('testData.pq')
    

In [24]:
#train it!
lr = LogisticRegression(labelCol="label", maxIter=20)

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.3, 0.5]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()
    
    
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
lrModel = crossval.fit(trainingData)

In [25]:
#this code block extracts the best threshold value for a maximum recall value
trainingSummary = lrModel.bestModel.summary
recall = trainingSummary.recallByThreshold
maxRecall = recall.groupBy().max('recall').select('max(recall)').head()
bestThreshold = recall.where(recall['recall'] == maxRecall['max(recall)']) \
    .select('threshold').head()['threshold']
print("setting the optimal threshold for maximum recall: ",bestThreshold)
_ = lr.setThreshold(bestThreshold)


setting the optimal threshold for maximum recall:  0.5214534489620104


In [26]:
#this model is now saved so it can be quickly acessed in future runs
try:
    lrModel.save("LogRegModel") 
except:
    print("model was already saved")

model was already saved


In [27]:
from pyspark.ml.tuning import CrossValidatorModel
CrossValidatorModel.read().load("LogRegModel")
#now the model ist loaded

CrossValidatorModel_49ad884bb52690933b7f

In [28]:
#Now we use our Model and predict the test data with it
pred = lrModel.transform(testData)

In [29]:
TP = pred.filter("label == 1 and prediction == 1").count()
TN = pred.filter("label == 0 and prediction == 0").count()
FP = pred.filter("label == 0 and prediction == 1").count()
FN = pred.filter("label == 1 and prediction == 0").count()


print("we have some results!\n")
print("True Positive", TP)

print("True Negative", TN)

print("False Positive", FP)

print("False Negative", FN)

print("\nrecall",TP/(TP+FN))
print("precision:",(TP/(TP+FP)))


we have some results!

True Positive 399
True Negative 273439
False Positive 216
False Negative 3

recall 0.9925373134328358
precision: 0.6487804878048781


In [30]:
import os
try:
    os.system('spd-say -i -10 "dear human, the classification is finished"')
except:
    pass

0