# Using Decision Trees to classify Network Intrusions with Spark
KDD Cup 1999 Data
"The competition task was to build a network intrusion detector, a predictive model capable of distinguishing between ``bad'' connections, called intrusions or attacks, and ``good'' normal connections. This database contains a standard set of data to be audited, which includes a wide variety of intrusions simulated in a military network environment."
http://kdd.ics.uci.edu/databases/kddcup99/kddcup99

In [1]:


data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

print "Train data size is {}".format(raw_data.count())

Train data size is 4898431


In [2]:
# import urllib
# ft = urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")


test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print "Test data size is {}".format(test_raw_data.count())

Test data size is 311029


In [3]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

csv_data = raw_data.map(lambda x: x.split(","))
test_csv_data = test_raw_data.map(lambda x: x.split(","))

protocols = csv_data.map(lambda x: x[1]).distinct().collect()
services = csv_data.map(lambda x: x[2]).distinct().collect()
flags = csv_data.map(lambda x: x[3]).distinct().collect()

In [4]:
def create_labeled_point(line_split):
    # leave_out = [41]
    clean_line_split = line_split[0:41]
    
    # convert protocol to numeric categorical variable
    try: 
        clean_line_split[1] = protocols.index(clean_line_split[1])
    except:
        clean_line_split[1] = len(protocols)
        
    # convert service to numeric categorical variable
    try:
        clean_line_split[2] = services.index(clean_line_split[2])
    except:
        clean_line_split[2] = len(services)
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[3] = flags.index(clean_line_split[3])
    except:
        clean_line_split[3] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

In [5]:
training_data = csv_data.map(create_labeled_point)
test_data = test_csv_data.map(create_labeled_point)

## Create the Decision Tree with the trainning data

In [14]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from time import time

# Build the model
t0 = time()
tree_model = DecisionTree.trainClassifier(training_data, numClasses=2, 
                                          categoricalFeaturesInfo={1: len(protocols),
                                                                   2: len(services),
                                                                   3: len(flags)},
                                          impurity='gini', maxDepth=4, maxBins=100)
tt = time() - t0

print "Classifier trained in {} seconds".format(round(tt,3))


Classifier trained in 371.58 seconds


## Predict the output using the test data and evaluate the results

In [7]:
predictions = tree_model.predict(test_data.map(lambda p: p.features))
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)

In [8]:
t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))


Prediction made in 33.752 seconds. Test accuracy is 0.915


## Display the tree

In [9]:
print "Learned classification tree model:"
print tree_model.toDebugString()

Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 23 nodes
  If (feature 22 <= 88.5)
   If (feature 38 <= 0.7949999999999999)
    If (feature 36 <= 0.49)
     If (feature 34 <= 0.9550000000000001)
      Predict: 0.0
     Else (feature 34 > 0.9550000000000001)
      Predict: 1.0
    Else (feature 36 > 0.49)
     If (feature 2 in {0.0,42.0,24.0,20.0,46.0,57.0,60.0,44.0,27.0,12.0,7.0,3.0,18.0,67.0,43.0,26.0,55.0,58.0,36.0,4.0,47.0,15.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,42.0,24.0,20.0,46.0,57.0,60.0,44.0,27.0,12.0,7.0,3.0,18.0,67.0,43.0,26.0,55.0,58.0,36.0,4.0,47.0,15.0})
      Predict: 1.0
   Else (feature 38 > 0.7949999999999999)
    If (feature 3 in {10.0,1.0,9.0,3.0,4.0})
     Predict: 0.0
    Else (feature 3 not in {10.0,1.0,9.0,3.0,4.0})
     Predict: 1.0
  Else (feature 22 > 88.5)
   If (feature 5 <= 2.0)
    If (feature 11 <= 0.5)
     Predict: 1.0
    Else (feature 11 > 0.5)
     If (feature 2 in {12.0})
      Predict: 0.0
     Els

In [12]:
from pyspark.mllib.tree import RandomForest, RandomForestModel

# Cria o modelo - Classificação ou Regressão
model = RandomForest.trainClassifier(training_data, numClasses=2, numTrees=10, impurity="gini", featureSubsetStrategy="auto", categoricalFeaturesInfo={})

predictions = model.predict(test_data.map(lambda p: p.features))
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)

t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))

Prediction made in 33.644 seconds. Test accuracy is 0.9198


In [17]:
from pyspark.mllib.tree import RandomForest, RandomForestModel

# Cria o modelo - Classificação ou Regressão
model = RandomForest.trainClassifier(training_data, numClasses=2, maxBins=32, numTrees=100, impurity="gini", featureSubsetStrategy="auto", categoricalFeaturesInfo={})

predictions = model.predict(test_data.map(lambda p: p.features))
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)

t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))

Prediction made in 39.166 seconds. Test accuracy is 0.9183


In [16]:
from pyspark.mllib.tree import RandomForest, RandomForestModel

# Cria o modelo - Classificação ou Regressão
model = RandomForest.trainClassifier(training_data, numClasses=2, numTrees=10, impurity="gini", featureSubsetStrategy="auto", categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)},)

predictions = model.predict(test_data.map(lambda p: p.features))
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)

t0 = time()
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
tt = time() - t0

print "Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4))

IllegalArgumentException: u'requirement failed: DecisionTree requires maxBins (= 32) to be at least as large as the number of values in each categorical feature, but categorical feature 2 has 70 values. Considering remove this and other categorical features with a large number of values, or add more training examples.'