#### Feature Extraction, Creation, and Normalization

In the data exploration stage we have identified the relevant features. In this combined notebok we will first prepare the features, and then build and evaluate our model. 

In [17]:
import pyspark.sql
import numpy as np 
from pyspark.sql.functions import col

In [18]:
#We start by loading the CSV file that we are going to use

import ibmos2spark
# @hidden_cell
credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'service_id': 'iam-ServiceId-e2505fd8-30ac-494c-84fd-b6d402f6066e',
    'iam_service_endpoint': 'https://iam.ng.bluemix.net/oidc/token',
    'api_key': 'Vflbn-q3SEKstW6WU0EqiO02GjNQo6bIb2UFbwUQ6ov5'
}

configuration_name = 'os_01e0daecbfe749718a79c33cdabd6510_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('Sample2Capture.csv', 'default-donotdelete-pr-fuwiqkd4yylvy3'))

df.printSchema()

root
 |-- StartTime: string (nullable = true)
 |-- Dur: string (nullable = true)
 |-- Proto: string (nullable = true)
 |-- SrcAddr: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Dir: string (nullable = true)
 |-- DstAddr: string (nullable = true)
 |-- Dport: string (nullable = true)
 |-- State: string (nullable = true)
 |-- sTos: string (nullable = true)
 |-- dTos: string (nullable = true)
 |-- TotPkts: string (nullable = true)
 |-- TotBytes: string (nullable = true)
 |-- SrcBytes: string (nullable = true)
 |-- Label: string (nullable = true)



In [19]:
# first thing we do is add the label to the dataset, 
# which we know according to the IP address of the infected PC

df.createOrReplaceTempView('df') 
infected_addr = "147.32.84.165"

sql = f"""
SELECT Dur, Proto, Sport, Dir, Dport, State, sTos, dTos, TotPkts,TotBytes,SrcBytes, 
CASE WHEN SrcAddr = '{infected_addr}' THEN 1 ELSE 0 END AS Bot
from df

"""
df_current = spark.sql(sql)



In [20]:
# now we convert relevant datatypes from string to integers and fill in nulls in two columns

from pyspark.sql.types import FloatType

df_current = df_current.fillna({'sTos':'-1','dTos':'-1'})
df_current = df_current.withColumn("Dur", df_current["Dur"].cast(FloatType()))
df_current = df_current.withColumn("TotPkts", df_current["TotPkts"].cast(FloatType()))
df_current = df_current.withColumn("TotBytes", df_current["TotBytes"].cast(FloatType()))
df_current = df_current.withColumn("SrcBytes", df_current["SrcBytes"].cast(FloatType()))
df_current = df_current.fillna({'sTos':'-1','dTos':'-1'})
df_current.createOrReplaceTempView('df_current')
df_current.printSchema()

root
 |-- Dur: float (nullable = true)
 |-- Proto: string (nullable = true)
 |-- Sport: string (nullable = true)
 |-- Dir: string (nullable = true)
 |-- Dport: string (nullable = true)
 |-- State: string (nullable = true)
 |-- sTos: string (nullable = false)
 |-- dTos: string (nullable = false)
 |-- TotPkts: float (nullable = true)
 |-- TotBytes: float (nullable = true)
 |-- SrcBytes: float (nullable = true)
 |-- Bot: integer (nullable = false)



In [21]:
# Now we index all the relevant categorical features.

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer


state_indexer = StringIndexer(inputCol="State", outputCol="StateIndex")
proto_indexer = StringIndexer(inputCol="Proto", outputCol="ProtoIndex")
dir_indexer   = StringIndexer(inputCol="Dir", outputCol="DirIndex")
sTos_indexer   = StringIndexer(inputCol="sTos", outputCol="sTosIndex")
dTos_indexer   = StringIndexer(inputCol="dTos", outputCol="dTosIndex")




In [22]:
# Here we just add the data indexers to a pipeline and transform the data accordingly

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[state_indexer, proto_indexer, dir_indexer,sTos_indexer,dTos_indexer])
df_indexed =  pipeline.fit(df_current).transform(df_current)


In [23]:
# Next, we create the vector assembler with all the relevant data and the normalizer
vectorAssembler = VectorAssembler(inputCols=["StateIndex","ProtoIndex","DirIndex", "Dur","TotBytes","SrcBytes","sTosIndex","dTosIndex"],outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)


#### The features are now  prepared. 
#### We are now going to create a balanced training data set and a test set  

In [24]:
#first remove unused cols
columns_to_drop = ['Proto', 'Sport','Dir','Dport','State','sTos','dTos']
df_indexed = df_indexed.drop(*columns_to_drop)

# split and balance the data
splits = df_indexed.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

#now balance the classes in the training data set
bot_data  = df_train.filter(df_train.Bot == 1) #get all the bot data
normal_data_sampled  = df_train.filter(df_train.Bot == 0).sample(False,.01) #get all the normal data and sample 5% of it
df_train_balanced = bot_data.union(normal_data_sampled) #union the downsampled normal data and the bot data



In [25]:
print(f"Total Data:{df_indexed.count()}")
print(f"Train Set:{df_train.count()}")
print(f"Test Set:{df_test.count()}")
print(f"Bot Req in Test:{bot_data.count()}")
print(f"Normal Req Sampled:{normal_data_sampled.count()}")
print(f"Total Balanced Train Set:{df_train_balanced.count()}")


Total Data:372715
Train Set:298137
Test Set:74578
Bot Req in Test:2143
Normal Req Sampled:2903
Total Balanced Train Set:5046


#### Build and evaluate the model


In [26]:
from pyspark.ml.classification import GBTClassifier, LinearSVC
#classifier = LinearSVC(maxIter=10, regParam=0.1, featuresCol='features_norm', labelCol='Bot')
classifier = GBTClassifier(labelCol="Bot", featuresCol="features_norm", maxIter=10)

pipeline = Pipeline(stages=[vectorAssembler,normalizer,classifier])
model = pipeline.fit(df_train_balanced)
prediction = model.transform(df_train_balanced)

In [27]:
#prediction.show()
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction").setLabelCol("Bot")
evaluator.evaluate(prediction)



0.98546397285766

In [28]:
prediction = model.transform(df_test)
#prediction.show()
evaluator.evaluate(prediction)



0.9812975061402408

In [29]:
false_negatives = prediction.select("Bot", "prediction", "probability").filter("Bot == 1 and prediction=0.0").count()
true_positives  = prediction.select("Bot", "prediction", "probability").filter("Bot == 1 and prediction=1.0").count()
print(f"{true_positives} bot requests out of {true_positives + false_negatives} were correctly identified ({true_positives / (true_positives + false_negatives) * 100} %)")

505 bot requests out of 548 were correctly identified (92.15328467153284 %)


In [30]:
false_positives = prediction.filter("Bot == 0 and prediction=1").count()
true_negatives  = prediction.filter("Bot == 0 and prediction=0").count()
print(f"{false_positives} normal requests out of {false_positives + true_negatives} were wrongly identified ({false_positives / (false_positives + true_negatives) * 100} %)")

5341 normal requests out of 74030 were wrongly identified (7.2146427124138865 %)


#### To summarize: our model wrongly identified about  7% of the normal requests as infected by maleware while it correctly identified 92% of the malware infected requests.

#### The detection rate is quite satisfying, though its up to the stakeholders to decide whether a false positive rate of 7% is accepable. 