In [None]:
%pylab inline
from time import time

# Introduction

In this tutorial we are going to look at performing joins in Spark and how to perform faster joins by minimizing data transfer between partitions.

We are going to be using the dating profiles dataset for this tutorial. More information on this dataset can be found [here](https://sites.google.com/a/insightdatascience.com/spark-lab/s3-data/dating-profiles).


# Load in the data
We need to specify 50 partitions for the gender table, otherwise will be loaded as a single partition. This dataset is rather small, so in order to see performance improvements with optimal Spark queries we are going to use the union transformation to 5x the dataset.

In [None]:
gender_raw = sc.textFile("s3n://insight-spark-after-dark/gender.csv.gz").repartition(50)
ratings_raw = sc.textFile("s3n://insight-spark-after-dark/ratings.csv.gz")\
                .persist(StorageLevel.MEMORY_AND_DISK_SER)
ratings_rep_raw = sc.emptyRDD()
for i in range(5):
    ratings_rep_raw = ratings_rep_raw.union(ratings_raw)
    
ratings_rep_raw = ratings_rep_raw.repartition(50)

In [None]:
def parse_gender(row):
    split_row = row.split(",")
        
    return (int(split_row[0]), split_row[1])

def parse_ratings(row):
    split_row = row.split(",")
        
    return (int(split_row[0]), int(split_row[1]), int(split_row[2]))

Will persist the parsed ratings and parsed gender using the cache() method.

In [None]:
parsed_gender = gender_raw.map(parse_gender)\
                          .setName("gender")\
                          .cache()
parsed_ratings = ratings_rep_raw.map(parse_ratings).setName("ratings").cache()

# Comparing group by key and reduce by key

## Are men more critical in ratings or the other way around?
We are going to join the ratings table with the gender table on user id. Spark's join only works on key-value paired RDDs. We reorder our ratings RDD so each row is a tuple with the first element the user id.

In [None]:
# are men more critical in ratings or the other way around?
from_RDD = parsed_ratings.map(lambda r: (r[0], (r[1], r[2])))
#from_RDD.sample(False, 0.001, 20).collect()

Join the ratings with the gender table.

Format of ratings table: **(userID, (profileID, rating) )**

Format of gender table: **(userID, gender)**

The output from the join: **(userID, ( (profileID, rating), gender_user) )**

In [None]:
joined_from_RDD = from_RDD.join(parsed_gender)
#joined_from_RDD.sample(False, 0.001, 20).collect()

Now let's get the **gender** for each **profileID** by joining the gender table on the **profileID**.

First we want just **profileID**, **gender**, **rating** from the above join (dropping **userID** for speed) again organized as a key-value pair tuple for a join

In [None]:
# tuple from join: (userID, ((profileID, rating), gender))
to_RDD = joined_from_RDD.map(lambda r: (r[1][0][0], (r[1][1], r[1][0][1]))) # accessing profileID, gender, rating
#to_RDD.sample(False, 0.001, 20).collect()

Then execute the join

Format of to_RDD: **(profileID, (gender_user, rating) )**

Format of gender table: **(userID, gender)**

The output from the join: **(profileID, ( (gender_user, rating) gender_profile) )**

In [None]:
joined_to_RDD = to_RDD.join(parsed_gender)
#joined_to_RDD.sample(False, 0.001, 20).collect()

We want the gender of the user, the gender of the profile and the rating. 

Map to the format: **( (gender_user, gender_profile) rating) **

In [None]:
from_to_RDD = joined_to_RDD.map(lambda r: ((r[1][0][0], r[1][1]), r[1][0][1]))\
                           .setName("from_to")\
                           .persist(StorageLevel.MEMORY_AND_DISK_SER)

Let's check the format and the steps so far by looking at a sample (good practice).

Use the sample transformation (with replacement = **False**, fraction of data = **0.001**, seed = **20**) and run the job by calling **collect()**

In [None]:
from_to_RDD.sample(False, 0.001, 20).collect()

## Grouping pairs of genders to find average rating per pair. 

### Naive approach: group by key. Let's time it to see performance

In [None]:
start_time = time()
tot_cnt_RDD2 = from_to_RDD.groupByKey()\
                          .map(lambda r: (r[0], (sum(list(r[1]))/float(len(r[1])), len(r[1])))) # average, number
print tot_cnt_RDD2.collect()
end_time = time()

### Time for job

In [None]:
print end_time - start_time

### Optimal approach: aggregate by key (reduce by key). 

Using an accumulator 

In [None]:
start_time = time()
tot_cnt_RDD = from_to_RDD.aggregateByKey((0,0),
                                         lambda acc, val: (acc[0]+val, acc[1]+1),
                                         lambda acc1, acc2: (acc1[0]+acc2[0], acc1[1]+acc2[1]))
avg_RDD = tot_cnt_RDD.mapValues(lambda r: (r[0]/float(r[1]), r[1])) # average, number

print avg_RDD.collect()
end_time = time()

### Time for job

In [None]:
print end_time - start_time

### The difference between job times is small here but for a big dataset this will be a huge difference.
### Why is the second approach faster? 
Hint: check the 4040 while each of these approaches is running - in particular look at the specific stages associated with the job triggered by .collect()