In [1]:
import os
import glob
from pathlib import Path

!pip install inquirer
import inquirer

import pyspark

from pyspark.sql.functions import *
from pyspark.ml import Pipeline

from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.classification import  RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorSlicer

import pandas as pd
from sklearn.preprocessing import OrdinalEncoder
%autosave 60



Autosaving every 60 seconds


In [3]:
class SparkFactory:
    def __init__(self):
        self.base = Path('./datasets')
        self.aviable_datasets = ["CICIDS", "NET", "All"]
        self.spark = SparkSession.builder.appName(__name__).getOrCreate()
    
    def validate_available_dataset(self):
#         return [name for name in glob.glob(f'{self.source_dataset_folder}/**')]
        return [name for name in self.aviable_datasets]

    def preprocess(dataframe):
        dataframe.fillna(dataframe.mean(), inplace=True)
        TARGETS = ['binary_class', 'multi_class']
        ordinal_encoder = OrdinalEncoder()
        for target_column in TARGETS: 
            dataframe[f'{target_column}'] = ordinal_encoder.fit_transform(dataframe[[f'{target_column}']]).astype('int')        
        return dataframe

    def read_dataset(self, dataset):
        """Read source dataset with sql interface"""
        print(f'Reading dataset: {dataset}')
        try:
            self.spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(dataset) 
            data = self.spark.read.option("header", "true").csv(dataset)
        except Exception as e:
            raise e
            
        return data
    
    def read_df(self, dataset):
        df = self.spark.createDataFrame(pd.read_csv(dataset))
        return df
    
    def _one_hot_endcoder(data):
        
        Vectorizer = CountVectorizer(inputCol="Color_Array", outputCol="Color_OneHotEncoded", vocabSize=4, minDF=1.0)
        return data
    
    def run(self):
        
        print(self.base)
        # validate and choose dataset to work with
        assert input('Hi! Would u like to see available datasets to process? Y/n: ') == 'Y', 'Okay, see u later!'    
        print(self.validate_available_dataset())
        dataset = input('Good! Choose one of datasets to work with pasting name shown in previous step: ')
        
        while False:
            path = f'' + self.source_dataset_folder + '/TRANSFORMED_{dataset}'
            print(path)
            assert os.path.isdir(path) == True, 'Wrong path, try one more time!'
        print(f'Nice, we gonna to preprocess {dataset}')
        data = self.read_dataset(dataset)
        # print(data.toPandas().head(10))
        
        # validate and choose method to use
        print('Available methods to apply: [OHE, B, C, D]')
        method = input('Now choose one of methods to apply ')
        if method == 'OHE':
            data = _one_hot_endcoder(data)


In [9]:
# SparkFactory().run()

### Define paths to datasets

In [2]:
def read_pd():
    TRANSFORMED_CICIDS_TRAIN = pd.read_csv('datasets/TRANSFORMED_CICIDS/train.csv')
    TRANSFORMED_CICIDS_TEST = pd.read_csv('datasets/TRANSFORMED_CICIDS/test.csv')

    TRANSFORMED_NET_TRAIN = pd.read_csv('datasets/TRANSFORMED_NET/train.csv')
    TRANSFORMED_NET_TEST = pd.read_csv('datasets/TRANSFORMED_NET/test.csv')
    
    return TRANSFORMED_CICIDS_TRAIN, TRANSFORMED_CICIDS_TEST, TRANSFORMED_NET_TRAIN, TRANSFORMED_NET_TEST

TRANSFORMED_CICIDS_TRAIN, TRANSFORMED_CICIDS_TEST, TRANSFORMED_NET_TRAIN, TRANSFORMED_NET_TEST = read_pd()

### Replace NaN to mean values

In [26]:
def preprocess(dataframe, is_test = False):
    dataframe.fillna(dataframe.mean(), inplace=True)
    
    if is_test == True:
        TARGETS = ['binary_class', 'multi_class']
        ordinal_encoder = OrdinalEncoder()
        for target_column in TARGETS: 
            dataframe[f'{target_column}'] = ordinal_encoder.fit_transform(dataframe[[f'{target_column}']]).astype('int')        
    return dataframe


pd_cicids_train = preprocess(TRANSFORMED_CICIDS_TRAIN, True)
pd_cicids_test = preprocess(TRANSFORMED_CICIDS_TEST)

pd_net_train = preprocess(TRANSFORMED_NET_TRAIN, True)
pd_net_test = preprocess(TRANSFORMED_NET_TEST)

In [38]:
target_columns_cicids = pd_cicids_train[['binary_class', 'multi_class']]
target_columns_net = pd_net_train[['binary_class', 'multi_class']]

### Initialize Spark session

In [4]:
spark = SparkSession.builder.appName(__name__).getOrCreate()

### Read DataFrames to Spark DataFrame

In [5]:
#CICIDS
pd_cicids_train = spark.createDataFrame(pd_cicids_train)
# pd_cicids_test = spark.createDataFrame(pd_cicids_test)

#NET
pd_net_train = spark.createDataFrame(pd_net_train)
# pd_net_test = spark.createDataFrame(pd_net_test)

### Drop one target to fit Feature Importance model 

In [6]:
drop_list = ['multi_class']

pd_cicids_train = pd_cicids_train.select([column for column in pd_cicids_train.columns if column not in drop_list])
pd_net_train = pd_net_train.select([column for column in pd_net_train.columns if column not in drop_list])

### Pipeline for Feature Importance model

In [7]:
def pipeline_preparation(df):
    
    num_var = [i[0] for i in df.dtypes if (((i[1]=='int') | (i[1]=='bigint') | (i[1]=='double')) & (i[0]!='binary_class'))]

    label_indexes = StringIndexer(inputCol = 'binary_class', outputCol = 'label', handleInvalid = 'keep')
    assembler = VectorAssembler(inputCols = num_var, outputCol = "features")
    rf = RandomForestClassifier(labelCol="label", featuresCol="features", seed = 8464,numTrees=10, cacheNodeIds = True, subsamplingRate = 0.7)

    stages = [assembler, label_indexes, rf]
    
    return stages

In [8]:
# Define Pipelines

cicids_pipeline =  Pipeline(stages = pipeline_preparation(pd_cicids_train))
net_pipeline    =  Pipeline(stages = pipeline_preparation(pd_net_train))

In [9]:
# Fit pipelines

cicids_model = cicids_pipeline.fit(pd_cicids_train)
net_model    = net_pipeline.fit(pd_net_train)

In [10]:
# Transform DataFrames

pd_cicids_train_ = cicids_model.transform(pd_cicids_train)
pd_net_train_ = net_model.transform(pd_net_train)

### Check feature importance vector

In [11]:
cicids_model.stages[-1].featureImportances

SparseVector(37, {0: 0.0027, 1: 0.0101, 2: 0.1089, 3: 0.0952, 4: 0.0041, 5: 0.1645, 6: 0.0011, 7: 0.0096, 8: 0.0004, 9: 0.0086, 10: 0.0003, 11: 0.0898, 12: 0.018, 13: 0.0106, 15: 0.0, 17: 0.0116, 18: 0.028, 19: 0.0002, 20: 0.0055, 21: 0.065, 22: 0.093, 23: 0.0791, 24: 0.0583, 25: 0.0763, 27: 0.008, 28: 0.0025, 29: 0.0055, 30: 0.0014, 32: 0.0021, 33: 0.0031, 34: 0.0, 35: 0.0059, 36: 0.0305})

In [12]:
net_model.stages[-1].featureImportances

SparseVector(37, {0: 0.0078, 2: 0.0501, 3: 0.1213, 4: 0.0032, 5: 0.0003, 6: 0.018, 7: 0.1068, 8: 0.0208, 9: 0.0246, 10: 0.1439, 11: 0.1664, 12: 0.0035, 13: 0.0011, 14: 0.0071, 15: 0.0041, 16: 0.0341, 17: 0.0437, 18: 0.0119, 19: 0.003, 20: 0.0201, 21: 0.001, 22: 0.0401, 23: 0.0, 28: 0.004, 29: 0.0009, 31: 0.0313, 32: 0.0035, 33: 0.0194, 35: 0.0191, 36: 0.0889})

### Extracting features

In [13]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

In [14]:
varlist_cicids = ExtractFeatureImp(cicids_model.stages[-1].featureImportances, pd_cicids_train_, "features")
varlist_net = ExtractFeatureImp(net_model.stages[-1].featureImportances, pd_net_train_, "features")

In [15]:
varidx_cicids = [x for x in varlist_cicids['idx'][0:15]]
varidxt_net= [x for x in varlist_net['idx'][0:15]]

In [16]:
slicer_cicids = VectorSlicer(inputCol="features", outputCol="features2", indices=varidx_cicids)
selected_cicids_train_ = slicer_cicids.transform(pd_cicids_train_)

slicer_net = VectorSlicer(inputCol="features", outputCol="features2", indices=varidxt_net)
selected_net_train_ = slicer_net.transform(pd_net_train_)

In [17]:
import_features_cicids = list(ExtractFeatureImp(cicids_model.stages[-1].featureImportances, pd_cicids_train_, "features").head(20).name)
import_features_net = list(ExtractFeatureImp(net_model.stages[-1].featureImportances, pd_net_train_, "features").head(20).name)

In [21]:
pd_selected_cicids_train = selected_cicids_train_.toPandas()
pd_selected_net_train = pd_net_train_.toPandas()

In [22]:
pd_selected_cicids_train = pd_selected_cicids_train[import_features_cicids]
pd_selected_net_train = pd_selected_net_train[import_features_net]

In [50]:
def save_csv(dataset, dataset_name, dataset_type):
    DESTINATION = Path('datasets', f'SELECTED_{dataset_name}')
    DESTINATION.mkdir(parents=True, exist_ok=True)
    
    FILE = DESTINATION / f'{dataset_type}.csv'
    
    dataset.to_csv(FILE, index=False)
    return True

In [51]:
# CONCAT LABELS AND DATASETS
train_cicids = pd_selected_cicids_train + target_columns_cicids
test_cicids = pd_cicids_test[import_features_cicids]

train_net = pd_selected_net_train + target_columns_net
test_net = pd_net_test[import_features_net]

In [52]:
save_csv(dataset=train_cicids, dataset_name='CICIDS', dataset_type='train')
save_csv(dataset=test_cicids, dataset_name='CICIDS', dataset_type='test')

save_csv(dataset=train_net, dataset_name='NET', dataset_type='train')
save_csv(dataset=test_net, dataset_name='NET', dataset_type='test')

True