In [1]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.ml import feature, evaluation, Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql import Row

In [2]:
conf = (SparkConf()
            .setAppName('random_forest')
            .setMaster('spark://spark-master:7077')
       )
conf.set("spark.executor.memory", "6g")
conf.set("spark.driver.maxResultSize", "0")
conf.set("spark.sql.shuffle.partitions", "6")
conf.set("spark.default.parallelism", "6")
conf.set("spark.driver.memory", "3g") 

<pyspark.conf.SparkConf at 0x7f468a620b70>

In [3]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [4]:
# load original dataset without bootstrapped samples
df = (spark.read.format('csv')
      .option('inferSchema', 'true')
      .option('header', 'true')
      .option('escape', '"')
      .load('hdfs://namenode:9000/data/no_bootstrap.csv') 
     )

# Cleaned dataset from previous model
+ regex and split operations transform the string representation of cmd_line_tokens back to an arraytype

In [5]:
df = (df.select('*')
          .withColumn('class_label', df.class_label.cast(T.DoubleType()))
          .withColumn('cmd_line_tokens', F.regexp_replace(F.col('cmd_line_tokens'), r"(\[)|(\]|\'|\s+)", ''))
          .withColumn('cmd_line_tokens', F.split(F.col('cmd_line_tokens'), ','))
     )
df.select('cmd_line_tokens').show(5, False)

+-----------------------------------------------------------------+
|cmd_line_tokens                                                  |
+-----------------------------------------------------------------+
|[svchost.exe, -k, localservicenonetwork, -p]                     |
|[svchost.exe, -k, localservice, -p, -s, dispbrokerdesktopsvc]    |
|[oobe, windeploy.exe]                                            |
|[oobe, setup.exe]                                                |
|[svchost.exe, -k, localservicenetworkrestricted, -p, -s, lmhosts]|
+-----------------------------------------------------------------+
only showing top 5 rows



# Data engineering pipelines 
+ Term Frequency (one-hot): value indicates if feature is present in observation
+ Feature has to be present at least once in dataset and in at least 50 documents

In [6]:
cv_transformer = feature.CountVectorizer(minTF=1, minDF=50, binary=True, inputCol='cmd_line_tokens', outputCol='tf')

In [7]:
estimator = Pipeline(stages=[cv_transformer]).fit(df)

In [8]:
estimator.transform(df).select('cmd_line_tokens','tf').sample(.1).show(5, False)

+-----------------------------------------------------------------------+---------------------------------------------+
|cmd_line_tokens                                                        |tf                                           |
+-----------------------------------------------------------------------+---------------------------------------------+
|[svchost.exe, -k, localservicenonetwork, -p]                           |(288,[26,27,60],[1.0,1.0,1.0])               |
|[svchost.exe, -k, localservice, -p, -s, dispbrokerdesktopsvc]          |(288,[26,27,60,63,120],[1.0,1.0,1.0,1.0,1.0])|
|[svchost.exe, -k, localsystemnetworkrestricted, -p, -s, ncbservice]    |(288,[26,27,60,63,114],[1.0,1.0,1.0,1.0,1.0])|
|[svchost.exe, -k, localservicenetworkrestricted, -p, -s, timebrokersvc]|(288,[26,27,60,63,111],[1.0,1.0,1.0,1.0,1.0])|
|[svchost.exe, -k, netsvcs, -p, -s, themes]                             |(288,[26,27,60,63,77],[1.0,1.0,1.0,1.0,1.0]) |
+---------------------------------------

In [9]:
len(estimator.stages[0].vocabulary)

288

In [10]:
training_df, validation_df, testing_df = df.randomSplit([0.6, 0.3, 0.1], seed=0)

In [11]:
rf = RandomForestClassifier(featuresCol='tf', labelCol='class_label', numTrees=100, 
                            featureSubsetStrategy='sqrt', impurity='gini', seed=0)
rf_estimator = Pipeline(stages=[cv_transformer, rf])
rf_model = rf_estimator.fit(training_df)

In [12]:
rf_model.transform(testing_df).\
    select(F.avg(F.expr('float(class_label = prediction)')).alias('accuracy')).\
    first()

Row(accuracy=0.9976617303195635)

In [13]:
print(rf_model.stages[-1].trees[1].toDebugString)

DecisionTreeClassificationModel (uid=dtc_d932652de21e) of depth 4 with 11 nodes
  If (feature 162 <= 0.5)
   If (feature 115 <= 0.5)
    Predict: 0.0
   Else (feature 115 > 0.5)
    Predict: 1.0
  Else (feature 162 > 0.5)
   If (feature 3 <= 0.5)
    If (feature 165 <= 0.5)
     If (feature 20 <= 0.5)
      Predict: 1.0
     Else (feature 20 > 0.5)
      Predict: 0.0
    Else (feature 165 > 0.5)
     Predict: 1.0
   Else (feature 3 > 0.5)
    Predict: 0.0



# Inference

+ The tokens with the most importance all indicate a malicious log and are very similar to the tokens identified in the LR model

In [14]:
vocab = rf_model.stages[0].vocabulary
feature_importance = rf_model.stages[-1].featureImportances.toArray()
vocab_importance_df = pd.DataFrame({'vocab': vocab, 'weight': feature_importance})
vocab_importance_df.sort_values('weight', ascending=False).head(20)

Unnamed: 0,vocab,weight
116,-executionpolicy,0.254961
115,bypass,0.24708
162,/c,0.154412
108,-c,0.130766
194,select-object,0.119997
195,net,0.035477
15,https,0.012194
165,share,0.010667
199,/r,0.007573
3,files,0.005825


# Model Tuning

In [15]:
paramGrid = (ParamGridBuilder() 
                 .addGrid(rf_model.stages[0].minDF, [25, 50, 75, 100]) 
                 .addGrid(rf_model.stages[1].numTrees, [75, 100, 150, 200]) 
                 .build()
            )

In [16]:
models = []
for grid in range(len(paramGrid)):
    print("Fitting model {}".format(grid))
    _model = rf_estimator.fit(validation_df, paramGrid[grid])
    models.append(_model)

Fitting model 0
Fitting model 1
Fitting model 2
Fitting model 3
Fitting model 4
Fitting model 5
Fitting model 6
Fitting model 7
Fitting model 8
Fitting model 9
Fitting model 10
Fitting model 11
Fitting model 12
Fitting model 13
Fitting model 14
Fitting model 15


In [17]:
evaluator = BinaryClassificationEvaluator(labelCol='class_label', metricName='areaUnderROC')
auc_scores = [evaluator.evaluate(model.transform(validation_df)) for model in models]

In [18]:
auc_scores

[0.9996269915822306,
 0.9996769236381544,
 0.9995775155268174,
 0.9995727275214548,
 0.989375416100466,
 0.9910815420113271,
 0.9914331184050926,
 0.9903273171665952,
 0.9903223011609772,
 0.9910081259291011,
 0.991248666198506,
 0.9904420012950415,
 0.9891503798484254,
 0.991151766089978,
 0.9893685760928053,
 0.9902739651068408]

In [19]:
best_model_idx = np.argmax(auc_scores)
best_model = models[best_model_idx]
print("Best params: \n\n{}\n".format(paramGrid[best_model_idx]))
print("Best AUC: \n\n{}".format(auc_scores[best_model_idx]))

Best params: 

{Param(parent='CountVectorizer_64866bd78246', name='minDF', doc='Specifies the minimum number of different documents a term must appear in to be included in the vocabulary. If this is an integer >= 1, this specifies the number of documents the term must appear in; if this is a double in [0,1), then this specifies the fraction of documents. Default 1.0'): 25.0, Param(parent='RandomForestClassifier_28597608eb0b', name='numTrees', doc='Number of trees to train (>= 1)'): 100}

Best AUC: 

0.9996769236381544


## Best model
+ minDF: 25
+ numTrees: 100

# Cross Validation
+ test best_model performance on training dataset

In [20]:
evaluator.evaluate(best_model.transform(training_df))

0.9990428223888843

# Plotly Dash Code