In [0]:
#all spark imports
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib.request 

In [0]:
#instantiate the spark session
spark = SparkSession.builder.appName("Assignment three").getOrCreate()

2 Use python urllib library to extract the KDD Cup 99 data from their web repository, store it in a temporary location and then move it to the Databricks filesystem which can enable easy access to this data for analysis. Use the following commands in Databricks to get your data.

In [0]:
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "/tmp/kddcup_data.gz")
dbutils.fs.mv("file:/tmp/kddcup_data.gz", "dbfs:/kdd/kddcup_data.gz")
display(dbutils.fs.ls("dbfs:/kdd"))

path,name,size
dbfs:/kdd/kddcup_data.gz,kddcup_data.gz,2144903


3 After storing the data into the Databricks filesystem. Load your data from the disk into Spark's RDD. Print 10 values of your RDD and verify the type of data structure of your data (RDD)

In [0]:
rdd=spark.sparkContext.textFile("dbfs:/kdd/kddcup_data.gz")

rdd.take(10)



In [0]:

type(rdd)



5 Split the data. (Each entry in your RDD is a comma-separated line of data, which you first need to split before you can parse and build your dataframe.) Show the total number of features (columns) and print results

In [0]:
mappedRDD=rdd.map(lambda x: x.split(","))
rdd2=mappedRDD.map(lambda x:x[0:6])
rdd3=mappedRDD.map(lambda x:x[41])

newRDD=rdd2.union(rdd3)
newRDD.take(10)


6 Now extract these 6 columns (duration, protocol_type, service, src_bytes, dst_bytes, flag and label) from your dataset. Build a new RDD and dataframe. Print schema and display 10 values.

In [0]:
df1 = rdd2.toDF()
df2 = df1.selectExpr("_1 as duration", "_2 as protocol_type", "_3 as Service", "_4 as Flag" , "_5 as src_bytes", "_6 as dst_bytes")
df3=rdd3.map(lambda x: (x, )).toDF()
df4=df3.selectExpr("_1 as Label")
df5=df2.join(df4)

df5.display()

duration,protocol_type,Service,Flag,src_bytes,dst_bytes,Label
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.


6 Get the total number of connections based on the protocol_type and based on the service. Show result in an ascending order. Plot the bar graph for both.

In [0]:
df_protcol= df5.groupBy("protocol_type")\
               .count()\
               .sort(col("count").asc())

df_service=df5.groupBy("Service")\
              .count()\
              .sort(col("count").asc())

display(df_protcol)
display(df_service)

protocol_type,count
udp,10055303434
tcp,93896101365
icmp,140105343642


Service,count
tftp_u,494021
red_i,494021
pm_dump,494021
tim_i,3458147
X11,5434231
urh_i,6916294
IRC,21242903
Z39_50,45449932
netstat,46931995
ctf,47920037


7 Do a further exploratory data analysis, including other columns of this dataset and plot graphs. Plot at least 3 different charts and explain them.

In [0]:
df_protcol.display()

protocol_type,count
udp,10055303434
tcp,93896101365
icmp,140105343642


In [0]:

df_flag= df5.groupBy("Flag")\
         .count()

df_flag.display()

Flag,count
RSTOS0,5434231
S3,4940210
SF,186957307240
S0,42983285147
OTH,3952168
REJ,13276814375
RSTO,286038159
RSTR,446100963
SH,52860247
S2,11856504


In [0]:
from pyspark.sql.types import IntegerType
data_df = df5.withColumn("src_bytes", df5["src_bytes"].cast(IntegerType()))
             
data_df.display()




duration,protocol_type,Service,Flag,src_bytes,dst_bytes,Label
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.
0,tcp,http,SF,181,5450,normal.


8 Look at the label column where label == ‘normal’. Now create a new label column where you have a label == ‘normal’ and everything else is considered as an ‘attack’. Split your data (train/test) and based on your new label column now build a simple machine learning model for intrusion detection (you can use few selected columns for your model out of all). Explain which algorithm you have selected and why? Show the results with some success metrics.

In [0]:



df5.withColumn("New_Label", when((df5.Label == "normal."), lit("Normal")) \
               .otherwise(lit("Attack")) \
               ).show()








In [0]:

from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ['protocol_type', 'service', 'Flag', 'label']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'New_Label', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['duration', 'src_bytes', 'dst_bytes']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]



In [0]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df5)
df5 = pipelineModel.transform(df5)
selectedCols = ['label', 'features'] + cols
df5 = df5.select(selectedCols)
train, test = df5.randomSplit([0.7, 0.3], seed = 2018)

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

predictions = lrModel.transform(test)
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))



I selected logistic regression since it is simple binary classification algorithm.