# Join in Spark
This notebook explains a simple join operation in Spark. Code here is a solution to a programming assignment in the Coursera course `Hadoop Platform and Application Framework`. 

#### Simple `join` operation
Dataset for this section can be found in the repository. `join1_FileA.txt` and `join1_FileA.txt` are the relevant files. These are 2 different wordcount datasets. We'll be performing a join on these two files. The content of the file isn't super clear to me. So, I'm not including the description here. Anyways, since the dataset is really small, that wouldn't be a hindrance in understanding the results. 

#### Load datasets
Load the two datasets and check the contents using the `collect` method.

In [42]:
fileA = sc.textFile("./data/simple_join/join1_FileA.txt")
fileB = sc.textFile("./data/simple_join/join1_FileB.txt")

In [43]:
fileA.collect()

[u'able,991', u'about,11', u'burger,15', u'actor,22']

In [44]:
fileB.collect()

[u'Jan-01 able,5',
 u'Feb-02 about,3',
 u'Mar-03 about,8',
 u'Apr-04 able,13',
 u'Feb-22 actor,3',
 u'Feb-23 burger,5',
 u'Mar-08 burger,2',
 u'Dec-15 able,100']

#### Mapper for fileA
Create a map function for fileA that takes a line, splits it on the comma and turns the count to an integer.

In [55]:
def split_fileA(line):
        # split the input line in word and count on the comma
        key_value = line.split(",")
        word = key_value[0]
        # turn the count to an integer  
        count = int(key_value[1])
        return (word, count)

#### Mapper for fileB

In [56]:
def split_fileB(line):
        # split the input line into word, date and count_string
        key_value = line.split(",")
        count_string = key_value[1]
        date_word = key_value[0].split(" ")
        date = date_word[0]
        word = date_word[1] 
        return (word, date + " " + count_string)

#### Map transformations on the two files

In [57]:
# Test if the split_fileA() returns a desired output
test_line = "able,991"
split_fileA(test_line)

('able', 991)

In [58]:
fileA_data = fileA.map(split_fileA)
fileA_data.collect()

NameError: name 'fileA' is not defined

In [47]:
fileB_data = fileB.map(split_fileB)
fileB_data.collect()

[(u'able', u'Jan-01 5'),
 (u'about', u'Feb-02 3'),
 (u'about', u'Mar-03 8'),
 (u'able', u'Apr-04 13'),
 (u'actor', u'Feb-22 3'),
 (u'burger', u'Feb-23 5'),
 (u'burger', u'Mar-08 2'),
 (u'able', u'Dec-15 100')]

#### Run join

The goal is to join the two datasets using the words as keys and print for each word the wordcount for a specific date and then the total output from A.

Basically for each word in fileB, we would like to print the date and count from fileB but also the total count from fileA.

Spark implements the join transformation that given a RDD of (K, V) pairs to be joined with another RDD of (K, W) pairs, returns a dataset that contains (K, (V, W)) pairs.

In [48]:
fileB_joined_fileA = fileB_data.join(fileA_data)

In [49]:
fileB_joined_fileA.collect()

[(u'able', (u'Jan-01 5', 991)),
 (u'able', (u'Apr-04 13', 991)),
 (u'able', (u'Dec-15 100', 991)),
 (u'burger', (u'Feb-23 5', 15)),
 (u'burger', (u'Mar-08 2', 15)),
 (u'about', (u'Feb-02 3', 11)),
 (u'about', (u'Mar-03 8', 11)),
 (u'actor', (u'Feb-22 3', 22))]

### Advanced `join` operation
Dataset can be found in the repository in `./data/advanced_join`. It consists of a bunch of files with names containing the string `gennum` and `genchan`. The gennum files contain show names and their viewers, genchan files contain show names and the channel on which they are broadcasted. We want to find out the total number of viewers across all shows for the channel BAT.

#### Read files
We'll read both the two types of files using pattern matching, signified by the '?' in the snipppet below. 

In [39]:
show_views_file = sc.textFile("./data/advanced_join/join2_gennum?.txt")
# Read first 5 elements. This copies some elements of an RDD back to the driver program,
# which is the PySpark console.
show_views_file.take(5)

[u'Hourly_Sports,21',
 u'Hot_Talking,44',
 u'Almost_Cooking,91',
 u'Dumb_Show,186',
 u'PostModern_Sports,377']

In [40]:
show_channel_file = sc.textFile("./data/advanced_join/join2_genchan?.txt")
show_channel_file.take(5)

[u'Hourly_Sports,DEF',
 u'Hot_Cooking,XYZ',
 u'Almost_Talking,CAB',
 u'Dumb_Talking,MAN',
 u'PostModern_News,BAT']

#### Parse and split

In [41]:
def split_show_views(line):
    """
    Parse and split the show files. Fields are separated by a comma.
    """
    show_views = line.split(",")
    show = show_views[0]
    views = show_views[1]
    return (show, views)

In [42]:
show_views = show_views_file.map(split_show_views)
show_views.take(5)

[(u'Hourly_Sports', u'21'),
 (u'Hot_Talking', u'44'),
 (u'Almost_Cooking', u'91'),
 (u'Dumb_Show', u'186'),
 (u'PostModern_Sports', u'377')]

In [43]:
def split_show_channel(line):
    """
    Parse and split the channel files similar to what was done for the show files. 
    """
    show_channel = line.split(",")
    show = show_channel[0]
    channel = show_channel[1]
    return (show, channel)

In [44]:
show_channel = show_channel_file.map(split_show_channel)
show_channel.take(5)

[(u'Hourly_Sports', u'DEF'),
 (u'Hot_Cooking', u'XYZ'),
 (u'Almost_Talking', u'CAB'),
 (u'Dumb_Talking', u'MAN'),
 (u'PostModern_News', u'BAT')]

#### Join the datasets
Join the two datasets with the show name as the key. Datasets can be joined in any order as long as we are consistent.

In [45]:
joined_dataset = show_channel.join(show_views)

In [46]:
joined_dataset.take(5)

[(u'PostModern_Cooking', (u'MAN', u'938')),
 (u'PostModern_Cooking', (u'MAN', u'259')),
 (u'PostModern_Cooking', (u'MAN', u'644')),
 (u'PostModern_Cooking', (u'MAN', u'161')),
 (u'PostModern_Cooking', (u'MAN', u'686'))]

#### Extract channel as key
We want to find the total number of viewers by channel. So we need to create an RDD with the channel as key and all the viewer counts as a value, whichever is the show.

In [47]:
def extract_channel_views(show_views_channel): 
        channel = show_views_channel[1][0]
        views = show_views_channel[1][1]
        return (channel, views)

In [53]:
# Apply the function above to create an RDD of channel and viewers.
channel_views = joined_dataset.map(extract_channel_views)

In [49]:
channel_views.take(5)

[(u'MAN', u'938'),
 (u'MAN', u'259'),
 (u'MAN', u'644'),
 (u'MAN', u'161'),
 (u'MAN', u'686')]

#### Sum across all channels
Finally, we need to sum all of the viewers for each channel.

In [50]:
def sum_channel_views(a, b):
        channel_views = int(a) + int(b)
        return channel_views

In [52]:
# Copy the results back to the driver with collect. 
channel_views.reduceByKey(sum_channel_views).collect()

[(u'XYZ', 5208016),
 (u'DEF', 8032799),
 (u'CNO', 3941177),
 (u'BAT', 5099141),
 (u'NOX', 2583583),
 (u'CAB', 3940862),
 (u'BOB', 2591062),
 (u'ABC', 1115974),
 (u'MAN', 6566187)]