# Working with key/value pair RDDs

[Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

Spark provides specific functions to deal with RDDs which elements are key/value pairs. They are usually used to perform aggregations and other processings by key.  

In this notebook we will show how, by working with key/value pairs, we can process our network interactions dataset in a more practical and powerful way than that used in previous notebooks. Key/value pair aggregations will show to be particularly effective when trying to explore each type of tag in our network attacks, in an individual way.  

## Getting the data and creating the RDD

As we did in our first notebook, we will use the reduced dataset (10 percent) provided for the [KDD Cup 1999](http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.  

In [1]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

In [1]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

## Creating a pair RDD for interaction types

In this notebook we want to do some exploratory data analysis on our network interactions dataset. More concretely we want to profile each network interaction type in terms of some of its variables such as duration. In order to do so, we first need to create the RDD suitable for that, where each interaction is parsed as a CSV row representing the value, and is put together with its corresponding tag as a key.  

Normally we create key/value pair RDDs by applying a function using `map` to the original data. This function returns the corresponding pair for a given RDD element. We can proceed as follows.  

In [2]:
csv_data = raw_data.map(lambda x: x.split(","))
key_value_data = csv_data.map(lambda x: (x[41], x)) # x[41] contains the network interaction tag

We have now our key/value pair data ready to be used. Let's get the first element in order to see how it looks like.  

In [3]:
key_value_data.take(1)

[(u'normal.',
  [u'0',
   u'tcp',
   u'http',
   u'SF',
   u'181',
   u'5450',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'1',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'0',
   u'8',
   u'8',
   u'0.00',
   u'0.00',
   u'0.00',
   u'0.00',
   u'1.00',
   u'0.00',
   u'0.00',
   u'9',
   u'9',
   u'1.00',
   u'0.00',
   u'0.11',
   u'0.00',
   u'0.00',
   u'0.00',
   u'0.00',
   u'0.00',
   u'normal.'])]

## Data aggregations with key/value pair RDDs

We can use all the transformations and actions available for normal RDDs with key/value pair RDDs. We just need to make the functions work with pair elements. Additionally, Spark provides specific functions to work with RDDs containing pair elements. They are very similar to those available for general RDDs.  

For example, we have a `reduceByKey` transformation that we can use as follows to calculate the total duration of each network interaction type.  

In [4]:
key_value_duration = csv_data.map(lambda x: (x[41], float(x[0]))) 
durations_by_key = key_value_duration.reduceByKey(lambda x, y: x + y)

durations_by_key.collect()

[(u'guess_passwd.', 144.0),
 (u'nmap.', 0.0),
 (u'warezmaster.', 301.0),
 (u'rootkit.', 1008.0),
 (u'warezclient.', 627563.0),
 (u'smurf.', 0.0),
 (u'pod.', 0.0),
 (u'neptune.', 0.0),
 (u'normal.', 21075991.0),
 (u'spy.', 636.0),
 (u'ftp_write.', 259.0),
 (u'phf.', 18.0),
 (u'portsweep.', 1991911.0),
 (u'teardrop.', 0.0),
 (u'buffer_overflow.', 2751.0),
 (u'land.', 0.0),
 (u'imap.', 72.0),
 (u'loadmodule.', 326.0),
 (u'perl.', 124.0),
 (u'multihop.', 1288.0),
 (u'back.', 284.0),
 (u'ipsweep.', 43.0),
 (u'satan.', 64.0)]

We have a specific counting action for key/value pairs.  

In [5]:
counts_by_key = key_value_data.countByKey()
counts_by_key

defaultdict(<type 'int'>, {u'guess_passwd.': 53, u'nmap.': 231, u'warezmaster.': 20, u'rootkit.': 10, u'warezclient.': 1020, u'smurf.': 280790, u'pod.': 264, u'neptune.': 107201, u'normal.': 97278, u'spy.': 2, u'ftp_write.': 8, u'phf.': 4, u'portsweep.': 1040, u'teardrop.': 979, u'buffer_overflow.': 30, u'land.': 21, u'imap.': 12, u'loadmodule.': 9, u'perl.': 3, u'multihop.': 7, u'back.': 2203, u'ipsweep.': 1247, u'satan.': 1589})

### Using `combineByKey`

This is the most general of the per-key aggregation functions. Most of the other per-key combiners are implemented using it. We can think about it as the `aggregate` equivalent since it allows the user to return values that are not the same type as our input data.

For example, we can use it to calculate per-type average durations as follows.  

In [6]:
sum_counts = key_value_duration.combineByKey(
    (lambda x: (x, 1)), # the initial value, with value x and count 1
    (lambda acc, value: (acc[0]+value, acc[1]+1)), # how to combine a pair value with the accumulator: sum value, and increment count
    (lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1])) # combine accumulators
)

sum_counts.collectAsMap()

{u'back.': (284.0, 2203),
 u'buffer_overflow.': (2751.0, 30),
 u'ftp_write.': (259.0, 8),
 u'guess_passwd.': (144.0, 53),
 u'imap.': (72.0, 12),
 u'ipsweep.': (43.0, 1247),
 u'land.': (0.0, 21),
 u'loadmodule.': (326.0, 9),
 u'multihop.': (1288.0, 7),
 u'neptune.': (0.0, 107201),
 u'nmap.': (0.0, 231),
 u'normal.': (21075991.0, 97278),
 u'perl.': (124.0, 3),
 u'phf.': (18.0, 4),
 u'pod.': (0.0, 264),
 u'portsweep.': (1991911.0, 1040),
 u'rootkit.': (1008.0, 10),
 u'satan.': (64.0, 1589),
 u'smurf.': (0.0, 280790),
 u'spy.': (636.0, 2),
 u'teardrop.': (0.0, 979),
 u'warezclient.': (627563.0, 1020),
 u'warezmaster.': (301.0, 20)}

We can see that the arguments are pretty similar to those passed to `aggregate` in the previous notebook. The result associated to each type is in the form of a pair. If we want to actually get the averages, we need to do the division before collecting the results.  

In [7]:
duration_means_by_type = sum_counts.map(lambda (key,value): (key, round(value[0]/value[1],3))).collectAsMap()

# Print them sorted
for tag in sorted(duration_means_by_type, key=duration_means_by_type.get, reverse=True):
    print tag, duration_means_by_type[tag]

portsweep. 1915.299
warezclient. 615.258
spy. 318.0
normal. 216.657
multihop. 184.0
rootkit. 100.8
buffer_overflow. 91.7
perl. 41.333
loadmodule. 36.222
ftp_write. 32.375
warezmaster. 15.05
imap. 6.0
phf. 4.5
guess_passwd. 2.717
back. 0.129
satan. 0.04
ipsweep. 0.034
nmap. 0.0
smurf. 0.0
pod. 0.0
neptune. 0.0
teardrop. 0.0
land. 0.0


A small step into understanding what makes a network interaction be considered an attack.