# <center> Enhancing DDoS Attack Detection using Big Data

<a class="anchor" id="0.1"></a>
# Table of Contents

### [0. Introduction](#0)
* [**0.1 Notebook Settings/Requirements**](#0.0.1)

### [1. Data Collection and Preprocessing](#1)
* [**1.1 Data Load and DataFrame Creation**](#1.1)
### [2. Exploratory data analysis (EDA)](#2)
* [**2.1 Summary information about the DataFrame**](#2.1)
* [**2.2 Features inspection**](#2.2)
### [3. Machine Learning Model training](#3)
* [**3.1 Transform the labels**](#3.1)
* [**3.2 Split data**](#3.2)
* [**3.3 Feature transformation**](#3.3)
* [**3.4 Train model**](#3.4)
### [4. Feature importance analysis](#4.)
* [**4.1 Feature transformation**](#4.1)
* [**4.2 Create a fitted pipeline model**](#4.2)
* [**4.3 Visualise the result**](#4.5)
### [5. Neural network implementation](#5)
* [**5.1 Filtering features**](#5.1)
* [**5.2 Feature transformation**](#5.2)
* [**5.3 Train the model**](#5.4)
* [**5.4 Assess the result**](#5.5)

<a class="anchor" id="0"></a>
## 0. Introduction
[Back to Table of Contents](#0.1)

The project consists of 2 parts:
1. DDoS attack detection using artificial intelegence 

You can find the code here

https://github.com/Natidhcp/MSC_DA_sem2_CA1/blob/main/MSc_DA_CA1(No_BigData).ipynb

2. Enhancing DDoS Attack Detection using Big Data (current Jupyter Notebook)

### IMPORTANT!

The notebook follows the same rules and instructions as outlined in the main part, ensuring that it mirrors the essential code needed to accomplish the project goals.

<a class="anchor" id="0.0.1"></a>
## 0.1. Notebook Settings/Requirements
[Back to Table of Contents](#0.1)

In [1]:
sc

The SparkContext serves as the entry point for Apache Spark, facilitating interaction with distributed datasets. The Spark UI is a web-based tool for monitoring Spark application performance. The provided version (v3.4.2) denotes the Spark version, while Master (local[*]) indicates execution mode (local with all cores). AppName (PySparkShell) names the Spark application, here used interactively via the PySpark shell.

In [2]:
sc.master

'local[*]'

sc.master specifies the execution mode for Apache Spark, such as local[*] for local mode with all available cores.

In [3]:
from pyspark.sql.types import *
import matplotlib.pyplot as plt
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import sum as spark_sum
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.sql.functions import when
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import col, format_number
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
import pyspark
import time
import warnings
# Suppress warnings
warnings.filterwarnings("ignore")
import logging
# Suppress WARN messages
log = logging.getLogger('py4j')
log.setLevel(logging.ERROR)

Necessary libraries imported for working with Apache Spark and machine learning tasks. It includes modules for defining data types, visualization, classification algorithms (Logistic Regression, Random Forest, Multilayer Perceptron), evaluation metrics, feature engineering, and pipelines. Additionally, it sets up configurations to suppress warnings and logs for a cleaner output during execution.

The main libraries imported in the provided code are:

* pyspark.sql.types: For defining data types.
* matplotlib.pyplot: For visualization.
* pyspark.ml.classification: For implementing classification algorithms like Logistic Regression, Random Forest, and Multilayer Perceptron.
* pyspark.ml.evaluation: For evaluating model performance using metrics such as Binary Classification Evaluator and Multiclass Classification Evaluator.
* pyspark.ml.feature: For feature engineering tasks like vector assembly.
* pyspark.sql.functions: For performing operations on Spark DataFrames.
* pyspark.ml: For building machine learning pipelines.
* pyspark: The main Spark Python library.

<a class="anchor" id="1"></a>
## 1. Data Collection and Preprocessing
[Back to Table of Contents](#0.1)

Same data sample is used

*see notebook provided above

In [4]:
# Define schema
schema = StructType([
    StructField("Instruction", StringType(), True),
    StructField("Count", IntegerType(), True),
    StructField("AverageTime", DoubleType(), True)
])

# Create an empty DataFrame with the defined schema
instruction_execution_df = spark.createDataFrame([], schema)

# Function to calculate and print execution time
def print_execution_time(start_time, instruction):
    global instruction_execution_df
    global total_average_sum
    
    end_time = time.time()
    execution_time = end_time - start_time
    
    instruction_row = spark.createDataFrame([(instruction, 1, execution_time)], ['Instruction', 'Count', 'AverageTime'])
    
    if instruction in [row[0] for row in instruction_execution_df.select("Instruction").collect()]:
        # If instruction is repeated, update the DataFrame values
        instruction_execution_df = instruction_execution_df.withColumn("Count", 
            when(col("Instruction") == instruction, col("Count") + 1).otherwise(col("Count")))
        instruction_execution_df = instruction_execution_df.withColumn("AverageTime",
            format_number(((col("AverageTime") * col("Count")) + execution_time) / (col("Count") + 1), 2))
        instruction_execution_df.show()
        
    else:
        # If instruction is encountered for the first time, add it to the DataFrame
        instruction_execution_df = instruction_execution_df.union(instruction_row)
        instruction_execution_df.show()
    
    # Calculate total_average_sum after the conditional block
    total_average_sum = instruction_execution_df.selectExpr("sum(AverageTime)").collect()[0][0]

<a class="anchor" id="1.1"></a>
## 1.1 Data Load and DataFrame Creation
[Back to Table of Contents](#0.1)

In [5]:
# Initialize total_average_sum
total_average_sum = 0.0

# Start time
start_time = time.time()

# Read CSV files from HDFS
df = spark.read.csv("hdfs://localhost:9000/user1/*.csv", header=True, inferSchema=True)

print_execution_time(start_time, 'Data Load')


                                                                                

+-----------+-----+-----------------+
|Instruction|Count|      AverageTime|
+-----------+-----+-----------------+
|  Data Load|    1|41.91358709335327|
+-----------+-----+-----------------+



                                                                                

In [6]:
# Start time
start_time = time.time()

# Extract the list of imported CSV files
imported_files = df.inputFiles()

# Sort the list of imported CSV files
sorted_files = sorted(imported_files)

# Print the sorted list of imported CSV files
print("Sorted list of imported CSV files:")
for file_path in sorted_files:
    print(file_path)

print_execution_time(start_time, 'Files Enumeration')

Sorted list of imported CSV files:
hdfs://localhost:9000/user1/part-00000-363d1ba3-8ab5-4f96-bc25-4d5862db7cb9-c000.csv
hdfs://localhost:9000/user1/part-00001-363d1ba3-8ab5-4f96-bc25-4d5862db7cb9-c000.csv
hdfs://localhost:9000/user1/part-00002-363d1ba3-8ab5-4f96-bc25-4d5862db7cb9-c000.csv
hdfs://localhost:9000/user1/part-00003-363d1ba3-8ab5-4f96-bc25-4d5862db7cb9-c000.csv
hdfs://localhost:9000/user1/part-00004-363d1ba3-8ab5-4f96-bc25-4d5862db7cb9-c000.csv
hdfs://localhost:9000/user1/part-00005-363d1ba3-8ab5-4f96-bc25-4d5862db7cb9-c000.csv
hdfs://localhost:9000/user1/part-00006-363d1ba3-8ab5-4f96-bc25-4d5862db7cb9-c000.csv
hdfs://localhost:9000/user1/part-00007-363d1ba3-8ab5-4f96-bc25-4d5862db7cb9-c000.csv
hdfs://localhost:9000/user1/part-00008-363d1ba3-8ab5-4f96-bc25-4d5862db7cb9-c000.csv
hdfs://localhost:9000/user1/part-00009-363d1ba3-8ab5-4f96-bc25-4d5862db7cb9-c000.csv
hdfs://localhost:9000/user1/part-00010-363d1ba3-8ab5-4f96-bc25-4d5862db7cb9-c000.csv
hdfs://localhost:9000/user1/pa



+-----------------+-----+------------------+
|      Instruction|Count|       AverageTime|
+-----------------+-----+------------------+
|        Data Load|    1| 41.91358709335327|
|Files Enumeration|    1|0.1537766456604004|
+-----------------+-----+------------------+



                                                                                

<a class="anchor" id="1.5"></a>
### * Data Collection and Preprocessing part *total execution time*
[Back to Table of Contents](#0.1)

In [8]:
# Define schema for the DataFrame
schema0 = StructType([
    StructField("stage name", StringType(), True),
    StructField("total execution time", FloatType(), True)
])

# Create an empty DataFrame with the defined schema
df_exec = spark.createDataFrame([], schema0)

# Add data to the DataFrame
data = [("Data Preprocessing", total_average_sum)]
df_exec = df_exec.union(spark.createDataFrame(data, schema0))

# Show the DataFrame
df_exec.show()

+------------------+--------------------+
|        stage name|total execution time|
+------------------+--------------------+
|Data Preprocessing|           42.067364|
+------------------+--------------------+



<a class="anchor" id="2"></a>
## 2. Exploratory data analysis (EDA)
[Back to Table of Contents](#0.1)

<a class="anchor" id="2.1"></a>
## 2.1 Summary information about the DataFrame
[Back to Table of Contents](#0.1)

In [9]:
total_average_sum = 0.0
# Start time
start_time = time.time()

# Data Summary
print("Data Summary:")
df.printSchema()

print_execution_time(start_time, 'Data Summary')

Data Summary:
root
 |-- flow_duration: double (nullable = true)
 |-- Header_Length: double (nullable = true)
 |-- Protocol Type: double (nullable = true)
 |-- Duration: double (nullable = true)
 |-- Rate: double (nullable = true)
 |-- Srate: double (nullable = true)
 |-- Drate: double (nullable = true)
 |-- fin_flag_number: double (nullable = true)
 |-- syn_flag_number: double (nullable = true)
 |-- rst_flag_number: double (nullable = true)
 |-- psh_flag_number: double (nullable = true)
 |-- ack_flag_number: double (nullable = true)
 |-- ece_flag_number: double (nullable = true)
 |-- cwr_flag_number: double (nullable = true)
 |-- ack_count: double (nullable = true)
 |-- syn_count: double (nullable = true)
 |-- fin_count: double (nullable = true)
 |-- urg_count: double (nullable = true)
 |-- rst_count: double (nullable = true)
 |-- HTTP: double (nullable = true)
 |-- HTTPS: double (nullable = true)
 |-- DNS: double (nullable = true)
 |-- Telnet: double (nullable = true)
 |-- SMTP: doubl

                                                                                

+-----------------+-----+--------------------+
|      Instruction|Count|         AverageTime|
+-----------------+-----+--------------------+
|        Data Load|    1|   41.91358709335327|
|Files Enumeration|    1|  0.1537766456604004|
|     Data Summary|    1|0.014063119888305664|
+-----------------+-----+--------------------+



                                                                                

In [10]:
# Start time
start_time = time.time()

# Data Distribution
print("Data Distribution:")
num_cols_per_table = 5
numeric_columns = [col_name for col_name, data_type in df.dtypes if data_type != "string"]
categorical_columns = [col_name for col_name, data_type in df.dtypes if data_type == "string"]

# Process numeric columns
for i in range(0, len(numeric_columns), num_cols_per_table):
    table_columns = numeric_columns[i:i+num_cols_per_table]
    df.select(table_columns).summary().show()

# Process categorical columns
for i in range(0, len(categorical_columns), num_cols_per_table):
    table_columns = categorical_columns[i:i+num_cols_per_table]
    for col_name in table_columns:
        print(col_name + ":")
        df.groupBy(col_name).count().show()
        
print_execution_time(start_time, 'Data Distribution')

Data Distribution:


24/04/07 09:44:51 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+------------------+-----------------+-----------------+------------------+------------------+
|summary|     flow_duration|    Header_Length|    Protocol Type|          Duration|              Rate|
+-------+------------------+-----------------+-----------------+------------------+------------------+
|  count|           3802000|          3802000|          3802000|           3802000|           3802000|
|   mean|  5.68731011482405| 77212.4416434508|9.061357030063604| 66.36322665403715| 9037.742662148668|
| stddev|  271.329859890405|463178.7585458275|8.936870396605629|14.078998566276729| 99802.00897849097|
|    min|               0.0|              0.0|              0.0|               0.0|               0.0|
|    25%|               0.0|             54.0|              6.0|              64.0| 2.093256101519056|
|    50%|               0.0|             54.0|              6.0|              64.0|15.766304181066191|
|    75%|0.1046640419960022|           293.34|             14.3|         

                                                                                

+-------+------------------+--------------------+-------------------+-------------------+-------------------+
|summary|             Srate|               Drate|    fin_flag_number|    syn_flag_number|    rst_flag_number|
+-------+------------------+--------------------+-------------------+-------------------+-------------------+
|  count|           3802000|             3802000|            3802000|            3802000|            3802000|
|   mean| 9037.742662148668|3.805486897013727E-6|0.08651657022619674|0.20689978958442926|0.09044950026301947|
| stddev| 99802.00897849097|0.002186489226242...| 0.2811253707677547|  0.405083090012185| 0.2868247022196352|
|    min|               0.0|                 0.0|                0.0|                0.0|                0.0|
|    25%| 2.093256101519056|                 0.0|                0.0|                0.0|                0.0|
|    50%|15.766304181066191|                 0.0|                0.0|                0.0|                0.0|
|    75%|1

                                                                                

+-------+------------------+-------------------+--------------------+--------------------+------------------+
|summary|   psh_flag_number|    ack_flag_number|     ece_flag_number|     cwr_flag_number|         ack_count|
+-------+------------------+-------------------+--------------------+--------------------+------------------+
|  count|           3802000|            3802000|             3802000|             3802000|           3802000|
|   mean|0.0878766438716465|0.12371173066806944|2.367175170962651E-6|1.052077853761178...|0.0904856472623142|
| stddev|0.2831154542155188| 0.3292524364015792|0.001538561077778...|0.001025708059639837|0.2863779252716044|
|    min|               0.0|                0.0|                 0.0|                 0.0|               0.0|
|    25%|               0.0|                0.0|                 0.0|                 0.0|               0.0|
|    50%|               0.0|                0.0|                 0.0|                 0.0|               0.0|
|    75%| 

                                                                                

+-------+-------------------+-------------------+-----------------+-----------------+-------------------+
|summary|          syn_count|          fin_count|        urg_count|        rst_count|               HTTP|
+-------+-------------------+-------------------+-----------------+-----------------+-------------------+
|  count|            3802000|            3802000|          3802000|          3802000|            3802000|
|   mean|0.32989108580616544|0.09901666892267333|6.286085547460638|38.82400703400762|0.04833140452393477|
| stddev| 0.6633607993857262|  0.323812737988614|72.36438850350457|327.5465887591808|0.21446559621157943|
|    min|                0.0|                0.0|              0.0|              0.0|                0.0|
|    25%|                0.0|                0.0|              0.0|              0.0|                0.0|
|    50%|                0.0|                0.0|              0.0|              0.0|                0.0|
|    75%|               0.06|                0

                                                                                

+-------+-------------------+--------------------+-------+-------+--------------------+
|summary|              HTTPS|                 DNS| Telnet|   SMTP|                 SSH|
+-------+-------------------+--------------------+-------+-------+--------------------+
|  count|            3802000|             3802000|3802000|3802000|             3802000|
|   mean|0.05522067332982641|1.307206733298264E-4|    0.0|    0.0|4.576538663861126E-5|
| stddev|0.22841051702780968|0.011432568382169677|    0.0|    0.0|0.006764858032851587|
|    min|                0.0|                 0.0|    0.0|    0.0|                 0.0|
|    25%|                0.0|                 0.0|    0.0|    0.0|                 0.0|
|    50%|                0.0|                 0.0|    0.0|    0.0|                 0.0|
|    75%|                0.0|                 0.0|    0.0|    0.0|                 0.0|
|    max|                1.0|                 1.0|    0.0|    0.0|                 1.0|
+-------+-------------------+---

                                                                                

+-------+-------+------------------+-------------------+--------------------+--------------------+
|summary|    IRC|               TCP|                UDP|                DHCP|                 ARP|
+-------+-------+------------------+-------------------+--------------------+--------------------+
|  count|3802000|           3802000|            3802000|             3802000|             3802000|
|   mean|    0.0|0.5738269331930563|0.21198658600736456|1.841136244082062E-6|6.575486586007364E-5|
| stddev|    0.0|0.4945196136217704|0.40871544783292113|0.001356883686449...|0.008108671867289176|
|    min|    0.0|               0.0|                0.0|                 0.0|                 0.0|
|    25%|    0.0|               0.0|                0.0|                 0.0|                 0.0|
|    50%|    0.0|               1.0|                0.0|                 0.0|                 0.0|
|    75%|    0.0|               1.0|                0.0|                 0.0|                 0.0|
|    max| 

                                                                                

+-------+-------------------+--------------------+--------------------+------------------+-----------------+
|summary|               ICMP|                 IPv|                 LLC|           Tot sum|              Min|
+-------+-------------------+--------------------+--------------------+------------------+-----------------+
|  count|            3802000|             3802000|             3802000|           3802000|          3802000|
|   mean|0.16375960021041558|  0.9998892688058917|  0.9998892688058917|1309.5189164180142| 91.6679482769298|
| stddev|0.37005733281190956|0.010522307818737158|0.010522307818737158|2616.5968888659595|140.0785843059267|
|    min|                0.0|                 0.0|                 0.0|              42.0|             42.0|
|    25%|                0.0|                 1.0|                 1.0|             525.0|             50.0|
|    50%|                0.0|                 1.0|                 1.0|             567.0|             54.0|
|    75%|          

                                                                                

+-------+------------------+------------------+------------------+------------------+--------------------+
|summary|               Max|               AVG|               Std|          Tot size|                 IAT|
+-------+------------------+------------------+------------------+------------------+--------------------+
|  count|           3802000|           3802000|           3802000|           3802000|             3802000|
|   mean|182.39534119279602|124.81607560361284| 33.46550229663197|124.81119805536375| 8.318535577810591E7|
| stddev| 526.8528034552024|241.61356071044517|161.03802691101416|242.09511561396621|1.7070170706106003E7|
|    min|              42.0|              42.0|               0.0|              42.0|                 0.0|
|    25%|              50.0|              50.0|               0.0|              50.0| 8.307156673426197E7|
|    50%|              54.0|              54.0|               0.0|              54.0|  8.31245216069558E7|
|    75%|             55.28|  54.0501

                                                                                

+-------+------------------+------------------+------------------+--------------------+-------------------+
|summary|            Number|          Magnitue|            Radius|          Covariance|           Variance|
+-------+------------------+------------------+------------------+--------------------+-------------------+
|  count|           3802000|           3802000|           3802000|             3802000|            3802000|
|   mean| 9.498585052123945|13.126617237461538|  47.2940447164847|  30988.169070825854|0.09661046979820624|
| stddev|0.8204473918309835| 8.637693479983257| 227.7628787821259|   338036.2997414148|0.23327353226578393|
|    min|               1.0| 9.165151389911676|               0.0|                 0.0|                0.0|
|    25%|               9.5|              10.0|               0.0|                 0.0|                0.0|
|    50%|               9.5|10.392304845413264|               0.0|                 0.0|                0.0|
|    75%|               9.5|

                                                                                

+-------+------------------+
|summary|            Weight|
+-------+------------------+
|  count|           3802000|
|   mean|141.51543239725416|
| stddev|21.098391294202212|
|    min|               1.0|
|    25%|            141.55|
|    50%|            141.55|
|    75%|            141.55|
|    max|             244.6|
+-------+------------------+

label:


                                                                                

+--------------------+------+
|               label| count|
+--------------------+------+
|    CommandInjection|   404|
|    DDoS-RSTFINFlood|329190|
|       DoS-SYN_Flood|163947|
|DDoS-ACK_Fragment...| 23365|
|  Mirai-greeth_flood| 80162|
|DDoS-ICMP_Fragmen...| 36833|
|       DoS-UDP_Flood|270629|
|DDoS-SynonymousIP...|292280|
|      DDoS-SYN_Flood|330999|
|       DoS-TCP_Flood|217539|
|      DDoS-TCP_Flood|367051|
|   DDoS-PSHACK_Flood|333880|
|      DDoS-UDP_Flood|440357|
|                 XSS|   302|
|   Mirai-greip_flood| 61335|
|DDoS-UDP_Fragment...| 23504|
|        DNS_Spoofing| 14662|
|        Recon-OSScan|  8172|
|      DDoS-SlowLoris|  1928|
|      Recon-PortScan|  6640|
+--------------------+------+
only showing top 20 rows



                                                                                

+-----------------+-----+--------------------+
|      Instruction|Count|         AverageTime|
+-----------------+-----+--------------------+
|        Data Load|    1|   41.91358709335327|
|Files Enumeration|    1|  0.1537766456604004|
|     Data Summary|    1|0.014063119888305664|
|Data Distribution|    1|  220.38567852973938|
+-----------------+-----+--------------------+



                                                                                

In [11]:
# Start time
start_time = time.time()

# Count the number of rows in the DataFrame
print('Rows number', df.count())

print_execution_time(start_time, 'Rows number')

                                                                                

Rows number 3802000


                                                                                

+-----------------+-----+--------------------+
|      Instruction|Count|         AverageTime|
+-----------------+-----+--------------------+
|        Data Load|    1|   41.91358709335327|
|Files Enumeration|    1|  0.1537766456604004|
|     Data Summary|    1|0.014063119888305664|
|Data Distribution|    1|  220.38567852973938|
|      Rows number|    1|   3.568094491958618|
+-----------------+-----+--------------------+



                                                                                

In [12]:
# Start time
start_time = time.time()

# Initialize a list to store rows of data
missing_values_rows = []

# Populate the list with missing value counts
for col_name in df.columns:
    missing_count = df.where(col(col_name).isNull() | (col(col_name) == "")).count()
    missing_values_rows.append((col_name, missing_count))

# Define the schema for the DataFrame
schema1 = StructType([
    StructField("Column Name", StringType(), True),
    StructField("Missing Count", IntegerType(), True)
])

# Create a DataFrame from the list of rows and the defined schema
missing_values_df = spark.createDataFrame(missing_values_rows, schema1)

# Show all rows of the DataFrame as a table without truncation
missing_values_df.show(missing_values_df.count(), truncate=False)

print_execution_time(start_time, 'Missing Values')


                                                                                

+---------------+-------------+
|Column Name    |Missing Count|
+---------------+-------------+
|flow_duration  |0            |
|Header_Length  |0            |
|Protocol Type  |0            |
|Duration       |0            |
|Rate           |0            |
|Srate          |0            |
|Drate          |0            |
|fin_flag_number|0            |
|syn_flag_number|0            |
|rst_flag_number|0            |
|psh_flag_number|0            |
|ack_flag_number|0            |
|ece_flag_number|0            |
|cwr_flag_number|0            |
|ack_count      |0            |
|syn_count      |0            |
|fin_count      |0            |
|urg_count      |0            |
|rst_count      |0            |
|HTTP           |0            |
|HTTPS          |0            |
|DNS            |0            |
|Telnet         |0            |
|SMTP           |0            |
|SSH            |0            |
|IRC            |0            |
|TCP            |0            |
|UDP            |0            |
|DHCP   

                                                                                

+-----------------+-----+--------------------+
|      Instruction|Count|         AverageTime|
+-----------------+-----+--------------------+
|        Data Load|    1|   41.91358709335327|
|Files Enumeration|    1|  0.1537766456604004|
|     Data Summary|    1|0.014063119888305664|
|Data Distribution|    1|  220.38567852973938|
|      Rows number|    1|   3.568094491958618|
|   Missing Values|    1|   385.2225694656372|
+-----------------+-----+--------------------+



                                                                                

<a class="anchor" id="2.2"></a>
## 2.2 Features inspection
[Back to Table of Contents](#0.1)

In [13]:
# Start time
start_time = time.time()


# Print list of all columns
print(df.columns)
print("\nTotal number of columns:", len(df.columns))

print_execution_time(start_time, 'Total number of columns')


['flow_duration', 'Header_Length', 'Protocol Type', 'Duration', 'Rate', 'Srate', 'Drate', 'fin_flag_number', 'syn_flag_number', 'rst_flag_number', 'psh_flag_number', 'ack_flag_number', 'ece_flag_number', 'cwr_flag_number', 'ack_count', 'syn_count', 'fin_count', 'urg_count', 'rst_count', 'HTTP', 'HTTPS', 'DNS', 'Telnet', 'SMTP', 'SSH', 'IRC', 'TCP', 'UDP', 'DHCP', 'ARP', 'ICMP', 'IPv', 'LLC', 'Tot sum', 'Min', 'Max', 'AVG', 'Std', 'Tot size', 'IAT', 'Number', 'Magnitue', 'Radius', 'Covariance', 'Variance', 'Weight', 'label']

Total number of columns: 47


                                                                                

+--------------------+-----+--------------------+
|         Instruction|Count|         AverageTime|
+--------------------+-----+--------------------+
|           Data Load|    1|   41.91358709335327|
|   Files Enumeration|    1|  0.1537766456604004|
|        Data Summary|    1|0.014063119888305664|
|   Data Distribution|    1|  220.38567852973938|
|         Rows number|    1|   3.568094491958618|
|      Missing Values|    1|   385.2225694656372|
|Total number of c...|    1|8.337497711181641E-4|
+--------------------+-----+--------------------+



                                                                                

<a class="anchor" id="2.3"></a>
### * Data Exploration part *total execution time*
[Back to Table of Contents](#0.1)

In [14]:
# Assume you have another row of data to add
new_row_data = [("EDA", total_average_sum)]  # Example data

# Create DataFrame for the new row
new_row_df = spark.createDataFrame(new_row_data, schema0)

# Add the new row to the existing df_exec DataFrame
df_exec = df_exec.union(new_row_df)

# Show the DataFrame
df_exec.show()

+------------------+--------------------+
|        stage name|total execution time|
+------------------+--------------------+
|Data Preprocessing|           42.067364|
|               EDA|            651.2586|
+------------------+--------------------+



<a class="anchor" id="3"></a>
## 3 Machine Learning Model training
[Back to Table of Contents](#0.1)

<a class="anchor" id="3.1"></a>
## 3.1 Transform the labels
[Back to Table of Contents](#0.1)

In [15]:
total_average_sum = 0.0
# Start time
start_time = time.time()

# Binary labeling
df = df.withColumn('label', when(df['label'] == 'BenignTraffic', 0).otherwise(1))

print_execution_time(start_time, 'Data labeling') 

                                                                                

+--------------------+-----+--------------------+
|         Instruction|Count|         AverageTime|
+--------------------+-----+--------------------+
|           Data Load|    1|   41.91358709335327|
|   Files Enumeration|    1|  0.1537766456604004|
|        Data Summary|    1|0.014063119888305664|
|   Data Distribution|    1|  220.38567852973938|
|         Rows number|    1|   3.568094491958618|
|      Missing Values|    1|   385.2225694656372|
|Total number of c...|    1|8.337497711181641E-4|
|       Data labeling|    1| 0.04415130615234375|
+--------------------+-----+--------------------+



                                                                                

<a class="anchor" id="3.2"></a>
## 3.2 Split data
[Back to Table of Contents](#0.1)

In [16]:
# Start time
start_time = time.time()

# Split data into training and testing sets
train_data, test_data = df.randomSplit([0.7, 0.3], seed=42)

print_execution_time(start_time, 'Split data') 

                                                                                

+--------------------+-----+--------------------+
|         Instruction|Count|         AverageTime|
+--------------------+-----+--------------------+
|           Data Load|    1|   41.91358709335327|
|   Files Enumeration|    1|  0.1537766456604004|
|        Data Summary|    1|0.014063119888305664|
|   Data Distribution|    1|  220.38567852973938|
|         Rows number|    1|   3.568094491958618|
|      Missing Values|    1|   385.2225694656372|
|Total number of c...|    1|8.337497711181641E-4|
|       Data labeling|    1| 0.04415130615234375|
|          Split data|    1| 0.06647634506225586|
+--------------------+-----+--------------------+



                                                                                

<a class="anchor" id="3.3"></a>
## 3.3 Feature transformation
[Back to Table of Contents](#0.1)

The code below is fundamental for data preprocessing in PySpark machine learning workflows. It selects input columns, excluding the last one typically representing the target variable. Then, it initializes a VectorAssembler transformer to combine these selected columns into a single feature vector column, simplifying the data structure for machine learning algorithms. Then, by applying the transform method to both training and test datasets, the code ensures consistency in feature engineering, enabling seamless integration with PySpark's machine learning models. Overall, these steps play a pivotal role in preparing structured data for predictive modeling tasks in PySpark, facilitating efficient and effective machine learning model training and evaluation.

In [17]:
# Start time
start_time = time.time()

# Define the input columns (all columns except 'label')
input_cols = df.columns[:-1]  # Exclude the last column ('label')

# Initialize the VectorAssembler
assembler = VectorAssembler(inputCols=input_cols, outputCol='features')

# Transform the DataFrames to include the 'features' column
train_df_with_features = assembler.transform(train_data)
test_df_with_features = assembler.transform(test_data)

print_execution_time(start_time, 'Feature transformation ML')

                                                                                

+--------------------+-----+--------------------+
|         Instruction|Count|         AverageTime|
+--------------------+-----+--------------------+
|           Data Load|    1|   41.91358709335327|
|   Files Enumeration|    1|  0.1537766456604004|
|        Data Summary|    1|0.014063119888305664|
|   Data Distribution|    1|  220.38567852973938|
|         Rows number|    1|   3.568094491958618|
|      Missing Values|    1|   385.2225694656372|
|Total number of c...|    1|8.337497711181641E-4|
|       Data labeling|    1| 0.04415130615234375|
|          Split data|    1| 0.06647634506225586|
|Feature transform...|    1|  0.7770123481750488|
+--------------------+-----+--------------------+



                                                                                

<a class="anchor" id="3.4"></a>
## 3.4 Train model
[Back to Table of Contents](#0.1)

In [18]:
# Start time
start_time = time.time()
# Create logistic regression model
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Train the logistic regression model on the training data
lr_model = lr.fit(train_df_with_features)

# Make predictions on both training and testing data
train_predictions = lr_model.transform(train_df_with_features)
test_predictions = lr_model.transform(test_df_with_features)

# Initialize BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol='label')

# Evaluate the model on training data
train_accuracy = evaluator.evaluate(train_predictions)
print("Training Accuracy: {:.3f}".format(train_accuracy))

# Evaluate the model on testing data
test_accuracy = evaluator.evaluate(test_predictions)
print("Testing Accuracy: {:.3f}".format(test_accuracy))

print_execution_time(start_time, 'ML_model_DDoS_detection')

24/04/07 09:56:55 WARN MemoryStore: Not enough space to cache rdd_831_10 in memory! (computed 17.0 MiB so far)
24/04/07 09:56:55 WARN BlockManager: Persisting block rdd_831_10 to disk instead.
24/04/07 09:56:55 WARN MemoryStore: Not enough space to cache rdd_831_9 in memory! (computed 17.0 MiB so far)
24/04/07 09:56:55 WARN BlockManager: Persisting block rdd_831_9 to disk instead.
24/04/07 09:56:56 WARN MemoryStore: Not enough space to cache rdd_831_8 in memory! (computed 17.0 MiB so far)
24/04/07 09:56:56 WARN BlockManager: Persisting block rdd_831_8 to disk instead.
24/04/07 09:56:56 WARN MemoryStore: Not enough space to cache rdd_831_11 in memory! (computed 17.0 MiB so far)
24/04/07 09:56:56 WARN BlockManager: Persisting block rdd_831_11 to disk instead.
24/04/07 09:56:58 WARN MemoryStore: Not enough space to cache rdd_831_11 in memory! (computed 33.0 MiB so far)
24/04/07 09:57:07 WARN MemoryStore: Not enough space to cache rdd_831_13 in memory! (computed 17.0 MiB so far)
24/04/07 0

24/04/07 09:57:17 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:17 WARN MemoryStore: Not enough space to cache rdd_831_6 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:17 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:18 WARN MemoryStore: Not enough space to cache rdd_831_0 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:18 WARN MemoryStore: Not enough space to cache rdd_831_3 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:18 WARN MemoryStore: Not enough space to cache rdd_831_2 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:18 WARN MemoryStore: Not enough space to cache rdd_831_1 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:18 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:18 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 0

24/04/07 09:57:26 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:26 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:27 WARN MemoryStore: Not enough space to cache rdd_831_6 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:27 WARN MemoryStore: Not enough space to cache rdd_831_1 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:27 WARN MemoryStore: Not enough space to cache rdd_831_2 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:27 WARN MemoryStore: Not enough space to cache rdd_831_0 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:27 WARN MemoryStore: Not enough space to cache rdd_831_3 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:27 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:27 WARN MemoryStore: Not enough space to cache rdd_831_6 in memory! (computed 17.0 MiB so far)
24/04/07 0

24/04/07 09:57:36 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:36 WARN MemoryStore: Not enough space to cache rdd_831_6 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:37 WARN MemoryStore: Not enough space to cache rdd_831_1 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:37 WARN MemoryStore: Not enough space to cache rdd_831_3 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:37 WARN MemoryStore: Not enough space to cache rdd_831_0 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:37 WARN MemoryStore: Not enough space to cache rdd_831_2 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:37 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:37 WARN MemoryStore: Not enough space to cache rdd_831_6 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:37 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 0

24/04/07 09:57:44 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:44 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:44 WARN MemoryStore: Not enough space to cache rdd_831_6 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:44 WARN MemoryStore: Not enough space to cache rdd_831_2 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:44 WARN MemoryStore: Not enough space to cache rdd_831_0 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:44 WARN MemoryStore: Not enough space to cache rdd_831_1 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:44 WARN MemoryStore: Not enough space to cache rdd_831_3 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:44 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:44 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 0

24/04/07 09:57:51 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:51 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:51 WARN MemoryStore: Not enough space to cache rdd_831_6 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:51 WARN MemoryStore: Not enough space to cache rdd_831_1 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:51 WARN MemoryStore: Not enough space to cache rdd_831_3 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:51 WARN MemoryStore: Not enough space to cache rdd_831_2 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:51 WARN MemoryStore: Not enough space to cache rdd_831_0 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:51 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:51 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 0

24/04/07 09:57:58 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:58 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:58 WARN MemoryStore: Not enough space to cache rdd_831_6 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:58 WARN MemoryStore: Not enough space to cache rdd_831_3 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:58 WARN MemoryStore: Not enough space to cache rdd_831_2 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:58 WARN MemoryStore: Not enough space to cache rdd_831_0 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:58 WARN MemoryStore: Not enough space to cache rdd_831_1 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:58 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:57:58 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 0

24/04/07 09:58:04 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:04 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:04 WARN MemoryStore: Not enough space to cache rdd_831_6 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:05 WARN MemoryStore: Not enough space to cache rdd_831_0 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:05 WARN MemoryStore: Not enough space to cache rdd_831_1 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:05 WARN MemoryStore: Not enough space to cache rdd_831_2 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:05 WARN MemoryStore: Not enough space to cache rdd_831_3 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:05 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:05 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 0

24/04/07 09:58:11 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:11 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:11 WARN MemoryStore: Not enough space to cache rdd_831_6 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:12 WARN MemoryStore: Not enough space to cache rdd_831_1 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:12 WARN MemoryStore: Not enough space to cache rdd_831_2 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:12 WARN MemoryStore: Not enough space to cache rdd_831_3 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:12 WARN MemoryStore: Not enough space to cache rdd_831_0 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:12 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:12 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 0

24/04/07 09:58:18 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:18 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:18 WARN MemoryStore: Not enough space to cache rdd_831_6 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:18 WARN MemoryStore: Not enough space to cache rdd_831_3 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:18 WARN MemoryStore: Not enough space to cache rdd_831_2 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:18 WARN MemoryStore: Not enough space to cache rdd_831_0 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:18 WARN MemoryStore: Not enough space to cache rdd_831_1 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:19 WARN MemoryStore: Not enough space to cache rdd_831_4 in memory! (computed 17.0 MiB so far)
24/04/07 09:58:19 WARN MemoryStore: Not enough space to cache rdd_831_5 in memory! (computed 17.0 MiB so far)
24/04/07 0

Training Accuracy: 0.995


                                                                                

Testing Accuracy: 0.995


                                                                                

+--------------------+-----+--------------------+
|         Instruction|Count|         AverageTime|
+--------------------+-----+--------------------+
|           Data Load|    1|   41.91358709335327|
|   Files Enumeration|    1|  0.1537766456604004|
|        Data Summary|    1|0.014063119888305664|
|   Data Distribution|    1|  220.38567852973938|
|         Rows number|    1|   3.568094491958618|
|      Missing Values|    1|   385.2225694656372|
|Total number of c...|    1|8.337497711181641E-4|
|       Data labeling|    1| 0.04415130615234375|
|          Split data|    1| 0.06647634506225586|
|Feature transform...|    1|  0.7770123481750488|
|ML_model_DDoS_det...|    1|   255.8777415752411|
+--------------------+-----+--------------------+



                                                                                

<a class="anchor" id="3.5"></a>
### * Machine learning part *total execution time*
[Back to Table of Contents](#0.1)

In [19]:

# Assume you have another row of data to add
new_row_data = [("Model training", total_average_sum)]  # Example data

# Create DataFrame for the new row
new_row_df = spark.createDataFrame(new_row_data, schema0)

# Add the new row to the existing df_exec DataFrame
df_exec = df_exec.union(new_row_df)

# Show the DataFrame
df_exec.show()

+------------------+--------------------+
|        stage name|total execution time|
+------------------+--------------------+
|Data Preprocessing|           42.067364|
|               EDA|            651.2586|
|    Model training|             908.024|
+------------------+--------------------+





<a class="anchor" id="4."></a>
## 4. Feature importance analysis
[Back to Table of Contents](#0.1)

<a class="anchor" id="4.1"></a>
## 4.1 Feature transformation
[Back to Table of Contents](#0.1)

The code below is fundamental for data preprocessing in PySpark machine learning workflows. It selects input columns, excluding the last one typically representing the target variable. Then, it initializes a VectorAssembler transformer to combine these selected columns into a single feature vector column, simplifying the data structure for machine learning algorithms. Then, by applying the transform method to both training and test datasets, the code ensures consistency in feature engineering, enabling seamless integration with PySpark's machine learning models. Overall, these steps play a pivotal role in preparing structured data for predictive modeling tasks in PySpark, facilitating efficient and effective machine learning model training and evaluation.

In [20]:
total_average_sum = 0.0
# Start time
start_time = time.time()

# Define features
feature_columns = df.columns[:-1]  # Assuming the last column is the label column
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Define a Random Forest classifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Create a pipeline
pipeline = Pipeline(stages=[vector_assembler, rf])

print_execution_time(start_time, 'Feature transformation Features')

                                                                                

+--------------------+-----+--------------------+
|         Instruction|Count|         AverageTime|
+--------------------+-----+--------------------+
|           Data Load|    1|   41.91358709335327|
|   Files Enumeration|    1|  0.1537766456604004|
|        Data Summary|    1|0.014063119888305664|
|   Data Distribution|    1|  220.38567852973938|
|         Rows number|    1|   3.568094491958618|
|      Missing Values|    1|   385.2225694656372|
|Total number of c...|    1|8.337497711181641E-4|
|       Data labeling|    1| 0.04415130615234375|
|          Split data|    1| 0.06647634506225586|
|Feature transform...|    1|  0.7770123481750488|
|ML_model_DDoS_det...|    1|   255.8777415752411|
|Feature transform...|    1| 0.13527297973632812|
+--------------------+-----+--------------------+



                                                                                


<a class="anchor" id="4.2"></a>
## 4.2 Create a fitted pipeline model
[Back to Table of Contents](#0.1)

In [21]:
# Start time
start_time = time.time()

# Train the model
model = pipeline.fit(train_data)

# Evaluate the model on the testing data
predictions = model.transform(test_data)
evaluator = BinaryClassificationEvaluator(labelCol="label")
auc = evaluator.evaluate(predictions)

print("Area Under ROC: ", auc)

# Get feature importance scores
importances = model.stages[-1].featureImportances

print_execution_time(start_time, 'ML_model_Features')

24/04/07 10:02:21 WARN MemoryStore: Not enough space to cache rdd_1461_6 in memory! (computed 28.7 MiB so far)
24/04/07 10:02:21 WARN BlockManager: Persisting block rdd_1461_6 to disk instead.
24/04/07 10:02:21 WARN MemoryStore: Not enough space to cache rdd_1461_7 in memory! (computed 5.3 MiB so far)
24/04/07 10:02:21 WARN BlockManager: Persisting block rdd_1461_7 to disk instead.
24/04/07 10:02:21 WARN MemoryStore: Not enough space to cache rdd_1461_4 in memory! (computed 43.1 MiB so far)
24/04/07 10:02:21 WARN BlockManager: Persisting block rdd_1461_4 to disk instead.
24/04/07 10:02:23 WARN MemoryStore: Not enough space to cache rdd_1461_6 in memory! (computed 43.1 MiB so far)
24/04/07 10:02:23 WARN MemoryStore: Not enough space to cache rdd_1461_4 in memory! (computed 43.1 MiB so far)
24/04/07 10:02:24 WARN MemoryStore: Not enough space to cache rdd_1461_7 in memory! (computed 12.6 MiB so far)
24/04/07 10:02:31 WARN MemoryStore: Not enough space to cache rdd_1461_10 in memory! (com

Area Under ROC:  0.9969769506200666


                                                                                

+--------------------+-----+--------------------+
|         Instruction|Count|         AverageTime|
+--------------------+-----+--------------------+
|           Data Load|    1|   41.91358709335327|
|   Files Enumeration|    1|  0.1537766456604004|
|        Data Summary|    1|0.014063119888305664|
|   Data Distribution|    1|  220.38567852973938|
|         Rows number|    1|   3.568094491958618|
|      Missing Values|    1|   385.2225694656372|
|Total number of c...|    1|8.337497711181641E-4|
|       Data labeling|    1| 0.04415130615234375|
|          Split data|    1| 0.06647634506225586|
|Feature transform...|    1|  0.7770123481750488|
|ML_model_DDoS_det...|    1|   255.8777415752411|
|Feature transform...|    1| 0.13527297973632812|
|   ML_model_Features|    1|  209.25959300994873|
+--------------------+-----+--------------------+



                                                                                

<a class="anchor" id="4.5"></a>
## 4.3 Visualise the result
[Back to Table of Contents](#0.1)

In [22]:


# Start time
start_time = time.time()

# Zip feature names and importances, and sort based on importance scores in descending order
sorted_features = sorted(zip(feature_columns, importances), key=lambda x: x[1], reverse=True)
sorted_feature_names, sorted_importances = zip(*sorted_features)

# Get feature importance scores
importances = model.stages[-1].featureImportances.toArray()

# Extract feature names
feature_names = feature_columns

# Create a DataFrame to store feature names and importance scores
schema2 = StructType([
    StructField("Feature", StringType(), True),
    StructField("Importance", FloatType(), True)
])

importance_data = [(name, float(importance)) for name, importance in zip(feature_names, importances)]
importance_df = spark.createDataFrame(importance_data, schema2)

# Round importance scores to three decimals
importance_df = importance_df.withColumn("Importance", format_number(col("Importance"), 3))

# Order the DataFrame by importance scores in descending order
importance_df_sorted = importance_df.orderBy(col("Importance").desc())

# Show the sorted DataFrame with all rows
importance_df_sorted.show(importance_df_sorted.count(), truncate=False)

print_execution_time(start_time, 'feature_sort_values') 

+---------------+----------+
|Feature        |Importance|
+---------------+----------+
|rst_count      |0.353     |
|urg_count      |0.187     |
|Duration       |0.083     |
|flow_duration  |0.070     |
|IAT            |0.047     |
|HTTPS          |0.044     |
|Number         |0.029     |
|Header_Length  |0.025     |
|Srate          |0.025     |
|Max            |0.024     |
|Tot size       |0.020     |
|Rate           |0.018     |
|Radius         |0.017     |
|Variance       |0.014     |
|Weight         |0.011     |
|ack_flag_number|0.010     |
|psh_flag_number|0.006     |
|AVG            |0.005     |
|ack_count      |0.002     |
|Covariance     |0.002     |
|syn_count      |0.002     |
|HTTP           |0.002     |
|Tot sum        |0.001     |
|Protocol Type  |0.001     |
|Std            |0.001     |
|fin_count      |0.001     |
|Magnitue       |0.001     |
|Telnet         |0.000     |
|Min            |0.000     |
|ece_flag_number|0.000     |
|Drate          |0.000     |
|cwr_flag_numb

                                                                                

+--------------------+-----+--------------------+
|         Instruction|Count|         AverageTime|
+--------------------+-----+--------------------+
|           Data Load|    1|   41.91358709335327|
|   Files Enumeration|    1|  0.1537766456604004|
|        Data Summary|    1|0.014063119888305664|
|   Data Distribution|    1|  220.38567852973938|
|         Rows number|    1|   3.568094491958618|
|      Missing Values|    1|   385.2225694656372|
|Total number of c...|    1|8.337497711181641E-4|
|       Data labeling|    1| 0.04415130615234375|
|          Split data|    1| 0.06647634506225586|
|Feature transform...|    1|  0.7770123481750488|
|ML_model_DDoS_det...|    1|   255.8777415752411|
|Feature transform...|    1| 0.13527297973632812|
|   ML_model_Features|    1|  209.25959300994873|
| feature_sort_values|    1|  0.8401956558227539|
+--------------------+-----+--------------------+



                                                                                

<a class="anchor" id="4.3"></a>
### * Feature Importance stage *total execution time*
[Back to Table of Contents](#0.1)


In [23]:
# Assume you have another row of data to add
new_row_data = [("Feature importance", total_average_sum)]  # Example data

# Create DataFrame for the new row
new_row_df = spark.createDataFrame(new_row_data, schema0)

# Add the new row to the existing df_exec DataFrame
df_exec = df_exec.union(new_row_df)

# Show the DataFrame
df_exec.show()



+------------------+--------------------+
|        stage name|total execution time|
+------------------+--------------------+
|Data Preprocessing|           42.067364|
|               EDA|            651.2586|
|    Model training|             908.024|
|Feature importance|            1118.259|
+------------------+--------------------+





<a class="anchor" id="5"></a>
## 5. Neural network implementation
[Back to Table of Contents](#0.1)

<a class="anchor" id="5.1"></a>
## 5.1 Filtering features
[Back to Table of Contents](#0.1)

In [24]:
total_average_sum = 0.0
# Start time
start_time = time.time()

# Filter features based on importance scores
selected_features = [feature_name for feature_name, importance in zip(feature_names, importances) if importance > 0.005]

print_execution_time(start_time, 'selected_features') 

                                                                                

+--------------------+-----+--------------------+
|         Instruction|Count|         AverageTime|
+--------------------+-----+--------------------+
|           Data Load|    1|   41.91358709335327|
|   Files Enumeration|    1|  0.1537766456604004|
|        Data Summary|    1|0.014063119888305664|
|   Data Distribution|    1|  220.38567852973938|
|         Rows number|    1|   3.568094491958618|
|      Missing Values|    1|   385.2225694656372|
|Total number of c...|    1|8.337497711181641E-4|
|       Data labeling|    1| 0.04415130615234375|
|          Split data|    1| 0.06647634506225586|
|Feature transform...|    1|  0.7770123481750488|
|ML_model_DDoS_det...|    1|   255.8777415752411|
|Feature transform...|    1| 0.13527297973632812|
|   ML_model_Features|    1|  209.25959300994873|
| feature_sort_values|    1|  0.8401956558227539|
|   selected_features|    1|1.120567321777343...|
+--------------------+-----+--------------------+



                                                                                

<a class="anchor" id="5.2"></a>
## 5.2 Feature transformation
[Back to Table of Contents](#0.1)

In [25]:
# Start time
start_time = time.time()

# Initialize the VectorAssembler with selected features
assembler = VectorAssembler(inputCols=selected_features, outputCol='feat')

# Transform the DataFrames to replace the existing 'features' column with selected features
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)
# Define the input size based on the number of selected features
input_size = len(selected_features)

print_execution_time(start_time, 'Feature transformation NN')

                                                                                

+--------------------+-----+--------------------+
|         Instruction|Count|         AverageTime|
+--------------------+-----+--------------------+
|           Data Load|    1|   41.91358709335327|
|   Files Enumeration|    1|  0.1537766456604004|
|        Data Summary|    1|0.014063119888305664|
|   Data Distribution|    1|  220.38567852973938|
|         Rows number|    1|   3.568094491958618|
|      Missing Values|    1|   385.2225694656372|
|Total number of c...|    1|8.337497711181641E-4|
|       Data labeling|    1| 0.04415130615234375|
|          Split data|    1| 0.06647634506225586|
|Feature transform...|    1|  0.7770123481750488|
|ML_model_DDoS_det...|    1|   255.8777415752411|
|Feature transform...|    1| 0.13527297973632812|
|   ML_model_Features|    1|  209.25959300994873|
| feature_sort_values|    1|  0.8401956558227539|
|   selected_features|    1|1.120567321777343...|
|Feature transform...|    1| 0.16662883758544922|
+--------------------+-----+--------------------+


                                                                                

<a class="anchor" id="5.4"></a>
## 5.3 Train the model
[Back to Table of Contents](#0.1)

In [26]:
# Start time
start_time = time.time()

# Define the layers for the neural network
output_size = 2  # Binary classification
layers = [input_size, 64, 32, output_size]  # Example: 3 hidden layers with 64, 32, and output_size neurons respectively

# Create the MultilayerPerceptronClassifier
mlp = MultilayerPerceptronClassifier(layers=layers, seed=1234, labelCol='label', featuresCol='feat')
# Fit the model using the training data
mlp_model = mlp.fit(train_data)

print_execution_time(start_time, 'Train NN model') 

24/04/07 10:05:16 WARN MemoryStore: Not enough space to cache rdd_1793_0 in memory! (computed 26.7 MiB so far)
24/04/07 10:05:24 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 10:05:56 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 10:06:26 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 10:06:53 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 10:07:21 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 10:07:50 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 10:08:19 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 10:08:48 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
2

24/04/07 11:14:36 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 11:15:03 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 11:15:32 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 11:16:01 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 11:16:33 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 11:17:01 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 11:17:29 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 11:17:57 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
24/04/07 11:18:27 WARN MemoryStore: Not enough space to cache rdd_1793_6 in memory! (computed 11.7 MiB so far)
2

+--------------------+-----+--------------------+
|         Instruction|Count|         AverageTime|
+--------------------+-----+--------------------+
|           Data Load|    1|   41.91358709335327|
|   Files Enumeration|    1|  0.1537766456604004|
|        Data Summary|    1|0.014063119888305664|
|   Data Distribution|    1|  220.38567852973938|
|         Rows number|    1|   3.568094491958618|
|      Missing Values|    1|   385.2225694656372|
|Total number of c...|    1|8.337497711181641E-4|
|       Data labeling|    1| 0.04415130615234375|
|          Split data|    1| 0.06647634506225586|
|Feature transform...|    1|  0.7770123481750488|
|ML_model_DDoS_det...|    1|   255.8777415752411|
|Feature transform...|    1| 0.13527297973632812|
|   ML_model_Features|    1|  209.25959300994873|
| feature_sort_values|    1|  0.8401956558227539|
|   selected_features|    1|1.120567321777343...|
|Feature transform...|    1| 0.16662883758544922|
|      Train NN model|    1|   7963.170134305954|


                                                                                

<a class="anchor" id="5.5"></a>
## 5.4 Assess the result
[Back to Table of Contents](#0.1)

In [27]:
# Start time
start_time = time.time()

# Make predictions on the test data
predictions = mlp_model.transform(test_data)

# Evaluate the model
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Accuracy:", accuracy)

print_execution_time(start_time, 'MLP results') 

                                                                                

Test Accuracy: 0.9804570544560968


                                                                                

+--------------------+-----+--------------------+
|         Instruction|Count|         AverageTime|
+--------------------+-----+--------------------+
|           Data Load|    1|   41.91358709335327|
|   Files Enumeration|    1|  0.1537766456604004|
|        Data Summary|    1|0.014063119888305664|
|   Data Distribution|    1|  220.38567852973938|
|         Rows number|    1|   3.568094491958618|
|      Missing Values|    1|   385.2225694656372|
|Total number of c...|    1|8.337497711181641E-4|
|       Data labeling|    1| 0.04415130615234375|
|          Split data|    1| 0.06647634506225586|
|Feature transform...|    1|  0.7770123481750488|
|ML_model_DDoS_det...|    1|   255.8777415752411|
|Feature transform...|    1| 0.13527297973632812|
|   ML_model_Features|    1|  209.25959300994873|
| feature_sort_values|    1|  0.8401956558227539|
|   selected_features|    1|1.120567321777343...|
|Feature transform...|    1| 0.16662883758544922|
|      Train NN model|    1|   7963.170134305954|


                                                                                

### Note

<a class="anchor" id="5.5"></a>
### *total execution time*
[Back to Table of Contents](#0.1)

In [28]:
# Assume you have another row of data to add
new_row_data = [("Neural network", total_average_sum)]  # Example data

# Create DataFrame for the new row
new_row_df = spark.createDataFrame(new_row_data, schema0)

# Add the new row to the existing df_exec DataFrame
df_exec = df_exec.union(new_row_df)

# Show the DataFrame
df_exec.show()



+------------------+--------------------+
|        stage name|total execution time|
+------------------+--------------------+
|Data Preprocessing|           42.067364|
|               EDA|            651.2586|
|    Model training|             908.024|
|Feature importance|            1118.259|
|    Neural network|            9123.198|
+------------------+--------------------+





In [29]:
output_path = "hdfs://localhost:9000/user1/result.csv"
df_exec.write.csv(output_path, header=True, mode="overwrite")

                                                                                