# Scaled ML Prototype

This notebook implements the scaled version of the ML Prototype created in the previous notebook.  

The scaled version of the ML Prototype facilitates PySpark to create a [Pipeline](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/ml/Pipeline.html) cleaning the dataset and to train a [GBTClassifier](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/ml/classification/GBTClassifier.html) using grid-search.  
Both the pipeline and the classifier are serialized to disk for further usage.  

This notebook is separated into four parts:
1. Common code: Contains the common code used in all other parts of the notebook.
2. Dataset creation: The [CIC-IDS-2018](https://drive.google.com/open?id=1HrTPh0YRSZ4T9DLa_c47lubheKUcPl0r) and [CIC-IDS-2017](https://drive.google.com/open?id=1Q2J_pPB0K0PHjq0YO5BPwYQwrvoZgYqo) datasets are mixed and split into training and holdout datasets. The training dataset is used to train and evaluate a classifier whereas the holdout dataset may be used to utilise the classifier in practice in absence of further real-world data.
3. Model training: In this section the pipeline to clean and impute the data is created and a GBTClassifier is trained on the data via grid-search. The pipeline and the classifier are serialized to disk as a last step.
4. Model Usage: This part of the notebook demonstrates the usage of the classifier. Both the pipeline and the classifier are read from disk to perform predictions on the holdout dataset. This section can be used as a basis for a separate driver program.

## 1. Common code

The following section contains the common code used in the later parts of the notebook.

In [1]:
# Base Paths
dataset_path = r'/home/glados/Development/Projects/ids-201?/processed/*.csv'
spark_output_path = r'/home/glados/Development/Projects/ids-spark/'

In [2]:
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType, StructField, ShortType, IntegerType, LongType, FloatType, DoubleType, TimestampType, StringType
from pyspark.sql.functions import count, when, col
from pyspark.ml import Pipeline, Transformer, PipelineModel
from pyspark.ml.param.shared import HasOutputCols, HasInputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.feature import Imputer, OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml.classification import GBTClassifier, GBTClassificationModel
from pyspark.ml.tuning import CrossValidatorModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from functools import reduce
import os
import findspark

findspark.init()

In [3]:
training_path = os.path.join(spark_output_path, 'training')
holdout_path = os.path.join(spark_output_path, 'holdout')
pipeline_model_path = 'models/spark/pipeline-model'
gb_model_path = 'models/spark/gb-model'

### 1.1 Features

The following features were used to train the ML Prototype (LINK) and are subsequently used to train the GBTClassifier.

In [3]:
selected_features = [
    'protocol',
    'flow_duration',
    'tot_fwd_pkts',
    'tot_bwd_pkts',
    'totlen_fwd_pkts',
    'totlen_bwd_pkts',
    'fwd_pkt_len_mean',
    'fwd_pkt_len_std',
    'bwd_pkt_len_mean',
    'flow_byts_s',
    'flow_pkts_s',
    'flow_iat_std',
    'flow_iat_min',
    'fwd_iat_tot',
    'fwd_iat_min',
    'bwd_iat_tot',
    'bwd_iat_min',
    'fwd_psh_flags',
    'fwd_urg_flags',
    'bwd_pkts_s',
    'fin_flag_cnt',
    'rst_flag_cnt',
    'psh_flag_cnt',
    'ack_flag_cnt',
    'urg_flag_cnt',
    'down_up_ratio',
    'init_fwd_win_byts',
    'init_bwd_win_byts',
    'fwd_seg_size_min',
    'active_mean',
    'idle_mean'
]

selected_columns = selected_features + ['label']

### 1.2 Schema

The following contains the schema of the complete dataset.

In [4]:
schema = StructType([
    StructField('dst_port', IntegerType()),
    StructField('protocol', IntegerType()),
    StructField('timestamp', StringType()),
    StructField('flow_duration', LongType()),
    StructField('tot_fwd_pkts', IntegerType()),
    StructField('tot_bwd_pkts', IntegerType()),
    StructField('totlen_fwd_pkts', DoubleType()),
    StructField('totlen_bwd_pkts', DoubleType()),
    StructField('fwd_pkt_len_max', DoubleType()),
    StructField('fwd_pkt_len_min', DoubleType()),
    StructField('fwd_pkt_len_mean', DoubleType()),
    StructField('fwd_pkt_len_std', DoubleType()),
    StructField('bwd_pkt_len_max', DoubleType()),
    StructField('bwd_pkt_len_min', DoubleType()),
    StructField('bwd_pkt_len_mean', DoubleType()),
    StructField('bwd_pkt_len_std', DoubleType()),
    StructField('flow_byts_s', StringType()),
    StructField('flow_pkts_s', StringType()),
    StructField('flow_iat_mean', DoubleType()),
    StructField('flow_iat_std', DoubleType()),
    StructField('flow_iat_max', DoubleType()),
    StructField('flow_iat_min', DoubleType()),
    StructField('fwd_iat_tot', DoubleType()),
    StructField('fwd_iat_mean', DoubleType()),
    StructField('fwd_iat_std', DoubleType()),
    StructField('fwd_iat_max', DoubleType()),
    StructField('fwd_iat_min', DoubleType()),
    StructField('bwd_iat_tot', DoubleType()),
    StructField('bwd_iat_mean', DoubleType()),
    StructField('bwd_iat_std', DoubleType()),
    StructField('bwd_iat_max', DoubleType()),
    StructField('bwd_iat_min', DoubleType()),
    StructField('fwd_psh_flags', IntegerType()),
    StructField('bwd_psh_flags', IntegerType()),
    StructField('fwd_urg_flags', IntegerType()),
    StructField('bwd_urg_flags', IntegerType()),
    StructField('fwd_header_len', LongType()),
    StructField('bwd_header_len', IntegerType()),
    StructField('fwd_pkts_s', DoubleType()),
    StructField('bwd_pkts_s', DoubleType()),
    StructField('pkt_len_min', DoubleType()),
    StructField('pkt_len_max', DoubleType()),
    StructField('pkt_len_mean', DoubleType()),
    StructField('pkt_len_std', DoubleType()),
    StructField('pkt_len_var', DoubleType()),
    StructField('fin_flag_cnt', IntegerType()),
    StructField('syn_flag_cnt', IntegerType()),
    StructField('rst_flag_cnt', IntegerType()),
    StructField('psh_flag_cnt', IntegerType()),
    StructField('ack_flag_cnt', IntegerType()),
    StructField('urg_flag_cnt', IntegerType()),
    StructField('cwe_flag_count', IntegerType()),
    StructField('ece_flag_cnt', IntegerType()),
    StructField('down_up_ratio', DoubleType()),
    StructField('pkt_size_avg', DoubleType()),
    StructField('fwd_seg_size_avg', DoubleType()),
    StructField('bwd_seg_size_avg', DoubleType()),
    StructField('fwd_byts_b_avg', IntegerType()),
    StructField('fwd_pkts_b_avg', IntegerType()),
    StructField('fwd_blk_rate_avg', IntegerType()),
    StructField('bwd_byts_b_avg', IntegerType()),
    StructField('bwd_pkts_b_avg', IntegerType()),
    StructField('bwd_blk_rate_avg', IntegerType()),
    StructField('subflow_fwd_pkts', IntegerType()),
    StructField('subflow_fwd_byts', IntegerType()),
    StructField('subflow_bwd_pkts', IntegerType()),
    StructField('subflow_bwd_byts', IntegerType()),
    StructField('init_fwd_win_byts', IntegerType()),
    StructField('init_bwd_win_byts', IntegerType()),
    StructField('fwd_act_data_pkts', IntegerType()),
    StructField('fwd_seg_size_min', IntegerType()),
    StructField('active_mean', DoubleType()),
    StructField('active_std', DoubleType()),
    StructField('active_max', DoubleType()),
    StructField('active_min', DoubleType()),
    StructField('idle_mean', DoubleType()),
    StructField('idle_std', DoubleType()),
    StructField('idle_max', DoubleType()),
    StructField('idle_min', DoubleType()),
    StructField('label', StringType())
])

### 1.3 Common classes and functions

The following cell contains common classes and functions used in the later sections of the notebook.

In [5]:
class BinaryLabelMaker(Transformer, HasOutputCols, HasInputCols, DefaultParamsReadable, DefaultParamsWritable):
    '''
    A transformer that adds binary labels (0|1) based on the value of the input colums. 
    The "classLabel" parameter specifies the value of the input columns used to determine class 0.
    '''
    
    classLabel = Param(Params._dummy(), 'classLabel', 'label for class 0')
    
    def __init__(self, inputCols=None, outputCols=None, classLabel=''):
        super(BinaryLabelMaker, self).__init__()
        self._set(inputCols=inputCols)
        self._set(outputCols=outputCols)
        self._set(classLabel=classLabel)

    def setClassLabel(self, classLabel):
        return self._set(classLabel=classLabel)

    def getClassLabel(self):
        return self.getOrDefault(self.classLabel)
    
    def _make_label(self, df, inputCol, outputCol):
        return df.withColumn(outputCol, when(df[inputCol] == self.getClassLabel(), 0).otherwise(1).cast(DoubleType()))
        
    def _transform(self, df):
        cols = zip(self.getInputCols(), self.getOutputCols())
        return reduce(lambda acc, col: self._make_label(acc, col[0], col[1]), cols, df)
    
    
class ValueCleaner(Transformer, HasOutputCols, HasInputCols, DefaultParamsReadable, DefaultParamsWritable):
    '''
    A transformer that removes invalid values from the input columns.
    Invalid values are "inf" and values < 0.
    '''
    
    def __init__(self, inputCols=None, outputCols=None):
        super(ValueCleaner, self).__init__()
        self._set(inputCols=inputCols)
        self._set(outputCols=outputCols)
        
    @staticmethod    
    def _replace_invalid_values(df, inputCol, outputCol, replacement):   
        return (df.withColumn(outputCol, 
                              when(df[inputCol] == 'inf', replacement)
                              .when(df[inputCol] < 0.0, replacement)
                              .otherwise(df[inputCol])
                              .cast(DoubleType())
                             ))
    
    def _transform(self, df):
        cols = zip(self.getInputCols(), self.getOutputCols())
        return reduce(lambda acc, col: ValueCleaner._replace_invalid_values(acc, col[0], col[1], None), cols, df)
    

def load_csv(path, columns=None):
    df = (spark.read
          .schema(schema)
          .option('inferSchema', 'false')
          .option('header', 'true')
          .option('sep', ',')
          .csv(path))
    
    if columns:
        return df.select(*columns)
    else:
        return df
    

def find_columns_to_impute(df):
    col_values = df.select([count(when(col(c).isNull(), c).when(col(c) < 0.0, c).when(col(c) == 'inf', c)).alias(c) for c in df.columns]).collect()[0].asDict()
    return [k for k, v in col_values.items() if v > 0]


def cat_column_name(c):
    return f"{c}_cat"


def index_column_name(c):
    return f"{c}_idx"


def imputed_column_name(c):
    return f"{c}_imputed"


def print_classification_report(predictions, pred_col, label_col, report_type):
    predictionAndLabels = predictions.select(pred_col, label_col).rdd
    
    binary_metrics = BinaryClassificationMetrics(predictionAndLabels)
    multi_metrics = MulticlassMetrics(predictionAndLabels)
    
    evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol=label_col)
    aupr = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderPR"})
    
    print(f'Classification Report ({report_type}):')
    print(f'Recall: {multi_metrics.weightedRecall}')
    print(f'Precision: {multi_metrics.weightedPrecision}')
    print(f'F1: {multi_metrics.weightedFMeasure()}')
    print(f'FPR: {multi_metrics.weightedFalsePositiveRate}')
    print(f'TPR: {multi_metrics.weightedTruePositiveRate}')
    print()
    print(f'Area under PR (raw predictions): {aupr}')
    print(f'Area under PR: {binary_metrics.areaUnderPR}')
    print(f'Accuracy = {multi_metrics.accuracy}')
    print()
    print('Confusion Matrix:')
    print(multi_metrics.confusionMatrix())

    
def find_columns_with_value(df, value):
    col_values = df.select([count(when(col(c) == value, c)).alias(c) for c in df.columns]).collect()[0].asDict()
    return [k for k, v in col_values.items() if v > 0]


def find_neg_columns(df):
    col_values = df.select([count(when(col(c) < 0.0, c)).alias(c) for c in df.columns]).collect()[0].asDict()
    return [k for k, v in col_values.items() if v > 0]


def find_null_columns(df):
    col_values = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    return [k for k, v in col_values.items() if v > 0]


def assert_column_validity(df):
    impute_cols = find_columns_to_impute(df)
    assert len(impute_cols) == 0, f'Invalid columns found {impute_cols}'

### 1.4 Spark Session

A SparkSession is created configured with 3 executors using all cores.

In [6]:
spark = (SparkSession.builder
    .master('local[*]')
    .appName('ml-ids')
    .config('spark.executor.instances', '3')
    .config('spark.executor.cores', '3')
    .config('spark.executor.memory', '15g')
    .config('spark.driver.memory', '15g')
    .getOrCreate())

## 2. Dataset creation

In this part both the [CIC-IDS-2018](https://drive.google.com/open?id=1HrTPh0YRSZ4T9DLa_c47lubheKUcPl0r) and [CIC-IDS-2017](https://drive.google.com/open?id=1Q2J_pPB0K0PHjq0YO5BPwYQwrvoZgYqo) datasets are loaded, combined and split into a training and holdout dataset with the ratio of `90%/10%`.   
Both respective datasets are subsequently saved to disk.

In [60]:
df = load_csv(dataset_path)

(training_df, hold_df) = df.randomSplit([0.9, 0.1], seed=42)

print(f"Data samples: {training_df.count()}")
print(f"Holdout samples: {hold_df.count()}")

print('Writing training set...')
training_df.write.csv(training_path, header='true', mode='overwrite')

print('Writing holdout set...')
hold_df.write.csv(holdout_path, header='true', mode='overwrite')

Data samples: 17157022
Holdout samples: 1906664
Writing training set...
Writing holdout set...


As both datasets contain invalid values as well as empty values all columns that have to be processed are determined.

In [26]:
impute_cols = find_columns_to_impute(df.select(*selected_features))
print(f'Columns to impute:')
impute_cols

Columns to impute:


['flow_duration',
 'flow_byts_s',
 'flow_pkts_s',
 'flow_iat_min',
 'fwd_iat_tot',
 'fwd_iat_min',
 'init_fwd_win_byts',
 'init_bwd_win_byts',
 'fwd_seg_size_min']

## 3. Model Training

In this section a pipeline is created in order to remove invalid values and impute missing values in the dataset. Afterwards a GBTClassifier is trained on the data using grid-search.  
Both the pipeline and the classifier are serialized to disk as a last step.

### 3.1 Dataset Loading

In [7]:
impute_cols = [
    'flow_duration',
    'flow_byts_s',
    'flow_pkts_s',
    'flow_iat_min',
    'fwd_iat_tot',
    'fwd_iat_min',
    'init_fwd_win_byts',
    'init_bwd_win_byts',
    'fwd_seg_size_min'
]

In [8]:
train_df = load_csv(os.path.join(training_path, '*.csv'), selected_columns).cache()

### 3.2 Feature columns

The feature columns of the classifier are defined in order to create the pipeline in the next step.

In [10]:
cat_cols = ['protocol']

cleaned_impute_cols = [f'{c}_clean' for c in impute_cols]

processed_cols = cat_cols + impute_cols
unprocessed_cols = [c for c in selected_features if c not in processed_cols] 
feature_cols = (unprocessed_cols + 
                [cat_column_name(c) for c in cat_cols] +
                [imputed_column_name(c) for c in impute_cols])

print(f'Number of feature columns: {len(feature_cols)}')
print('Feature columns:')
feature_cols

Number of feature columns: 31
Feature columns:


['tot_fwd_pkts',
 'tot_bwd_pkts',
 'totlen_fwd_pkts',
 'totlen_bwd_pkts',
 'fwd_pkt_len_mean',
 'fwd_pkt_len_std',
 'bwd_pkt_len_mean',
 'flow_iat_std',
 'bwd_iat_tot',
 'bwd_iat_min',
 'fwd_psh_flags',
 'fwd_urg_flags',
 'bwd_pkts_s',
 'fin_flag_cnt',
 'rst_flag_cnt',
 'psh_flag_cnt',
 'ack_flag_cnt',
 'urg_flag_cnt',
 'down_up_ratio',
 'active_mean',
 'idle_mean',
 'protocol_cat',
 'flow_duration_imputed',
 'flow_byts_s_imputed',
 'flow_pkts_s_imputed',
 'flow_iat_min_imputed',
 'fwd_iat_tot_imputed',
 'fwd_iat_min_imputed',
 'init_fwd_win_byts_imputed',
 'init_bwd_win_byts_imputed',
 'fwd_seg_size_min_imputed']

### 3.3 Pipeline Creation

In this section the pipeline to process the input data is created. The pipeline consists of the following stages:
1. `ValueCleaner`: sets all values to `None` which have a value of `inf` or `<0` in order to be imputed in the next stage.
2. `Imputer`: Imputes all missing values with the mean value of the column.
3. `OneHotEncoderEstimator`: One-hot encodes the `protocol` category column.
4. `VectorAssembler`: Merges all feature columns into a feature vector column. 
5. `BinaryLabelMaker`: Adds a binary label with value `0 = Benign` and `1 = Attack`.

In [11]:
stages = []

# ValueCleaner
cleaner = ValueCleaner(inputCols=impute_cols, outputCols=cleaned_impute_cols)
stages += [cleaner]

# Imputer
imputer = Imputer(
    inputCols=cleaned_impute_cols, 
    outputCols=[imputed_column_name(c) for c in impute_cols]
)
stages += [imputer]

# OneHotEncoderEstimator
for c in cat_cols:
    encoder = OneHotEncoderEstimator(inputCols=[c], outputCols=[cat_column_name(c)])
    stages += [encoder]
    
# VetorAssembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
stages += [assembler]    

# LabelMaker
label_maker = BinaryLabelMaker(
    inputCols=['label'], 
    outputCols=['label_is_attack'],
    classLabel='Benign'    
)
stages += [label_maker]

pipeline = Pipeline(stages=stages)

### 3.4 Pipeline Fitting

In this step the pipeline is fitted, the training dataset is transformed and the validity of the resulting dataset is asserted.

In [12]:
pipeline_model = pipeline.fit(train_df)
train_transf_df = pipeline_model.transform(train_df)

In [13]:
assert_column_validity(train_transf_df.select(feature_cols).drop('protocol_cat').drop('features'))

### 3.5 Dataset Preparation

The dataset is splitted into training and test datasets.

In [14]:
(training_data, test_data) = train_transf_df.randomSplit([0.8, 0.2], seed=42)
print(f'Training samples: {training_data.count()}')
print(f'Test samples: {test_data.count()}')

Training samples: 13728637
Test samples: 3428385


### 3.6 Model Training

In this section the GBTClassifier is trained using grid-search with cross-validation.

In [15]:
gb = GBTClassifier(labelCol='label_is_attack', featuresCol='features', seed=42)

param_grid = (ParamGridBuilder()
              .addGrid(gb.maxDepth, [1, 3, 5])
              .addGrid(gb.maxIter, [10, 20])
              .addGrid(gb.stepSize, [0.5, 0.1, 0.05])
              .build())

evaluator = (MulticlassClassificationEvaluator(predictionCol="prediction", 
                                               labelCol='label_is_attack',
                                               metricName='weightedRecall'))

cv = (CrossValidator(estimator=gb,
                     estimatorParamMaps=param_grid,
                     evaluator=evaluator,
                     numFolds=3))

gb_model = cv.fit(training_data)

print('Best parameters:')
print(f'Max depth: {gb_model.bestModel._java_obj.getMaxDepth()}')
print(f'Max iterations: {gb_model.bestModel._java_obj.getMaxIter()}')
print(f'Step size: {gb_model.bestModel._java_obj.getStepSize()}')

Best parameters:
Max depth: 5
Max iterations: 20
Step size: 0.5


### 3.7 Model Evaluation

In [16]:
train_pred = gb_model.transform(training_data)
print_classification_report(train_pred, 'prediction', 'label_is_attack', 'Train')

Classification Report (Train):
Recall: 0.9875890082897523
Precision: 0.9875789062812305
F1: 0.9874730933711836
FPR: 0.04899706171213246
TPR: 0.9875890082897523

Area under PR (raw predictions): 0.9776131473914916
Area under PR: 0.9626597197081999
Accuracy = 0.9875890082897523

Confusion Matrix:
DenseMatrix([[11317165.,    30602.],
             [  139784.,  2241086.]])


In [17]:
test_pred = gb_model.transform(test_data)
print_classification_report(test_pred, 'prediction', 'label_is_attack', 'Test')

Classification Report (Test):
Recall: 0.9875626570528105
Precision: 0.9875525094295057
F1: 0.9874458377349316
FPR: 0.04922211493411625
TPR: 0.9875626570528105

Area under PR (raw predictions): 0.97758372423304
Area under PR: 0.9625211546722784
Accuracy = 0.9875626570528105

Confusion Matrix:
DenseMatrix([[2827204.,    7643.],
             [  34997.,  558541.]])


The performance of the model is promising with a recall of `0.988` and a precision of `0.988` on the test set.    
The following table summarizes the performance on the test set:  

|Precision|Recall|F1|Area under PR|
|---------|------|--|-------------|
|0.988|0.988|0.988|0.978|

### 3.8 Model Persistence

In the last step of this section the models for the pipeline and the GBTClassifier are persisted to disk in order to be used independent of training.

In [18]:
pipeline_model.write().overwrite().save(pipeline_model_path)
gb_model.write().overwrite().save(gb_model_path)

## 4. Model Usage

This section demonstrates the usage of the classifier trained in the previous section. Models for the pipeline and the GBTClassifier are read from disk in order to perform predictions on the holdout dataset.  
This section can be used as a basis for a separate driver program.

### 4.1 Pipeline and GBTClassifier Loading

In [22]:
pipeline_model = PipelineModel.load(pipeline_model_path)
gb_model = CrossValidatorModel.load(gb_model_path)

### 4.2 Holdout Dataset Loading

In [23]:
hold_df = load_csv(os.path.join(holdout_path, '*.csv'), selected_columns).cache()
print(f"Holdout samples: {hold_df.count()}")

Holdout samples: 1906664


### 4.3 Pipeline Application

The pipeline is applied to the dataset, asserting the validity of all values in the next step.

In [24]:
hold_transf_df = pipeline_model.transform(hold_df)

In [25]:
assert_column_validity(hold_transf_df.select(feature_cols).drop('protocol_cat').drop('features'))

### 4.4 Predictions on the Holdout Dataset

The classifier is applied to the holdout dataset, printing the classification report for the resulting predictions.

In [26]:
hold_pred = gb_model.transform(hold_transf_df)
print_classification_report(hold_pred, 'prediction', 'label_is_attack', 'Holdout')

Classification Report (Holdout):
Recall: 0.9876931646058246
Precision: 0.9876838348438507
F1: 0.9875794596235876
FPR: 0.04846458013698347
TPR: 0.9876931646058246

Area under PR (raw predictions): 0.9777125841930321
Area under PR: 0.9630935075027023
Accuracy = 0.9876931646058246

Confusion Matrix:
DenseMatrix([[1570985.,    4206.],
             [  19259.,  312214.]])


In [27]:
spark.stop()