### If you use pycharm, first you need to set Environment Variables in run-> Edit Configurations ...

- SPARK_HOME = /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark
- PYTHONPATH = $SPARK_HOME/python 
- JAVA_HOME = /Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home
- PYSPARK_PYTHON = python3
- PYSPARK_DRIVER_PYTHON = python3

In [1]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local")
sc = SparkContext(conf = conf)

In [2]:
sc.version

'3.0.0-preview2'

# MLlib: Decision Trees
original https://github.com/jadianes/spark-py-notebooks/blob/master/nb9-mllib-trees/nb9-mllib-trees.ipynb

##  Getting the data and creating the RDD

In [3]:
import urllib.request
f = urllib.request.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz", "kddcup.data.gz")

In [4]:
data_file = "./kddcup.data.gz"
raw_data = sc.textFile(data_file)

print ("Train data size is {}".format(raw_data.count()))

Train data size is 4898431


In [5]:
ft = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz", "corrected.gz")

In [6]:
test_data_file = "./corrected.gz"
test_raw_data = sc.textFile(test_data_file)

print ("Test data size is {}".format(test_raw_data.count()))

Test data size is 311029


##  Detecting network attacks using Decision Trees

In this section we will train a classification tree that, as we did with logistic regression, will predict if a network interaction is either normal or attack.


Training a classification tree using MLlib requires some parameters:


- Training data
- Num classes
- Categorical features info: a map from column to categorical variables arity. This is optional, although it should increase model accuracy. However it requires that we know the levels in our categorical variables in advance. second we need to parse our data to convert labels to integer values within the arity range.
- Impurity metric
- Tree maximum depth
- And tree maximum number of bins


In the next section we will see how to obtain all the labels within a dataset and convert them to numerical factors.

## Preparing the data


In [7]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

csv_data = raw_data.map(lambda x: x.split(","))
test_csv_data = test_raw_data.map(lambda x: x.split(","))

protocols = csv_data.map(lambda x: x[1]).distinct().collect()
services = csv_data.map(lambda x: x[2]).distinct().collect()
flags = csv_data.map(lambda x: x[3]).distinct().collect()

In [8]:
def create_labeled_point(line_split):
    # leave_out = [41]
    clean_line_split = line_split[0:41]
    
    # convert protocol to numeric categorical variable
    try: 
        clean_line_split[1] = protocols.index(clean_line_split[1])
    except:
        clean_line_split[1] = len(protocols)
        
    # convert service to numeric categorical variable
    try:
        clean_line_split[2] = services.index(clean_line_split[2])
    except:
        clean_line_split[2] = len(services)
    
    # convert flag to numeric categorical variable
    try:
        clean_line_split[3] = flags.index(clean_line_split[3])
    except:
        clean_line_split[3] = len(flags)
    
    # convert label to binary label
    attack = 1.0
    if line_split[41]=='normal.':
        attack = 0.0
        
    return LabeledPoint(attack, array([float(x) for x in clean_line_split]))

training_data = csv_data.map(create_labeled_point)
test_data = test_csv_data.map(create_labeled_point)

## Training a classifier

In [9]:
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from time import time

# Build the model
t0 = time()
tree_model = DecisionTree.trainClassifier(training_data, numClasses=2, 
                                          categoricalFeaturesInfo={1: len(protocols), 2: len(services), 3: len(flags)},
                                          impurity='gini', maxDepth=4, maxBins=100)
tt = time() - t0

print ("Classifier trained in {} seconds".format(round(tt,3)))

Classifier trained in 135.877 seconds


## Evaluating the model

In [10]:
predictions = tree_model.predict(test_data.map(lambda p: p.features))
labels_and_preds = test_data.map(lambda p: p.label).zip(predictions)

In [11]:
labels_and_preds.top(10)

[(1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0),
 (1.0, 1.0)]

In [13]:
t0 = time()
test_accuracy = labels_and_preds.filter(lambda v, p: v == p).count() / float(test_data.count())
tt = time() - t0

print ("Prediction made in {} seconds. Test accuracy is {}".format(round(tt,3), round(test_accuracy,4)))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 1 times, most recent failure: Lost task 0.0 in stage 22.0 (TID 22, DEUS, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in main
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 587, in process
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\rdd.py", line 2583, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\rdd.py", line 2583, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\rdd.py", line 2583, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\rdd.py", line 425, in func
    return f(iterator)
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\rdd.py", line 1128, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\rdd.py", line 1128, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
TypeError: <lambda>() missing 1 required positional argument: 'p'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:484)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:619)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:602)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:437)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2156)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:830)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1989)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1977)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1976)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1976)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:956)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:956)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2155)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2144)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:758)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2137)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2156)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2181)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:567)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 597, in main
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 587, in process
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\rdd.py", line 2583, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\rdd.py", line 2583, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\rdd.py", line 2583, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\rdd.py", line 425, in func
    return f(iterator)
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\rdd.py", line 1128, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\pyspark\rdd.py", line 1128, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "C:\spark-3.0.0-preview2-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\util.py", line 107, in wrapper
    return f(*args, **kwargs)
TypeError: <lambda>() missing 1 required positional argument: 'p'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:484)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:619)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:602)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:437)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2156)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:441)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:444)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [14]:
print ("Learned classification tree model:")
print (tree_model.toDebugString())

Learned classification tree model:
DecisionTreeModel classifier of depth 4 with 25 nodes
  If (feature 22 <= 35.5)
   If (feature 38 <= 0.875)
    If (feature 36 <= 0.445)
     If (feature 34 <= 0.925)
      Predict: 0.0
     Else (feature 34 > 0.925)
      Predict: 1.0
    Else (feature 36 > 0.445)
     If (feature 2 in {0.0,5.0,24.0,25.0,14.0,20.0,29.0,1.0,21.0,13.0,2.0,17.0,22.0,27.0,7.0,3.0,11.0,26.0,23.0,8.0,19.0,4.0})
      Predict: 0.0
     Else (feature 2 not in {0.0,5.0,24.0,25.0,14.0,20.0,29.0,1.0,21.0,13.0,2.0,17.0,22.0,27.0,7.0,3.0,11.0,26.0,23.0,8.0,19.0,4.0})
      Predict: 1.0
   Else (feature 38 > 0.875)
    If (feature 3 in {0.0,1.0,2.0})
     Predict: 0.0
    Else (feature 3 not in {0.0,1.0,2.0})
     If (feature 36 <= 0.255)
      Predict: 1.0
     Else (feature 36 > 0.255)
      Predict: 0.0
  Else (feature 22 > 35.5)
   If (feature 5 <= 2.0)
    If (feature 2 in {11.0,66.0})
     Predict: 0.0
    Else (feature 2 not in {11.0,66.0})
     If (feature 11 <= 0.5)
     