# Set operations on RDDs

[Introduction to Spark with Python, by Jose A. Dianes](https://github.com/jadianes/spark-py-notebooks)

Spark supports many of the operations we have in mathematical sets, such as union and intersection, even when the RDDs themselves are not properly sets. It is important to note that these operations require that the RDDs being operated on are of the same type.  

Set operations are quite straightforward to understand as it work as expected. The only consideration comes from the fact that RDDs are not real sets, and therefore operations such as the union of RDDs doesn't remove duplicates. In this notebook we will have a brief look at `subtract`, `distinct`, and `cartesian`.       

## Getting the data and creating the RDD

As we did in our first notebook, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million network interactions. The file is provided as a Gzip file that we will download locally.

In [None]:
import urllib
f = urllib.urlretrieve ("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")

# Create an external table from the data file

The downloaded file, "kddcup.data_10_percent.gz" will be located in the SparkProject directory.


1. As we have done in the lab exercises, create a Hadoop directory "/user/hive/warehouse/kddcup_10_percent". This will the source directory for our external table.

2. You now have to create an external table definition describing the data, as we did in the labs with "CreatHousingExt.sql". Be careful to use the correct data types. Use the file "KDDCUP data schema.txt" as a basis for your external table definition.

You should do this task in the Hive editor by logging onto localhost:7180.

3. Load the data into HDFS as we did in the labs (see the file "LoadHousingData" for the use of the -copyFromLocal command). 

Question 1 - Hadoop natively supports various compression algorithms. Does it natively support .gz files or do you have to unpack the file in order to load it into HDFS?

# Use Hive to build your raw_data RDD

Use the HiveContext sqlContext to get the data from your external table instead of loading it from the file.

The format of the HiveContext is <rdd> = sqlContext.sql("<select statement>");

In [None]:
hive_raw_data  = sqlContext.sql("<your select statement>")

# How many records are in your RDD?

In [None]:
hive_raw_data_count = <your code>

In [None]:
data_file = "./kddcup.data_10_percent.gz"
raw_data = sc.textFile(data_file)

# Compare your hive count to the record count from the data file

In [None]:
raw_data_count = <your code>

## Getting attack interactions using `subtract`

For illustrative purposes, imagine we already have our RDD with non attack (normal) interactions from some previous analysis.   

In [None]:
normal_raw_data = raw_data.filter(lambda x: "normal." in x)

In [None]:
attack_raw_data = raw_data.subtract(normal_raw_data)

# Instead of using a python filter, extract the "normal." interactions using hive

Hint - use a "where" predicate in your select statement

In [None]:
normal_raw_data_hive = sqlContext.sql("<your select statement>")

We can obtain attack interactions by subtracting normal ones from the original unfiltered RDD as follows.  

# Extracting the attack interactions

We can extract attack interactions by using the subtract method as above. Or - we can use a hive statement to extract the attack interactins directly. Again, use a "where" predicate in your select statement 


In [None]:
attack_raw_data_hive = slContext.sql("<your select statement>")

Let's do some counts to check our results.  

In [None]:
from time import time

# count all
t0 = time()
raw_data_count = raw_data.count()
tt = time() - t0
print "All count in {} secs".format(round(tt,3))

In [None]:
# count normal
t0 = time()
normal_raw_data_count = normal_raw_data.count()
tt = time() - t0
print "Normal count in {} secs".format(round(tt,3))

In [None]:
# count attacks
t0 = time()
attack_raw_data_count = attack_raw_data.count()
tt = time() - t0
print "Attack count in {} secs".format(round(tt,3))

In [None]:
print "There are {} normal interactions and {} attacks, \
from a total of {} interactions".format(normal_raw_data_count,attack_raw_data_count,raw_data_count)

# Check the results you have extracted using Hive

In [None]:
# count all
t0 = time()
hive_raw_data_count = <...your code...>
tt = time() - t0
print "All hive count in {} secs".format(round(tt,3))

In [None]:
# count normal
t0 = time()
normal_raw_data_count_hive = <...your code...>
tt = time() - t0
print "Normal hive count in {} secs".format(round(tt,3))

In [None]:
# count attacks
t0 = time()
attack_raw_data_count_hive = <...your code...>
tt = time() - t0
print "Attack count in {} secs".format(round(tt,3))

In [None]:
print "For our Hive data - there are {} normal interactions and {} attacks, \
from a total of {} interactions".format(normal_raw_data_count_hive,attack_raw_data_count_hive,hive_raw_data_count)

So now we have two RDDs, one with normal interactions and another one with attacks.  

## Protocol and service combinations using `cartesian`

We can compute the Cartesian product between two RDDs by using the `cartesian` transformation. It returns all possible pairs of elements between two RDDs. In our case we will use it to generate all the possible combinations between service and protocol in our network interactions.  

First of all we need to isolate each collection of values in two separate RDDs. For that we will use `distinct` on the CSV-parsed dataset. From the [dataset description](http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names) we know that protocol is the second column and service is the third (tag is the last one and not the first as appears in the page).   

So first, let's get the protocols.  

In [None]:
csv_data = raw_data.map(lambda x: x.split(","))
protocols = csv_data.map(lambda x: x[1]).distinct()
protocols.collect()

Now we do the same for services.  

In [None]:
services = csv_data.map(lambda x: x[2]).distinct()
services.collect()

A longer list in this case.

Now we can do the cartesian product.  

In [None]:
product = protocols.cartesian(services).collect()
print "There are {} combinations of protocol X service".format(len(product))

Obviously, for such small RDDs doesn't really make sense to use Spark cartesian product. We could have perfectly collected the values after using `distinct` and do the cartesian product locally. Moreover, `distinct` and `cartesian` are expensive operations so they must be used with care when the operating datasets are large. 

# Hive can simplify this process
A cartesian product can be generated from one Hive statement and the cartesian RDD can be created in one statement.


You simply select the distinct values of protocols and services using a nested queries and join the two results without a selection predicate.

An example

select a.firstname, b.lastname
from
/* first nested query with the table alias a */
(select distinct customer_fname as firstname from customers) a JOIN
/* second nested query with the table alias b */
(select distinct customer_lname as lastname from customers) b

Notice there is no join predicate specified by a "where..." clause. This is what triggers the cartesian join.

Rewite the above cells to generate a full combination of protocols and services from the Hive table you have created.

Notice in the cell below how to generate a multiline SQL statement and pass that statement as a variable. We usually do this for readability. 

In [None]:
cartesianSQL = ("select ...<your code>... "
                "...<your code>... "
                "...<your code>...")
cartesian_join = sqlContext.sql(cartesianSQL)

Display your results. They should be the same as the product calculation above.

In [None]:
print "Using Hive, there are {} combinations of protocol X service".format(...<your code>...)