In [0]:
# Question 2
import urllib.request
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "/tmp/kddcup_data.gz")
dbutils.fs.mv("file:/tmp/kddcup_data.gz", "dbfs:/kdd/kddcup_data.gz")
display(dbutils.fs.ls("dbfs:/kdd"))

path,name,size
dbfs:/kdd/kddcup_data.gz,kddcup_data.gz,2144903


In [0]:
# Question 3
# Load data into RDD
path = "dbfs:/kdd/kddcup_data.gz"
rdd = sc.textFile(path)
# Print 10 values of the RDD
print(rdd.take(10))

In [0]:
# Verify type od data structure of data
type(rdd)

In [0]:
# Question 4
df = rdd.map(lambda x: x.split(",")).toDF()

In [0]:
# Show total number of features (columns)
print("Total number of features:", len(df.columns))

In [0]:
# Print results
df.show()

In [0]:
# Question 5
# Select first 6 features and label column
rdd2 = df.select('_1','_2','_3','_4','_5','_6', '_42').rdd

In [0]:
# Add header
df2 = rdd2.toDF(schema=['duration', 'protocol_type', 'service', 'flag', 'src_bytes', 'dst_bytes', 'label'])

In [0]:
# Change type from string to integer for columns duration, src_bytes, and dst_bytes
df2 = df2.withColumn("duration", df2.duration.cast('integer'))\
    .withColumn("src_bytes", df2.src_bytes.cast('integer'))\
    .withColumn("dst_bytes", df2.dst_bytes.cast('integer'))

In [0]:
# Print schema
df2.printSchema()

In [0]:
# Display 10 values
df2.show(10)

In [0]:
# Question 6
# Protocol_type
df_protocol = df2.groupBy('protocol_type').count().orderBy('count', ascending=True)
df_protocol.show()

In [0]:
# Bar graph for protocol_type
display(df_protocol)

protocol_type,count
udp,20354
tcp,190065
icmp,283602


Output can only be rendered in Databricks

In [0]:
# Service
df_service = df2.groupBy('service').count().orderBy('count', ascending=True)
df_service.show()

In [0]:
# Bar graph for service
display(df_service)

service,count
tftp_u,1
red_i,1
pm_dump,1
tim_i,7
X11,11
urh_i,14
IRC,43
Z39_50,92
netstat,95
ctf,97


Output can only be rendered in Databricks

In [0]:
# Question 7 - Further exploratory data analysis

# Relationship between average duration and connection types
from pyspark.sql import functions as F
df_duration = df.groupBy("_42").agg(F.mean('_1'), F.count('_1')).orderBy('avg(_1)', ascending=False)
df_duration.show(20)

In [0]:
# Plot scatter plot
display(df_duration)

# y-axis shows the number of occurance of each label activity, x-axis shows the avergae duration of each label activity

_42,avg(_1),count(_1)
portsweep.,1915.2990384615384,1040
warezclient.,615.2578431372549,1020
spy.,318.0,2
normal.,216.65732231336992,97278
multihop.,184.0,7
rootkit.,100.8,10
buffer_overflow.,91.7,30
perl.,41.333333333333336,3
loadmodule.,36.22222222222222,9
ftp_write.,32.375,8


Output can only be rendered in Databricks

In [0]:
# Relationship between average dst_host_same_src_port_rate and label types
df_dst_host_same_src_port_rate = df.groupBy("_42").agg(F.mean('_36'), F.count('_36')).orderBy('avg(_36)', ascending=False)
df_dst_host_same_src_port_rate.show(20)

In [0]:
# Plot bar graph
display(df_dst_host_same_src_port_rate)

# y-axis shows the average port rate each label experienced, x-axis shows the top 20 label

_42,avg(_36),count(_36)
smurf.,0.9996929733964888,280790
nmap.,0.9597835497835498,231
ipsweep.,0.93028067361668,1247
warezmaster.,0.9,20
portsweep.,0.8873269230769232,1040
ftp_write.,0.875,8
land.,0.87,21
loadmodule.,0.7355555555555555,9
multihop.,0.7142857142857143,7
buffer_overflow.,0.6806666666666668,30


Output can only be rendered in Databricks

In [0]:
# Relationship between average duration and service
df_service_time = df.groupBy("_3").agg(F.mean('_1'), F.count('_1')).orderBy('avg(_1)', ascending=False)
df_service_time.show(10)

In [0]:
# Plot bar graph
display(df_service_time)
# y-axis shows the average duration time, x-axis shows the top 20 services

_3,avg(_1),count(_1)
IRC,7343.093023255814,43
other,2815.952604670444,7237
telnet,890.2085769980507,513
ftp,423.1390977443609,798
supdup,387.4857142857143,105
csnet_ns,321.04761904761904,126
efs,299.36893203883494,103
uucp,286.9622641509434,106
printer,283.8073394495413,109
courier,283.50925925925924,108


Output can only be rendered in Databricks

In [0]:
# Question 8 - Preprocess data, stringindex categorical data, can also do onehot encoding but will not
from pyspark.ml.feature import StringIndexer
protocol_type_indexer = StringIndexer(inputCol="protocol_type",outputCol="protocol_type_index")
service_indexer = StringIndexer(inputCol="service",outputCol="service_index")
flag_indexer = StringIndexer(inputCol="flag",outputCol="flag_index")
protocol_type_indexed = protocol_type_indexer.fit(df2).transform(df2)
service_indexed = service_indexer.fit(df2).transform(protocol_type_indexed)
flag_indexed = flag_indexer.fit(df2).transform(service_indexed)

flag_indexed.show()

In [0]:
df_Q8 = flag_indexed.toPandas()

In [0]:
df_Q8 = df_Q8.drop('protocol_type', axis=1)
df_Q8 = df_Q8.drop('service', axis=1)
df_Q8 = df_Q8.drop('flag', axis=1)

In [0]:
df_Q8.loc[df_Q8["label"] != "normal.", "label"] = 0
df_Q8.loc[df_Q8["label"] == "normal.", "label"] = 1

In [0]:
df_Q8

Unnamed: 0,duration,src_bytes,dst_bytes,label,protocol_type_index,service_index,flag_index
0,0,181,5450,1,1.0,2.0,0.0
1,0,239,486,1,1.0,2.0,0.0
2,0,235,1337,1,1.0,2.0,0.0
3,0,219,1337,1,1.0,2.0,0.0
4,0,217,2032,1,1.0,2.0,0.0
...,...,...,...,...,...,...,...
494016,0,310,1881,1,1.0,2.0,0.0
494017,0,282,2286,1,1.0,2.0,0.0
494018,0,203,1200,1,1.0,2.0,0.0
494019,0,291,1200,1,1.0,2.0,0.0


In [0]:
target = df_Q8['label'].astype('float')

In [0]:
df_Q8 = df_Q8.drop('label', axis=1)

In [0]:
# Standardize data, can't have negative values in chi squared test
normalized_df_Q8=(df_Q8-df_Q8.min())/(df_Q8.max()-df_Q8.min())

In [0]:
normalized_df_Q8

Unnamed: 0,duration,src_bytes,dst_bytes,protocol_type_index,service_index,flag_index
0,0.0,2.610418e-07,0.001057,0.5,0.030769,0.0
1,0.0,3.446905e-07,0.000094,0.5,0.030769,0.0
2,0.0,3.389216e-07,0.000259,0.5,0.030769,0.0
3,0.0,3.158461e-07,0.000259,0.5,0.030769,0.0
4,0.0,3.129617e-07,0.000394,0.5,0.030769,0.0
...,...,...,...,...,...,...
494016,0.0,4.470881e-07,0.000365,0.5,0.030769,0.0
494017,0.0,4.067060e-07,0.000443,0.5,0.030769,0.0
494018,0.0,2.927706e-07,0.000233,0.5,0.030769,0.0
494019,0.0,4.196859e-07,0.000233,0.5,0.030769,0.0


In [0]:
### Split the data  ###
# Supposed to split the data first and then process the data, but will process data first to save time

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(normalized_df_Q8, target, test_size=0.3, random_state=0)

In [0]:
# Chi-squared method for feature selection, select the top 4 features

from sklearn import datasets
from sklearn.feature_selection import chi2
from sklearn.feature_selection import SelectKBest

test = SelectKBest(score_func=chi2, k=4)
fit = test.fit(X_train, y_train)
fit.scores_

# It seems like all features except for src_bytes is important

In [0]:
X_new_train=test.fit_transform(X_train, y_train)

In [0]:
#Import svm model
from sklearn import svm

#Create a svm Classifier
clf = svm.SVC(kernel='linear') # Linear Kernel

#Train the model using the training sets
clf.fit(X_train, y_train)

#Predict the response for test dataset
y_pred = clf.predict(X_test)

In [0]:
#Import scikit-learn metrics module for accuracy calculation
from sklearn import metrics

# Model Accuracy: how often is the classifier correct?
print("Accuracy:",metrics.accuracy_score(y_test, y_pred))

# Model Precision: what percentage of positive tuples are labeled as such?
print("Precision:",metrics.precision_score(y_test, y_pred))

# Model Recall: what percentage of positive tuples are labelled as such?
print("Recall:",metrics.recall_score(y_test, y_pred))