In [0]:
import urllib.request
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz",
                           "/tmp/kddcup_data.gz")
dbutils.fs.mv("file:/tmp/kddcup_data.gz", "dbfs:/kdd/kddcup_data.gz")
display(dbutils.fs.ls("dbfs:/kdd"))

path,name,size
dbfs:/kdd/kddcup_data.gz,kddcup_data.gz,2144903


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Intrusion_detection").getOrCreate()

dataRDD = spark.sparkContext.textFile("dbfs:/kdd/kddcup_data.gz", 4)

dataRDD.take(10)

In [0]:
schema = spark.sparkContext.textFile("/FileStore/tables/schema-1.txt", 4)
schema = schema.map(lambda x: x.split(':')[0]).collect()

dataRDD = dataRDD.map(lambda x: x.split(","))

df = dataRDD.toDF(schema)

num_features = len(df.columns)
print("The total number of features present in the dataset: ", num_features)

In [0]:
# Extract the columns
new_df = df.select(col("duration"), col("protocol_type"), col("service"),\
                   col("src_bytes"), col("dst_bytes"), col("flag"), col("label"))

#Build new RDD
new_rdd = df.rdd.map(list)

#print schema 
new_df.printSchema()

In [0]:
#display
new_df.show(10)

In [0]:
# Number of connections based on protocol type
new_df.groupBy('protocol_type').count().sort('count').show()

In [0]:
display(new_df.groupBy('protocol_type').count().sort('count'))

protocol_type,count
udp,20354
tcp,190065
icmp,283602


In [0]:
# Number of connections based on service
new_df.groupBy('service').count().sort('count').show()

In [0]:
display(new_df.groupBy('service').count().sort('count'))

service,count
tftp_u,1
red_i,1
pm_dump,1
tim_i,7
X11,11
urh_i,14
IRC,43
Z39_50,92
netstat,95
ctf,97


In [0]:
display(df.groupBy('flag').count().sort('count'))

flag,count
OTH,8
S3,10
RSTOS0,11
S2,24
S1,57
SH,107
RSTO,579
RSTR,903
REJ,26875
S0,87007


In [0]:
display(new_df.groupBy('label').count().sort('count'))

label,count
spy.,2
perl.,3
phf.,4
multihop.,7
ftp_write.,8
loadmodule.,9
rootkit.,10
imap.,12
warezmaster.,20
land.,21


In [0]:
display(df.groupBy('urgent').count().sort('count'))

urgent,count
3,1
2,1
1,2
0,494017


In [0]:
from pyspark.ml.feature import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
# Data Preprocessing
dataframe = new_df.withColumn('label', \
                              when(df["label"]== 'normal.', 'normal')\
                              .otherwise('attack'))

inputs = ["protocol_type", "service", "flag", "label"]
for column in inputs:
  indexer = StringIndexer(inputCol=column, outputCol=column+"_index") 
  dataframe = indexer.fit(dataframe).transform(dataframe)
  dataframe = dataframe.drop(column)
  
vecAssembler = VectorAssembler()\
                .setInputCols(["protocol_type_index", "service_index", "flag_index"])\
                .setOutputCol("features")
dataframe = vecAssembler.transform(dataframe)

In [0]:
# Logistic Regression
from pyspark.ml.classification import LogisticRegression

(trainset, testset) = dataframe.randomSplit([0.8, 0.2])

lr = LogisticRegression(maxIter=10, featuresCol='features', labelCol='label_index')

model = lr.fit(trainset)
predictions = model.transform(testset)

In [0]:
# Model Co-efficients
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

In [0]:
# Model Summary
trainingSummary = model.summary
accuracy = trainingSummary.accuracy
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
print("Accuracy: %s\nTPR: %s\nF-measure: %s\nPrecision: %s"
      % (accuracy, truePositiveRate, fMeasure, precision))

In [0]:
# Evaluation on Test set
evaluator = MulticlassClassificationEvaluator(
    labelCol="label_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g " % (accuracy))