# MapReduce for joining data

I will do two examples regarding to joining data.

## Part 1

Target: (date word, day-count) + (word, total-count) --> (date word day-count total-count)
E.g. data1 (able, 991) + data2 (Jan-01 able, 5) --> data3 (Jan-01 able 5 991)

### Mapper

join1_mapper.py

In [None]:
#!/usr/bin/env python
import sys

# --------------------------------------------------------------------------
#This mapper code will input a <date word, value> input file, and move date into 
#  the value field for output
#  
#  Note, this program is written in a simple style and does not full advantage of Python 
#     data structures,but I believe it is more readable
#
#  Note, there is NO error checking of the input, it is assumed to be correct
#     meaning no extra spaces, missing inputs or counts,etc..
#
# See #  see https://docs.python.org/2/tutorial/index.html for details  and python  tutorials
#
# --------------------------------------------------------------------------



for line in sys.stdin:
    line       = line.strip()   #strip out carriage return
    key_value  = line.split(",")   #split line, into key and value, returns a list
    key_in     = key_value[0].split(" ")   #key is first item in list
    value_in   = key_value[1]   #value is 2nd item 

    #print key_in
    if len(key_in)>=2:           #if this entry has <date word> in key
        date = key_in[0]      #now get date from key field
        word = key_in[1]
        value_out = date+" "+value_in     #concatenate date, blank, and value_in
        print( '%s\t%s' % (word, value_out) )  #print a string, tab, and string
    else:   #key is only <word> so just pass it through
        print( '%s\t%s' % (key_in[0], value_in) )  #print a string tab and string

#Note that Hadoop expects a tab to separate key value
#but this program assumes the input file has a ',' separating key value

### Reducer

join1_reducer.py

In [None]:
#!/usr/bin/env python
import sys

# --------------------------------------------------------------------------
#This reducer code will input a <word, value> input file, and join words together
# Note the input will come as a group of lines with same word (ie the key)
# As it reads words it will hold on to the value field
#
# It will keep track of current word and previous word, if word changes
#   then it will perform the 'join' on the set of held values by merely printing out 
#   the word and values.  In other words, there is no need to explicitly match keys b/c
#   Hadoop has already put them sequentially in the input 
#   
# At the end it will perform the last join
#
#
#  Note, there is NO error checking of the input, it is assumed to be correct, meaning
#   it has word with correct and matching entries, no extra spaces, etc.
#
#  see https://docs.python.org/2/tutorial/index.html for python tutorials
#
#  San Diego Supercomputer Center copyright
# --------------------------------------------------------------------------

prev_word          = "  "                #initialize previous word  to blank string
months             = ['Jan','Feb','Mar','Apr','Jun','Jul','Aug','Sep','Nov','Dec']

dates_to_output    = [] #an empty list to hold dates for a given word
day_cnts_to_output = [] #an empty list of day counts for a given word
# see https://docs.python.org/2/tutorial/datastructures.html for list details

line_cnt           = 0  #count input lines

for line in sys.stdin:
    line       = line.strip()       #strip out carriage return
    key_value  = line.split('\t')   #split line, into key and value, returns a list
    line_cnt   = line_cnt+1     

    #note: for simple debugging use print statements, ie:  
    curr_word  = key_value[0]         #key is first item in list, indexed by 0
    value_in   = key_value[1]         #value is 2nd item

    #-----------------------------------------------------
    # Check if its a new word and not the first line 
    #   (b/c for the first line the previous word is not applicable)
    #   if so then print out list of dates and counts
    #----------------------------------------------------
    if curr_word != prev_word:

        # -----------------------     
	#now write out the join result, but not for the first line input
        # -----------------------
        if line_cnt>1:
	    for i in range(len(dates_to_output)):  #loop thru dates, indexes start at 0
	         print('{0} {1} {2} {3}'.format(dates_to_output[i],prev_word,day_cnts_to_output[i],curr_word_total_cnt))
            #now reset lists
	    dates_to_output   =[]
            day_cnts_to_output=[]
        prev_word         =curr_word  #set up previous word for the next set of input lines

	
    # ---------------------------------------------------------------
    #whether or not the join result was written out, 
    #   now process the curr word    
  	
    #determine if its from file <word, total-count> or < word, date day-count>
    # and build up list of dates, day counts, and the 1 total count
    # ---------------------------------------------------------------
    if (value_in[0:3] in months): 

        date_day =value_in.split() #split the value field into a date and day-cnt
        
        #add date to lists of the value fields we are building
        dates_to_output.append(date_day[0])
        day_cnts_to_output.append(date_day[1])
    else:
        curr_word_total_cnt = value_in  #if the value field was just the total count then its
                                           #the first (and only) item in this list

# ---------------------------------------------------------------
#now write out the LAST join result
# ---------------------------------------------------------------
for i in range(len(dates_to_output)):  #loop thru dates, indexes start at 0
         print('{0} {1} {2} {3}'.format(dates_to_output[i],prev_word,day_cnts_to_output[i],curr_word_total_cnt))
	

### Data

join1_FileA.txt

In [None]:
able,991
about,11
burger,15
actor,22

join1_FileB.txt

In [None]:
Jan-01 able,5
Feb-02 about,3
Mar-03 about,8
Apr-04 able,13
Feb-22 actor,3
Feb-23 burger,5
Mar-08 burger,2
Dec-15 able,100

### Procedure

1)Copy and paste the above into a text file as follows from the terminal prompt in Cloudera VM.

Type in the following to open a text editor, and then cut and paste the above lines for join1_mapper.py, join1_reducer.py, join1_FileA.txt, and join1_FileB.txt into the text editor, save, and exit.

In [None]:
gedit join1_mapper.py
gedit join1_reducer.py
gedit join1_FileA.txt
gedit join1_FileB.txt

2)Enter the following to make functions executable.

In [None]:
chmod +x join1_mapper.py
chmod +x join1_reducer.py

3)Create a directory on the HDFS file system (if already exists that’s OK):

In [None]:
hdfs dfs -mkdir /user/cloudera/input1

Remark: make sure the input folder only has the input data.

4)Copy the files from local filesystem to the HDFS filesystem:

In [None]:
hdfs dfs -put /home/cloudera/join1_FileA.txt /user/cloudera/input1

hdfs dfs -put /home/cloudera/join1_FileB.txt /user/cloudera/input1

You can see your files on HDFS

In [None]:
hdfs dfs -ls /user/cloudera/input

5)Test the program in serial execution using the following Unix utilities and piping commands:

In [None]:
cat join1_File*.txt | ./join1_mapper.py | sort | ./join1_reducer.py

We can also only test mapper function.

In [None]:
cat join1_File*.txt | ./join1_mapper.py | sort

Remark: `|` pipes the standard output to the standard input of the join_mapper program, etc..

To debug programs in serial execution one should use small datasets and possibly extra print statements in the program. Debugging with map/reduce jobs is harder but hopefully not necessary for this assignment.

6)Run the Hadoop streaming command:

In [None]:
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input /user/cloudera/input1 \
-output /user/cloudera/output_join \   
-mapper /home/cloudera/join1_mapper.py \   
-reducer /home/cloudera/join1_reducer.py

## Part 2

Target: (TV show, count) + (TV show title, channel) --> (TV show, total-count) for ABC channel

### Mapper

In [None]:
#!/usr/bin/env python   
#the above just indicates to use python to intepret this file
import sys

for line in sys.stdin:
    line = line.strip()   #strip out carriage return
    key_value = line.split(",")   #split line, into key and value, returns a list
    key_in = key_value[0]
    value_in = key_value[1]
    if value_in.isdigit():
        print('{0}\t{1}'.format(key_in, value_in) )
    elif value_in == 'ABC':
        print('{0}\t{1}'.format(key_in, value_in) )

### Reducer

In [None]:
#!/usr/bin/env python   
#the above just indicates to use python to intepret this file
import sys

last_key      = None              #initialize these variables
running_total = 0
abc_found = False
# -----------------------------------
# Loop thru file
#  --------------------------------
for input_line in sys.stdin:
    input_line = input_line.strip()

    # --------------------------------
    # Get Next Word    # --------------------------------
    this_key, value = input_line.split("\t", 1)  #the Hadoop default is tab separates key value
                          #the split command returns a list of strings, in this case into 2 variables
    
        
    if last_key == this_key:     #check if key has changed ('==' is                                   #      logical equalilty check
        if value.isdigit():
            value = int(value)
            running_total += value 
        elif value == 'ABC':
            abc_found = True
    else:
        if abc_found:         
            print( "{0}\t{1}".format(last_key, running_total) )
            abc_found = False                    
        if value.isdigit():
            value = int(value)
            running_total = value 
        elif value == 'ABC':
            abc_found = True
        last_key = this_key

if last_key == this_key & abc_found:
    print( "{0}\t{1}".format(last_key, running_total))

### Data

make_join2data.py

In [None]:
#!/usr/bin/env python
import sys

# --------------------------------------------------------------------------
#  (make_join2data.py) Generate a random combination of titles and viewer counts, or channels
# this is a simple version of a congruential generator, 
#   not a great random generator but enough  
# --------------------------------------------------------------------------

chans   = ['ABC','DEF','CNO','NOX','YES','CAB','BAT','MAN','ZOO','XYZ','BOB']
sh1 =['Hot','Almost','Hourly','PostModern','Baked','Dumb','Cold','Surreal','Loud']
sh2 =['News','Show','Cooking','Sports','Games','Talking','Talking']
vwr =range(17,1053)

chvnm=sys.argv[1]  #get number argument, if its n, do numbers not channels,

lch=len(chans)
lsh1=len(sh1)
lsh2=len(sh2)
lvwr=len(vwr)
ci=1
s1=2
s2=3
vwi=4
ri=int(sys.argv[3])
for i in range(0,int(sys.argv[2])):  #arg 2 is the number of lines to output

    if chvnm=='n':  #no numuber
        print('{0}_{1},{2}'.format(sh1[s1],sh2[s2],chans[ci]))
    else:
        print('{0}_{1},{2}'.format(sh1[s1],sh2[s2],vwr[vwi])) 
    ci=(5*ci+ri) % lch   
    s1=(4*s1+ri) % lsh1
    s2=(3*s1+ri+i) % lsh2
    vwi=(2*vwi+ri+i) % lvwr
 
    if (vwi==4): vwi=5

make_data_join2.txt

In [None]:
python make_join2data.py y 1000 13 > join2_gennumA.txt
python make_join2data.py y 2000 17 > join2_gennumB.txt
python make_join2data.py y 3000 19 > join2_gennumC.txt
python make_join2data.py n 100  23 > join2_genchanA.txt
python make_join2data.py n 200  19 > join2_genchanB.txt
python make_join2data.py n 300  37 > join2_genchanC.txt

### Procedure

The procedure is exacutly with Part 1.

Run the Hadoop streaming command:

In [None]:
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input /user/cloudera/input3 \
-output /user/cloudera/output_join2 \   
-mapper /home/cloudera/join2_mapper.py \   
-reducer /home/cloudera/join2_reducer.py \
-numReduceTasks 1