<img src="images/BDG_LOGO.png" alt="drawing" align="right" width="200"/>

# H2020 RIA BigDataGrapes - Predictive Data Analytics (T4.3)

### This deliverable (D4.3) presents how to train machine learning models with the BigDataGrapes distributed processing architecture. In particular, we present how to train classifiers with MLLib (https://spark.apache.org/mllib/).

In [1]:
from pyspark import SparkContext

In [2]:
import math
import urllib
import random
import numpy as np
import pydoop.hdfs as hdfs

from numpy import array
from pyspark.mllib.regression import LabeledPoint

In [3]:
%matplotlib inline
import matplotlib.pyplot as plt

## Connection to the BDG Apache Spark

In [4]:
# standalone mode below
#sc = SparkContext(appName="Classification-KDDCUP1999", master="master[*]")

# distributed mode below
sc = SparkContext(appName="Classification-KDDCUP1999", master="spark://spark-master:7077")

# setting logger level
sc.setLogLevel("ERROR")

# Classification on a real dataset

## We employ the KDD Cup 1999 dataset (http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html).

### This is the data set used for The Third International Knowledge Discovery and Data Mining Tools Competition, which was held in conjunction with KDD-99 The Fifth International Conference on Knowledge Discovery and Data Mining. The competition task was to build a network intrusion detector, a predictive model capable of distinguishing between "bad" connections, called intrusions or attacks, and "good" normal connections. This database contains a standard set of data to be audited, which includes a wide variety of intrusions simulated in a military network environment. 

## Downloading the dataset from the Web and Reading it with Apache Spark (creating RDDs)

In [6]:
# train data
path_on_disk = "/tmp/kddcup.data.gz"
path_on_hdfs = "hdfs://namenode:8020/user/root/kddcup11.data.gz"
urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", path_on_disk)
hdfs.put(path_on_disk, path_on_hdfs)
train_data = sc.textFile(path_on_hdfs)

# # test data
test_path_on_disk = "/tmp/corrected.gz"
test_path_on_hdfs = "hdfs://namenode:8020/user/root/corrected11.data.gz"
urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", test_path_on_disk)
hdfs.put(test_path_on_disk, test_path_on_hdfs)
test_data = sc.textFile(test_path_on_hdfs)

In [7]:
# parsing data to produce data with correct labels (1, 0).
def parse_interaction(line):
    line_split = line.split(",")
    clean_line_split = line_split[0:1] + line_split[4:41]
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

In [8]:
parsed_train_data = train_data.map(parse_interaction)
parsed_test_data = test_data.map(parse_interaction)

# Binary Classification

## Now training one Logistic Regression Classifier.

### We also measure the time needed by MLLib to train it.

In [None]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

%time logit_model = LogisticRegressionWithLBFGS.train(parsed_train_data, iterations=1)

In [9]:
labels_and_preds = parsed_test_data.map(lambda p: (p.label, logit_model.predict(p.features)))

## Evaluation of the performance of the classifier (Accuracy)

In [10]:
%time test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(test_data.count())
print "Accuracy on test data is {}".format(round(test_accuracy, 4))

CPU times: user 8 ms, sys: 20 ms, total: 28 ms
Wall time: 17.2 s
Accuracy on test data is 0.8052


## Disconnection from the BDG Apache Spark

In [11]:
sc.stop()