# Executive Certificate Big Data - Centrale Supelec - Octobre 2018
# Détection d’intrusion réseau à l’aide de l’apprentissage automatique
### Notebook : sp5-Spark-ml-classification
### Auteur : Ahmed Mekaouar

In [2]:
# spark running in jupyter notebook in local mode (pyspark --master local[2])
sc

# Building IDS model using Spark

## Load & Explore data

In [3]:
# Retrieve Cleaned/processed data ready -- data from "processed" directory
baseline_file = os.path.join(os.path.pardir,'data','train_baseline.csv')
attacks_file = os.path.join(os.path.pardir,'data','train_attacks.csv')

In [4]:
df_baseline = spark.read.csv(baseline_file,header=True,sep=",",inferSchema=True)

In [5]:
df_attacks = spark.read.csv(attacks_file,header=True,sep=",",inferSchema=True)

In [6]:
df_all=df_baseline.union(df_attacks)

In [10]:
#df_attacks.show()

In [6]:
#COLUMNS = ("TotalBackwardPackets", "Label")

In [9]:
#df_attacks.select(*COLUMNS).show()

In [11]:
#df_attacks.describe().show()

In [132]:
#df_all.columns[0:-1]

In [16]:
#col=df_all.columns[0:7]

### Data transformation

In [8]:
# Rename Label column -> stringLabel - spark is not case sensitive regarding columns names!
df_all=df_all.withColumnRenamed("Label", "stringLabel")
# To be directly done on files

In [9]:
df_all

DataFrame[Init_Win_bytes_backward: int, Init_Win_bytes_forward: int, FwdPacketLengthMax: double, min_seg_size_forward: int, PacketLengthMean: double, SubflowFwdBytes: int, BwdPacketLengthMean: double, BwdPacketLengthMax: double, AvgBwdSegmentSize: double, BwdPackets/s: double, FlowIATMean: double, BwdPacketLengthMin: double, FlowIATMin: double, BwdHeaderLength: int, FwdIATMin: double, TotalLengthofBwdPackets: double, FlowPackets/s: double, TotalLengthofFwdPackets: double, SubflowBwdBytes: int, FwdIATMean: double, AveragePacketSize: double, FlowDuration: int, MaxPacketLength: double, FlowIATStd: double, FwdIATTotal: double, FwdPacketLengthMean: double, FlowBytesPs: double, PacketLengthVariance: double, FwdIATStd: double, FlowIATMax: double, FwdIATMax: double, PacketLengthStd: double, FwdPackets/s: double, FwdHeaderLength: int, SubflowFwdPackets: int, AvgFwdSegmentSize: double, SubflowBwdPackets: int, act_data_pkt_fwd: int, FwdPacketLengthStd: double, TotalFwdPackets: int, TotalBackwardP

In [10]:
# MLLib require Label to be numerical so we will use StringIndexer
from pyspark.ml.feature import StringIndexer
labelIndexer = StringIndexer().setInputCol("stringLabel").setOutputCol("label") .fit(df_all) 

In [11]:
#labelIndexer.labels

In [12]:
indexed_df_all = labelIndexer.transform(df_all) 

In [53]:
ft_col=indexed_df_all.columns[0:-2]

In [14]:
#indexed_df_all.take(2)

In [15]:
# VectorAssembler concatenate all your features into one big vector called "features"
# This is mondatoty 
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler().setInputCols(indexed_df_all.columns[0:-2]).setOutputCol("features")
data_ml = vectorAssembler.transform(indexed_df_all)

In [16]:
#data_ml.take(1)

In [17]:
#data_ml.schema

In [19]:
# Split the data set : train 70% , test 30%
data_ml_train, df_ml_test = data_ml.randomSplit([0.7,0.3],seed=33)

In [51]:
data_ml_train.count()
# data_ml_train.columns

1761879

In [137]:
#data_ml_train.groupBy("label").count().collect()

In [20]:
data_ml_train.cache()

DataFrame[Init_Win_bytes_backward: int, Init_Win_bytes_forward: int, FwdPacketLengthMax: double, min_seg_size_forward: int, PacketLengthMean: double, SubflowFwdBytes: int, BwdPacketLengthMean: double, BwdPacketLengthMax: double, AvgBwdSegmentSize: double, BwdPackets/s: double, FlowIATMean: double, BwdPacketLengthMin: double, FlowIATMin: double, BwdHeaderLength: int, FwdIATMin: double, TotalLengthofBwdPackets: double, FlowPackets/s: double, TotalLengthofFwdPackets: double, SubflowBwdBytes: int, FwdIATMean: double, AveragePacketSize: double, FlowDuration: int, MaxPacketLength: double, FlowIATStd: double, FwdIATTotal: double, FwdPacketLengthMean: double, FlowBytesPs: double, PacketLengthVariance: double, FwdIATStd: double, FlowIATMax: double, FwdIATMax: double, PacketLengthStd: double, FwdPackets/s: double, FwdHeaderLength: int, SubflowFwdPackets: int, AvgFwdSegmentSize: double, SubflowBwdPackets: int, act_data_pkt_fwd: int, FwdPacketLengthStd: double, TotalFwdPackets: int, TotalBackwardP

## Decision Tree model

In [21]:
from pyspark.ml.classification import DecisionTreeClassifier
decisiontree =  DecisionTreeClassifier(labelCol="label",featuresCol="features",seed=42)

In [22]:
%%time
dt_model = decisiontree.fit(data_ml_train)
# fitting the Decision Tree classificatio model

CPU times: user 32 ms, sys: 0 ns, total: 32 ms
Wall time: 1min 52s


In [28]:
# Predict label for test data
y_test_pred = dt_model.transform(df_ml_test)

In [29]:
# persisting y_test_pred
y_test_pred.cache()

DataFrame[Init_Win_bytes_backward: int, Init_Win_bytes_forward: int, FwdPacketLengthMax: double, min_seg_size_forward: int, PacketLengthMean: double, SubflowFwdBytes: int, BwdPacketLengthMean: double, BwdPacketLengthMax: double, AvgBwdSegmentSize: double, BwdPackets/s: double, FlowIATMean: double, BwdPacketLengthMin: double, FlowIATMin: double, BwdHeaderLength: int, FwdIATMin: double, TotalLengthofBwdPackets: double, FlowPackets/s: double, TotalLengthofFwdPackets: double, SubflowBwdBytes: int, FwdIATMean: double, AveragePacketSize: double, FlowDuration: int, MaxPacketLength: double, FlowIATStd: double, FwdIATTotal: double, FwdPacketLengthMean: double, FlowBytesPs: double, PacketLengthVariance: double, FwdIATStd: double, FlowIATMax: double, FwdIATMax: double, PacketLengthStd: double, FwdPackets/s: double, FwdHeaderLength: int, SubflowFwdPackets: int, AvgFwdSegmentSize: double, SubflowBwdPackets: int, act_data_pkt_fwd: int, FwdPacketLengthStd: double, TotalFwdPackets: int, TotalBackwardP

## Model evaluation

In [131]:
#y_test_pred.take(1)

In [117]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
model_evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")

In [155]:
def model_metrics(y_test_pred):
    print('Accuracy : %4f'  %model_evaluator.evaluate(y_test_pred, {model_evaluator.metricName: "accuracy"}) )
    print('Precision : %4f'  %model_evaluator.evaluate(y_test_pred, {model_evaluator.metricName: "weightedPrecision"}) )
    print('Recall : %4f'  %model_evaluator.evaluate(y_test_pred, {model_evaluator.metricName: "weightedRecall"}) )
    print('F1 : %4f'  %model_evaluator.evaluate(y_test_pred, {model_evaluator.metricName: "f1"}) )

In [156]:
model_metrics(y_test_pred)
# Results are the equivalent of scikit learn weighted metrics.
# Spark model performance is slightly worse than scikit learn one. The reason is that I did not perform any class balacing
# effort. The model is doing very well anyway.

Accuracy : 0.959411
Precision : 0.954285
Recall : 0.959411
F1 : 0.952071


## Creating and fitting a  pipeline

In [None]:
# Trying to keep the code simple, I did not include majority class undersampling 

In [164]:
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel

In [142]:
# create a pipeline with all transformation + model
ids_pipeline = Pipeline().setStages([labelIndexer, vectorAssembler, decisiontree])

In [144]:
df_all_train, df_all_test = df_all.randomSplit([0.7,0.3],seed=33)

In [145]:
%%time
ids_model = ids_pipeline.fit(df_all_train)

CPU times: user 48 ms, sys: 4 ms, total: 52 ms
Wall time: 2min 51s


In [146]:
predict = ids_model.transform(df_all_test)

In [150]:
predict.cache()

DataFrame[Init_Win_bytes_backward: int, Init_Win_bytes_forward: int, FwdPacketLengthMax: double, min_seg_size_forward: int, PacketLengthMean: double, SubflowFwdBytes: int, BwdPacketLengthMean: double, BwdPacketLengthMax: double, AvgBwdSegmentSize: double, BwdPackets/s: double, FlowIATMean: double, BwdPacketLengthMin: double, FlowIATMin: double, BwdHeaderLength: int, FwdIATMin: double, TotalLengthofBwdPackets: double, FlowPackets/s: double, TotalLengthofFwdPackets: double, SubflowBwdBytes: int, FwdIATMean: double, AveragePacketSize: double, FlowDuration: int, MaxPacketLength: double, FlowIATStd: double, FwdIATTotal: double, FwdPacketLengthMean: double, FlowBytesPs: double, PacketLengthVariance: double, FwdIATStd: double, FlowIATMax: double, FwdIATMax: double, PacketLengthStd: double, FwdPackets/s: double, FwdHeaderLength: int, SubflowFwdPackets: int, AvgFwdSegmentSize: double, SubflowBwdPackets: int, act_data_pkt_fwd: int, FwdPacketLengthStd: double, TotalFwdPackets: int, TotalBackwardP

In [151]:
model_metrics(predict)

Accuracy : 0.959411
Precision : 0.954285
Recall : 0.959411
F1 : 0.952071


## Persisting the pipeline and the model

In [159]:
# persisting the pipeline
pipeline_path_file = os.path.join(os.path.pardir,'models','ids_pipeline')
ids_pipeline.write().overwrite().save(pipeline_path_file)
# persisting the model
model_path_file = os.path.join(os.path.pardir,'models','ids_ml_model')
ids_model.write().overwrite().save(model_path_file)

In [170]:
loaded_pipeline = Pipeline.load(pipeline_path_file)
#loaded_ids_model = PipelineModel.load(model_path_file)