## Install pyspark and Basic Intro

* https://www.digitalocean.com/community/tutorials/how-to-install-anaconda-on-ubuntu-18-04-quickstart
* http://spark.apache.org/docs/latest/quick-start.html
* https://www.datacamp.com/community/tutorials/apache-spark-python#gs.fMIIqxM

This notebook is based upon [Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

In [1]:
!pip install findspark



Requirement already satisfied: findspark in /home/gong/anaconda3/lib/python3.7/site-packages (1.3.0)

command to start jupyter notebook:
    
    `$ PYSPARK_DRIVER_PYTHON="jupyter" PYSPARK_DRIVER_PYTHON_OPTS="notebook" pyspark`

In [2]:
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [None]:
from time import time

In [3]:
spark = SparkSession\
    .builder\
    .appName("rdd")\
    .getOrCreate()

In [4]:
sc = spark.sparkContext

In [5]:
sc

* an RDD is simply a set of Java or Scala objects representing data.

* DataFrame is higher level abstraction built on top of RDD (optimized by spark) `Dataset[Row]`

* Dataset: getting back some type-safety and use of lambda function

## RDD creation

In this notebook we will introduce two different ways of getting data into the basic Spark data structure, the **Resilient Distributed Dataset** or **RDD**. An RDD is a distributed collection of elements. All work in Spark is expressed as either creating new RDDs, transforming existing RDDs, or calling actions on RDDs to compute a result. Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.

#### References

The reference book for these and other Spark related topics is *Learning Spark* by Holden Karau, Andy Konwinski, Patrick Wendell, and Matei Zaharia.  

/home/gong/INSIGHT-Bookshelf/Spark_Learning.pdf

The KDD Cup 1999 competition dataset is described in detail [here](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99).

## Getting the data files  

In this notebook we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a *Gzip* file that we will download locally.  

import urllib

f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

## Creating a RDD from a file  

The most common way of creating an RDD is to load it from a file. Notice that Spark's `textFile` can handle compressed files directly.    

In [6]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

In [7]:
type(raw_data)

pyspark.rdd.RDD

Now we have our data file loaded into the `raw_data` RDD.

Without getting into Spark *transformations* and *actions*, the most basic thing we can do to check that we got our RDD contents right is to `count()` the number of lines loaded from the file into the RDD.  

In [8]:
raw_data.count()
# 494021

494021

We can also check the first few entries in our data.  

In [9]:
raw_data.take(5)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

In the following notebooks, we will use this raw data to learn about the different Spark transformations and actions.  

## Creating an RDD using `parallelize`

Another way of creating an RDD is to parallelize an already existing list.  

In [10]:
a = range(1000)

data = sc.parallelize(a)

As we did before, we can `count()` the number of elements in the RDD.

In [11]:
data.count()

1000

As before, we can access the first few elements on our RDD.  

In [12]:
data.take(5)

[0, 1, 2, 3, 4]

## The `filter` transformation

This transformation can be applied to RDDs in order to keep just elements that satisfy a certain condition. More concretely, a function is evaluated on every element in the original RDD. The new resulting RDD will contain just those elements that make the function return `True`.

For example, imagine we want to count how many `normal.` interactions we have in our dataset. We can filter our `raw_data` RDD as follows.  

In [13]:
normal_raw_data = raw_data.filter(lambda x: 'normal.' in x)

In [14]:
normal_raw_data.count()   # 97278

97278

In [15]:
t0 = time()
normal_count = normal_raw_data.count()
tt = time() - t0
print("There are {} 'normal' interactions".format(normal_count)) 
print ("Count completed in {} seconds".format(round(tt,3)))

There are 97278 'normal' interactions
Count completed in 1.484 seconds


Notice that we have measured the elapsed time for counting the elements in the RDD. We have done this because we wanted to point out that actual (distributed) computations in Spark take place when we execute *actions* and not *transformations*. In this case `count` is the action we execute on the RDD. We can apply as many transformations as we want on a our RDD and no computation will take place until we call the first action that, in this case takes a few seconds to complete.

## The `map` transformation

By using the `map` transformation in Spark, we can apply a function to every element in our RDD. Python's lambdas are specially expressive for this particular.

In this case we want to read our data file as a CSV formatted one. We can do this by applying a lambda function to each element in the RDD as follows.

In [16]:
from pprint import pprint
csv_data = raw_data.map(lambda x: x.split(","))
t0 = time()
head_rows = csv_data.take(5)
tt = time() - t0
print ("Parse completed in {} seconds".format(round(tt,3)))
pprint(head_rows[0])

Parse completed in 0.05 seconds
['0',
 'tcp',
 'http',
 'SF',
 '181',
 '5450',
 '0',
 '0',
 '0',
 '0',
 '0',
 '1',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '0',
 '8',
 '8',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 '1.00',
 '0.00',
 '0.00',
 '9',
 '9',
 '1.00',
 '0.00',
 '0.11',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 '0.00',
 'normal.']


In [17]:
t0 = time()
head_rows = csv_data.take(10000)
tt = time() - t0
print ("Parse completed in {} seconds".format(round(tt,3)))

Parse completed in 0.606 seconds


### Using `map` with predefined functions

Of course we can use predefined functions with `map`. Imagine we want to have each element in the RDD as a key-value pair where the key is the tag (e.g. *normal*) and the value is the whole list of elements that represents the row in the CSV formatted file. We could proceed as follows.  

In [18]:
row1 = raw_data.first()

In [21]:
type(row1), len(row1.split(',')), row1

(str,
 42,
 '0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.')

In [22]:
def parse_interaction(line):
    elems = line.split(",")
    tag = elems[-1]
    return (tag, elems)

key_csv_data = raw_data.map(parse_interaction)
head_rows = key_csv_data.take(5)
pprint(head_rows[0])

('normal.',
 ['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'])


## The `collect` action

So far we have used the actions `count` and `take`. Another basic action we need to learn is `collect`. Basically it will get all the elements in the RDD into memory for us to work with them. For this reason it has to be used with care, specially when working with large RDDs.  

In [24]:
t0 = time()
all_raw_data = raw_data.collect()
tt = time() - t0
print ("Data collected in {} seconds".format(round(tt,3)))

Data collected in 5.294 seconds


That took longer as any other action we used before, of course. Every Spark worker node that has a fragment of the RDD has to be coordinated in order to retrieve its part, and then *reduce* everything together.   

As a last example combining all the previous, we want to collect all the normal interactions as key-value pairs

In [25]:
# parse into key-value pairs
key_csv_data = raw_data.map(parse_interaction)

# filter normal key interactions
normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")

# collect all
t0 = time()
all_normal = normal_key_interactions.collect()
tt = time() - t0
normal_count = len(all_normal)
print ("Data collected in {} seconds".format(round(tt,3)))
print ("There are {} 'normal' interactions".format(normal_count))

Data collected in 4.336 seconds
There are 97278 'normal' interactions


## Sampling RDDs

So far we have introduced RDD creation together with some basic transformations such as map and filter and some actions such as count, take, and collect.

This notebook will show how to sample RDDs. Regarding transformations, sample will be introduced since it will be useful in many statistical learning scenarios. Then we will compare results with the takeSample action.

In Spark, there are two sampling operations, the transformation `sample` and the action `takeSample`. By using a transformation we can tell Spark to apply successive transformation on a sample of a given RDD. By using an action we retrieve a given sample and we can have it in local memory to be used by any other standard library (e.g. Scikit-learn).  

### The `sample` transformation

The `sample` transformation takes up to three parameters. First is whether the sampling is done with replacement or not. Second is the sample size as a fraction. Finally we can optionally provide a *random seed*.  

In [28]:
raw_data_sample = raw_data.sample(False, 0.1, 1234)
sample_size = raw_data_sample.count()
total_size = raw_data.count()
print ("Sample size is {} of {}".format(sample_size, total_size))

Sample size is 49493 of 494021


But the power of sampling as a transformation comes from doing it as part of a sequence of additional transformations. This will show more powerful once we start doing aggregations and key-value pairs operations, and will be specially useful when using Spark's machine learning library MLlib.    

In the meantime, imagine we want to have an approximation of the proportion of `normal.` interactions in our dataset. We could do this by counting the total number of tags as we did in previous notebooks. However we want a quicker response and we don't need the exact answer but just an approximation. We can do it as follows.   

In [29]:
from time import time

# transformations to be applied
raw_data_sample_items = raw_data_sample.map(lambda x: x.split(","))
sample_normal_tags = raw_data_sample_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
sample_normal_tags_count = sample_normal_tags.count()
tt = time() - t0

sample_normal_ratio = sample_normal_tags_count / float(sample_size)
print ("The ratio of 'normal' interactions is {}".format(round(sample_normal_ratio,3)) )
print ("Count done in {} seconds".format(round(tt,3)))

The ratio of 'normal' interactions is 0.195
Count done in 1.675 seconds


Let's compare this with calculating the ratio without sampling.

In [30]:
# transformations to be applied
raw_data_items = raw_data.map(lambda x: x.split(","))
normal_tags = raw_data_items.filter(lambda x: "normal." in x)

# actions + time
t0 = time()
normal_tags_count = normal_tags.count()
tt = time() - t0

normal_ratio = normal_tags_count / float(total_size)
print ("The ratio of 'normal' interactions is {}".format(round(normal_ratio,3)) )
print ("Count done in {} seconds".format(round(tt,3)))

The ratio of 'normal' interactions is 0.197
Count done in 3.11 seconds


We can see a gain in time. The more transformations we apply after the sampling the bigger this gain. This is because without sampling all the transformations are applied to the complete set of data.  

### The `takeSample` action  

If what we need is to grab a sample of raw data from our RDD into local memory in order to be used by other non-Spark libraries, `takeSample` can be used.  

The syntax is very similar, but in this case we specify the number of items instead of the sample size as a fraction of the complete data size.  

In [32]:
rows_to_sample = 100000

t0 = time()
raw_data_sample = raw_data.takeSample(False, rows_to_sample, 1234)
normal_data_sample = [x.split(",") for x in raw_data_sample if "normal." in x]
tt = time() - t0

normal_sample_size = len(normal_data_sample)

normal_ratio = normal_sample_size / rows_to_sample
print ("The ratio of 'normal' interactions is {}".format(normal_ratio))
print ("Count done in {} seconds".format(round(tt,3)))

The ratio of 'normal' interactions is 0.19633
Count done in 3.049 seconds


The process was very similar as before. We obtained a sample of about 10 percent of the data, and then filter and split.  

However, it took longer, even with a slightly smaller sample. The reason is that Spark just distributed the execution of the sampling process. The filtering and splitting of the results were done locally in a single node. 

##  Set operations on RDDs

Spark supports many of the operations we have in mathematical sets, such as union and intersection, even when the RDDs themselves are not properly sets. It is important to note that these operations require that the RDDs being operated on are of the same type.  

Set operations are quite straightforward to understand as it work as expected. The only consideration comes from the fact that RDDs are not real sets, and therefore operations such as the union of RDDs doesn't remove duplicates. In this notebook we will have a brief look at `subtract`, `distinct`, and `cartesian`.       

### Getting attack interactions using `subtract`

For illustrative purposes, imagine we already have our RDD with non attack (normal) interactions from some previous analysis.

We can obtain attack interactions by subtracting normal ones from the original unfiltered RDD as follows.  

In [33]:
normal_raw_data = raw_data.filter(lambda x: "normal." in x)
attack_raw_data = raw_data.subtract(normal_raw_data)

In [34]:
normal_raw_data.count(), attack_raw_data.count()

(97278, 396743)

In [35]:
attack_raw_data.take(5)

['305,tcp,telnet,SF,1735,2766,0,0,0,3,0,1,2,1,0,0,1,0,0,0,0,0,1,1,0.00,0.00,0.00,0.00,1.00,0.00,0.00,2,4,1.00,0.00,0.50,0.50,0.00,0.00,0.00,0.00,buffer_overflow.',
 '0,tcp,telnet,S0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,6,5,0.83,1.00,0.00,0.00,0.83,0.33,0.00,5,6,1.00,0.00,0.20,0.33,1.00,0.83,0.00,0.00,neptune.',
 '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,123,0.48,0.01,0.48,0.00,0.00,0.00,0.00,0.00,smurf.',
 '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,133,0.52,0.01,0.52,0.00,0.00,0.00,0.00,0.00,smurf.',
 '0,icmp,ecr_i,SF,1032,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,511,511,0.00,0.00,0.00,0.00,1.00,0.00,0.00,255,183,0.72,0.01,0.72,0.00,0.00,0.00,0.00,0.00,smurf.']

### Protocol and service combinations using `cartesian`

We can compute the Cartesian product between two RDDs by using the cartesian transformation. It returns all possible pairs of elements between two RDDs. In our case we will use it to generate all the possible combinations between service and protocol in our network interactions.

First of all we need to isolate each collection of values in two separate RDDs. For that we will use distinct on the CSV-parsed dataset. From the [dataset description](http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names) we know that protocol is the second column and service is the third (tag is the last one and not the first as appears in the page).

In [39]:
# get attach type
csv_data = raw_data.map(lambda x: x.split(","))
attack_types = csv_data.map(lambda x: x[-1]).distinct()
attack_types.collect()

['normal.',
 'buffer_overflow.',
 'loadmodule.',
 'perl.',
 'neptune.',
 'smurf.',
 'guess_passwd.',
 'pod.',
 'teardrop.',
 'portsweep.',
 'ipsweep.',
 'land.',
 'ftp_write.',
 'back.',
 'imap.',
 'satan.',
 'phf.',
 'nmap.',
 'multihop.',
 'warezmaster.',
 'warezclient.',
 'spy.',
 'rootkit.']

In [40]:
# get protocol
csv_data = raw_data.map(lambda x: x.split(","))
protocols = csv_data.map(lambda x: x[1]).distinct()
protocols.collect()

['tcp', 'udp', 'icmp']

In [37]:
services = csv_data.map(lambda x: x[2]).distinct()
services.collect()

['http',
 'smtp',
 'finger',
 'domain_u',
 'auth',
 'telnet',
 'ftp',
 'eco_i',
 'ntp_u',
 'ecr_i',
 'other',
 'private',
 'pop_3',
 'ftp_data',
 'rje',
 'time',
 'mtp',
 'link',
 'remote_job',
 'gopher',
 'ssh',
 'name',
 'whois',
 'domain',
 'login',
 'imap4',
 'daytime',
 'ctf',
 'nntp',
 'shell',
 'IRC',
 'nnsp',
 'http_443',
 'exec',
 'printer',
 'efs',
 'courier',
 'uucp',
 'klogin',
 'kshell',
 'echo',
 'discard',
 'systat',
 'supdup',
 'iso_tsap',
 'hostnames',
 'csnet_ns',
 'pop_2',
 'sunrpc',
 'uucp_path',
 'netbios_ns',
 'netbios_ssn',
 'netbios_dgm',
 'sql_net',
 'vmnet',
 'bgp',
 'Z39_50',
 'ldap',
 'netstat',
 'urh_i',
 'X11',
 'urp_i',
 'pm_dump',
 'tftp_u',
 'tim_i',
 'red_i']

In [41]:
# do the cartesian product.
product = protocols.cartesian(services).collect()
print ("There are {} combinations of protocol X service".format(len(product)))

There are 198 combinations of protocol X service


Obviously, for such small RDDs doesn't really make sense to use Spark cartesian product. We could have perfectly collected the values after using `distinct` and do the cartesian product locally. Moreover, `distinct` and `cartesian` are expensive operations so they must be used with care when the operating datasets are large.   

##  Data aggregations on RDDs

We can aggregate RDD data in Spark by using three different actions: reduce, fold, and aggregate. The last one is the more general one and someway includes the first two.


### Inspecting interaction duration by tag

Both `fold` and `reduce` take a function as an argument that is applied to two elements of the RDD. The `fold` action differs from `reduce` in that it gets and additional initial *zero value* to be used for the initial call. This value should be the identity element for the function provided.  

As an example, imagine we want to know the total duration of our interactions for normal and attack interactions. We can use `reduce` as follows.   

In [42]:
# parse data
csv_data = raw_data.map(lambda x: x.split(","))

# separate into different RDDs
normal_csv_data = csv_data.filter(lambda x: x[41]=="normal.")
attack_csv_data = csv_data.filter(lambda x: x[41]!="normal.")

The function that we pass to `reduce` gets and returns elements of the same type of the RDD. If we want to sum durations we need to extract that element into a new RDD.  

In [43]:
normal_duration_data = normal_csv_data.map(lambda x: int(x[0]))
attack_duration_data = attack_csv_data.map(lambda x: int(x[0]))

In [44]:
total_normal_duration = normal_duration_data.reduce(lambda x, y: x + y)
total_attack_duration = attack_duration_data.reduce(lambda x, y: x + y)

In [45]:
print ("Total duration for 'normal' interactions is {}".\
    format(total_normal_duration))
print ("Total duration for 'attack' interactions is {}".\
    format(total_attack_duration))

Total duration for 'normal' interactions is 21075991
Total duration for 'attack' interactions is 2626792


We can go further and use counts to calculate duration means.  

In [46]:
normal_count = normal_duration_data.count()
attack_count = attack_duration_data.count()

print ("Mean duration for 'normal' interactions is {}".\
    format(round(total_normal_duration/float(normal_count),3)))
print ("Mean duration for 'attack' interactions is {}".\
    format(round(total_attack_duration/float(attack_count),3)))

Mean duration for 'normal' interactions is 216.657
Mean duration for 'attack' interactions is 6.621


### A better way, using `aggregate`  

The `aggregate` action frees us from the constraint of having the return be the same type as the RDD we are working on. Like with `fold`, we supply an initial zero value of the type we want to return. Then we provide two functions. The first one is used to combine the elements from our RDD with the accumulator. The second function is needed to merge two accumulators. Let's see it in action calculating the mean we did before.  

In [47]:
normal_sum_count = normal_duration_data.aggregate(
    (0,0), # the initial value
    (lambda acc, value: (acc[0] + value, acc[1] + 1)), # combine value with acc
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # combine accumulators
)

print ("Mean duration for 'normal' interactions is {}".\
    format(round(normal_sum_count[0]/float(normal_sum_count[1]),3)))

Mean duration for 'normal' interactions is 216.657


## Working with key/value pair RDDs

Spark provides specific functions to deal with RDDs which elements are key/value pairs. They are usually used to perform aggregations and other processings by key.  

In this notebook we will show how, by working with key/value pairs, we can process our network interactions dataset in a more practical and powerful way than that used in previous notebooks. Key/value pair aggregations will show to be particularly effective when trying to explore each type of tag in our network attacks, in an individual way.  

### Creating a pair RDD for interaction types

In this notebook we want to do some exploratory data analysis on our network interactions dataset. More concretely we want to profile each network interaction type in terms of some of its variables such as duration. In order to do so, we first need to create the RDD suitable for that, where each interaction is parsed as a CSV row representing the value, and is put together with its corresponding tag as a key.  

Normally we create key/value pair RDDs by applying a function using `map` to the original data. This function returns the corresponding pair for a given RDD element. We can proceed as follows.  

In [48]:
csv_data = raw_data.map(lambda x: x.split(","))
key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag

In [49]:
key_value_data.first()

('normal.',
 ['0',
  'tcp',
  'http',
  'SF',
  '181',
  '5450',
  '0',
  '0',
  '0',
  '0',
  '0',
  '1',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '0',
  '8',
  '8',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '1.00',
  '0.00',
  '0.00',
  '9',
  '9',
  '1.00',
  '0.00',
  '0.11',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  '0.00',
  'normal.'])

### Data aggregations with key/value pair RDDs

We can use all the transformations and actions available for normal RDDs with key/value pair RDDs. We just need to make the functions work with pair elements. Additionally, Spark provides specific functions to work with RDDs containing pair elements. They are very similar to those available for general RDDs.  

For example, we have a `reduceByKey` transformation that we can use as follows to calculate the total duration of each network interaction type.  

In [50]:
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)

durations_by_key.collect()

[('normal.', 21075991.0),
 ('buffer_overflow.', 2751.0),
 ('loadmodule.', 326.0),
 ('perl.', 124.0),
 ('neptune.', 0.0),
 ('smurf.', 0.0),
 ('guess_passwd.', 144.0),
 ('pod.', 0.0),
 ('teardrop.', 0.0),
 ('portsweep.', 1991911.0),
 ('ipsweep.', 43.0),
 ('land.', 0.0),
 ('ftp_write.', 259.0),
 ('back.', 284.0),
 ('imap.', 72.0),
 ('satan.', 64.0),
 ('phf.', 18.0),
 ('nmap.', 0.0),
 ('multihop.', 1288.0),
 ('warezmaster.', 301.0),
 ('warezclient.', 627563.0),
 ('spy.', 636.0),
 ('rootkit.', 1008.0)]

We have a specific counting action for key/value pairs.  

In [51]:
counts_by_key = key_value_data.countByKey()
counts_by_key

defaultdict(int,
            {'normal.': 97278,
             'buffer_overflow.': 30,
             'loadmodule.': 9,
             'perl.': 3,
             'neptune.': 107201,
             'smurf.': 280790,
             'guess_passwd.': 53,
             'pod.': 264,
             'teardrop.': 979,
             'portsweep.': 1040,
             'ipsweep.': 1247,
             'land.': 21,
             'ftp_write.': 8,
             'back.': 2203,
             'imap.': 12,
             'satan.': 1589,
             'phf.': 4,
             'nmap.': 231,
             'multihop.': 7,
             'warezmaster.': 20,
             'warezclient.': 1020,
             'spy.': 2,
             'rootkit.': 10})

### Using `combineByKey`

This is the most general of the per-key aggregation functions. Most of the other per-key combiners are implemented using it. We can think about it as the `aggregate` equivalent since it allows the user to return values that are not the same type as our input data.

For example, we can use it to calculate per-type average durations as follows.  

In [None]:
sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

In [82]:
type(sum_counts)

pyspark.rdd.PipelinedRDD

In [81]:
sum_counts.collect()

[('normal.', (21075991.0, 97278)),
 ('buffer_overflow.', (2751.0, 30)),
 ('loadmodule.', (326.0, 9)),
 ('perl.', (124.0, 3)),
 ('neptune.', (0.0, 107201)),
 ('smurf.', (0.0, 280790)),
 ('guess_passwd.', (144.0, 53)),
 ('pod.', (0.0, 264)),
 ('teardrop.', (0.0, 979)),
 ('portsweep.', (1991911.0, 1040)),
 ('ipsweep.', (43.0, 1247)),
 ('land.', (0.0, 21)),
 ('ftp_write.', (259.0, 8)),
 ('back.', (284.0, 2203)),
 ('imap.', (72.0, 12)),
 ('satan.', (64.0, 1589)),
 ('phf.', (18.0, 4)),
 ('nmap.', (0.0, 231)),
 ('multihop.', (1288.0, 7)),
 ('warezmaster.', (301.0, 20)),
 ('warezclient.', (627563.0, 1020)),
 ('spy.', (636.0, 2)),
 ('rootkit.', (1008.0, 10))]

We can see that the arguments are pretty similar to those passed to `aggregate` in the previous notebook. The result associated to each type is in the form of a pair. If we want to actually get the averages, we need to do the division before collecting the results.  

In [83]:
sum_counts.take(2)

[('normal.', (21075991.0, 97278)), ('buffer_overflow.', (2751.0, 30))]

In [84]:
type(sum_counts)

pyspark.rdd.PipelinedRDD

In [104]:
duration_means_by_type = sum_counts.map(lambda x: (x[0], round(x[1][0]/x[1][1],3)) ).collect()

In [105]:
type(duration_means_by_type)

list

In [106]:
duration_means_by_type[:2]

[('normal.', 216.657), ('buffer_overflow.', 91.7)]

In [110]:
duration_means_by_type.sort(key=lambda tup: tup[1], reverse=True)  # sorts in place

In [111]:
# Print them sorted
for tag,avg in duration_means_by_type:
    print (tag, avg)

portsweep. 1915.299
warezclient. 615.258
spy. 318.0
normal. 216.657
multihop. 184.0
rootkit. 100.8
buffer_overflow. 91.7
perl. 41.333
loadmodule. 36.222
ftp_write. 32.375
warezmaster. 15.05
imap. 6.0
phf. 4.5
guess_passwd. 2.717
back. 0.129
satan. 0.04
ipsweep. 0.034
neptune. 0.0
smurf. 0.0
pod. 0.0
teardrop. 0.0
land. 0.0
nmap. 0.0


## Spark SQL and Data Frames

This notebook will introduce Spark capabilities to deal with data in a structured way. Basically, everything turns around the concept of *Data Frame* and using *SQL language* to query them. We will see how the data frame abstraction, very popular in other data analytics ecosystems (e.g. R and Python/Pandas), it is very powerful when performing exploratory data analysis. In fact, it is very easy to express data queries when used together with the SQL language. Moreover, Spark distributes this column-based data structure transparently, in order to make the querying process as efficient as possible.  

### Getting a Data Frame

A Spark `DataFrame` is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Pandas. They can be constructed from a wide array of sources such as a existing RDD in our case.

The entry point into all SQL functionality in Spark is the `SQLContext` class. To create a basic instance, all we need is a `SparkContext` reference. Since we are running Spark in shell mode (using pySpark) we can use the global context object `sc` for this purpose.  

In [112]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

### Inferring the schema

With a `SQLContext`, we are ready to create a `DataFrame` from our existing RDD. But first we need to tell Spark SQL the schema in our data.   

Spark SQL can convert an RDD of `Row` objects to a `DataFrame`. Rows are constructed by passing a list of key/value pairs as *kwargs* to the `Row` class. The keys define the column names, and the types are inferred by looking at the first row. Therefore, it is important that there is no missing data in the first row of the RDD in order to properly infer the schema.

In our case, we first need to split the comma separated data, and then use the information in KDD's 1999 task description to obtain the [column names](http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names).  

Once we have our RDD of Row we can infer and register the schema.

In [113]:
from pyspark.sql import Row

csv_data = raw_data.map(lambda l: l.split(","))
row_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5])
    )
)

In [114]:
interactions_df = sqlContext.createDataFrame(row_data)
# interactions_df = row_data.toDF()
interactions_df.registerTempTable("interactions")

In [122]:
type(interactions_df)

pyspark.sql.dataframe.DataFrame

In [120]:
interactions_df.head(5)

[Row(dst_bytes=5450, duration=0, flag='SF', protocol_type='tcp', service='http', src_bytes=181),
 Row(dst_bytes=486, duration=0, flag='SF', protocol_type='tcp', service='http', src_bytes=239),
 Row(dst_bytes=1337, duration=0, flag='SF', protocol_type='tcp', service='http', src_bytes=235),
 Row(dst_bytes=1337, duration=0, flag='SF', protocol_type='tcp', service='http', src_bytes=219),
 Row(dst_bytes=2032, duration=0, flag='SF', protocol_type='tcp', service='http', src_bytes=217)]

In [136]:
interactions_df.printSchema()

root
 |-- dst_bytes: long (nullable = true)
 |-- duration: long (nullable = true)
 |-- flag: string (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- src_bytes: long (nullable = true)



Now we can run SQL queries over our data frame that has been registered as a table.

In [125]:
%%time
# Select tcp network interactions with more than 1 second duration and no transfer from destination
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
|    5039|        0|
|    5062|        0|
|    5041|        0|
|    5056|        0|
|    5064|        0|
|    5043|        0|
|    5061|        0|
|    5049|        0|
|    5061|        0|
|    5048|        0|
|    5047|        0|
|    5044|        0|
|    5063|        0|
|    5068|        0|
|    5062|        0|
+--------+---------+
only showing top 20 rows

CPU times: user 3.3 ms, sys: 482 µs, total: 3.78 ms
Wall time: 2.81 s


In [126]:
type(tcp_interactions)

pyspark.sql.dataframe.DataFrame

In [130]:
tcp_interactions.count()

139

In [135]:
for row in tcp_interactions.collect()[:5]:
    print (f"Duration: {row.duration}, Dest. bytes: {row.dst_bytes}")

Duration: 5057, Dest. bytes: 0
Duration: 5059, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0
Duration: 5056, Dest. bytes: 0
Duration: 5051, Dest. bytes: 0


### Queries as `DataFrame` operations

Spark `DataFrame` provides a domain-specific language for structured data manipulation. This language includes methods we can concatenate in order to do selection, filtering, grouping, etc. For example, let's say we want to count how many interactions are there for each protocol type. We can proceed as follows.  

In [138]:
%%time
interactions_df.select("protocol_type", "duration", "dst_bytes").show(10)

+-------------+--------+---------+
|protocol_type|duration|dst_bytes|
+-------------+--------+---------+
|          tcp|       0|     5450|
|          tcp|       0|      486|
|          tcp|       0|     1337|
|          tcp|       0|     1337|
|          tcp|       0|     2032|
|          tcp|       0|     2032|
|          tcp|       0|     1940|
|          tcp|       0|     4087|
|          tcp|       0|      151|
|          tcp|       0|      786|
+-------------+--------+---------+
only showing top 10 rows

CPU times: user 5 ms, sys: 0 ns, total: 5 ms
Wall time: 212 ms


In [139]:
%%time
interactions_df.select("protocol_type", "duration", "dst_bytes").filter(interactions_df.duration>1000).show(10)

+-------------+--------+---------+
|protocol_type|duration|dst_bytes|
+-------------+--------+---------+
|          tcp|   12454|    21181|
|          tcp|   10774|      919|
|          udp|    1023|       48|
|          udp|    6087|       33|
|          tcp|   13368|    58843|
|          udp|    3029|       17|
|          udp|    2096|       46|
|          udp|    3030|       17|
|          udp|    3030|       17|
|          udp|    3030|       17|
+-------------+--------+---------+
only showing top 10 rows

CPU times: user 5.12 ms, sys: 0 ns, total: 5.12 ms
Wall time: 1.15 s


In [137]:
%%time
interactions_df.select("protocol_type", "duration", "dst_bytes").groupBy("protocol_type").count().show()

+-------------+------+
|protocol_type| count|
+-------------+------+
|          tcp|190065|
|          udp| 20354|
|         icmp|283602|
+-------------+------+

CPU times: user 19.6 ms, sys: 9.28 ms, total: 28.9 ms
Wall time: 10.3 s


We can use this to perform some [exploratory data analysis](http://en.wikipedia.org/wiki/Exploratory_data_analysis). Let's count how many attack and normal interactions we have. First we need to add the label column to our data.    

In [140]:
def get_label_type(label):
    if label!="normal.":
        return "attack"
    else:
        return "normal"
    
row_labeled_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5]),
    label=get_label_type(p[41])
    )
)
interactions_labeled_df = sqlContext.createDataFrame(row_labeled_data)

This time we don't need to register the schema since we are going to use the OO query interface.

Let's check the previous actually works by counting attack and normal data in our data frame.

In [141]:
%%time
interactions_labeled_df.select("label").groupBy("label").count().show()

+------+------+
| label| count|
+------+------+
|normal| 97278|
|attack|396743|
+------+------+

CPU times: user 7.14 ms, sys: 20 ms, total: 27.1 ms
Wall time: 10.2 s


Now we want to count them by label and protocol type, in order to see how important the protocol type is to detect when an interaction is or not an attack.  

In [142]:
%%time
interactions_labeled_df.select("label", "protocol_type").groupBy("label", "protocol_type").count().show()

+------+-------------+------+
| label|protocol_type| count|
+------+-------------+------+
|normal|          udp| 19177|
|normal|         icmp|  1288|
|normal|          tcp| 76813|
|attack|         icmp|282314|
|attack|          tcp|113252|
|attack|          udp|  1177|
+------+-------------+------+

CPU times: user 14.8 ms, sys: 12.2 ms, total: 27 ms
Wall time: 10.3 s


At first sight it seems that *udp* interactions are in lower proportion between network attacks versus other protocol types.  

And we can do much more sophisticated groupings. For example, add to the previous a "split" based on data transfer from target. 

In [143]:
%%time
interactions_labeled_df.select("label", "protocol_type", "dst_bytes").groupBy("label", "protocol_type", interactions_labeled_df.dst_bytes==0).count().show()

+------+-------------+---------------+------+
| label|protocol_type|(dst_bytes = 0)| count|
+------+-------------+---------------+------+
|normal|          udp|          false| 15583|
|attack|          udp|          false|    11|
|attack|          tcp|           true|110583|
|normal|          tcp|          false| 67500|
|attack|         icmp|           true|282314|
|attack|          tcp|          false|  2669|
|normal|          tcp|           true|  9313|
|normal|          udp|           true|  3594|
|normal|         icmp|           true|  1288|
|attack|          udp|           true|  1166|
+------+-------------+---------------+------+

CPU times: user 22.8 ms, sys: 4.5 ms, total: 27.3 ms
Wall time: 10.4 s


#### order-by (sort)

* https://stackoverflow.com/questions/34514545/spark-dataframe-groupby-and-sort-in-the-descending-order-pyspark

In [146]:
from pyspark.sql.functions import col

In [148]:
%%time
interactions_labeled_df.select("label", "protocol_type", "dst_bytes")\
    .groupBy("label", "protocol_type", interactions_labeled_df.dst_bytes==0)\
    .count()\
    .sort(col("label"),col("protocol_type").desc())\
    .show()

+------+-------------+---------------+------+
| label|protocol_type|(dst_bytes = 0)| count|
+------+-------------+---------------+------+
|attack|          udp|          false|    11|
|attack|          udp|           true|  1166|
|attack|          tcp|          false|  2669|
|attack|          tcp|           true|110583|
|attack|         icmp|           true|282314|
|normal|          udp|           true|  3594|
|normal|          udp|          false| 15583|
|normal|          tcp|          false| 67500|
|normal|          tcp|           true|  9313|
|normal|         icmp|           true|  1288|
+------+-------------+---------------+------+

CPU times: user 26.5 ms, sys: 4.37 ms, total: 30.8 ms
Wall time: 10.1 s


We see how relevant is this new split to determine if a network interaction is an attack.  

We will stop here, but we can see how powerful this type of queries are in order to explore our data. Actually we can replicate all the splits we saw in previous notebooks, when introducing classification trees, just by selecting, groping, and filtering our dataframe. For a more detailed (but less real-world) list of Spark's `DataFrame` operations and data sources, have a look at the official documentation [here](https://spark.apache.org/docs/latest/sql-programming-guide.html#dataframe-operations).    

## MLlib: Basic Statistics and Exploratory Data Analysis


So far we have used different map and aggregation functions, on simple and key/value pair RDD's, in order to get simple statistics that help us understand our datasets. In this notebook we will introduce Spark's machine learning library [MLlib](https://spark.apache.org/docs/latest/mllib-guide.html) through its basic statistics functionality in order to better understand our dataset. We will use the reduced 10-percent [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html) datasets through the notebook.  

### Local vectors

A [local vector](https://spark.apache.org/docs/latest/mllib-data-types.html#local-vector) is often used as a base type for RDDs in Spark MLlib. A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. 

For dense vectors, MLlib uses either Python *lists* or the *NumPy* `array` type. The later is recommended, so you can simply pass NumPy arrays around.  

For sparse vectors, users can construct a `SparseVector` object from MLlib or pass *SciPy* `scipy.sparse` column vectors if SciPy is available in their environment. The easiest way to create sparse vectors is to use the factory methods implemented in `Vectors`.  

#####  An RDD of dense vectors

Let's represent each network interaction in our dataset as a dense vector. For that we will use the *NumPy* `array` type.  

In [149]:
raw_data.first()

'0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.'

In [150]:
import numpy as np

def parse_interaction(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return np.array([float(x) for x in clean_line_split])

vector_data = raw_data.map(parse_interaction)

In [151]:
type(vector_data)

pyspark.rdd.PipelinedRDD

In [152]:
vector_data.take(2)

[array([0.00e+00, 1.81e+02, 5.45e+03, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 8.00e+00, 8.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 9.00e+00, 9.00e+00,
        1.00e+00, 0.00e+00, 1.10e-01, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00]),
 array([0.00e+00, 2.39e+02, 4.86e+02, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 8.00e+00, 8.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 1.90e+01, 1.90e+01,
        1.00e+00, 0.00e+00, 5.00e-02, 0.00e+00, 0.00e+00, 0.00e+00,
        0.00e+00, 0.00e+00])]

### Summary statistics

Spark's MLlib provides column summary statistics for `RDD[Vector]` through the function [`colStats`](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.stat.Statistics.colStats) available in [`Statistics`](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.stat.Statistics). The method returns an instance of [`MultivariateStatisticalSummary`](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.stat.MultivariateStatisticalSummary), which contains the column-wise *max*, *min*, *mean*, *variance*, and *number of nonzeros*, as well as the *total count*.  

In [153]:
from pyspark.mllib.stat import Statistics 
from math import sqrt 

# Compute column summary statistics.
summary = Statistics.colStats(vector_data)

mean = round(summary.mean()[0],3)
std = round(sqrt(summary.variance()[0]),3)
max_ = round(summary.max()[0],3)
min_ = round(summary.min()[0],3)
count = summary.count()
ct_non_zero = summary.numNonzeros()[0]

In [155]:
print(f"""
Duration Statistics:
    Mean: {mean}
    St. deviation: {std}
    Max value: {max_}
    Min value: {min_}
    Total value count: {count}
    Number of non-zero values: {ct_non_zero}
""")


Duration Statistics:
    Mean: 47.979
    St. deviation: 707.746
    Max value: 58329.0
    Min value: 0.0
    Total value count: 494021
    Number of non-zero values: 12350.0



#### Summary statistics by label  

The interesting part of summary statistics, in our case, comes from being able to obtain them by the type of network attack or 'label' in our dataset. By doing so we will be able to better characterise our dataset dependent variable in terms of the independent variables range of values.  

If we want to do such a thing we could filter our RDD containing labels as keys and vectors as values. For that we just need to adapt our `parse_interaction` function to return a tuple with both elements.    

In [156]:
def parse_interaction_with_key(line):
    line_split = line.split(",")
    # keep just numeric and logical values
    symbolic_indexes = [1,2,3,41]
    clean_line_split = [item for i,item in enumerate(line_split) if i not in symbolic_indexes]
    return (line_split[41], np.array([float(x) for x in clean_line_split]))

label_vector_data = raw_data.map(parse_interaction_with_key)

In [157]:
label_vector_data.take(2)

[('normal.', array([0.00e+00, 1.81e+02, 5.45e+03, 0.00e+00, 0.00e+00, 0.00e+00,
         0.00e+00, 0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
         0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
         0.00e+00, 8.00e+00, 8.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
         0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 9.00e+00, 9.00e+00,
         1.00e+00, 0.00e+00, 1.10e-01, 0.00e+00, 0.00e+00, 0.00e+00,
         0.00e+00, 0.00e+00])),
 ('normal.', array([0.00e+00, 2.39e+02, 4.86e+02, 0.00e+00, 0.00e+00, 0.00e+00,
         0.00e+00, 0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
         0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
         0.00e+00, 8.00e+00, 8.00e+00, 0.00e+00, 0.00e+00, 0.00e+00,
         0.00e+00, 1.00e+00, 0.00e+00, 0.00e+00, 1.90e+01, 1.90e+01,
         1.00e+00, 0.00e+00, 5.00e-02, 0.00e+00, 0.00e+00, 0.00e+00,
         0.00e+00, 0.00e+00]))]

In [158]:
normal_label_data = label_vector_data.filter(lambda x: x[0]=="normal.")
normal_summary = Statistics.colStats(normal_label_data.values())

mean = round(normal_summary.mean()[0],3)
std = round(sqrt(normal_summary.variance()[0]),3)
max_ = round(normal_summary.max()[0],3)
min_ = round(normal_summary.min()[0],3)
count = normal_summary.count()
ct_non_zero = normal_summary.numNonzeros()[0]

In [159]:
print(f"""
Duration Statistics:
    Mean: {mean}
    St. deviation: {std}
    Max value: {max_}
    Min value: {min_}
    Total value count: {count}
    Number of non-zero values: {ct_non_zero}
""")


Duration Statistics:
    Mean: 216.657
    St. deviation: 1359.213
    Max value: 58329.0
    Min value: 0.0
    Total value count: 97278
    Number of non-zero values: 11690.0

