In [1]:
# Importing necessary libraries
import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import lit

appName = "Project_2"
master = "local"

# Create Configuration object for Spark.
conf = pyspark.SparkConf()\
    .set('spark.driver.host','127.0.0.1')\
    .setAppName(appName)\
    .setMaster(master)

# Create Spark Context with the new configurations rather than relying on the default one
sc = SparkContext.getOrCreate(conf=conf)

# You need to create SQL Context to conduct some database operations like what we will see later.
sqlContext = SQLContext(sc)

# If you have SQL context, you create the session from the Spark Context
spark = sqlContext.sparkSession.builder.getOrCreate()



In [2]:
# Load data from csv to a dataframe on a local machine. 
# header=False means the first row is not a header 
# sep=',' means the column are seperated using ','


df_train = spark.read.csv("train70_reduced.csv",header=True, inferSchema= True)
df_test = spark.read.csv("test30_reduced.csv",header=True, inferSchema= True)
df_train = df_train.withColumn("dataset", lit("train"))
df_test = df_test.withColumn("dataset", lit("test"))
df = df_train.union(df_test)


In [3]:
db_properties={}
db_properties['username']="postgres"
db_properties['password']="Natkanvij@22"
db_properties['url']= "jdbc:postgresql://localhost:5432/postgres"

# I have kept the table name as intrusion2
db_properties['table']="MQTT" 
db_properties['driver']="org.postgresql.Driver"

df.write.format("jdbc")\
.mode("overwrite")\
.option("url", db_properties['url'])\
.option("dbtable", db_properties['table'])\
.option("user", db_properties['username'])\
.option("password", db_properties['password'])\
.option("Driver", db_properties['driver'])\
.save()


In [4]:
df_read = sqlContext.read.format("jdbc")\
    .option("url", db_properties['url'])\
    .option("dbtable", db_properties['table'])\
    .option("user", db_properties['username'])\
    .option("password", db_properties['password'])\
    .option("Driver", db_properties['driver'])\
    .load()

df_read.show(1, vertical=True)
df_read.printSchema()

-RECORD 0--------------------------------
 tcp.flags                  | 0x00000018 
 tcp.time_delta             | 0.998867   
 tcp.len                    | 10         
 mqtt.conack.flags          | 0          
 mqtt.conack.flags.reserved | 0.0        
 mqtt.conack.flags.sp       | 0.0        
 mqtt.conack.val            | 0.0        
 mqtt.conflag.cleansess     | 0.0        
 mqtt.conflag.passwd        | 0.0        
 mqtt.conflag.qos           | 0.0        
 mqtt.conflag.reserved      | 0.0        
 mqtt.conflag.retain        | 0.0        
 mqtt.conflag.uname         | 0.0        
 mqtt.conflag.willflag      | 0.0        
 mqtt.conflags              | 0          
 mqtt.dupflag               | 0.0        
 mqtt.hdrflags              | 0x00000030 
 mqtt.kalive                | 0.0        
 mqtt.len                   | 8.0        
 mqtt.msg                   | 32         
 mqtt.msgid                 | 0.0        
 mqtt.msgtype               | 3.0        
 mqtt.proto_len             | 0.0 

Renaming coloumns with . to _

In [5]:
from pyspark.sql.functions import col

def replace_dot_with_underscore(data_df):
    # Replace '.' with '_' in column names
    for col_name in data_df.columns:
        new_col_name = col_name.replace('.', '_')
        data_df = data_df.withColumnRenamed(col_name, new_col_name)
    return data_df
renamed_columns_df = replace_dot_with_underscore(df_read)

In [6]:
renamed_columns_df.printSchema()

root
 |-- tcp_flags: string (nullable = true)
 |-- tcp_time_delta: double (nullable = true)
 |-- tcp_len: integer (nullable = true)
 |-- mqtt_conack_flags: string (nullable = true)
 |-- mqtt_conack_flags_reserved: double (nullable = true)
 |-- mqtt_conack_flags_sp: double (nullable = true)
 |-- mqtt_conack_val: double (nullable = true)
 |-- mqtt_conflag_cleansess: double (nullable = true)
 |-- mqtt_conflag_passwd: double (nullable = true)
 |-- mqtt_conflag_qos: double (nullable = true)
 |-- mqtt_conflag_reserved: double (nullable = true)
 |-- mqtt_conflag_retain: double (nullable = true)
 |-- mqtt_conflag_uname: double (nullable = true)
 |-- mqtt_conflag_willflag: double (nullable = true)
 |-- mqtt_conflags: string (nullable = true)
 |-- mqtt_dupflag: double (nullable = true)
 |-- mqtt_hdrflags: string (nullable = true)
 |-- mqtt_kalive: double (nullable = true)
 |-- mqtt_len: double (nullable = true)
 |-- mqtt_msg: string (nullable = true)
 |-- mqtt_msgid: double (nullable = true)
 |-

In [7]:
from pyspark.sql.functions import avg


# Filter the DataFrame to select only rows where 'dataset' is 'train'
train_df = renamed_columns_df.filter(renamed_columns_df['dataset'] == 'train')

# Calculate the average of the 'len' column for the 'train' dataset
average_length = train_df.agg(avg('mqtt_len')).collect()[0][0]

# Print the average length
print("Average length for 'train' dataset:", average_length)

Average length for 'train' dataset: 31.435725201384873


In [8]:
average_lengths = renamed_columns_df.groupBy('target').agg(avg('tcp_len').alias('average_tcp_length'))
df = renamed_columns_df
average_lengths.show()


+----------+------------------+
|    target|average_tcp_length|
+----------+------------------+
|   slowite|3.9993479678330797|
|bruteforce|3.9871043376318873|
|     flood|13313.415986949429|
| malformed| 20.97491761259612|
|       dos|312.65759830457716|
|legitimate| 7.776101001432345|
+----------+------------------+



In [9]:
from pyspark.sql.functions import dense_rank, desc
from pyspark.sql.window import Window


def get_X_most_frequent_tcp_flags(x):
    distinct_flags = df.select('tcp_flags').distinct().count()
    if x > distinct_flags:
        print(f"There are only {distinct_flags} distinct TCP flags in this dataset. Showing all {distinct_flags} TCP flags ranked with dense ranking.")
        x = distinct_flags
    window_spec = Window.partitionBy().orderBy(desc('count'))
    ranked_df = df.groupBy('tcp_flags').count().orderBy(desc('count')).withColumn("dense_rank", dense_rank().over(window_spec))
    result = ranked_df.filter(col("dense_rank") <= x)
    result.show()

In [10]:
get_X_most_frequent_tcp_flags(4)

+----------+------+----------+
| tcp_flags| count|dense_rank|
+----------+------+----------+
|0x00000018|183076|         1|
|0x00000010|134547|         2|
|0x00000011|  4198|         3|
|0x00000002|  3372|         4|
|0x00000012|  3372|         4|
+----------+------+----------+



In [11]:
from pyspark.sql.functions import when

renamed_columns_df = renamed_columns_df.withColumn("target", when(renamed_columns_df["target"] == "dos", "denial-of-service").otherwise(renamed_columns_df["target"]))
renamed_columns_df = renamed_columns_df.withColumn("target", when(renamed_columns_df["target"] == "bruteforce", "brute-force").otherwise(renamed_columns_df["target"]))



In [12]:
unique_target_types = renamed_columns_df.select('target').distinct()

# Collect the unique values and convert them to a list
unique_target_types_list = [row.target for row in unique_target_types.collect()]

# Print the unique protocol types
print("Unique Targets we have from the dataset:")
for target in unique_target_types_list:
    print(target)




Unique Targets we have from the dataset:
slowite
flood
brute-force
malformed
denial-of-service
legitimate


In [13]:
from confluent_kafka import Producer
import socket
#Initialize Your Parameters here - Keep the variable values as is for the ones you can't find on the Confluent-Kafka connection 
KAFKA_CONFIG = {
    "bootstrap.servers":"pkc-lzvrd.us-west4.gcp.confluent.cloud:9092",
    "security.protocol":"SASL_SSL",
    "sasl.mechanisms":"PLAIN",
    "sasl.username":"LKGBAJ3FAQY7XT3O",
    "sasl.password":"FdsLkF9Cec8qkthIz9EAny8whhp9dZ9Wa0/YuBQZa2JjEsF/61KnaGkkpK7VW9fk",
    "session.timeout.ms":"45000",
    "group.id":"python-group-1",
    'auto.offset.reset': 'smallest',
    'client.id': socket.gethostname()
}

# Update your topic name
topic_name = "topic_0"
producer = Producer(KAFKA_CONFIG)

In [None]:
import feedparser
import time

# We are searching for Analytics in the news
feed_url = "https://news.google.com/rss/search?q=popular+cyber+attacks"
def extract_news_feed(feed_url, runtime_minutes):
    feed = feedparser.parse(feed_url)
    articles = []
    extracted_articles = set()
    start_time = time.time()
    end_time = start_time + (runtime_minutes * 60)  # Convert minutes to seconds
    while time.time() < end_time:
        for entry in feed.entries:
            link = entry.link
            title = entry.title.encode('ascii', 'ignore').decode()
            unique_id = f'{link}-{title}'
            if unique_id in extracted_articles:
                continue
            extracted_articles.add(unique_id)
            article_data = {"title": title, "link": link}
            if article_data is not None:
                producer.produce(topic_name, key=article_data["title"], value=article_data["link"])
        producer.flush()

extract_news_feed(feed_url, runtime_minutes=5)

In [None]:
from confluent_kafka import Consumer
from pyspark.sql.types import *
import string


# Clean the punctation by making a translation table that maps punctations to empty strings
translator = str.maketrans("", "", string.punctuation.replace('-', ''))


emp_RDD = spark.sparkContext.emptyRDD()
# Defining the schema of the DataFrame
columns = StructType([StructField('key', StringType(), False),
                      StructField('value', StringType(), False)])

# Creating an empty DataFrame
df = spark.createDataFrame(data=emp_RDD,
                                   schema=columns)
 
# Printing the DataFrame with no data
df.show()

consumer = Consumer(KAFKA_CONFIG)
consumer.subscribe([topic_name])

try:
    i = 0
    while i < 5:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            i = i + 1
            print("Waiting...")
            continue
        if msg is not None:
            key = msg.key().decode('utf-8').lower().translate(translator)
            #print(key)
            cleaned_key = " ".join(key.split())
            value = msg.value().decode('utf-8')
            added_row = [[cleaned_key,value]]
            added_df = spark.createDataFrame(added_row, columns)
            df = df.union(added_df)

except KeyboardInterrupt:
    pass
finally:
    consumer.close()
    df.show()


In [None]:
from pyspark.sql.functions import *
# import nltk 
# nltk.download('stopwords')

# stop_words = nltk.corpus.stopwords.words('english')
streamed_data = df.withColumn('word', explode(split(col('key'), ' '))) \
                .filter(col('word').isin(unique_target_types_list)) \
                .groupBy('word') \
                .count() \
                .sort('count', ascending=False)
    
    
    
streamed_data.show()

renaming the target values to the original type

In [None]:
from pyspark.sql.functions import when

renamed_columns_df = renamed_columns_df.withColumn("target", when(renamed_columns_df["target"] == "denial-of-service", "dos").otherwise(renamed_columns_df["target"]))
renamed_columns_df = renamed_columns_df.withColumn("target", when(renamed_columns_df["target"] == "brute-force", "bruteforce").otherwise(renamed_columns_df["target"]))



Task- III Machine Learning Modeling

Feature Engineering

*Check for Null and NA values in the coloumns*


In [None]:
from pyspark.sql.functions import *

null_df = renamed_columns_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) \
                        for c in renamed_columns_df.columns])

null_df.show(truncate=False, vertical=True)

There are no null/NA values in the coloumns of our dataset. Thus, we do not need to do imputation for null/Na values.

*Checking duplicate rows*

In [None]:
row_count = renamed_columns_df.count()
print(f"Current number of rows:{row_count}")

df = renamed_columns_df.dropDuplicates()
row_count = df.count()
print(f"After eliminating duplicates, current number of rows:{row_count}")

*Classifying our variables*

To classify our variables lets first check the unique values in the coloumns.

In [None]:
for column in renamed_columns_df.columns:
    unique_values = df.select(column).distinct()
    print(f"Column: {column}")
    unique_values.show(truncate=False)
    print("\n")


We can see that there are coloumns with only one unique value. Let's drop these coloumns.

In [None]:
df = df.drop("mqtt_conack_flags_reserved","mqtt_conack_flags_sp","mqtt_conflag_qos","mqtt_conflag_reserved",
             "mqtt_conflag_retain","mqtt_conflag_willflag","mqtt_sub_qos","mqtt_suback_qos","mqtt_willmsg",
             "mqtt_willmsg_len","mqtt_willtopic","mqtt_willtopic_len")
df.printSchema()

Now, there are binary coloumns which has values as strings, values other than 0 and 1. Therefore, lets encode these coloumns with binary values 0 and 1. Also we will cast these coloumns to datatype double.

In [None]:
from pyspark.sql.functions import col, when

# Note when a coloumn is encoded we will drop the original coloumn.
conversion_map1 = {"0": 0, "0x00000000": 1}
conversion_map2 = {"train": 0, "test": 1}
conversion_map3 = {"0.0": 0, "5.0": 1}
conversion_map4 = {"0.0": 0, "4.0": 1}
conversion_map5 = {"MQTT": 0, "0": 1}

# Note in the below code "mqtt_conack_flags"] == "0" and df["dataset"] == "train" is used as its dataype is a string
df_binary = df.withColumn("mqtt_conack_flags_encoded_binary", when(df["mqtt_conack_flags"] == "0", conversion_map1["0"]).otherwise(conversion_map1["0x00000000"])).drop('mqtt_conack_flags')
df_binary = df_binary.withColumn("dataset_encoded_binary", when(df["dataset"] == "train", conversion_map2["train"]).otherwise(conversion_map2["test"])).drop('dataset')

# Note in the below code df["mqtt_conack_val"] == 0.0 and df["mqtt_proto_len"] == 0.0 is used as its dataype is a double
df_binary = df_binary.withColumn("mqtt_conack_val_encoded_binary", when(df["mqtt_conack_val"] == 0.0, conversion_map3["0.0"]).otherwise(conversion_map3["5.0"])).drop('mqtt_conack_val')
df_binary = df_binary.withColumn("mqtt_proto_len_encoded_binary", when(df["mqtt_proto_len"] == 0.0, conversion_map4["0.0"]).otherwise(conversion_map4["4.0"])).drop('mqtt_proto_len')

# Note in the below code ["mqtt_protoname"] == MQTT is used as its dataype is a string
df_binary = df_binary.withColumn("mqtt_protoname_encoded_binary", when(df["mqtt_protoname"] == 'MQTT', conversion_map5["MQTT"]).otherwise(conversion_map5["0"])).drop('mqtt_protoname')

# Casting string datatypes
df_binary = df_binary.withColumn('mqtt_conack_flags_encoded_binary', col('mqtt_conack_flags_encoded_binary').cast('double'))
df_binary = df_binary.withColumn('dataset_encoded_binary', col('dataset_encoded_binary').cast('double'))
df_binary = df_binary.withColumn('mqtt_protoname_encoded_binary', col('mqtt_protoname_encoded_binary').cast('double'))


df_binary.printSchema()
df = df_binary


Lets investigate more on the coloumn mqtt_msg

In [None]:
unique_values_count = df.select(countDistinct(col('mqtt_msg')).alias('unique_values_count')).collect()[0]['unique_values_count']

print(f"Number of unique values in 'your_column': {unique_values_count}")

As we can see this coloumn has roughly half unique values with very large individual dataset numeric value, this coloumn will cause the pipeline to fail. hence dropping this coloumn. 

In [None]:
df = df.drop("mqtt_msg")
df.printSchema()

num_columns = len(df.columns)

# Print the number of columns
print("Number of columns:", num_columns)

Now, lets classify our variables

In [None]:
nominal_cols = ['tcp_flags','mqtt_conflags','mqtt_hdrflags']
continuous_cols = ['tcp_time_delta','tcp_len','mqtt_len','mqtt_msgid','mqtt_kalive','mqtt_msgtype' ]
binary_cols = ['mqtt_protoname_encoded_binary','mqtt_proto_len_encoded_binary','mqtt_conack_val_encoded_binary','dataset_encoded_binary','mqtt_conack_flags_encoded_binary','mqtt_conflag_cleansess','mqtt_conflag_passwd','mqtt_conflag_uname','mqtt_dupflag', 'mqtt_qos','mqtt_retain','mqtt_ver']

Summary table for our dataset

In [None]:
df.summary().show(truncate=False, vertical=True)

Boxplots

In [None]:
numeric_features = [feature[0] for feature in df.dtypes if feature[1] in ('int','double')]

import matplotlib.pyplot as plt

#Extract data and convert them into Pandas for visualization
converted_data = df[numeric_features].toPandas()

figure = plt.boxplot(converted_data)


Lets see outliers and check if we need to handle them.

In [None]:
from functools import reduce

def column_add(a,b):
     return  a.__add__(b)
    
def find_outliers(df):
    # Identifying the numerical columns in a spark dataframe
    numeric_columns = [column[0] for column in df.dtypes if column[1]=='int']

    # Using the `for` loop to create new columns by identifying the outliers for each feature
    for column in numeric_columns:

        less_Q1 = 'less_Q1_{}'.format(column)
        more_Q3 = 'more_Q3_{}'.format(column)
        Q1 = 'Q1_{}'.format(column)
        Q3 = 'Q3_{}'.format(column)

        # Q1 : First Quartile ., Q3 : Third Quartile
        Q1 = df.approxQuantile(column,[0.25],relativeError=0)
        Q3 = df.approxQuantile(column,[0.75],relativeError=0)
        
        # IQR : Inter Quantile Range
        # We need to define the index [0], as Q1 & Q3 are a set of lists., to perform a mathematical operation
        # Q1 & Q3 are defined seperately so as to have a clear indication on First Quantile & 3rd Quantile
        IQR = Q3[0] - Q1[0]
        
        #selecting the data, with -1.5*IQR to + 1.5*IQR., where param = 1.5 default value
        less_Q1 =  Q1[0] - 1.5*IQR
        more_Q3 =  Q3[0] + 1.5*IQR
        
        isOutlierCol = 'is_outlier_{}'.format(column)
        
        df = df.withColumn(isOutlierCol,when((df[column] > more_Q3) | (df[column] < less_Q1), 1).otherwise(0))
    

    # Selecting the specific columns which we have added above, to check if there are any outliers
    selected_columns = [column for column in df.columns if column.startswith("is_outlier")]
    # Adding all the outlier columns into a new colum "total_outliers", to see the total number of outliers
    df = df.withColumn('total_outliers',reduce(column_add, ( df[col] for col in  selected_columns)))

    # Dropping the extra columns created above, just to create nice dataframe., without extra columns
    df = df.drop(*[column for column in df.columns if column.startswith("is_outlier")])

    return df

df_with_outlier_handling = find_outliers(df)
df_with_outlier_handling.show(1, vertical=True)

df_with_outlier_handling.groupby("total_outliers").count().show()

*As there are no rows with more than 3 outliers, we will not drop any rows on the basis of outliers.*

Correlation Matrix

In [None]:
numeric_columns = [col for col, dtype in df.dtypes if dtype in ['double', 'int']]
numeric_df = df.select(numeric_columns)

# Calculate the correlation matrix
correlation_matrix = numeric_df.toPandas().corr()

# Print the correlation matrix
print(correlation_matrix)

List of correlated columns that needs to be removed.

In [None]:
correlated_col = ['mqtt_proto_len_encoded_binary', 'mqtt_protoname_encoded_binary','mqtt_conflag_uname','mqtt_qos','mqtt_len','mqtt_ver']

Now, let's handle further feature engineering steps including removing correlated columns, one hot encoding, vectorizing the features and outcomes in our pipeline transformer setup.

In [None]:
from pyspark.sql.functions import col, when
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.feature import Imputer,StandardScaler,StringIndexer,OneHotEncoder, VectorAssembler

from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np


normal = ['normal']
slowite = ['slowite']
brute_force = ['bruteforce']
flood = ['flood']
malformed = ['malformed']
dos = ['dos']
legitimate = ['legitimate']

class OutcomeCreater(Transformer): # this defines a transformer that creates the outcome column
    
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
#         label_to_binary = udf(lambda name: 0.0 if name == 'normal' else 1.0 if namdose in slowite else 2.0 if name in brute_force else 3.0 if name in flood else 4.0 if name in malformed else 5.0 if name in denial_of_service else 6.0 if name in legitimate else 7.0)  # Unknown category or any other category not listed
#         output_df = dataset.withColumn('outcome', label_to_binary(col('dataset'))).drop("dataset")  
#         output_df = output_df.withColumn('outcome', col('outcome').cast(DoubleType()))
        label_to_binary = udf(lambda name: 0.0 if name == 'normal' else 1.0 if name == 'slowite' else 2.0 if name == 'bruteforce' else 3.0 if name == 'flood' else 4.0 if name == 'malformed' else 5.0 if name == 'dos' else 6.0 if name == 'legitimate' else -1.0)  # Unknown category or any other category not listed
        output_df = dataset.withColumn('outcome', label_to_binary(col('target'))).drop("target")
        output_df = output_df.withColumn('outcome', col('outcome').cast(DoubleType()))
#         output_df = output_df.drop('dataset')
        output_df = output_df.drop('mqtt_msg')
        return output_df

class FeatureTypeCaster(Transformer): # this transformer will cast the columns as appropriate types  
    def __init__(self):
        super().__init__()

    def _transform(self, dataset):
        output_df = dataset
        for col_name in binary_cols + continuous_cols:
            output_df = output_df.withColumn(col_name,col(col_name).cast(DoubleType()))

        return output_df
class ColumnDropper(Transformer): # this transformer drops unnecessary columns
    def __init__(self, columns_to_drop = None):
        super().__init__()
        self.columns_to_drop=columns_to_drop
    def _transform(self, dataset):
        output_df = dataset
        for col_name in self.columns_to_drop:
            output_df = output_df.drop(col_name)
        return output_df

def get_preprocess_pipeline():
    # Stage where columns are casted as appropriate types
#     columns_to_impute = ["tcp_time_delta","tcp_len","mqtt_conack_val","mqtt_conflag_cleansess","mqtt_conflag_passwd","mqtt_conflag_uname","mqtt_dupflag","mqtt_kalive","mqtt_len","mqtt_msgid","mqtt_msgtype","mqtt_proto_len","mqtt_qos","mqtt_retain","mqtt_ver"]
#     imputer1 = Imputer(strategy="median", inputCols=columns_to_impute, outputCols=columns_to_impute)
    stage_typecaster = FeatureTypeCaster()

    # Stage where nominal columns are transformed to index columns using StringIndexer
    nominal_id_cols = [x+"_index" for x in nominal_cols]
    nominal_onehot_cols = [x+"_encoded" for x in nominal_cols]
    stage_nominal_indexer = StringIndexer(inputCols = nominal_cols, outputCols = nominal_id_cols )

    # Stage where the index columns are further transformed using OneHotEncoder
    stage_nominal_onehot_encoder = OneHotEncoder(inputCols=nominal_id_cols, outputCols=nominal_onehot_cols)

    # Stage where all relevant features are assembled into a vector (and dropping a few)
    feature_cols = continuous_cols+binary_cols+nominal_onehot_cols
    corelated_cols_to_remove = []
    for col_name in corelated_cols_to_remove:
        feature_cols.remove(col_name)
    stage_vector_assembler = VectorAssembler(inputCols=feature_cols, outputCol="vectorized_features", handleInvalid="keep")

    # Stage where we scale the columns
    stage_scaler = StandardScaler(inputCol= 'vectorized_features', outputCol= 'features')

    # Stage for creating the outcome column representing whether there is attack 
    stage_outcome = OutcomeCreater()

    # Removing all unnecessary columbs, only keeping the 'features' and 'outcome' columns
    stage_column_dropper = ColumnDropper(columns_to_drop = nominal_cols+nominal_id_cols+
        nominal_onehot_cols+ binary_cols + continuous_cols + ['vectorized_features'])
    # Connect the columns into a pipeline
    pipeline = Pipeline(stages=[stage_typecaster,stage_nominal_indexer,stage_nominal_onehot_encoder,
        stage_vector_assembler,stage_scaler,stage_outcome,stage_column_dropper])
    return pipeline 

In [None]:
from pyspark.sql.functions import col, when
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("SystemsToolChains") \
    .getOrCreate()

Load the training and test dataframe using the pipeline

In [None]:
column_names = df.columns
split_column = 'dataset_encoded_binary'

train_set = df.filter(col(split_column) == '0').toDF(*column_names)
test_set = df.filter(col(split_column) == '1').toDF(*column_names)


Creating a preprocess pipeline for train and test dataset

In [None]:
preprocess_pipeline = get_preprocess_pipeline()
preprocess_pipeline_model = preprocess_pipeline.fit(train_set)

In [None]:
pipeline_df = preprocess_pipeline_model.transform(train_set)
pipeline_df_test = preprocess_pipeline_model.transform(test_set)

In [None]:
pipeline_df.printSchema()
pipeline_df.show()

Using logistic regression as the first classification model.

First we are fitting the model

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', labelCol = 'outcome')

lrModel = lr.fit(pipeline_df) # fit the logistic regression model to the training dataset


Creating predicitions based on the anove model

In [None]:
predictions = lrModel.transform(pipeline_df_test)

In [None]:
predictions.printSchema()

In [None]:
predictions.select("rawPrediction","probability","prediction","outcome").toPandas().head()

Test and Train accuracy

In [None]:
predictions_train = lrModel.transform(pipeline_df)# predictions using the training dataset
accuracy_train = (predictions_train.filter(predictions_train.outcome == predictions_train.prediction)
    .count() / float(predictions_train.count()))

accuracy_test = (predictions.filter(predictions.outcome == predictions.prediction)
    .count() / float(predictions.count()))
print(f"Train Accuracy : {np.round(accuracy_train*100,2)}%")
print(f"Test Accuracy : {np.round(accuracy_test*100,2)}%")



Confusion Matrix

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix

import itertools
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
class_names=[0.0,1.0,2.0,3.0,4.0,5.0,6.0]
class_names_str=["normal","slowite", "bruteforce", "flood", "malformed","dos","legitimate"]

outcome_true = predictions.select("outcome")
outcome_true = outcome_true.toPandas()

pred = predictions.select("prediction")
pred = pred.toPandas()

cnf_matrix = confusion_matrix(outcome_true, pred,labels=class_names)
#cnf_matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=class_names_str,
                      title='Confusion matrix')
plt.show()

AUC

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='outcome', metricName='f1')
print("Area under the curve/Accuracy is: ", evaluator.evaluate(predictions))

Cross-validation

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

lr = LogisticRegression(featuresCol = 'features', labelCol = 'outcome')

# Create ParamGrid for Cross Validation
lr_paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.0001, 0.001, 0.1])
             .addGrid(lr.maxIter, [10, 100, 1000])
             .build())

evaluator = MulticlassClassificationEvaluator(predictionCol='prediction', labelCol='outcome', metricName='f1')

# Create a CrossValidator for multi-class classification
lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=lr_paramGrid, evaluator=evaluator, numFolds=5)

# Fit the CrossValidator to your data
cv_model = lr_cv.fit(pipeline_df)

# Make predictions on your test data
predictions = cv_model.transform(pipeline_df_test)

# Evaluate the model's performance using the F1 score or other relevant metrics
f1_score = evaluator.evaluate(predictions)

print(predictions)

In [None]:
print(f"Area under the curve/Accuracy is: {f1_score}")


Random Forrest Classifier is our second classification model

In [None]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'outcome')
rf_model = rf.fit(pipeline_df)

In [None]:
predictions = lrModel.transform(pipeline_df_test)


In [None]:
predictions.printSchema()

In [None]:
predictions.select("rawPrediction","probability","prediction","outcome").toPandas().head()

Test and Train accuracy

In [None]:
rf_prediction_train = rf_model.transform(pipeline_df)
rf_prediction_test = rf_model.transform(pipeline_df_test)

rf_accuracy_train = (rf_prediction_train.filter(rf_prediction_train.outcome == rf_prediction_train.prediction)
    .count()/ float(rf_prediction_train.count()))
rf_accuracy_test = (rf_prediction_test.filter(rf_prediction_test.outcome == rf_prediction_test.prediction)
    .count() / float(rf_prediction_test.count()))

rf_auc = evaluator.evaluate(rf_prediction_test)

print(f"Train accuracy = {np.round(rf_accuracy_train*100,2)}%, test accuracy = {np.round(rf_accuracy_test*100,2)}%")

Confusion Matrix

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix

import itertools
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
class_names=[0.0,1.0,2.0,3.0,4.0,5.0,6.0]
class_names_str=["normal","slowite", "bruteforce", "flood", "malformed","dos","legitimate"]

outcome_true = predictions.select("outcome")
outcome_true = outcome_true.toPandas()

pred = predictions.select("prediction")
pred = pred.toPandas()

cnf_matrix = confusion_matrix(outcome_true, pred,labels=class_names)
#cnf_matrix
plt.figure()
plot_confusion_matrix(cnf_matrix, classes=class_names_str,
                      title='Confusion matrix')
plt.show()

Cross validation

In [None]:
rf_paramGrid = (ParamGridBuilder()
             .addGrid(rf.maxDepth, [5, 10, 15])# maximum depth for each tree
             .addGrid(rf.numTrees,[10, 20, 40])# number of trues
             .build())

rf_cv = CrossValidator(estimator=rf, estimatorParamMaps=rf_paramGrid, 
                    evaluator=evaluator, numFolds=5)

rf_cv_model = rf_cv.fit(pipeline_df)

rf_cv_prediction_test = rf_cv_model.transform(pipeline_df_test)
rf_cv_auc = evaluator.evaluate(rf_cv_prediction_test)

In [None]:
print(f"Before cross-validation and parameter tuning, AUC/accuracy={np.round(rf_auc,2)}")
print(f"After cross-validation and parameter tuning, AUC/accuracy={np.round(rf_cv_auc,2)}")

Pytorch ML modelling

Creating the pipeline and splitting our dataset to validation_data and test_data

In [None]:
import pyspark
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Imputer, StandardScaler, StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import *
from pyspark.sql.types import *
import numpy as np
import pandas as pd
import torch 
from torch.utils.data import Dataset, DataLoader, TensorDataset

# Convert Spark DataFrames to Pandas DataFrames
nslkdd_pd = pipeline_df.toPandas()
nslkdd_test_pd = pipeline_df_test.toPandas()

# Split the data into training, validation, and testing sets
# 50% of KDDTest+ for validation and the remaining 50% for testing
split_ratio = 0.5
split_index = int(len(nslkdd_test_pd) * split_ratio)

validation_data = nslkdd_test_pd[:split_index]
test_data = nslkdd_test_pd[split_index:]

Creating tensors of train, test and validate datasets.

In [None]:
x_train = torch.from_numpy(np.array(nslkdd_pd['features'].values.tolist(),np.float32))
y_train = torch.from_numpy(np.array(nslkdd_pd['outcome'].values.tolist(),np.int64))
x_validate = torch.from_numpy(np.array(validation_data['features'].values.tolist(),np.float32))
y_validate = torch.from_numpy(np.array(validation_data['outcome'].values.tolist(),np.int64))
x_test = torch.from_numpy(np.array(test_data['features'].values.tolist(),np.float32))
y_test = torch.from_numpy(np.array(test_data['outcome'].values.tolist(),np.int64))

In [None]:
print(x_train.shape)


In [None]:
from torch.utils.data import Dataset, DataLoader, TensorDataset

class MyDataset(Dataset):
    def __init__(self,x,y):
        self.x = x
        self.y = y
    def __len__(self):
        return self.x.shape[0]
    def __getitem__(self,idx):
        return (self.x[idx],self.y[idx])

train_dataset = MyDataset(x_train,y_train)
validate_dataset = MyDataset(x_validate,y_validate)
test_dataset = MyDataset(x_test,y_test)

Deep Neural Network

In [None]:
import torch.nn as nn

class myMultilayerPerceptron(nn.Module):
    def __init__(self,input_dm,output_dm):
        super().__init__()
        self.sequential = nn.Sequential(
            nn.Linear(input_dm,60),
            nn.Tanh(),
            nn.Linear(60,30),
            nn.Tanh(),
            nn.Linear(30,15),
            nn.Tanh(),
            nn.Linear(15,7),
            nn.Tanh(),
            nn.Linear(7,output_dm)
        )
    def forward(self,x):
        y = self.sequential(x)
        return y

Initializing instance of our model

In [None]:
mymodel = myMultilayerPerceptron(x_train.shape[1],7)
print(mymodel)

In [None]:
# hyperparameters
lr = 0.1 
batch_size = 64
N_epochs = 10

loss_fun = nn.CrossEntropyLoss()

train_dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
validate_dataloader = DataLoader(validate_dataset, batch_size = batch_size, shuffle = True)

# Adam Optimizer
optimizer = torch.optim.Adam(mymodel.parameters(),lr = lr)

losses = []
accuracies = []

validate_losses = []
validate_accuracies = []

current_best_accuracy = 0.0

import numpy as np
for epoch in range(N_epochs):
    
    batch_loss = []
    batch_accuracy = []
    
    for x_batch,y_batch in train_dataloader:
        prediction_score = mymodel(x_batch)
        loss = loss_fun(prediction_score,y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        batch_loss.append(loss.detach().numpy())
        prediction_label = torch.argmax(prediction_score.detach(),dim=1).numpy()
        batch_accuracy.append(np.sum(prediction_label == y_batch.numpy())/x_batch.shape[0])
    
    validate_batch_loss = []
    validate_batch_accuracy = []
    
    for x_batch,y_batch in validate_dataloader:
        prediction_score = mymodel(x_batch)
        loss = loss_fun(prediction_score,y_batch)
        validate_batch_loss.append(loss.detach())
        prediction_label = torch.argmax(prediction_score.detach(),dim=1).numpy()
        validate_batch_accuracy.append(np.sum(prediction_label == y_batch.numpy())/x_batch.shape[0])
    
    losses.append(np.mean(np.array(batch_loss)))
    validate_losses.append(np.mean(np.array(validate_batch_loss)))
    
    accuracies.append(np.mean(np.array(batch_accuracy)))
    validate_accuracies.append(np.mean(np.array(validate_batch_accuracy)))
    
    print(f"Epoch={epoch},train_loss={losses[-1]},validate_loss={validate_losses[-1]}")
    print(f"Train_accuracy={np.round(accuracies[-1]*100,2)}%,validate_accuracy={np.round(validate_accuracies[-1]*100,2)}%")
    
    
    
    
    if validate_accuracies[-1]>current_best_accuracy:
        print("Current epoch is best so far, saving model...")
        torch.save(mymodel.state_dict(),'current_best_model')
        current_best_accuracy = validate_accuracies[-1]

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Reshape losses and accuracies arrays to match the total number of train iterations
train_loss_per_iteration = np.array(batch_loss).reshape(-1)

# Create the plots
plt.figure(figsize=(15, 5))

# Plot 1: The train loss per SGD iteration
plt.subplot(1, 3, 1)
plt.plot(train_loss_per_iteration)
plt.xlabel('SGD Iteration')
plt.ylabel('Train Loss')
plt.title('Train Loss per SGD Iteration')

# Plot 2: The train and validate loss across different epochs
plt.subplot(1, 3, 2)
plt.plot(np.arange(1, N_epochs+1), losses, label='Train Loss')
plt.plot(np.arange(1, N_epochs+1), validate_losses, label='Validate Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Train and Validate Loss across Different Epochs')
plt.legend()

# Plot 3: The train and validate metric across different epochs
plt.subplot(1, 3, 3)
plt.plot(np.arange(1, N_epochs+1), accuracies, label='Train Accuracy')
plt.plot(np.arange(1, N_epochs+1), validate_accuracies, label='Validate Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Train and Validate Accuracy across Different Epochs')
plt.legend()

plt.tight_layout

Hypertuning it

In [None]:
# hyperparameters
lr = 0.05 
batch_size = 128
N_epochs = 10

loss_fun = nn.CrossEntropyLoss()

train_dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
validate_dataloader = DataLoader(validate_dataset, batch_size = batch_size, shuffle = True)

# Adam Optimizer
optimizer = torch.optim.Adam(mymodel.parameters(),lr = lr)

losses = []
accuracies = []

validate_losses = []
validate_accuracies = []

current_best_accuracy = 0.0

import numpy as np
for epoch in range(N_epochs):
    
    batch_loss = []
    batch_accuracy = []
    
    for x_batch,y_batch in train_dataloader:
        prediction_score = mymodel(x_batch)
        loss = loss_fun(prediction_score,y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        batch_loss.append(loss.detach().numpy())
        prediction_label = torch.argmax(prediction_score.detach(),dim=1).numpy()
        batch_accuracy.append(np.sum(prediction_label == y_batch.numpy())/x_batch.shape[0])
    
    validate_batch_loss = []
    validate_batch_accuracy = []
    
    for x_batch,y_batch in validate_dataloader:
        prediction_score = mymodel(x_batch)
        loss = loss_fun(prediction_score,y_batch)
        validate_batch_loss.append(loss.detach())
        prediction_label = torch.argmax(prediction_score.detach(),dim=1).numpy()
        validate_batch_accuracy.append(np.sum(prediction_label == y_batch.numpy())/x_batch.shape[0])
    
    losses.append(np.mean(np.array(batch_loss)))
    validate_losses.append(np.mean(np.array(validate_batch_loss)))
    
    accuracies.append(np.mean(np.array(batch_accuracy)))
    validate_accuracies.append(np.mean(np.array(validate_batch_accuracy)))
    
    print(f"Epoch={epoch},train_loss={losses[-1]},validate_loss={validate_losses[-1]}")
    print(f"Train_accuracy={np.round(accuracies[-1]*100,2)}%,validate_accuracy={np.round(validate_accuracies[-1]*100,2)}%")
    
    
    
    
    if validate_accuracies[-1]>current_best_accuracy:
        print("Current epoch is best so far, saving model...")
        torch.save(mymodel.state_dict(),'current_best_model')
        current_best_accuracy = validate_accuracies[-1]

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Reshape losses and accuracies arrays to match the total number of train iterations
train_loss_per_iteration = np.array(batch_loss).reshape(-1)

# Create the plots
plt.figure(figsize=(15, 5))

# Plot 1: The train loss per SGD iteration
plt.subplot(1, 3, 1)
plt.plot(train_loss_per_iteration)
plt.xlabel('SGD Iteration')
plt.ylabel('Train Loss')
plt.title('Train Loss per SGD Iteration')

# Plot 2: The train and validate loss across different epochs
plt.subplot(1, 3, 2)
plt.plot(np.arange(1, N_epochs+1), losses, label='Train Loss')
plt.plot(np.arange(1, N_epochs+1), validate_losses, label='Validate Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Train and Validate Loss across Different Epochs')
plt.legend()

# Plot 3: The train and validate metric across different epochs
plt.subplot(1, 3, 3)
plt.plot(np.arange(1, N_epochs+1), accuracies, label='Train Accuracy')
plt.plot(np.arange(1, N_epochs+1), validate_accuracies, label='Validate Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Train and Validate Accuracy across Different Epochs')
plt.legend()

plt.tight_layout

In [None]:
# Load the best model
mybestmodel = myMultilayerPerceptron(x_train.shape[1],7)
mybestmodel.load_state_dict(torch.load("current_best_model"))

test_dataloader = DataLoader(test_dataset, batch_size = batch_size, shuffle = True)
test_batch_accuracy = []

for x_batch, y_batch in test_dataloader:
    prediction_score = mybestmodel(x_batch)
    prediction_label = torch.argmax(prediction_score.detach(),dim=1).numpy()
    test_batch_accuracy.append(np.sum(prediction_label == y_batch.numpy())/x_batch.shape[0])

test_accuracy = np.mean(np.array(test_batch_accuracy))

print(f"Test accuracy = {np.round(test_accuracy*100,2)}%")

Shallow Model

In [None]:
import torch.nn as nn

class myMultilayerPerceptron(nn.Module):
    def __init__(self,input_dm,output_dm):
        super().__init__()
        self.sequential = nn.Sequential(
            nn.Linear(input_dm,30),
            nn.Tanh(),
            nn.Linear(30,15),
            nn.Tanh(),
            nn.Linear(15,output_dm)
        )
    def forward(self,x):
        y = self.sequential(x)
        return y

In [None]:
mymodel = myMultilayerPerceptron(x_train.shape[1],7)
print(mymodel)

In [None]:
# hyperparameters
lr = 0.05 
batch_size = 64
N_epochs = 10

loss_fun = nn.CrossEntropyLoss()

train_dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
validate_dataloader = DataLoader(validate_dataset, batch_size = batch_size, shuffle = True)

# Adam Optimizer
optimizer = torch.optim.Adam(mymodel.parameters(),lr = lr)

losses = []
accuracies = []

validate_losses = []
validate_accuracies = []

current_best_accuracy = 0.0

import numpy as np
for epoch in range(N_epochs):
    
    batch_loss = []
    batch_accuracy = []
    
    for x_batch,y_batch in train_dataloader:
        prediction_score = mymodel(x_batch)
        loss = loss_fun(prediction_score,y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        batch_loss.append(loss.detach().numpy())
        prediction_label = torch.argmax(prediction_score.detach(),dim=1).numpy()
        batch_accuracy.append(np.sum(prediction_label == y_batch.numpy())/x_batch.shape[0])
    
    validate_batch_loss = []
    validate_batch_accuracy = []
    
    for x_batch,y_batch in validate_dataloader:
        prediction_score = mymodel(x_batch)
        loss = loss_fun(prediction_score,y_batch)
        validate_batch_loss.append(loss.detach())
        prediction_label = torch.argmax(prediction_score.detach(),dim=1).numpy()
        validate_batch_accuracy.append(np.sum(prediction_label == y_batch.numpy())/x_batch.shape[0])
    
    losses.append(np.mean(np.array(batch_loss)))
    validate_losses.append(np.mean(np.array(validate_batch_loss)))
    
    accuracies.append(np.mean(np.array(batch_accuracy)))
    validate_accuracies.append(np.mean(np.array(validate_batch_accuracy)))
    
    print(f"Epoch={epoch},train_loss={losses[-1]},validate_loss={validate_losses[-1]}")
    print(f"Train_accuracy={np.round(accuracies[-1]*100,2)}%,validate_accuracy={np.round(validate_accuracies[-1]*100,2)}%")
    
    
    
    
    if validate_accuracies[-1]>current_best_accuracy:
        print("Current epoch is best so far, saving model...")
        torch.save(mymodel.state_dict(),'current_best_model')
        current_best_accuracy = validate_accuracies[-1]

In [None]:
# hyperparameters
lr = 0.001 
batch_size = 128
N_epochs = 50

loss_fun = nn.CrossEntropyLoss()

train_dataloader = DataLoader(train_dataset, batch_size = batch_size, shuffle = True)
validate_dataloader = DataLoader(validate_dataset, batch_size = batch_size, shuffle = True)

# Adam Optimizer
optimizer = torch.optim.Adam(mymodel.parameters(),lr = lr)

losses = []
accuracies = []

validate_losses = []
validate_accuracies = []

current_best_accuracy = 0.0

import numpy as np
for epoch in range(N_epochs):
    
    batch_loss = []
    batch_accuracy = []
    
    for x_batch,y_batch in train_dataloader:
        prediction_score = mymodel(x_batch)
        loss = loss_fun(prediction_score,y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        batch_loss.append(loss.detach().numpy())
        prediction_label = torch.argmax(prediction_score.detach(),dim=1).numpy()
        batch_accuracy.append(np.sum(prediction_label == y_batch.numpy())/x_batch.shape[0])
    
    validate_batch_loss = []
    validate_batch_accuracy = []
    
    for x_batch,y_batch in validate_dataloader:
        prediction_score = mymodel(x_batch)
        loss = loss_fun(prediction_score,y_batch)
        validate_batch_loss.append(loss.detach())
        prediction_label = torch.argmax(prediction_score.detach(),dim=1).numpy()
        validate_batch_accuracy.append(np.sum(prediction_label == y_batch.numpy())/x_batch.shape[0])
    
    losses.append(np.mean(np.array(batch_loss)))
    validate_losses.append(np.mean(np.array(validate_batch_loss)))
    
    accuracies.append(np.mean(np.array(batch_accuracy)))
    validate_accuracies.append(np.mean(np.array(validate_batch_accuracy)))
    
    print(f"Epoch={epoch},train_loss={losses[-1]},validate_loss={validate_losses[-1]}")
    print(f"Train_accuracy={np.round(accuracies[-1]*100,2)}%,validate_accuracy={np.round(validate_accuracies[-1]*100,2)}%")
    
    
    
    
    if validate_accuracies[-1]>current_best_accuracy:
        print("Current epoch is best so far, saving model...")
        torch.save(mymodel.state_dict(),'current_best_model')
        current_best_accuracy = validate_accuracies[-1]

In [None]:
# Load the best model
mybestmodel = myMultilayerPerceptron(x_train.shape[1],7)
mybestmodel.load_state_dict(torch.load("current_best_model"))

test_dataloader = DataLoader(test_dataset, batch_size = batch_size, shuffle = True)
test_batch_accuracy = []

for x_batch, y_batch in test_dataloader:
    prediction_score = mybestmodel(x_batch)
    prediction_label = torch.argmax(prediction_score.detach(),dim=1).numpy()
    test_batch_accuracy.append(np.sum(prediction_label == y_batch.numpy())/x_batch.shape[0])

test_accuracy = np.mean(np.array(test_batch_accuracy))

print(f"Test accuracy = {np.round(test_accuracy*100,2)}%")