In [None]:
# import libraries
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from pylab import rcParams
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler, VectorAssembler
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from pyspark.sql.functions import col



In [None]:
train_df = pd.read_csv('GA2Datasets/UNSW_NB15_training-set.csv')
test_df = pd.read_csv('GA2Datasets/UNSW_NB15_testing-set.csv')

### Custom pipeline for data pre-processing

In [None]:
class PreProcessPipeline:
    def __init__(self, label_encode = True, process_label = True):
        self.label_encode = label_encode
        self.process_label = process_label
    
    def fit(self):
        return self

    def transform(self, df):
        df = df.drop('id', axis=1)
        df = df.copy()
        if self.label_encode:
            columns = ['proto', 'service', 'state'] 
            for column in columns:
                unique_values = df[column].unique()
                mapping = {value: index for index, value in enumerate(unique_values)}
                df[column] = df[column].map(mapping)

        if self.process_label:
            def label_transformer(category):
                if category == 'Normal':
                    return 0
                elif category in ['Reconnaissance', 'Analysis', 'Fuzzers', 'Shellcode', 'Generic']:
                    return 0
                elif category in ['Backdoor', 'DoS', 'Exploits', 'Worms']:
                    return 1

            df['label'] = df['attack_cat'].apply(label_transformer)
            df.drop('attack_cat', axis=1, inplace=True)

        return df

In [None]:
train_df.isnull().sum()

In [None]:
pipeline = PreProcessPipeline(label_encode=True, process_label=True)
train_df = pipeline.transform(train_df)
test_df = pipeline.transform(test_df)

In [None]:
rcParams["figure.figsize"]=(20,22)
train_df.hist()
plt.grid()

In [None]:
spark = SparkSession.builder.appName("CSCI316GP2").getOrCreate()

In [None]:
# Vectorize the data

spark_train_df = spark.createDataFrame(train_df)
spark_test_df = spark.createDataFrame(test_df)

# Define the feature columns
feature_columns = spark_train_df.columns[:-1]  # Exclude the "label" column

# Assemble features into a single vector column
feature_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
train = feature_assembler.transform(spark_train_df)
test = feature_assembler.transform(spark_test_df)

In [None]:
spark_train_df.show()

In [None]:
# Initialize MinMaxScaler and StandardScaler
minmax_scaler = MinMaxScaler()
standard_scaler = StandardScaler()

# List of feature columns
feature_columns = train_df.columns[:-1]

# Apply Min-Max Scaling to train_df and test_df
train_df[feature_columns] = minmax_scaler.fit_transform(train_df[feature_columns])
test_df[feature_columns] = minmax_scaler.transform(test_df[feature_columns])

# Apply Standardization to train_df and test_df
train_df[feature_columns] = standard_scaler.fit_transform(train_df[feature_columns])
test_df[feature_columns] = standard_scaler.transform(test_df[feature_columns])


In [None]:
'''
sparktrain_df = spark.createDataFrame(train_df)
sparktest_df = spark.createDataFrame(test_df)
'''

In [None]:
'''
Author: Khanh Nguyen
Name: PySpark Dataframe Pipeline
Description:
    This class is used to create a pipeline for PySpark dataframe, accept 2 boolean parameter: smote & standardize.
    Features 
        (Default)
        - Resample: Resample the dataframe
        - Vectorize: Vectorize the dataframe
        (activate by setting the parameter to True):
        - SMOTE: Oversampling the minority class
        - Standardize: Standardize the dataframe using z-score
'''

from pyspark.sql.functions import col
class SparkDFPipeline:
    def __init__(self, smote=False, standardize=False):
        self.smote = smote
        self.standardize = standardize
    
    def fit(self):
        return self
    
    def transform(self, train_df, test_df):
        if self.smote:
            majority = train_df.filter(col('label') == 0)
            minority = train_df.filter(col('label') == 1)

            majority_count = majority.count()
            minority_count = minority.count()

            ratio = int(majority_count / minority_count)
            sample_num = int(ratio * minority_count) - minority_count
            sample = minority.sample(True, sample_num / minority_count, seed=42)
            balanced_sample = minority.union(sample)
            train_df = majority.union(balanced_sample).orderBy('label')
        
        if self.standardize:
            # Standardize the df

            # Resample the df
            num_partitions = 500
            repartitioned_df = train_df.repartition(num_partitions)

            exclude = ['proto', 'service', 'state']
            input_columns = train_df.columns[:-1]
            selected_columns = [col for col in input_columns if col not in exclude]

            # Vectorize the df
            assembler = VectorAssembler(inputCols=selected_columns, outputCol='features')
            train_df = assembler.transform(repartitioned_df)
            test_df = assembler.transform(test_df)

            # Standardize the df
            scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withStd=True, withMean=True)
            scaler_model = scaler.fit(train_df)
            train_df = scaler_model.transform(train_df)

            scaler_model = scaler.fit(test_df)
            test_df = scaler_model.transform(test_df)
            test_df = test_df.drop('features')
            train_df = train_df.drop('features')
            
            # put back the categorical columns
            input_cols = ['scaled_features', 'proto', 'service', 'state']
            output_col = "features"
            assembler1 = VectorAssembler(inputCols=input_cols, outputCol=output_col)
            train_df = assembler1.transform(train_df)
            test_df = assembler1.transform(test_df)

            # return result
            test_df = test_df.select('features', 'label')
            train_df = train_df.select('features', 'label')
        else:
            # Normal vectorize df
            num_partitions = 500
            repartitioned_df = train_df.repartition(num_partitions)
            input_columns = train_df.columns[:-1]
            assembler = VectorAssembler(inputCols=input_columns, outputCol='features')
            train_df = assembler.transform(repartitioned_df)
            train_df = train_df.select('features', 'label')
              
        return train_df, test_df

In [None]:
'''

# Pipeline example
pipeline = SparkDFPipeline(smote=True, standardize=False)
train, test = pipeline.transform(sparktrain_df, sparktest_df)

'''

In [None]:
# Tools for SVM
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [None]:
# Train an SVM model
svm = LinearSVC(featuresCol="features", labelCol="label", maxIter=100)
svm_model = svm.fit(train)

In [None]:
# Make prediction 

predictions = svm_model.transform(test)

# Evaluate the model's accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print("Accuracy:", accuracy)

# Create an evaluator for accuracy
roc_evaluator = BinaryClassificationEvaluator(
    labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
roc_score = roc_evaluator.evaluate(predictions)
print("Area under ROC = %g" % roc_score)

# Create an evaluator for f1 score
pr_evaluator = BinaryClassificationEvaluator(
    labelCol="label", rawPredictionCol="prediction", metricName="areaUnderPR")
pr_score = pr_evaluator.evaluate(predictions)
print("Area under PR = %g" % pr_score)


In [None]:
# With hyper parameter

# Train an SVM model with different hyperparameters
svm = LinearSVC(featuresCol="features", labelCol="label", maxIter=100, regParam=0.01)
svm_model = svm.fit(train)

In [None]:
# spark.stop()