# Working with key/value pair RDDs

**NOTE: This notebook is worth 10% of the grade of project 2.**

[Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

Spark provides specific functions to deal with RDDs which elements are key/value pairs. They are usually used to perform aggregations and other processings by key.  

In this notebook we will show how, by working with key/value pairs, we can process our network interactions dataset in a more practical and powerful way than that used in previous notebooks. Key/value pair aggregations will show to be particularly effective when trying to explore each type of tag in our network attacks, in an individual way.  

## Getting the data and creating the RDD

In this notebook we will use the reduced dataset (1 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a *Gzip* file in the local directory.  

In [None]:
import os
import gzip
data_file = os.getcwd() + "/../kddcup.data_1_percent.gz"
# TODO: read the textFile from 'data_file' into 'raw_data'. Hint: use sc.textFile(<local path>) 
with gzip.open(data_file, "rt") as f:
    raw_data = [row.strip() for row in f.readlines()]

## Creating a pair RDD for interaction types

In this notebook we want to do some exploratory data analysis on our network interactions dataset. More concretely we want to profile each network interaction type in terms of some of its variables such as duration. In order to do so, we first need to create the RDD suitable for that, where each interaction is parsed as a CSV row representing the value, and is put together with its corresponding tag as a key.  

Normally we create key/value pair RDDs by applying a function using `map` to the original data. This function returns the corresponding pair for a given RDD element. We can proceed as follows.  

In [None]:
csv_data = [row.split(",") for row in raw_data]
# TODO: From each row of the data, generate a key-value pair. The key will be the tag and the value will be the CSV data.
# HINT: x[41] contains the network interaction tag
key_value_data = [(row[41], row) for row in csv_data]
print(type(key_value_data))
print("The correct answer should be a RDD object")

We have now our key/value pair data ready to be used. Let's get the first element in order to see how it looks like.  

In [None]:
key_value_data[:1]

## Data aggregations with key/value pair RDDs

We can use all the transformations and actions available for normal RDDs with key/value pair RDDs. We just need to make the functions work with pair elements. Additionally, Spark provides specific functions to work with RDDs containing pair elements. They are very similar to those available for general RDDs.  

For example, we have a `reduceByKey` transformation that we can use as follows to calculate the total duration of each network interaction type.  

In [None]:
key_value_duration = [(row[41], float(row[0])) for row in csv_data] 
# TODO: Aggregate the durations of network interactions which have the same key (i.e., tag) 
# HINT: The argument of reduceByKey is a lambda which takes two values and returns the reduced result.
durations_by_key = []
for key in list(set([row[0] for row in key_value_duration])):
    durations_by_key.append((key, sum([row[1] for row in key_value_duration if row[0] == key])))

durations_by_key

We have a specific counting action for key/value pairs.  

In [None]:
# TODO: count the number of rows for each key
counts_by_key = []
for key in list(set([row[0] for row in key_value_duration])):
    counts_by_key.append((key, len([row[1] for row in key_value_duration if row[0] == key])))

counts_by_key

### Using `combineByKey`

This is the most general of the per-key aggregation functions. Most of the other per-key combiners are implemented using it. We can think about it as the `aggregate` equivalent since it allows the user to return values that are not the same type as our input data.

`combineByKey` is essentially a combination of `map` and `reduce`. `combineByKey` requires three lambda functions as arguments:
  - **createCombiner**: Given a value V, return a combination of values or tuples
  - **mergeValue**: Given a combination C and a value V, returns a combination of values or tuples
  - **mergeCombiners**: Given two combinations C1 and C2, returns a combination of values or tuples

For example, we can use it to calculate per-type average durations as follows.  

In [None]:
# TODO: generate `sum_counts` as to sum up the elements of the same key, and for each key, to return a tuple of (sum, count).
# sum_counts = key_value_duration.combineByKey(
#     ..., # createCombiner: V -> (V, 1)
#     ..., # mergeValue: C, V -> (C[0] + V, C[1] + 1)
#     ..., # mergeCombiners: C1, C2 -> (C1[0] + C2[0], C1[1] + C2[1])
# )

sum_counts = []
for key in list(set([row[0] for row in key_value_duration])):
    list_by_key = [row[1] for row in key_value_duration if row[0] == key]
    sum_counts.append((key, (sum(list_by_key), len(list_by_key))))

dict(sum_counts)

We can see that the arguments are pretty similar to those passed to `aggregate` in the previous notebook. The result associated to each type is in the form of a pair. If we want to actually get the averages, we need to do the division before collecting the results.  

In [None]:
# TODO: create a RDD 'duration_means_by_type' by mapping each key-value pair of `sum_counts` as the key and the duration mean
# HINT: duration mean = sum / count
duration_means_by_type = dict([(key, sum / count) for (key, (sum, count)) in sum_counts])

# Print them sorted
for tag, _ in sorted(duration_means_by_type.items(), key=lambda tup: tup[1], reverse=True):
    print(tag + ": " + str(duration_means_by_type[tag]))

A small step into understanding what makes a network interaction be considered an attack.