# **Machine Learning on Big Data (CN7030) CRWK 23-24 Term B [60% weighting]**
# **Group ID: [Group13]**
1.   Student 1: Philip Acquaye-Mensah 2640756
2.   Student 2: Mohamed Jareer MOHAMED  ZEENAM 2596353

---

If you want to add comments on your group work, please write it here for us:


# **Initiate and Configure Spark**

In [261]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, MinMaxScaler, RobustScaler, Imputer
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.regression import LinearRegression


# Initialize SparkSession
spark = SparkSession.builder \
                    .appName("Group 13 CRWK") \
                    .master("local[*]") \
                    .config("spark.executor.memory", "4g") \
                    .config("spark.driver.memory", "2g") \
                    .config("spark.executor.cores", "2") \
                    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
                    .getOrCreate()


---
# **Task 1 - Data Loading and Preprocessing (15 marks)**
---

In [262]:
#Identify the student who made a contribution and mention their name in the appropriate section of the code.

## Philip Acquaye-Mensah

# Load the compressed file as a text file
df = spark.read.csv("CourseWork_Dataset_Machine_Learning.csv", header = True)
# Display the DataFrame
df.show(20, truncate= False)

# more info
print(df.count(), df.rdd.getNumPartitions())

+--------+--------+-------------------+-------------+------------+------------+---------------+---------------+---------------+---------------+----------------+---------------+---------------+---------------+----------------+---------------+-----------+-----------+-------------+------------+------------+------------+-----------+------------+-----------+-----------+-----------+-----------+------------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+--------------+--------------+-----------+-----------+-----------+-----------+------------+-----------+-----------+------------+------------+------------+------------+------------+------------+--------------+------------+-------------+------------+----------------+----------------+--------------+--------------+----------------+--------------+--------------+----------------+----------------+----------------+----------------+----------------+-----------------+-----------------+-----------------+------

In [263]:
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType

# Exclude 'Label' column from casting
columns_to_cast = [column for column in df.columns if column != 'Label']

for column in columns_to_cast:
    new_column = column.replace(' ', '_').replace('/', '_per_')
    df = df.withColumnRenamed(column, new_column).withColumn(new_column, col(new_column).cast(FloatType()))

In [264]:
# Check Multi class label
# Label is the column named Label
df.select("Label").distinct().show(20)

+--------------+
|         Label|
+--------------+
|        Benign|
|FTP-BruteForce|
|SSH-Bruteforce|
+--------------+



In [265]:
from pyspark.sql.functions import when, col

# Convert "Label" into a binary classification where "Benign" is 0 and any attack is 1
df = df.withColumn("Label_binary", when(col("Label") == "Benign", 0).otherwise(1))

# Display the updated DataFrame to verify the transformation
df.select("Label", "Label_binary").distinct().show()

+--------------+------------+
|         Label|Label_binary|
+--------------+------------+
|FTP-BruteForce|           1|
|        Benign|           0|
|SSH-Bruteforce|           1|
+--------------+------------+



In [266]:
# Drop Label column because of new binary label and Timestamp column
df = df.drop("Label","Timestamp")

# Print the schema to see the data types and structure
df.printSchema()

root
 |-- Dst_Port: float (nullable = true)
 |-- Protocol: float (nullable = true)
 |-- Flow_Duration: float (nullable = true)
 |-- Tot_Fwd_Pkts: float (nullable = true)
 |-- Tot_Bwd_Pkts: float (nullable = true)
 |-- TotLen_Fwd_Pkts: float (nullable = true)
 |-- TotLen_Bwd_Pkts: float (nullable = true)
 |-- Fwd_Pkt_Len_Max: float (nullable = true)
 |-- Fwd_Pkt_Len_Min: float (nullable = true)
 |-- Fwd_Pkt_Len_Mean: float (nullable = true)
 |-- Fwd_Pkt_Len_Std: float (nullable = true)
 |-- Bwd_Pkt_Len_Max: float (nullable = true)
 |-- Bwd_Pkt_Len_Min: float (nullable = true)
 |-- Bwd_Pkt_Len_Mean: float (nullable = true)
 |-- Bwd_Pkt_Len_Std: float (nullable = true)
 |-- Flow_Byts_per_s: float (nullable = true)
 |-- Flow_Pkts_per_s: float (nullable = true)
 |-- Flow_IAT_Mean: float (nullable = true)
 |-- Flow_IAT_Std: float (nullable = true)
 |-- Flow_IAT_Max: float (nullable = true)
 |-- Flow_IAT_Min: float (nullable = true)
 |-- Fwd_IAT_Tot: float (nullable = true)
 |-- Fwd_IAT_Mean:

In [267]:
# Check for missing values in each column
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------+--------+-------------+------------+------------+---------------+---------------+---------------+---------------+----------------+---------------+---------------+---------------+----------------+---------------+---------------+---------------+-------------+------------+------------+------------+-----------+------------+-----------+-----------+-----------+-----------+------------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+-----------+-----------+------------+-----------+-----------+------------+------------+------------+------------+------------+------------+--------------+------------+-----------------+------------+----------------+----------------+------------------+------------------+----------------+------------------+------------------+----------------+----------------+----------------+----------------+----------------+-----------------+-----------------+----------

In [268]:
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType


# Now, use the Imputer on the renamed column
imputer = Imputer(
    inputCols=["Flow_Byts_per_s"],
    outputCols=["Flow_Byts_s_imputed"]
).setStrategy("median")

df = imputer.fit(df).transform(df)

# Drop the 'Flow_Byts_per_s' column
df = df.drop('Flow_Byts_per_s')

# Print Schema again to check the changes
df.printSchema()

root
 |-- Dst_Port: float (nullable = true)
 |-- Protocol: float (nullable = true)
 |-- Flow_Duration: float (nullable = true)
 |-- Tot_Fwd_Pkts: float (nullable = true)
 |-- Tot_Bwd_Pkts: float (nullable = true)
 |-- TotLen_Fwd_Pkts: float (nullable = true)
 |-- TotLen_Bwd_Pkts: float (nullable = true)
 |-- Fwd_Pkt_Len_Max: float (nullable = true)
 |-- Fwd_Pkt_Len_Min: float (nullable = true)
 |-- Fwd_Pkt_Len_Mean: float (nullable = true)
 |-- Fwd_Pkt_Len_Std: float (nullable = true)
 |-- Bwd_Pkt_Len_Max: float (nullable = true)
 |-- Bwd_Pkt_Len_Min: float (nullable = true)
 |-- Bwd_Pkt_Len_Mean: float (nullable = true)
 |-- Bwd_Pkt_Len_Std: float (nullable = true)
 |-- Flow_Pkts_per_s: float (nullable = true)
 |-- Flow_IAT_Mean: float (nullable = true)
 |-- Flow_IAT_Std: float (nullable = true)
 |-- Flow_IAT_Max: float (nullable = true)
 |-- Flow_IAT_Min: float (nullable = true)
 |-- Fwd_IAT_Tot: float (nullable = true)
 |-- Fwd_IAT_Mean: float (nullable = true)
 |-- Fwd_IAT_Std: flo

In [269]:
df.describe("Flow_Pkts_per_s").show(10)

+-------+---------------+
|summary|Flow_Pkts_per_s|
+-------+---------------+
|  count|        1048575|
|   mean|       Infinity|
| stddev|            NaN|
|    min|   -0.001029761|
|    max|       Infinity|
+-------+---------------+



In [270]:
from pyspark.sql.functions import col, isnan

# Iterate through each column in the DataFrame
for column in df.columns:
    # Filter rows where the column contains problematic values
    problematic_rows = df.filter(
        (col(column).isNull()) |   # Check for null values
        (isnan(col(column))) |     # Check for NaN values
        (col(column) < 0)          # Check for negative values
    )
    
    # If there are any rows containing problematic values, print the column name
    if problematic_rows.count() > 0:
        print(f"Column '{column}' has {problematic_rows.count()} rows with problematic values.")

Column 'Flow_Duration' has 5 rows with problematic values.
Column 'Flow_Pkts_per_s' has 5 rows with problematic values.
Column 'Flow_IAT_Mean' has 5 rows with problematic values.
Column 'Flow_IAT_Max' has 2 rows with problematic values.
Column 'Flow_IAT_Min' has 5 rows with problematic values.
Column 'Fwd_IAT_Tot' has 5 rows with problematic values.
Column 'Fwd_IAT_Mean' has 5 rows with problematic values.
Column 'Fwd_IAT_Max' has 2 rows with problematic values.
Column 'Fwd_IAT_Min' has 5 rows with problematic values.
Column 'Init_Fwd_Win_Byts' has 219266 rows with problematic values.
Column 'Init_Bwd_Win_Byts' has 347370 rows with problematic values.


In [271]:
from pyspark.sql.functions import when, col

# List of columns to replace Infinity values with NaN
columns = ["Flow_Byts_s_imputed", "Flow_Pkts_per_s"]

#! Loop through each column and replace Infinity with NaN
for col_name in columns:
    df = df.withColumn(col_name, when(col(col_name) == float('inf'), float('nan')).otherwise(col(col_name)))
    
#! Replace negative values with zero for all columns in the DataFrame
for col_name in df.columns:
    df = df.withColumn(col_name, when(df[col_name] < 0, 0).otherwise(df[col_name]))


output_cols = ["Flow_Byts_s_nan_imputed", "Flow_Pkts_per_s_nan_imputed"]


#! impute nan values with mean
imputer = Imputer(
    inputCols=columns,
    outputCols=output_cols
).setStrategy("median")



df = imputer.fit(df).transform(df)

df.describe("Flow_Byts_s_nan_imputed").show()

+-------+-----------------------+
|summary|Flow_Byts_s_nan_imputed|
+-------+-----------------------+
|  count|                1048575|
|   mean|     193096.85716243155|
| stddev|      3096074.866298894|
|    min|                    0.0|
|    max|           4.45249984E8|
+-------+-----------------------+



In [272]:
df = df.drop('Flow_Byts_s_imputed', 'Flow_Pkts_per_s')

df.show(10, truncate=False)

+--------+--------+-------------+------------+------------+---------------+---------------+---------------+---------------+----------------+---------------+---------------+---------------+----------------+---------------+-------------+------------+------------+------------+------------+------------+-----------+-----------+-----------+-----------+------------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+-----------+-----------+------------+-----------+-----------+------------+------------+------------+------------+------------+------------+--------------+------------+-----------------+------------+----------------+----------------+------------------+------------------+----------------+------------------+------------------+----------------+----------------+----------------+----------------+----------------+-----------------+-----------------+-----------------+----------------+------

In [273]:
# Check for missing values again
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------+--------+-------------+------------+------------+---------------+---------------+---------------+---------------+----------------+---------------+---------------+---------------+----------------+---------------+-------------+------------+------------+------------+-----------+------------+-----------+-----------+-----------+-----------+------------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+-----------+-----------+------------+-----------+-----------+------------+------------+------------+------------+------------+------------+--------------+------------+-----------------+------------+----------------+----------------+------------------+------------------+----------------+------------------+------------------+----------------+----------------+----------------+----------------+----------------+-----------------+-----------------+-----------------+----------------+-------

In [274]:
from pyspark.sql.functions import when, col

# List of columns to replace Infinity values with NaN
columns = ["Flow_Byts_s_imputed", "Flow_Pkts_per_s"]

# Loop through each column and replace Infinity with NaN
for col_name in columns:
    df = df.withColumn(col_name, when(col(col_name) == float('inf'), float('nan')).otherwise(col(col_name)))


output_cols = ["Flow_Byts_s_nan_imputed", "Flow_Pkts_per_s_nan_imputed"]


#! impute nan values with mean
imputer = Imputer(
    inputCols=columns,
    outputCols=output_cols
).setStrategy("median")

df = imputer.fit(df).transform(df)

df.describe("Flow_Byts_s_nan_imputed").show()

+-------+-----------------+------------------+--------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+-------------------+--------------------+------------------+--------------------+-----------------+-------------------+--------------------+-----------------+-------------------+------------------+-----------------+------------------+------------------+-------------------+-------------+-------------+-------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+-------------------+--------------------+-------------------+-------------------+-------------------+--------------+--------------------+------------------+-----------------+----------

In [275]:
# Now proceed with your VectorAssembler and the rest of the pipeline
from pyspark.ml.feature import VectorAssembler

# Assuming 'feature_columns' is updated to exclude 'Flow_Byts_s' and includes 'Flow_Byts_s_imputed'
feature_columns = [column for column in df.columns if column not in ['Label_binary']]  # Assuming you've already removed 'Flow_Byts_s'

vectorAssembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

data = vectorAssembler.transform(df)

data = data.select('features', 'Label_binary')
data.show(5)


+--------+--------+-------------+------------+------------+---------------+---------------+---------------+---------------+----------------+---------------+---------------+---------------+----------------+---------------+-------------+------------+------------+------------+-----------+------------+-----------+-----------+-----------+-----------+------------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+-----------+-----------+------------+-----------+-----------+------------+------------+------------+------------+------------+------------+--------------+------------+-----------------+------------+----------------+----------------+------------------+------------------+----------------+------------------+------------------+----------------+----------------+----------------+----------------+----------------+-----------------+-----------------+-----------------+----------------+-------

In [276]:
# Check again for any null or NaN values that might have been introduced
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()


+--------+--------+-------------+------------+------------+---------------+---------------+---------------+---------------+----------------+---------------+---------------+---------------+----------------+---------------+-------------+------------+------------+------------+-----------+------------+-----------+-----------+-----------+-----------+------------+-----------+-----------+-----------+-------------+-------------+-------------+-------------+--------------+--------------+--------------+--------------+-----------+-----------+------------+-----------+-----------+------------+------------+------------+------------+------------+------------+--------------+------------+-----------------+------------+----------------+----------------+------------------+------------------+----------------+------------------+------------------+----------------+----------------+----------------+----------------+----------------+-----------------+-----------------+-----------------+----------------+-------

In [277]:
# Sparse Vectors
data.show(10, truncate = False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|features                                                                                                                                                                                                                                                                                                                          

In [278]:
# Extract the values from Sparse Vectors to the list format
data_select = data.select('features').limit(2).collect()
for val in data_select:
    dense_vector = val[0].toArray()
    print(dense_vector)

[0.00000000e+00 0.00000000e+00 1.12641720e+08 3.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 5.63208600e+07
 1.39300034e+02 5.63209600e+07 5.63207600e+07 1.12641720e+08
 5.63208600e+07 1.39300034e+02 5.63209600e+07 5.63207600e+07
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 2.66331155e-02
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 3.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.000000

### StandardScaler

In [279]:
# Normalizing feature vectors
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
scaler_model = scaler.fit(data)
data = scaler_model.transform(data)

data = data.select("scaledFeatures", "Label_binary")
data.show(3, truncate = False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|scaledFeatures                                                                                                                                                                                                                                                                                                                                                                                                              |Label_binary|
+-----------------------------------------------------------------------------------------------------------------------------------------------

In [280]:
# Split the data into training and testing sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed = 42)

train_data.printSchema()

root
 |-- scaledFeatures: vector (nullable = true)
 |-- Label_binary: integer (nullable = false)



In [281]:
train_data.show(2, truncate = False)


+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|scaledFeatures                                                                                                                                                  

In [282]:
# ! Double check for any Negative, Infinity or NaN values

# Iterate through each column in the DataFrame
for column in df.columns:
    # Filter rows where the column contains problematic values
    problematic_rows = df.filter(
        (col(column).isNull()) |   # Check for null values
        (isnan(col(column))) |     # Check for NaN values
        (col(column) < 0)          # Check for negative values
    )
    
    # If there are any rows containing problematic values, print the column name
    if problematic_rows.count() > 0:
        print(f"Column '{column}' has {problematic_rows.count()} rows with problematic values.")



---
# **Task 2 - Model Selection and Implementation (25 marks)**
---


In [283]:
##1st student name: Philip Acquaye-Mensah
# add the code here


2nd student name: Mohamed Jareer Mohamed Zeenam

## Apply Naive Bayes model

In [284]:
from pyspark.ml.classification import NaiveBayes

# Initialize Naive Bayes model
nb = NaiveBayes(featuresCol='scaledFeatures', labelCol='Label_binary', smoothing=1.0)  

# Fit the model to the training data
nb_model = nb.fit(train_data)

# Make predictions on the test data
nb_predictions = nb_model.transform(test_data)

In [285]:
nb_predictions.select('Label_binary', 'prediction').distinct().show(25)


+------------+----------+
|Label_binary|prediction|
+------------+----------+
|           0|       0.0|
|           1|       1.0|
|           1|       0.0|
|           0|       1.0|
+------------+----------+



#### Confusion Matrix

In [286]:
cm = nb_predictions.groupBy('Label_binary', 'prediction').count()
cm.show(cm.count(), truncate=False)

+------------+----------+------+
|Label_binary|prediction|count |
+------------+----------+------+
|0           |0.0       |177092|
|1           |1.0       |114141|
|1           |0.0       |4     |
|0           |1.0       |22660 |
+------------+----------+------+



In [287]:
#  Check the Missclassification rate
confusion_matrix = cm.groupBy('Label_binary').pivot('prediction').sum('count').na.fill(0)
confusion_matrix.show()

+------------+------+------+
|Label_binary|   0.0|   1.0|
+------------+------+------+
|           1|     4|114141|
|           0|177092| 22660|
+------------+------+------+



---
# **Task 3 - Model Parameter Tuning (20 marks)**
---


In [288]:
##1st student name: Philip Acquaye-Mensah
# add the code here


#### 2nd student name: Mohamed Jareer Mohamed Zeenam

##### Random Search results

In [289]:
import random
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Define the evaluator for binary classification
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Label_binary', metricName='areaUnderROC')

# Perform random search manually
best_auc_roc = 0.0
best_params = None

for smoothing in random.sample([0.1, 0.15, 0.2, 0.27, 0.3, 0.33, 0.4, 0.48, 0.5, 0.6, 0.7, 0.72, 0.76, 0.8, 0.9, 1], 5):
    nb = NaiveBayes(featuresCol='scaledFeatures', labelCol='Label_binary', smoothing=smoothing)
    model = nb.fit(train_data)

    # Make predictions
    predictions = model.transform(test_data)

    # Evaluate
    accuracy = evaluator.evaluate(predictions)

    # Check if the current model is the best so far
    if accuracy > best_auc_roc:
        best_auc_roc = accuracy
        best_params = smoothing




---
# **Task 4 - Model Evaluation and Accuracy Calculation (20 marks)**
---

In [290]:
##1st student name: Philip Acquaye-Mensah
# add the code here


---
# **Task 5 - Results Visualization or Printing (5 marks)**
---

In [291]:
##1st student name: Philip Acquaye-Mensah
# add the code here


In [293]:
##2nd student name: Mohamed Jareer MOHAMED  ZEENAM
# add the code here

# Print the results
print("Best smoothing value: ", best_params)
print("Area under ROC curve:", round(best_auc_roc, 5) * 100)
print("Accuracy:", round(accuracy, 5) * 100)

Best smoothing value:  0.33
Area under ROC curve: 94.33
Accuracy: 94.328


---
# **Task 6 - LSEP Considerations (5 marks)**
---

# Student 1: **Type the chosen issue**

add contribution here ...

# Student 2: **Type the chosen issue**

add contribution here ...

---

# **Task 7 - Convert ipynb to HTML for Turnitin submission [5 marks]**

---



In [None]:
# install nbconvert (if facing the conversion error)
#!pip3 install nbconvert



In [None]:
# convert ipynb to html and submit this HTML file
#!jupyter nbconvert --to html Your_Group_ID_CRWK_CN7030.ipynb

This application is used to convert notebook files (*.ipynb)
        to various other formats.


Options
The options below are convenience aliases to configurable class-options,
as listed in the "Equivalent to" description-line of the aliases.
To see all configurable class-options for some <cmd>, use:
    <cmd> --help-all

--debug
    set log level to logging.DEBUG (maximize logging output)
    Equivalent to: [--Application.log_level=10]
--show-config
    Show the application's configuration (human-readable format)
    Equivalent to: [--Application.show_config=True]
--show-config-json
    Show the application's configuration (json format)
    Equivalent to: [--Application.show_config_json=True]
--generate-config
    generate default config file
    Equivalent to: [--JupyterApp.generate_config=True]
-y
    Answer yes to any questions instead of prompting.
    Equivalent to: [--JupyterApp.answer_yes=True]
--execute
    Execute the notebook prior to export.
    Equivalent to: [--ExecutePr