# ΕΡΓΑΣΙΑ ΓΙΑ ΤΟ ΜΑΘΗΜΑ KNOWLEDGE REPRESENTATION AND BIG DATA (ΜΕΡΟΣ B’)
## Παππάς Παντελεήμων mscaidl-0030@uniwa.gr

This notebook aims to demonstrate use of Spark in order to deal with big data in a DDoS analysis scenario and actually using the ML libraries of Spark.

It is set up to run in an Anaconda enviroment as the installation is fairly painless and straight forward.

If you wish to run this notebook I recommend Anaconda paired with VSCode, as VSCode will resolve dependencies automatically.
I also recommend using a clean conda environment for Spark installation. 

I'll include two helpful links just in case: 
* https://medium.com/@divya.chandana/easy-install-pyspark-in-anaconda-e2d427b3492f
* https://www.youtube.com/watch?v=KCo59BnKxkk

If there are any problems, please contact me. Otherwise you should be able to see my saved results in the notebook.

## Imports

In [394]:
import pyspark
import sys
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

from pyspark.ml import Pipeline
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.sql.functions import countDistinct
from pyspark.mllib.evaluation import MulticlassMetrics

from pyspark.sql.types import FloatType

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [312]:
# Helper function

import pyspark.sql.functions as F

def groupby_apply_describe(df, groupby_col, stat_col):
    """From a grouby df object provide the stats
    of describe for each key in the groupby object.

    Parameters
    ----------
    df : spark dataframe groupby object
    col : column to compute statistics on
    
    """
    output = df.groupby(groupby_col).agg(
        F.count(stat_col).alias("count"),
        F.mean(stat_col).alias("mean"),
        F.stddev(stat_col).alias("std"),
        F.min(stat_col).alias("min"),
        F.max(stat_col).alias("max"),
    )
    print(output.orderBy(groupby_col).show())
    return output

## Creating Spark Session and Spark context

In [126]:
# Create SparkSession and sparkcontext
from pyspark.sql import SparkSession
spark = SparkSession.builder\
                    .master("local")\
                    .appName('Pantelis Pappas Spark Project')\
                    .getOrCreate()
sc=spark.sparkContext


In [127]:
#Suppressing log warnings
sc.setLogLevel("ERROR")

## Kill all spark session and spark context (if needed)

In [124]:
# Stopping Spark-Session and Spark context
sc.stop()
spark.stop()

## Simple word counting program (Υλοποίηση του Μέρους Α σε Spark)

In [5]:
#Reading the text file
input_file = sc.textFile("hadoopPPtext.txt")
#Mapping
map = input_file.flatMap(lambda line: line.split(" ")).map(lambda word:(word, 1))
#Reducing
counts = map.reduceByKey(lambda a, b: a + b)
# counts.saveAsTextFile("/home/dante/SparkProject/")

In [6]:
#Print the output
output = counts.collect()
for (word, count) in output:
    print("%s: %i" % (word, count))

====: 5
Transaction: 3
Inputs: 1
: 86
((("transactions",: 1
"outputs: 2
and: 21
inputs",: 4
"input: 4
components")))((("outputs: 1
components")))((("unspent: 1
transaction: 24
outputs: 2
(UTXO)")))((("UTXO: 1
sets")))Transaction: 1
inputs: 11
identify: 3
(by: 1
reference): 1
which: 8
UTXO: 26
will: 6
be: 7
consumed: 2
provide: 1
proof: 1
of: 18
ownership: 2
through: 1
an: 8
unlocking: 10
script.: 2
To: 3
build: 3
a: 18
transaction,: 4
wallet: 6
selects: 1
from: 9
the: 73
it: 7
controls,: 1
with: 7
enough: 1
value: 6
to: 29
make: 3
requested: 1
payment.: 1
Sometimes: 1
one: 7
is: 22
enough,: 1
other: 2
times: 1
more: 1
than: 2
needed.: 1
For: 2
each: 1
that: 20
this: 7
payment,: 1
creates: 1
input: 12
pointing: 1
unlocks: 1
Let's: 1
look: 1
at: 2
components: 1
in: 36
greater: 1
detail.: 1
The: 14
first: 5
part: 3
pointer: 1
by: 6
reference: 3
hash: 1
output: 4
index,: 1
identifies: 1
specific: 2
transaction.: 2
second: 1
script,: 3
constructs: 1
order: 4
satisfy: 2
spending: 3
condition

## DDoS dataset 
https://www.kaggle.com/datasets/devendra416/ddos-datasets?resource=download

This public domain dataset contrains log information from servers the undergo cyber attacks. 
It is a combination of various cyber attack dataset (i.e CSE-CIC-IDS2018-AWS: https://www.unb.ca/cic/datasets/ids-2017.html
CICIDS2017: https://www.unb.ca/cic/datasets/ids-2018.html
3.CIC DoS dataset(2016) : https://www.unb.ca/cic/datasets/dos-dataset.html)

In this particular case, the dataset focuses on Distributed Denial of Service (DDoS) attacks.
For our purposes, we will be using the ballanced dataset.

In [246]:
# Locating and loading the csv file
path = "archive/ddos_balanced/final_dataset.csv"
df = spark.read.csv(path,header = 'True',inferSchema='True')

In [158]:
type(df)

pyspark.sql.dataframe.DataFrame

In [258]:
# We also cache the data so that we only read it from disk once.
df.cache()
df.is_cached            # Checks if df is cached

True

In [268]:
# Shape of our data
print((df.count(), len(df.columns)))

(12794627, 85)


We are dealing with a massive ammount of data, and yet Spark lets us process this amount in seconds. Incidentally, pandas is not able to handle this amount of data. (I tried)

In [269]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Flow ID: string (nullable = true)
 |-- Src IP: string (nullable = true)
 |-- Src Port: integer (nullable = true)
 |-- Dst IP: string (nullable = true)
 |-- Dst Port: integer (nullable = true)
 |-- Protocol: integer (nullable = true)
 |-- Timestamp: string (nullable = true)
 |-- Flow Duration: integer (nullable = true)
 |-- Tot Fwd Pkts: integer (nullable = true)
 |-- Tot Bwd Pkts: integer (nullable = true)
 |-- TotLen Fwd Pkts: double (nullable = true)
 |-- TotLen Bwd Pkts: double (nullable = true)
 |-- Fwd Pkt Len Max: double (nullable = true)
 |-- Fwd Pkt Len Min: double (nullable = true)
 |-- Fwd Pkt Len Mean: double (nullable = true)
 |-- Fwd Pkt Len Std: double (nullable = true)
 |-- Bwd Pkt Len Max: double (nullable = true)
 |-- Bwd Pkt Len Min: double (nullable = true)
 |-- Bwd Pkt Len Mean: double (nullable = true)
 |-- Bwd Pkt Len Std: double (nullable = true)
 |-- Flow Byts/s: double (nullable = true)
 |-- Flow Pkts/s: double (nul

In [272]:
df.columns

['_c0',
 'Flow ID',
 'Src IP',
 'Src Port',
 'Dst IP',
 'Dst Port',
 'Protocol',
 'Timestamp',
 'Flow Duration',
 'Tot Fwd Pkts',
 'Tot Bwd Pkts',
 'TotLen Fwd Pkts',
 'TotLen Bwd Pkts',
 'Fwd Pkt Len Max',
 'Fwd Pkt Len Min',
 'Fwd Pkt Len Mean',
 'Fwd Pkt Len Std',
 'Bwd Pkt Len Max',
 'Bwd Pkt Len Min',
 'Bwd Pkt Len Mean',
 'Bwd Pkt Len Std',
 'Flow Byts/s',
 'Flow Pkts/s',
 'Flow IAT Mean',
 'Flow IAT Std',
 'Flow IAT Max',
 'Flow IAT Min',
 'Fwd IAT Tot',
 'Fwd IAT Mean',
 'Fwd IAT Std',
 'Fwd IAT Max',
 'Fwd IAT Min',
 'Bwd IAT Tot',
 'Bwd IAT Mean',
 'Bwd IAT Std',
 'Bwd IAT Max',
 'Bwd IAT Min',
 'Fwd PSH Flags',
 'Bwd PSH Flags',
 'Fwd URG Flags',
 'Bwd URG Flags',
 'Fwd Header Len',
 'Bwd Header Len',
 'Fwd Pkts/s',
 'Bwd Pkts/s',
 'Pkt Len Min',
 'Pkt Len Max',
 'Pkt Len Mean',
 'Pkt Len Std',
 'Pkt Len Var',
 'FIN Flag Cnt',
 'SYN Flag Cnt',
 'RST Flag Cnt',
 'PSH Flag Cnt',
 'ACK Flag Cnt',
 'URG Flag Cnt',
 'CWE Flag Count',
 'ECE Flag Cnt',
 'Down/Up Ratio',
 'Pkt Size 

The entirety of the datadet can't be properly displayed as it is too large. 

## Data Analysis

It is hard to distinguish high benign traffic from an actual DDoS attack.

Therefore, we will try to find some tell tale signs that can indicate a DDoS attack.
The most important features that need to be examined are the following.

* Destination Port
* Protocol
* Flow Duration
* Tot Fwd Pkts (Total forward packets)
* Tot Bwd Pkts (Total backward packets)
* Label (Label)


* General summary

In [259]:
num_cols = ['Dst Port','Protocol', "Flow Duration", 'Tot Fwd Pkts', 'Tot Bwd Pkts', 'Label']
df.select(num_cols).describe().show()

+-------+------------------+-----------------+--------------------+------------------+------------------+--------+
|summary|          Dst Port|         Protocol|       Flow Duration|      Tot Fwd Pkts|      Tot Bwd Pkts|   Label|
+-------+------------------+-----------------+--------------------+------------------+------------------+--------+
|  count|          12794627|         12794627|            12794627|          12794627|          12794627|12794627|
|   mean| 14642.89653078593|7.828587812681057|   8219593.071095234|27.196363285932446|4.9742809227654705|    null|
| stddev|23063.826083140073|4.206167602129172|2.4773266927709173E7|1720.5765273371583| 250.9204417677606|    null|
|    min|                 0|                0|                  -1|                 0|                 0|  Benign|
|    max|             65535|               17|           120000000|            309628|            291923|    ddos|
+-------+------------------+-----------------+--------------------+-------------

Dst Port and Protocol columns are garbage since thay are categorical values.

However the rest of the columns are very useful.

* Examining port frenquency 

In [294]:
df.groupBy('Label', 'Dst Port').count().show()

+-----+--------+-------+
|Label|Dst Port|  count|
+-----+--------+-------+
| ddos|   38092|    141|
| ddos|   38088|    101|
| ddos|   37760|    150|
| ddos|   37778|    148|
| ddos|   37672|    135|
| ddos|   38154|    121|
| ddos|   37784|    138|
| ddos|   38192|    115|
| ddos|   37654|    162|
| ddos|   37880|    147|
| ddos|   37846|    115|
| ddos|   38106|    116|
| ddos|   37804|    135|
| ddos|   37958|    133|
| ddos|   37796|    162|
| ddos|   38050|    132|
| ddos|   52114|    185|
| ddos|   38004|    128|
| ddos|   38180|    144|
| ddos|      80|3681958|
+-----+--------+-------+
only showing top 20 rows



For some reason it seems that port 80 is used a lot more frequently during attacks

* Looking for IP traffic. 

It seems that ddos attacks use the same IPs over and over again for the attack. Of course, we expected that.

In [206]:
df.groupBy('Label').agg(countDistinct('Src IP')).show()

+------+-------------+
| Label|count(Src IP)|
+------+-------------+
|  ddos|           36|
|Benign|        36726|
+------+-------------+




* Flow Duration

One of the biggest hints of of DDoS attack is the flow duration of the traffic. So we are looking for the difference in flow duration between normal users and an attacker.

In [316]:
groupby_apply_describe(df, groupby_col='Label', stat_col='Flow Duration')

+------+-------+--------------------+--------------------+---+---------+
| Label|  count|                mean|                 std|min|      max|
+------+-------+--------------------+--------------------+---+---------+
|Benign|6321980|1.3440838487992212E7|3.3109731615055606E7| -1|120000000|
|  ddos|6472647|  3119885.1616859375|   9474900.206741018|  0|119999998|
+------+-------+--------------------+--------------------+---+---------+

None


DataFrame[Label: string, count: bigint, mean: double, std: double, min: int, max: int]

In [315]:
df.describe(['Flow Duration']).show()

+-------+--------------------+
|summary|       Flow Duration|
+-------+--------------------+
|  count|            12794627|
|   mean|   8219593.071095234|
| stddev|2.4773266927709173E7|
|    min|                  -1|
|    max|           120000000|
+-------+--------------------+



* Protocol used

In [193]:
# df.groupBy('Label').count('Protocol').collect()
df.groupBy("Label","Protocol").count().show()

+------+--------+-------+
| Label|Protocol|  count|
+------+--------+-------+
|  ddos|      17|   4125|
|  ddos|       6|6468522|
|Benign|       6|4020622|
|Benign|      17|2185816|
|Benign|       0| 115542|
+------+--------+-------+



* Total Forward Packets

In [317]:
groupby_apply_describe(df, groupby_col='Label', stat_col='Tot Fwd Pkts')

+------+-------+-----------------+------------------+---+------+
| Label|  count|             mean|               std|min|   max|
+------+-------+-----------------+------------------+---+------+
|Benign|6321980|6.263011904498274|249.79944810340533|  0|219758|
|  ddos|6472647|47.64243871170481|2406.2551835248937|  0|309628|
+------+-------+-----------------+------------------+---+------+

None


DataFrame[Label: string, count: bigint, mean: double, std: double, min: int, max: int]

Total Backward Packets

In [318]:
groupby_apply_describe(df, groupby_col='Label', stat_col='Tot Bwd Pkts')

+------+-------+-----------------+-----------------+---+------+
| Label|  count|             mean|              std|min|   max|
+------+-------+-----------------+-----------------+---+------+
|Benign|6321980|7.431544547752445|355.2472252712231|  0|291923|
|  ddos|6472647|2.574216236417651|34.37956924413345|  0| 29695|
+------+-------+-----------------+-----------------+---+------+

None


DataFrame[Label: string, count: bigint, mean: double, std: double, min: int, max: int]

## Using SparkMLlib to train a ML model for DDoS detection

Keeping relevant features

In [372]:
data = df.select('Dst Port','Protocol', "Flow Duration", 'Tot Fwd Pkts', 'Tot Bwd Pkts', 'Label')
data.show(5)

+--------+--------+-------------+------------+------------+-----+
|Dst Port|Protocol|Flow Duration|Tot Fwd Pkts|Tot Bwd Pkts|Label|
+--------+--------+-------------+------------+------------+-----+
|      80|       6|      3974862|          29|          44| ddos|
|      80|       6|           63|           1|           1| ddos|
|      80|       6|       476078|           2|           6| ddos|
|      80|       6|          151|           2|           1| ddos|
|      80|       6|       472507|           2|           5| ddos|
+--------+--------+-------------+------------+------------+-----+
only showing top 5 rows



In [373]:
data.printSchema()

root
 |-- Dst Port: integer (nullable = true)
 |-- Protocol: integer (nullable = true)
 |-- Flow Duration: integer (nullable = true)
 |-- Tot Fwd Pkts: integer (nullable = true)
 |-- Tot Bwd Pkts: integer (nullable = true)
 |-- Label: string (nullable = true)



Preprocessing data into vectors

In [374]:
features = data.columns[:-1]

va = VectorAssembler(inputCols = features, outputCol='features')

va_df = va.transform(data)
va_df = va_df.select(['features', 'label'])
va_df.show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[80.0,6.0,3974862...| ddos|
|[80.0,6.0,63.0,1....| ddos|
|[80.0,6.0,476078....| ddos|
+--------------------+-----+
only showing top 3 rows



Encoding Label

In [375]:
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(va_df)

Encoding features

In [376]:
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(va_df)

Train-test split

In [377]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = va_df.randomSplit([0.7, 0.3])

Decision Tree model instance

In [378]:
# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

Creating a pipeline

In [379]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

Training the model

In [380]:
# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

Model predictions

In [381]:
# Make predictions.
predictions = model.transform(testData)

In [382]:
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         0.0|[80.0,6.0,0.0,0.0...|
|       0.0|         0.0|[80.0,6.0,0.0,0.0...|
|       0.0|         0.0|[80.0,6.0,0.0,0.0...|
|       0.0|         0.0|[80.0,6.0,0.0,0.0...|
|       0.0|         0.0|[80.0,6.0,0.0,0.0...|
+----------+------------+--------------------+
only showing top 5 rows



Evaluation of our model

In [392]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy = %g" % (accuracy))
print("Test Error = %g" % (1.0 - accuracy))

Test Accuracy = 0.951356
Test Error = 0.0486441


In [400]:
treeModel = model.stages[2]
print(treeModel) # summary only

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_e7ab9ce24721, depth=5, numNodes=39, numClasses=2, numFeatures=5


We achieve a 95.14% accuracy with our machine learning model using Spark.

Spark was able to handle 63,973,135 (12794627 *5, not including labels) datapoints effortlessly during preprocessing and training. In addition, Spark could be configured to monitor server log entries online. So theoretically, this could be a (relatively simple) first defence against DDoS attacks for a server.

Thank you for your attention.

Pantelis Pappas.
