# MLlib: Basic Statistics and Exploratory Data Analysis

We will introduce Spark's machine learning library [MLlib](https://spark.apache.org/docs/latest/mllib-guide.html).

## Getting the data and creating the RDD

As we did in our first notebook, we will use the reduced dataset provided for the [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), containing nearly half million network interactions.

In [1]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

First, parsing the file...

In [16]:
import numpy as np

def parse_interaction(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return [float(x) for x in clean_line_split]

vector_data = raw_data.map(parse_interaction)

In [22]:
col_names = ["duration","src_bytes","dst_bytes","land","wrong_fragment",
             "urgent","hot","num_failed_logins","logged_in","num_compromised",
             "root_shell","su_attempted","num_root","num_file_creations",
             "num_shells","num_access_files","num_outbound_cmds",
             "is_hot_login","is_guest_login","count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]

from pyspark import Row

def toRow(splitLine):
    return Row(**dict(list(zip(col_names, splitLine))))

In [23]:
df = sqlContext.createDataFrame(vector_data.map(toRow))

Caching is useful here for not parsing the text file each time we are calling the dataframe. The parsed results are stored in RAM on the fly.

In [25]:
df.cache()

DataFrame[count: double, diff_srv_rate: double, dst_bytes: double, dst_host_count: double, dst_host_diff_srv_rate: double, dst_host_rerror_rate: double, dst_host_same_src_port_rate: double, dst_host_same_srv_rate: double, dst_host_serror_rate: double, dst_host_srv_count: double, dst_host_srv_diff_host_rate: double, dst_host_srv_rerror_rate: double, dst_host_srv_serror_rate: double, duration: double, hot: double, is_guest_login: double, is_hot_login: double, land: double, logged_in: double, num_access_files: double, num_compromised: double, num_failed_logins: double, num_file_creations: double, num_outbound_cmds: double, num_root: double, num_shells: double, rerror_rate: double, root_shell: double, same_srv_rate: double, serror_rate: double, src_bytes: double, srv_count: double, srv_diff_host_rate: double, srv_rerror_rate: double, srv_serror_rate: double, su_attempted: double, urgent: double, wrong_fragment: double]

First, we want to compute some basic statistics on the dataframe.

In [28]:
result = df.describe().collect()

In [40]:
for l in result:
    print "----------------------"
    r = l.asDict()
    print "Statistics {}".format(r["summary"])
    for key in r.keys():
        print "{0}: {1}".format(key, r[key])
    print "----------------------"

----------------------
Statistics count
num_access_files: 494021
src_bytes: 494021
srv_count: 494021
num_outbound_cmds: 494021
rerror_rate: 494021
dst_host_srv_rerror_rate: 494021
dst_host_same_srv_rate: 494021
duration: 494021
srv_rerror_rate: 494021
srv_serror_rate: 494021
num_file_creations: 494021
dst_host_srv_serror_rate: 494021
num_compromised: 494021
is_guest_login: 494021
dst_host_rerror_rate: 494021
diff_srv_rate: 494021
hot: 494021
dst_host_srv_count: 494021
logged_in: 494021
is_hot_login: 494021
num_shells: 494021
dst_host_srv_diff_host_rate: 494021
srv_diff_host_rate: 494021
dst_host_same_src_port_rate: 494021
root_shell: 494021
su_attempted: 494021
dst_host_count: 494021
wrong_fragment: 494021
count: 494021
land: 494021
urgent: 494021
same_srv_rate: 494021
num_failed_logins: 494021
serror_rate: 494021
summary: count
dst_host_diff_srv_rate: 494021
num_root: 494021
dst_bytes: 494021
dst_host_serror_rate: 494021
----------------------
----------------------
Statistics mean
nu

In [12]:
label_list = ["back.","buffer_overflow.","ftp_write.","guess_passwd.",
              "imap.","ipsweep.","land.","loadmodule.","multihop.",
              "neptune.","nmap.","normal.","perl.","phf.","pod.","portsweep.",
              "rootkit.","satan.","smurf.","spy.","teardrop.","warezclient.",
              "warezmaster."]

In [41]:
import numpy as np

def parse_interaction_label(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return [float(x) for x in clean_line_split] + [line_split[i] for i in symbolic_indexes]

label_vector_data = raw_data.map(parse_interaction_label)

In [49]:
label_col_names = col_names + ["protocol", "service", "flag", "label"]

from pyspark import Row

def toLabelRow(splitLine):
    return Row(**dict(list(zip(label_col_names, splitLine))))

In [50]:
label_df = sqlContext.createDataFrame(label_vector_data.map(toLabelRow))

Compute statistics by labels

In [51]:
label_df.where(label_df.label=="normal.").describe()

97278

Mean duration by label

In [None]:
label_df.select("label").groupBy("label").count().show()

In [53]:
label_df.select("label", "duration").groupBy("label").mean().show()

+----------------+--------------------+
|           label|       avg(duration)|
+----------------+--------------------+
|    warezmaster.|               15.05|
|          smurf.|                 0.0|
|            pod.|                 0.0|
|           imap.|                 6.0|
|           nmap.|                 0.0|
|   guess_passwd.|  2.7169811320754715|
|        ipsweep.|0.034482758620689655|
|      portsweep.|  1915.2990384615384|
|          satan.|0.040276903713027064|
|           land.|                 0.0|
|     loadmodule.|   36.22222222222222|
|      ftp_write.|              32.375|
|buffer_overflow.|                91.7|
|        rootkit.|               100.8|
|    warezclient.|   615.2578431372549|
|       teardrop.|                 0.0|
|           perl.|  41.333333333333336|
|            phf.|                 4.5|
|       multihop.|               184.0|
|        neptune.|                 0.0|
+----------------+--------------------+
only showing top 20 rows



##Machine learning with Apache Spark
Now that the inputs are defined, we can apply some basics (or advanced) data processing functions to classify the type of interactions (i.e. "label")

In [58]:
from pyspark.ml.feature import StringIndexer

s = StringIndexer(inputCol="label", outputCol="idx_label").fit(label_df.select(col_names + ["label"]))

In [59]:
result = s.transform(label_df.select(col_names + ["label"]))

In [65]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler, PCA

assemblor = VectorAssembler(inputCols=col_names, outputCol="features")
rf = RandomForestClassifier(featuresCol="features", labelCol="idx_label", maxDepth=1, maxBins=32, numTrees=1)
pipeline = Pipeline(stages=[s, assemblor, rf])

Train and test splits

In [64]:
train, test = label_df.select(col_names + ["label"]).randomSplit([0.6,0.4])

In [66]:
model = pipeline.fit(train)

Compute accuracy on both train and test sets

In [70]:
model.transform(test).select("prediction", "idx_label").groupBy("prediction", "idx_label").count().show()

+----------+---------+------+
|prediction|idx_label| count|
+----------+---------+------+
|       1.0|     12.0|    13|
|       1.0|     19.0|     4|
|       1.0|     16.0|     5|
|       1.0|     17.0|     5|
|       1.0|     15.0|     3|
|       1.0|     11.0|    29|
|       1.0|      1.0| 42881|
|       1.0|     14.0|     6|
|       1.0|     18.0|     2|
|       1.0|      6.0|   396|
|       1.0|      8.0|   403|
|       1.0|     10.0|    92|
|       1.0|     13.0|     8|
|       1.0|      0.0|  1843|
|       1.0|     20.0|     1|
|       1.0|      4.0|   640|
|       1.0|      5.0|   472|
|       1.0|     22.0|     2|
|       1.0|      2.0| 39127|
|       0.0|      0.0|110503|
+----------+---------+------+
only showing top 20 rows



In [71]:
preds = model.transform(test)
print preds.where(preds.prediction == preds.idx_label).count()

153384


Try applying a PCA before learning the model

In [76]:
from pyspark.ml.feature import PCA

pca = PCA(k=2, inputCol="features", outputCol="pca_features")
assemblor = VectorAssembler(inputCols=col_names, outputCol="features")
rf = RandomForestClassifier(featuresCol="pca_features", labelCol="idx_label", maxDepth=1, maxBins=32, numTrees=1)
pipeline = Pipeline(stages=[s, assemblor, pca, rf])

In [77]:
model = pipeline.fit(train)

In [None]:
model.transform(test).select("prediction", "idx_label").groupBy("prediction", "idx_label").count().show()

Try applying a kmeans to the dataset

In [81]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=2, seed=1, featuresCol="features", predictionCol="kmeans_pred")
assemblor = VectorAssembler(inputCols=col_names, outputCol="features")
kmeans_assemblor = VectorAssembler(inputCols=col_names+["kmeans_pred"], outputCol="kmeans_features")
rf = RandomForestClassifier(featuresCol="kmeans_features", labelCol="idx_label", maxDepth=1, maxBins=32, numTrees=1)
pipeline = Pipeline(stages=[s, assemblor, kmeans, kmeans_assemblor, rf])

In [82]:
model = pipeline.fit(train)

KeyboardInterrupt: 

Don't forget the categorical variables

In [83]:
from pyspark.ml.feature import OneHotEncoder
train, test = label_df.select(col_names + ["protocol"] + ["label"]).randomSplit([0.6,0.4])
protocols = StringIndexer(inputCol="protocol", outputCol="idx_protocol")
ohe_protocol = OneHotEncoder(inputCol="idx_protocol", outputCol="ohe_protocol")
assemblor = VectorAssembler(inputCols=col_names+["ohe_protocol"], outputCol="features")
rf = RandomForestClassifier(featuresCol="features", labelCol="idx_label", maxDepth=1, maxBins=32, numTrees=1)
pipeline = Pipeline(stages=[s, protocols, ohe_protocol, assemblor, rf])

In [None]:
model = pipeline.fit(train)

World record is at 95%. Show what you've got...