![](http://spark.apache.org/images/spark-logo.png) ![](https://upload.wikimedia.org/wikipedia/commons/f/f8/Python_logo_and_wordmark.svg)

In [4]:
%autosave 10

Autosaving every 10 seconds


# MLlib: Classification with Logistic Regression

In this notebook we will use Spark's machine learning library MLlib to build a **Logistic Regression classifier** for network attack detection. We will use the complete [KDD Cup 1999 datasets](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html) in order to test Spark capabilities with large datasets.

Additionally, we will introduce two ways of performing model selection: 
- by using a correlation matrix and 
- by using hypothesis testing.

# Getting the data and creating the RDD

In [5]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")

In [6]:
data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

print "Train data size is {}".format(raw_data.count())

Train data size is 4898431


In [7]:
# Test dataset
ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")

In [8]:
test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print "Test data size is {}".format(test_raw_data.count())

Test data size is 311029


# Labeled Points
A labeled point is a local vector associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms and they are stored as doubles. For binary classification, a label should be either 0 (negative) or 1 (positive).

# Training Data Preprocessing

In our case, we are interested in detecting network attacks in general. We don't need to detect which type of attack we are dealing with. Therefore we will tag each network interaction as non attack (i.e. 'normal' tag) or attack (i.e. anything else but 'normal').

In [9]:
from pyspark.mllib.regression import LabeledPoint
import numpy as np


In [10]:
def parse_interaction(line):
    line_split = line.split(",")
    # leave out =[1.,2,3, 41]
    clean_line_split = line_split[0:1]+line_split[4:41]
    attack = 1.0
    if line_split[41] =="normal.":
        attack = 0.0
        
    return LabeledPoint(attack, np.array([float(x) for x in clean_line_split]))



In [18]:
training_data = raw_data.map(parse_interaction)

In [22]:
training_data.take(5)

[LabeledPoint(0.0, [0.0,215.0,45076.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [0.0,162.0,4528.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,2.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [0.0,236.0,1228.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,2.0,2.0,1.0,0.0,0.5,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [0.0,233.0,2032.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,2.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,3.0,3.0,1.0,0.0,0.33,0.0,0.0,0.0,0.0,0.0]),
 LabeledPoint(0.0, [0.0,239.0,486.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,3.0,3.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,4.0,4.0,1.0,0.0,0.25,0.0,0.0,0.0,0.0,0.0])]

# Test Data Preprocessing


In [19]:
test_data = test_raw_data.map(parse_interaction)

In [21]:
test_data.take(1)

[LabeledPoint(0.0, [0.0,105.0,146.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,255.0,254.0,1.0,0.01,0.0,0.0,0.0,0.0,0.0,0.0])]

# Detecting network attacks using Logistic Regression
Logistic regression is widely used to predict a binary response. Spark implements two [algorithms](https://spark.apache.org/docs/latest/mllib-linear-methods.html#logistic-regression) to solve logistic regression: mini-batch gradient descent and L-BFGS. L-BFGS is recommended over mini-batch gradient descent for faster convergence.

# Training a classifier

In [23]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from time import time

# Build the model
t0 = time()
logit_model = LogisticRegressionWithLBFGS.train(training_data)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))

Classifier trained in 617.151 seconds


# Evaluating the model on new data
In order to measure the classification error on our test data, we use map on the ```test_data``` RDD and the model to predict each test point class.

In [41]:
labels_and_preds = test_data.map(lambda p: (p.label, logit_model.predict(p.features)))

Classification results are returned in pars, with the actual test label and the predicted one. This is used to calculate the classification error by using ```filter``` and ```count``` as follows:

In [43]:
t0 = time()
test_accuracy = labels_and_preds.filter(
             lambda (v, p): v==p).count() / float(test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))

Prediction made in 21.294 seconds. Test accuracy is 0.9164


# Model Selection

Model or feature selection helps us building more interpretable and efficient models (or classifier in this case). For illustrative purposes, we will follow two different approaches, correlation matrices and hypothesis testing.


## Using a correlation matrix
In this [notebook](https://github.com/andersy005/spark-quest/blob/master/pyspark-notebooks/06-mllib-statistics.ipynb) we calculated a correlation in order to find predictors that are highly correlated. There are many possible choices there in order to simplify our model.

In [44]:
import pandas as pd
pd.set_option('display.max_columns', 50)
highly_correlated_df = pd.read_csv("highly_correlated.csv")
highly_correlated_df

Unnamed: 0.1,Unnamed: 0,duration,src_bytes,dst_bytes,land,wrong_fragment,urgent,hot,num_failed_logins,logged_in,num_compromised,root_shell,su_attempted,num_root,num_file_creations,num_shells,num_access_files,num_outbound_cmds,is_hot_login,is_guest_login,count,srv_count,serror_rate,srv_serror_rate,rerror_rate,srv_rerror_rate,same_srv_rate,diff_srv_rate,srv_diff_host_rate,dst_host_count,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate
0,duration,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
1,src_bytes,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False
2,dst_bytes,False,False,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
3,land,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
4,wrong_fragment,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
5,urgent,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
6,hot,False,False,False,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
7,num_failed_logins,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
8,logged_in,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False
9,num_compromised,False,False,False,False,False,False,True,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False


We can pick different combinations of correlated variables and leave just those that represent them.

## Trial1:
Here we will choose the following:

- From the description of the [KDD Cup 99 task](http://kdd.ics.uci.edu/databases/kddcup99/task.html) we know that the variable ```dst_host_same_src_port_rate``` references the percentage of the last 100 connections to the same port, for the same destination host. In our correlation matrix( and dataframes) we find that this one is highly and positively correlated to ```src_bytes``` and ```srv_count```. The former is the number of bytes sent from source to destination. The later is the number of connections to the same service as the current connection in the past 2 seconds. We decide not to include ```dst_host_same_src_port_rate``` in our model since we include the other two.


- Variables ```serror_rate``` and ```srv_error_rate```(% of connections that have SYN errors for same host and same service respectively) are highly positively correlated. Moreover, the set of variables that they highly correlate with are pretty much the same. They look like contributing very similarly to our model. We will keep just ```serror_rate```.


- A similar situation happens with ```rerror_rate``` and ```srv_rerror_rate```(% of connections that have REJ errors) so we will keep just ```rerror_rate```.


- Same thing with variables prefixed with ```dst_host_``` for the previous ones (e.g. ```dst_host_srv_serror_rate```).

Our list of variables we will drop includes:
- ```same_srv_rate```
- ```diff_srv_rate```
- ```dst_host_same_src_port_rate``` (column 35).
- ```srv_serror_rate``` (column 25).
- ```srv_rerror_rate``` (column 27).
- ```dst_host_srv_serror_rate``` (column 38).
- ```dst_host_srv_rerror_rate``` (column 40).


# Model Evaluation
First, we need to provide training and testing datasets containing just the selected variables. For that we will define a new function to parse the raw data that keeps just what we need .

In [54]:
def parse_interaction_corr(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,25,27,35,38,40,41]
    clean_line_split = line_split[0:1]+line_split[4:25]+line_split[26:27]+line_split[28:35]+line_split[36:38]+line_split[39:40]
    attack = 1.0
    if line_split[41] =="normal.":
        attack = 0.0
        
    return LabeledPoint(attack, np.array([float(x) for x in clean_line_split]))



In [55]:
corr_reduced_training_data = raw_data.map(parse_interaction_corr)
corr_reduced_test_data = test_raw_data.map(parse_interaction_corr)

In [56]:
corr_reduced_test_data.take(1)

[LabeledPoint(0.0, [0.0,105.0,146.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,1.0,0.0,0.0,255.0,254.0,1.0,0.01,0.0,0.0,0.0])]

*Note: when selecting elements in the split, a list comprehension with a ```leave_out``` list for filtering is more Pythonic than slicing and concatenation indeed, but we have found it less efficient. This is very important when dealing with large datasets. The ```parse_interaction``` functions will be called for every element in the RDD, so we need to make them as efficient as possible.*

In [57]:
# Build the model
t0 = time()
logit_model_2 = LogisticRegressionWithLBFGS.train(corr_reduced_training_data)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))

Classifier trained in 581.163 seconds


In [58]:
# Model Evalutation
labels_and_preds = corr_reduced_test_data.map(lambda p: (p.label, logit_model_2.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(corr_reduced_test_data.count())
tt = time() - t0
print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))

Prediction made in 21.072 seconds. Test accuracy is 0.8599


As expected, we have reduced accuracy and also training time. However this doesn't seem a good trade! At least not for logistic regression and considering the predictors we decided to leave out. We have lost quite a lot of accuracy and have not gained a lot of execution time during training. Moreover prediction time didn't improve.


# Using Hypothesis testing

Hypothesis testing is a powerful tool in statistical inference and learning to determine whether a result is statistically significant. MLlib supports Pearson's chi-squared($\tilde{\chi}^{2}$) tests for goodness of fit and independence. The goodness of fit test requires an input type of ```Vector```, whereas the independence test requires a ```Matrix``` as input. Moreover, MLlib also supports the input type ```RDD[LabeledPoint]``` to enable feature selection via chi-squared independence tests. Again, these methods are part of the ```Statistics``` package.

In our case we want to perform some sort of feature selection, so we will provide an RDD of ```LabeledPoint```. Internally, MLlib will calculate a contingency matrix and perform the Person's chi-squared (($\tilde{\chi}^{2}$) test. Features need to be categorical. Real-valued features will be treated as categorical in each of its different values. There is a limit of 1000 different values, so we need either to leave out some features or categorise them. In this case, we will consider just features that either take boolean values or just a few different numeric values in our dataset. We could overcome this limitation by defining a more complex ```parse_interaction``` function that categorises each feature properly.

In [59]:
feature_names = ["land","wrong_fragment",
             "urgent","hot","num_failed_logins","logged_in","num_compromised",
             "root_shell","su_attempted","num_root","num_file_creations",
             "num_shells","num_access_files","num_outbound_cmds",
             "is_hot_login","is_guest_login","count","srv_count","serror_rate",
             "srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
             "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count",
             "dst_host_same_srv_rate","dst_host_diff_srv_rate","dst_host_same_src_port_rate",
             "dst_host_srv_diff_host_rate","dst_host_serror_rate","dst_host_srv_serror_rate",
             "dst_host_rerror_rate","dst_host_srv_rerror_rate"]

In [66]:
def parse_interaction_categorical(line):
    line_split = line.split(",")
    clean_line_split = line_split[6:41]
    attack = 1.0
    if line_split[41] =="normal.":
        attack = 0.0
        
    return LabeledPoint(attack, np.array([float(x) for x in clean_line_split]))

In [67]:
training_data_categorical = raw_data.map(parse_interaction_categorical)

In [68]:
from pyspark.mllib.stat import Statistics

chi = Statistics.chiSqTest(training_data_categorical)

In [69]:
#  Now we can check the resulting values after putting them into a Pandas data frame.

records = [(result.statistic, result.pValue) for result in chi]

chi_df = pd.DataFrame(data=records, index=feature_names, columns=["Statistic", "p-value"])

chi_df.to_csv("chi.csv", index=False)

In [70]:
chi_df

Unnamed: 0,Statistic,p-value
land,0.4649835,0.4953041
wrong_fragment,306.8555,0.0
urgent,38.71844,2.705761e-07
hot,19463.31,0.0
num_failed_logins,127.7691,0.0
logged_in,3273098.0,0.0
num_compromised,2011.863,0.0
root_shell,1044.918,0.0
su_attempted,434.0,0.0
num_root,22871.68,0.0


From that we conclude that predictors land and num_outbound_cmds could be removed from our model without affecting our accuracy dramatically. Let's try this.

# Evaluating the new model
So the only modification to our first parse_interaction function will be to remove columns 6 and 19, corresponding to the two predictors that we want not to be part of our model.

In [78]:
def parse_interaction_chi(line):
    line_split = line.split(",")
    # leave_out = [1,2,3,6,19,41]
    clean_line_split = line_split[0:1] + line_split[4:6] + line_split[7:19] + line_split[20:41]
    attack = 1.0
    if line_split[41] =="normal.":
        attack = 0.0
        
    return LabeledPoint(attack, np.array([float(x) for x in clean_line_split]))

In [79]:
training_data_chi = raw_data.map(parse_interaction_chi)
test_data_chi = test_raw_data.map(parse_interaction_chi)

In [80]:
# Build the model
t0 = time()
logit_model_chi = LogisticRegressionWithLBFGS.train(training_data_chi)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))

Classifier trained in 613.6 seconds


In [81]:
# Model Evalutation
labels_and_preds = test_data_chi.map(lambda p: (p.label, logit_model_chi.predict(p.features)))
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data_chi.count())
tt = time() - t0
print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))

Prediction made in 21.379 seconds. Test accuracy is 0.9164


So we can see that, by using hypothesis testing, we have been able to remove two predictors without diminishing testing accuracy at all. Training time didn't change. This might now seem like a big model reduction, but it is something when dealing with big data sources. Moreover, we should be able to categorise those five predictors we have left out for different reasons and, either include them in the model or leave them out if they aren't statistically significant.


Additionally, we could try to remove some of those predictors that are highly correlated, trying not to reduce accuracy too much. In the end, we should end up with a model easier to understand and use.