<table style="border: none" align="center">
   <tr style="border: none">
      <th style="border: none"><font face="verdana" size="4" color="black"><b>Network Intrusion Detection</b></font></th>
      <th style="border: none"><img src="https://github.com/pmservice/customer-satisfaction-prediction/blob/master/app/static/images/ml_icon_gray.png?raw=true" alt="Watson Machine Learning icon" height="40" width="40"></th>
   </tr> 
   <tr style="border: none">
       <td style="border: none"><img src="https://github.com/pmservice/wml-sample-models/raw/master/tensorflow/hand-written-digit-recognition/images/experiment_banner.png" width="600" height = "200" alt="Icon"></td>
   </tr>
</table>

This notebook contains steps and code to use Spark ML library to build classification models using kddcup data.

**Notebook environment:** Scala 2.11 + Spark 2.2

**Platform:** IBM Watson Studio

**Dataset:**
[UCI kddcup data](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html) (743MB)

**Purpose:**
Build algorithms that can detect the network intrusions.

**Classification algorithms:**
- Random Forest Classifier
- Multilayer Perceptron Classifier

**Contents: **

This notebook contains the following parts:

1.	[Download data](#download)
2.	[Load and prepare data](#load)
3.	[Building models](#model)
  * [Random Forest](#rf)
  * [MLP](#mlp)


<a id="download"></a>
## Download data
We firstly download the zipped dataset(18M) to Watson's shared directory */opt/ibm/user-libs/shared-data*. If the "shared-data" folder doesn't exist, try to execute the commented code to create the folder.

In [None]:
import sys.process._
import java.net.URL
import java.io.File

// "mkdir /opt/ibm/user-libs/shared-data".!
val url = "http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz"
val filename = "/opt/ibm/user-libs/shared-data/kddcup.data.gz"
new URL(url) #> new File(filename) !!

**Once we have the zipped dataset, use *gunzip* to unzip the file to the save directory. Use *ls* to see if the file is there.**

In [None]:
"gunzip /opt/ibm/user-libs/shared-data/kddcup.data.gz -d /opt/ibm/user-libs/shared-data/kddcup.data".!
"ls /opt/ibm/user-libs/shared-data/".!

<a id="load"></a>
## Load and prepare data
The data is comma splited, so we can directly use SparkSession to read the data as dataframe

In [1]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder().
    getOrCreate()
val df = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
    option("inferSchema", "true").
    load("/opt/ibm/user-libs/shared-data/kddcup.data")
df.show(5)


+---+---+----+---+---+-----+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------+
|_c0|_c1| _c2|_c3|_c4|  _c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|   _c41|
+---+---+----+---+---+-----+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------+
|  0|tcp|http| SF|215|45076|  0|  0|  0|  0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   1| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0|   0|   0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|normal.|
|  0|tcp|http| SF|162| 4528|  0|  0|  0|  0|   0|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   2|   2| 0.0| 0.0| 0.0| 0.0| 1.0| 0.0| 0.0|   1|   1| 1.0| 0.0

spark = org.apache.spark.sql.SparkSession@250ac6eb
df = [_c0: int, _c1: string ... 40 more fields]


[_c0: int, _c1: string ... 40 more fields]

**Let's take a look at the schema and labels(the last column"_c41").**

In [2]:
df.printSchema

+----------------+-------+
|            _c41|  count|
+----------------+-------+
|    warezmaster.|     20|
|          smurf.|2807886|
|            pod.|    264|
|           imap.|     12|
|           nmap.|   2316|
|   guess_passwd.|     53|
|        ipsweep.|  12481|
|      portsweep.|  10413|
|          satan.|  15892|
|           land.|     21|
|     loadmodule.|      9|
|      ftp_write.|      8|
|buffer_overflow.|     30|
|        rootkit.|     10|
|    warezclient.|   1020|
|       teardrop.|    979|
|           perl.|      3|
|            phf.|      4|
|       multihop.|      7|
|        neptune.|1072017|
+----------------+-------+
only showing top 20 rows



In [None]:
df.select("_c41").groupBy("_c41").count().show()

**According to the [description](http://kdd.ics.uci.edu/databases/kddcup99/training_attack_types), we should recode the labels into five categories. We can use a SQL query to do this. Name the new column name as "label_s" stands for *label in string*.**

In [3]:
df.createOrReplaceTempView("attack")

val query = """SELECT *, 
    CASE _c41 
        WHEN 'back.' THEN 'dos'
        WHEN 'buffer_overflow.' THEN 'u2r'
        WHEN 'ftp_write.' THEN 'r2l'
        WHEN 'guess_passwd.' THEN 'r2l'
        WHEN 'imap.' THEN 'r2l'
        WHEN 'ipsweep.' THEN 'probe'
        WHEN 'land.' THEN 'dos'
        WHEN 'loadmodule.' THEN 'u2r'
        WHEN 'multihop.' THEN 'r2l'
        WHEN 'neptune.' THEN 'dos'
        WHEN 'nmap.' THEN 'probe'
        WHEN 'perl.' THEN 'u2r'
        WHEN 'phf.' THEN 'r2l'
        WHEN 'pod.' THEN 'dos'
        WHEN 'portsweep.' THEN 'probe'
        WHEN 'rootkit.' THEN 'u2r'
        WHEN 'satan.' THEN 'probe'
        WHEN 'smurf.' THEN 'dos'
        WHEN 'spy.' THEN 'r2l'
        WHEN 'teardrop.' THEN 'dos'
        WHEN 'warezclient.' THEN 'r2l'
        WHEN 'warezmaster.' THEN 'r2l'
        ELSE 'normal'
END AS label_s 
FROM attack"""
val labeled = spark.sql(query)
labeled.select("label_s").groupBy("label_s").count().show()

+-------+-------+
|label_s|  count|
+-------+-------+
|    u2r|     52|
| normal| 972781|
|    r2l|   1126|
|  probe|  41102|
|    dos|3883370|
+-------+-------+



query = 


        WHEN 'warez...


SELECT *, 
    CASE _c41 
        WHEN 'back.' THEN 'dos'
        WHEN 'buffer_overflow.' THEN 'u2r'
        WHEN 'ftp_write.' THEN 'r2l'
        WHEN 'guess_passwd.' THEN 'r2l'
        WHEN 'imap.' THEN 'r2l'
        WHEN 'ipsweep.' THEN 'probe'
        WHEN 'land.' THEN 'dos'
        WHEN 'loadmodule.' THEN 'u2r'
        WHEN 'multihop.' THEN 'r2l'
        WHEN 'neptune.' THEN 'dos'
        WHEN 'nmap.' THEN 'probe'
        WHEN 'perl.' THEN 'u2r'
        WHEN 'phf.' THEN 'r2l'
        WHEN 'pod.' THEN 'dos'
        WHEN 'portsweep.' THEN 'probe'
        WHEN 'rootkit.' THEN 'u2r'
        WHEN 'satan.' THEN 'probe'
        WHEN 'smurf.' THEN 'dos'
        WHEN 'spy.' THEN 'r2l'
        WHEN 'teardrop.' THEN 'dos'
        WHEN 'warezclient.' THEN 'r2l'
        WHEN 'warezmaster.' THEN 'r2l'
        ELSE 'normal'
END AS label_s 
FROM attack

**Intuitively, we should split the data into *training* and *testing* sets before building ML pipeline. However, the number of categories for those categorical variables may be different between two sets. It will cause errors in building algorithms.**

Therefore, we build a pipeline to prepare the data before splitting it to avoid errors.

**The pipeline:**
* StringIndexers: the c1, c2, c3 are categorical strings, we need to firstly index them
* OneHotEncoders: after the categorical strings are indexed, we can now perform one-hot encoding to the indexed columns
* VectorAssembler: Include the wanted columns and assemble them as a feature vector
* labelIndexer: another StringIndexer to index the label_s column and output as "label" column

In [4]:
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorAssembler, OneHotEncoder}
import org.apache.spark.ml.Pipeline

val indexer1 = new StringIndexer()
  .setInputCol("_c1")
  .setOutputCol("i_c1")
//   .setHandleInvalid("skip")
//   .fit(labeled)
val indexer2 = new StringIndexer()
  .setInputCol("_c2")
  .setOutputCol("i_c2")
//   .setHandleInvalid("skip")
//   .fit(labeled)
val indexer3 = new StringIndexer()
  .setInputCol("_c3")
  .setOutputCol("i_c3")
//   .setHandleInvalid("skip")
//   .fit(labeled)

val encoder1 = new OneHotEncoder()
  .setInputCol("i_c1")
  .setOutputCol("v_c1")

val encoder2 = new OneHotEncoder()
  .setInputCol("i_c2")
  .setOutputCol("v_c2")

val encoder3 = new OneHotEncoder()
  .setInputCol("i_c3")
  .setOutputCol("v_c3")

val featurenames = Array("_c0", "v_c1", "v_c2", "v_c3", "_c4", "_c5", "_c6", 
                         "_c7", "_c8", "_c9", "_c10", "_c11", "_c12", "_c13", 
                         "_c14", "_c15", "_c16", "_c17", "_c18", "_c19",
                         "_c22", "_c23", "_c24", "_c25", "_c26", "_c27", 
                         "_c28", "_c29", "_c30", "_c31", "_c32", "_c33", "_c34", 
                         "_c35", "_c36", "_c37", "_c38", "_c39", "_c40")
val assembler = new VectorAssembler()
  .setInputCols(featurenames)
  .setOutputCol("features")

val labelIndexer = new StringIndexer()
  .setInputCol("label_s")
  .setOutputCol("label")
//   .fit(labeled)

val pipeline_prepare = new Pipeline()
  .setStages(Array(indexer1,indexer2,indexer3,encoder1,encoder2,encoder3,assembler,labelIndexer))


indexer1 = strIdx_2a69cf734c93
indexer2 = strIdx_a24dff38fb3d
indexer3 = strIdx_38a1e08311dd
encoder1 = oneHot_8d9af096694e
encoder2 = oneHot_800fa6c0dbb4
encoder3 = oneHot_2aff1afb3a8f
featurenames = Array(_c0, v_c1, v_c2, v_c3, _c4, _c5, _c6, _c7, _c8, _c9, _c10, _c11, _c12, _c13, _c14, _c15, _c16, _c17, _c18, _c19, _c22, _c23, _c24, _c25, _c26, _c27, _c28, _c29, _c30, _c31, _c32, _c33...


[_c0, v_c1, v_c2, v_c3, _c4, _c5, _c6, _c7, _c8, _c9, _c10, _c11, _c12, _c13, _c14, _c15, _c16, _c17, _c18, _c19, _c22, _c23, _c24, _c25, _c26, _c27, _c28, _c29, _c30, _c31, _c32, _c33, _c34, _c35, _c36, _c37, _c38, _c39, _c40]

**Fit and transform the data, select only "label" and "features" for training**

In [5]:
val prepare = pipeline_prepare.fit(labeled)
val data = prepare.transform(labeled).select("label","features")

prepare = pipeline_c4326f9e05c2
data = [label: double, features: vector]


[label: double, features: vector]

<a id="build"></a>
## Building models
**Firstly we split the data, because the data is rather big, I choose 60% of the data to be training set and the rest goes to testing set**

In [6]:
val Array(train, test) = data.randomSplit(Array(0.6, 0.4))

train = [label: double, features: vector]
test = [label: double, features: vector]


[label: double, features: vector]

<a id="rf"></a>
### Random Forest
Let's start with random forest algorithm. Spark ML provides this algorithm and the only thing we need to do it set it up. One thing we need to notice is that there are 70 categories in c2 column, so the default _MaxBins_ is not enough (it has to be larger than the biggest number of categories of all categorical variables). We set the _MaxBins_ to be 72 to avoid errors. 

In [7]:
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
val t = System.nanoTime
val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setNumTrees(5)
  .setMaxBins(72)

val rf_model = rf.fit(train)
val duration = (System.nanoTime - t) / 1e9d
println(s"Training process takes $duration secs")

Training process takes 243.952950419 secs


t = 1487575431530246
rf = rfc_50de58a56dcf
rf_model = RandomForestClassificationModel (uid=rfc_50de58a56dcf) with 5 trees
duration = 243.952950419


243.952950419

**It takes about 6 mins to train this model. Now we can check the model performance.**

**Fit the model to test and get predictions**

In [8]:
val rf_predictions = rf_model.transform(test)

rf_predictions = [label: double, features: vector ... 3 more fields]


[label: double, features: vector ... 3 more fields]

**Check the error and accuracy (The model is actually pretty good)**

In [9]:
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(rf_predictions)
println("Test Error of RF = " + (1.0 - accuracy))

Test Error of RF = 0.007372298792682086


evaluator = mcEval_d9c9f4bc9fe9
accuracy = 0.9926277012073179


0.9926277012073179

<a id="mlp"></a>
### Multilayer Perceptron Classifier
Before building the MLP model, we need to know how many nodes are needed for the input layer. Check the feature vector:

In [10]:
train.select("features").show(5)

+--------------------+
|            features|
+--------------------+
|(117,[0,2,5,72,82...|
|(117,[0,2,5,72,82...|
|(117,[0,2,5,72,82...|
|(117,[0,2,5,72,82...|
|(117,[1,3,72,82,8...|
+--------------------+
only showing top 5 rows



**The input layer should have 117 nodes, the output layer should have 5 nodes (5 label categories). I add one hidden layer with 10 nodes to build the model**

In [11]:
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val layers = Array[Int](117, 10, 5)
val mlp = new MultilayerPerceptronClassifier()
  .setLayers(layers)
  .setBlockSize(128)
  .setSeed(1234L)
  .setMaxIter(100)

layers = Array(117, 10, 5)
mlp = mlpc_7c6c1276365e


mlpc_7c6c1276365e

**Train the model (takes a bit more than 10 mins)**

In [12]:
val t = System.nanoTime
val mlp_model = mlp.fit(train)
val duration = (System.nanoTime - t) / 1e9d
println(s"Training process takes $duration secs")

Training process takes 753.023877001 secs


t = 1487954321358991
mlp_model = mlpc_7c6c1276365e
duration = 753.023877001


753.023877001

**Evaluate the model, the result is pretty close to random forest**

In [13]:
val mlp_predictions = mlp_model.transform(test)
val accuracy = evaluator.evaluate(mlp_predictions)
println("Test Error of MLP = " + (1.0 - accuracy))

Test Error of MLP = 0.008306689363822728


mlp_predictions = [label: double, features: vector ... 1 more field]
accuracy = 0.9916933106361773


0.9916933106361773

<a id="summary"></a>
## Summary and next steps     

**Two well-performing models are built in this notebook. It is very easy to build models using Spark API!**

**There is no need to configure the Spark environment in Watson Studio. Just provision the Spark environment, create the notebook and you are ready to write your code!**

**The speed of the Spark enviornment is good, especially when using Scala. I'll have another demo on using PySpark to build the exact same models with same environment.**

Next steps:
* Save/download the models to local
* Save and deploy the models using Watson Machine Learning Service(WML)
