In [47]:
import pyspark
import findspark
findspark.init()

from pyspark.sql import SparkSession

In [48]:
spark=SparkSession.builder.getOrCreate()

In [49]:
print(spark.version)

3.0.0-preview2


In [50]:
##Preprocessing

In [51]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,DoubleType

schema = StructType([StructField("duration",IntegerType()),StructField("protocal_type",StringType()),
                     StructField("service",StringType()),StructField("flat",StringType()),StructField("src_bytes",IntegerType()),
    StructField("dst_bytes",IntegerType()),StructField("land",IntegerType()),StructField("wrong_fragment",IntegerType()),
    StructField("urgent",IntegerType()),StructField("hot",IntegerType()),StructField("num_failed_logins",IntegerType()),
    StructField("logged_in",IntegerType()),StructField("num_compromised",IntegerType()),StructField("root_shell",IntegerType()),
    StructField("su_attempted",IntegerType()),StructField("num_root",IntegerType()),
    StructField("num_file_creations",IntegerType()),StructField("num_shells",IntegerType()),
    StructField("num_access_files",IntegerType()),StructField("num_outbound_cmds",IntegerType()),
    StructField("is_host_login",IntegerType()),StructField("is_guest_login",IntegerType()),StructField("count",IntegerType()),
    StructField("srv_count",IntegerType()),StructField("serror_rate",DoubleType()),
    StructField("srv_serror_rate",DoubleType()),StructField("rerror_rate",DoubleType()),
    StructField("srv_rerror_rate",DoubleType()),StructField("same_srv_rate",DoubleType()),
    StructField("diff_srv_rate",DoubleType()),StructField("srv_diff_host_rate",DoubleType()),
    StructField("dst_host_count",IntegerType()),StructField("dst_host_srv_count",IntegerType()),
    StructField("dst_host_same_srv_rate",DoubleType()),StructField("dst_host_diff_srv_rate",DoubleType()),
    StructField("dst_host_same_src_port_rate",DoubleType()),
    StructField("dst_host_srv_diff_host_rate",DoubleType()),StructField("dst_host_serror_rate",DoubleType()),
    StructField("dst_host_srv_serror_rate",DoubleType()),
    StructField("dst_host_rerror_rate",DoubleType()),StructField("dst_host_srv_rerror_rate",DoubleType()),StructField("normal",StringType())
])

In [52]:

kdd=spark.read.csv("kddcup.data_10_percent",header=False,schema=schema)
kdd.printSchema()

root
 |-- duration: integer (nullable = true)
 |-- protocal_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- flat: string (nullable = true)
 |-- src_bytes: integer (nullable = true)
 |-- dst_bytes: integer (nullable = true)
 |-- land: integer (nullable = true)
 |-- wrong_fragment: integer (nullable = true)
 |-- urgent: integer (nullable = true)
 |-- hot: integer (nullable = true)
 |-- num_failed_logins: integer (nullable = true)
 |-- logged_in: integer (nullable = true)
 |-- num_compromised: integer (nullable = true)
 |-- root_shell: integer (nullable = true)
 |-- su_attempted: integer (nullable = true)
 |-- num_root: integer (nullable = true)
 |-- num_file_creations: integer (nullable = true)
 |-- num_shells: integer (nullable = true)
 |-- num_access_files: integer (nullable = true)
 |-- num_outbound_cmds: integer (nullable = true)
 |-- is_host_login: integer (nullable = true)
 |-- is_guest_login: integer (nullable = true)
 |-- count: integer (nullable = true

In [53]:
# Get number of records
print("The data contain %d records." % kdd.count())

The data contain 494021 records.


In [54]:
#terminate spark
#spark.stop()

In [55]:
# View the first five records
kdd.show(5)

+--------+-------------+-------+----+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------+
|duration|protocal_type|service|flat|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serror_rate|rerror_rate|srv_rerror_rate|same_srv_rate|diff_srv_rate|srv_diff_host_rate|dst_hos

In [56]:
#check column data types
kdd.dtypes

[('duration', 'int'),
 ('protocal_type', 'string'),
 ('service', 'string'),
 ('flat', 'string'),
 ('src_bytes', 'int'),
 ('dst_bytes', 'int'),
 ('land', 'int'),
 ('wrong_fragment', 'int'),
 ('urgent', 'int'),
 ('hot', 'int'),
 ('num_failed_logins', 'int'),
 ('logged_in', 'int'),
 ('num_compromised', 'int'),
 ('root_shell', 'int'),
 ('su_attempted', 'int'),
 ('num_root', 'int'),
 ('num_file_creations', 'int'),
 ('num_shells', 'int'),
 ('num_access_files', 'int'),
 ('num_outbound_cmds', 'int'),
 ('is_host_login', 'int'),
 ('is_guest_login', 'int'),
 ('count', 'int'),
 ('srv_count', 'int'),
 ('serror_rate', 'double'),
 ('srv_serror_rate', 'double'),
 ('rerror_rate', 'double'),
 ('srv_rerror_rate', 'double'),
 ('same_srv_rate', 'double'),
 ('diff_srv_rate', 'double'),
 ('srv_diff_host_rate', 'double'),
 ('dst_host_count', 'int'),
 ('dst_host_srv_count', 'int'),
 ('dst_host_same_srv_rate', 'double'),
 ('dst_host_diff_srv_rate', 'double'),
 ('dst_host_same_src_port_rate', 'double'),
 ('dst_h

In [57]:
# Remove records with missing values in any column and get the number of remaining rows
kdd_none_missing = kdd.dropna()
print(kdd.count())
#It is equal to the previous value, which means that there isn'n any missing value

494021


In [58]:
# transform the categorical data into indexed numerical values.
from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='protocal_type', outputCol='protocal_type_idx')

# Indexer identifies categories in the data
indexer_model = indexer.fit(kdd)

# Indexer creates a new column with numeric index values
kdd_indexed = indexer_model.transform(kdd)

# Repeat the process for the other categorical feature
kdd_indexed = StringIndexer(inputCol='service', outputCol='service_idx').fit(kdd_indexed).transform(kdd_indexed)
kdd_indexed = StringIndexer(inputCol='flat', outputCol='flat_idx').fit(kdd_indexed).transform(kdd_indexed)


In [59]:
# Import the one hot encoder class
from pyspark.ml.feature import OneHotEncoder

# Create an instance of the one hot encoder
onehot = OneHotEncoder(inputCols=['protocal_type_idx'], outputCols=['protocal_type_dummy'])

# Apply the one hot encoder to the flights data
onehot = onehot.fit(kdd_indexed)
kdd_onehot = onehot.transform(kdd_indexed)

# Check the results
kdd_onehot.select('protocal_type', 'protocal_type_idx', 'protocal_type_dummy').distinct().sort('protocal_type_idx').show()

+-------------+-----------------+-------------------+
|protocal_type|protocal_type_idx|protocal_type_dummy|
+-------------+-----------------+-------------------+
|         icmp|              0.0|      (2,[0],[1.0])|
|          tcp|              1.0|      (2,[1],[1.0])|
|          udp|              2.0|          (2,[],[])|
+-------------+-----------------+-------------------+



In [60]:
# Repeat the process for the other categorical feature
kdd_onehot=OneHotEncoder(inputCols=['service_idx'],outputCols=['service_dummy']).fit(kdd_onehot).transform(kdd_onehot)
kdd_onehot=OneHotEncoder(inputCols=['flat_idx'],outputCols=['flat_dummy']).fit(kdd_onehot).transform(kdd_onehot)

In [61]:
kdd_onehot.show(5)

+--------+-------------+-------+----+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------+-----------------+-----------+--------+-------------------+--------------+--------------+
|duration|protocal_type|service|flat|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv_serro

In [62]:
# Create 'label' column indicating whether kdd are normal (1) or not (0)
kdd = kdd_onehot.withColumn('label', (kdd["normal"]!="normal.").cast('int'))
kdd.show(5)
kdd.count

+--------+-------------+-------+----+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+-------+-----------------+-----------+--------+-------------------+--------------+--------------+-----+
|duration|protocal_type|service|flat|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv_count|serror_rate|srv

<bound method DataFrame.count of DataFrame[duration: int, protocal_type: string, service: string, flat: string, src_bytes: int, dst_bytes: int, land: int, wrong_fragment: int, urgent: int, hot: int, num_failed_logins: int, logged_in: int, num_compromised: int, root_shell: int, su_attempted: int, num_root: int, num_file_creations: int, num_shells: int, num_access_files: int, num_outbound_cmds: int, is_host_login: int, is_guest_login: int, count: int, srv_count: int, serror_rate: double, srv_serror_rate: double, rerror_rate: double, srv_rerror_rate: double, same_srv_rate: double, diff_srv_rate: double, srv_diff_host_rate: double, dst_host_count: int, dst_host_srv_count: int, dst_host_same_srv_rate: double, dst_host_diff_srv_rate: double, dst_host_same_src_port_rate: double, dst_host_srv_diff_host_rate: double, dst_host_serror_rate: double, dst_host_srv_serror_rate: double, dst_host_rerror_rate: double, dst_host_srv_rerror_rate: double, normal: string, protocal_type_idx: double, service_i

In [63]:
#Finally,consolidate all of the predictor columns into a single column
# Import the necessary class
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=[
    "duration","src_bytes",
    "dst_bytes","land","wrong_fragment","urgent","hot","num_failed_logins",
    "logged_in","num_compromised","root_shell","su_attempted","num_root",
    "num_file_creations","num_shells","num_access_files","num_outbound_cmds",
    "is_host_login","is_guest_login","count","srv_count","serror_rate",
    "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
    "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
    "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
    "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
    "dst_host_rerror_rate","dst_host_srv_rerror_rate","protocal_type_dummy",
    "service_dummy","flat_dummy"
], outputCol='features')

# Consolidate predictor columns
kdd_assembled = assembler.transform(kdd)

# Check the resulting column
kdd_assembled.select('features', 'normal').show(5, truncate=False)

+----------------------------------------------------------------------------------------------------------+-------+
|features                                                                                                  |normal |
+----------------------------------------------------------------------------------------------------------+-------+
|(115,[1,2,8,19,20,25,28,29,30,32,39,42,105],[181.0,5450.0,1.0,8.0,8.0,1.0,9.0,9.0,1.0,0.11,1.0,1.0,1.0])  |normal.|
|(115,[1,2,8,19,20,25,28,29,30,32,39,42,105],[239.0,486.0,1.0,8.0,8.0,1.0,19.0,19.0,1.0,0.05,1.0,1.0,1.0]) |normal.|
|(115,[1,2,8,19,20,25,28,29,30,32,39,42,105],[235.0,1337.0,1.0,8.0,8.0,1.0,29.0,29.0,1.0,0.03,1.0,1.0,1.0])|normal.|
|(115,[1,2,8,19,20,25,28,29,30,32,39,42,105],[219.0,1337.0,1.0,6.0,6.0,1.0,39.0,39.0,1.0,0.03,1.0,1.0,1.0])|normal.|
|(115,[1,2,8,19,20,25,28,29,30,32,39,42,105],[217.0,2032.0,1.0,6.0,6.0,1.0,49.0,49.0,1.0,0.02,1.0,1.0,1.0])|normal.|
+---------------------------------------------------------------

In [64]:
##Machine Learning

In [65]:
# Split into training and testing sets in a 80:20 ratio
kdd_train, kdd_test = kdd_assembled.randomSplit([0.8, 0.2], seed=17)

# Check that training set has around 80% of records
training_ratio = kdd_train.count() / kdd.count()
print(training_ratio)

0.7998728798978181


In [66]:
kdd_train.show(5)

+--------+-------------+-------+----+---------+---------+----+--------------+------+---+-----------------+---------+---------------+----------+------------+--------+------------------+----------+----------------+-----------------+-------------+--------------+-----+---------+-----------+---------------+-----------+---------------+-------------+-------------+------------------+--------------+------------------+----------------------+----------------------+---------------------------+---------------------------+--------------------+------------------------+--------------------+------------------------+--------+-----------------+-----------+--------+-------------------+--------------+--------------+-----+--------------------+
|duration|protocal_type|service|flat|src_bytes|dst_bytes|land|wrong_fragment|urgent|hot|num_failed_logins|logged_in|num_compromised|root_shell|su_attempted|num_root|num_file_creations|num_shells|num_access_files|num_outbound_cmds|is_host_login|is_guest_login|count|srv

In [67]:
##Decision Tree

In [68]:
%%time

# Import the Decision Tree Classifier class
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import numpy as np

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier(labelCol="label")

#grid search
params_tree = ParamGridBuilder()
params_tree=params_tree.addGrid(tree.maxDepth, [int(x) for x in np.linspace(start = 10, stop = 30, num = 5)]) \
    .addGrid(tree.maxBins, [20, 60]) \
    .build()

#the evaluator of the model
evaluator_tree = BinaryClassificationEvaluator()

#act cross-validation
cv_tree = CrossValidator(estimator=tree,
estimatorParamMaps=params_tree,
evaluator=evaluator_tree,
numFolds=2, seed=13)

tree_model = cv_tree.fit(kdd_train)


CPU times: user 859 ms, sys: 255 ms, total: 1.11 s
Wall time: 1min 20s


In [69]:
# Create predictions for the testing data and take a look at the predictions
prediction_tree = tree_model.transform(kdd_test)
prediction_tree.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+-----------+
|label|prediction|probability|
+-----+----------+-----------+
|1    |1.0       |[0.0,1.0]  |
|1    |1.0       |[0.0,1.0]  |
|1    |1.0       |[0.0,1.0]  |
|1    |1.0       |[0.0,1.0]  |
|1    |1.0       |[0.0,1.0]  |
+-----+----------+-----------+
only showing top 5 rows



In [70]:
# Create a confusion matrix
prediction_tree.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN_tree = prediction_tree.filter('prediction = 0 AND label = prediction').count()
TP_tree = prediction_tree.filter('prediction = 1 AND label = prediction').count()
FN_tree = prediction_tree.filter('prediction = 0 AND label != prediction').count()
FP_tree = prediction_tree.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy_tree = (TN_tree + TP_tree) / (TN_tree + TP_tree + FN_tree + FP_tree)
print(accuracy_tree)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   16|
|    0|       0.0|19480|
|    1|       1.0|79351|
|    0|       1.0|   20|
+-----+----------+-----+

0.9996358744576047


In [71]:
##Random Forest

In [74]:
%%time

# Import the Random Forest Classifier class
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create a classifier object and fit to the training data
rf= RandomForestClassifier(labelCol="label")

#grid search
params_rf = ParamGridBuilder()
params_rf=params_rf.addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]) \
    .addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \
    .build()

#the evaluator of the model
evaluator = BinaryClassificationEvaluator()

#act cross-validation
cv_rf = CrossValidator(estimator=rf,
estimatorParamMaps=params_rf,
evaluator=evaluator,
numFolds=2, seed=13)

rf_model = cv_rf.fit(kdd_train)

CPU times: user 723 ms, sys: 193 ms, total: 916 ms
Wall time: 3min 33s


In [75]:
# Create predictions for the testing data and take a look at the predictions
prediction_rf = rf_model.transform(kdd_test)
prediction_rf.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+----------------------------------------+
|label|prediction|probability                             |
+-----+----------+----------------------------------------+
|1    |1.0       |[0.07092756229685554,0.9290724377031445]|
|1    |1.0       |[0.07092756229685554,0.9290724377031445]|
|1    |1.0       |[0.0749361991687001,0.9250638008313]    |
|1    |1.0       |[0.0749361991687001,0.9250638008313]    |
|1    |1.0       |[0.0749361991687001,0.9250638008313]    |
+-----+----------+----------------------------------------+
only showing top 5 rows



In [76]:
# Create a confusion matrix
prediction_rf.groupBy('label', 'prediction').count().show()

# Calculate the elements of the confusion matrix
TN_rf = prediction_rf.filter('prediction = 0 AND label = prediction').count()
TP_rf = prediction_rf.filter('prediction = 1 AND label = prediction').count()
FN_rf = prediction_rf.filter('prediction = 0 AND label != prediction').count()
FP_rf = prediction_rf.filter('prediction = 1 AND label != prediction').count()

# Accuracy measures the proportion of correct predictions
accuracy_rf = (TN_rf + TP_rf) / (TN_rf + TP_rf + FN_rf + FP_rf)
print(accuracy_rf)

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   30|
|    0|       0.0|19491|
|    1|       1.0|79337|
|    0|       1.0|    9|
+-----+----------+-----+

0.9996055306624051


In [77]:
##The Gridient Boost Machine

In [78]:
%%time
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create model objects and train on training data
gbt = GBTClassifier().fit(kdd_train)

# Compare AUC on testing data
evaluator = BinaryClassificationEvaluator()

print(evaluator.evaluate(gbt.transform(kdd_test)))


0.9998923966655392
CPU times: user 52.1 ms, sys: 14.4 ms, total: 66.5 ms
Wall time: 29.9 s


In [79]:
# Find the number of trees and the relative importance of features
print(gbt.getNumTrees)
print(gbt.featureImportances)

20
(115,[0,1,2,4,6,7,8,9,10,12,13,14,15,18,19,20,21,22,23,25,26,27,28,29,30,31,32,33,34,35,36,37,38,40,41,42,44,45,46,47,48,50,55,58,59,70,76,80,84,86,92,105,107,108,109,111],[0.002751875861547349,0.01803385728168464,0.12426711483718648,0.0031067198057758718,0.023847395195590818,2.1585230926436794e-05,0.00022816013503532993,0.0008050491695247735,2.197413695550031e-05,5.1137258162508456e-05,0.00013534051606924933,8.187292409324189e-05,0.0001296414641145253,6.73487613265647e-05,0.674066595550602,0.0002673845850830423,0.01963966256095271,0.0036529728073646153,3.128024553271156e-05,1.720175548165228e-05,9.742339852631042e-06,2.2910847310336145e-10,0.010999001981195501,9.006503014234089e-05,0.001004938274371096,0.0009132711533957076,0.012712234658846782,0.030762772210460212,0.0036018164909978256,0.0026928987607885207,0.0006570754318193563,3.344875664247773e-05,0.00040443223429824615,0.003339633527250274,0.007287180497742161,0.020813929321595644,0.002251469529303596,0.00029516710205600263,0.