#DATASCI W261: Machine Learning at Scale

Nick Hamlin
nickhamlin@gmail.com  
Time of Submission: 12:15 AM EST, Tuesday, January 26, 2016  
W261-3, Spring 2016  
Week 3 Homework

###Submission Notes:
- For each problem, I've included a summary of the question as posed in the instructions.  In many cases, I have not included the full text to keep the final submission as uncluttered as possible.  For reference, I've included a link to the original instructions in the "Useful Reference" below.
- Problem statements are listed in *italics*, while my responses are shown in plain text. 
- I have written driver functions for each problem where a solution is provided in pure Python.  For simplicity, I have omitted them for the sections that use Bash commands either directly or to create files.

###Useful References:
- **[Original Assignment Instructions](https://www.dropbox.com/sh/uev2esasn7bvtnd/AAAlxSC1J3ZHCm7EgkJbemAZa/HW3-Questions.txt?dl=0)**
- [Counter Example in mrjob](http://nbviewer.jupyter.org/urls/dl.dropbox.com/s/5thl14n4pqvhzt5/Counter.ipynb)

###Handy Hadoop Links:
- [Jobtracker](http://localhost:8088/cluster)  
- [Namenode](http://localhost:50070/dfshealth.html#tab-overview)

##HW3.0.  


*What is a merge sort? Where is it used in Hadoop?* 

Merge sort is an algorithm that takes two sorted lists and merges them into a single sorted list.  It does this by tracking two pointers at the top of each list. In each iteration, the values of the two pointers are compared, and the smaller of the two is appended to the final list.  Therefore, as the algorithm iterates, the end result is a single sorted list. Hadoop uses merge sort during the shuffle process to take the multiple spill files that are generated for each partion and merge them together before sending them to the combiner.  Merge sort is also used on the reducer side to combine files received from multiple mappers into a single stream.

*How is a combiner function in the context of Hadoop? Give an example where it can be used and justify why it should be used in the context of this problem.*

A combiner function is used to combine records with the same key before they are transferred to the reducer.  For example, in a word count implementation, if a mapper generates two key-value pairs for the same word, a combiner would combine those two records together into a single pair with the same key and a value representing the sum of the two input values.  In Hadoop combiners can be used to reduce the amount of information that must be sent via the network from the mappers to the reducers.  In the previous word count example, if combiners were omitted, the mappers would have to send two pairs across the network.  But, by adding a combiner, only one pair per word needs to be sent.  This reduction in traffic can make a dramatic difference in the processing speed associated with a particular job, especially at large scale.

*What is the Hadoop shuffle?* 

The shuffle is the process by which Hadoop sorts the output of the mappers and transfers it to the reducers.  It consists of three main steps: the partition, the sort, and the combine. The partition step takes the output of the mapper and partitions the results, by key, into separate files (one file for each reducer). In the sort step, records within the same partition are sorted.  Finally, the combine step takes records with the same key and merges them together.  Once these steps are completed, the output can then be transferred to the reducer.


##HW3.1. Using Counters to do EDA

*Counters are lightweight objects in Hadoop that allow you to keep track of system progress in both the map and reduce stages of processing. By default, Hadoop defines a number of standard counters in "groups"; these show up in the jobtracker webapp, giving you information such as "Map input records", "Map output records", etc.* 

*While processing information/data using MapReduce job, it is a challenge to monitor the progress of parallel threads running across nodes of distributed clusters. Moreover, it is also complicated to distinguish between the data that has been processed and the data which is yet to be processed. The MapReduce Framework offers a provision of user-defined Counters, which can be effectively utilized to monitor the progress of data across nodes of distributed clusters.*

*The [consumer complaints](https://www.dropbox.com/s/vbalm3yva2rr86m/Consumer_Complaints.csv?dl=0) dataset consists of diverse consumer complaints, which have been reported across the United States regarding various types of loans. 

###User-defined Counters

*Now, let’s use Hadoop Counters to identify the number of complaints pertaining to debt collection, mortgage and other categories (all other categories get lumped into this one) in the consumer complaints dataset. Basically produce the distribution of the Product column in this dataset using counters (limited to 3 counters here).*

*Hadoop offers Job Tracker, an UI tool to determine the status and statistics of all jobs. Using the job tracker UI, developers can view the Counters that have been created. Screenshot your  job tracker UI as your job completes and include it here. Make sure that your user defined counters are visible.*


####HW 3.1 - Mapper and Reducer
Here, we simply use the type of product that the mapper sees to iterate the appropriate counter.  After that, the reducer just passes the results through, since we're primarily interested in the counter outputs, not the data itself

In [21]:
%%writefile mapper.py
#!/usr/bin/python

#HW 3.1 - Mapper Function Code
import sys
for line in sys.stdin:
    line=line.strip()
    product=line.split(',')[1] #extract product field from second field
    
    #Iterate the counter depending on the product
    if product=='Debt collection':
        sys.stderr.write("reporter:counter:Debt,Total,1\n")
    if product=='Mortgage':
        sys.stderr.write("reporter:counter:Mortgage,Total,1\n")
    else:
        sys.stderr.write("reporter:counter:Other,Total,1\n")
    print product+'\t1'

Overwriting mapper.py


In [None]:
%%writefile reducer.py
#!/usr/bin/python

#HW 3.1 - Reducer Function Code
import sys
for line in sys.stdin:
    line=line.strip()
    print line

In [None]:
#Load the input data into HDFS and make sure the output directory is clear
#!bin/hdfs dfs -put Consumer_Complaints.csv
!bin/hdfs dfs -rm -r hw_3_1_final_output

In [None]:
%%bash
#Run the job
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-file ./mapper.py    -mapper ./mapper.py \
-file ./reducer.py   -reducer ./reducer.py \
-input /user/nicholashamlin/Consumer_Complaints.csv \
-output /user/nicholashamlin/hw_3_1_final_output

##HW 3.2 Analyze the performance of your Mappers, Combiners and Reducers using Counters

###Part A
*For this brief study the Input file will be one record (the next line only): 
foo foo quux labs foo bar quux*

*Perform a word count analysis of this single record dataset using a Mapper and Reducer based WordCount (i.e., no combiners are used here) using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing this word count job. The answer  should be 1 and 4 respectively. Please explain.*

#TODO:
- Confirm proper counter use (increment on print or add reduce tasks?)
- Add explanation

In [None]:
#Create a test file that we can use to test our code
! echo "foo foo quux labs foo bar quux" > testfile.txt

####HW 3.2 Part A - Mapper and Reducer
For this section, as well as the subsequent ones, we've included two counters.  One that increments when the file itself is called ("script calls") and one that increments when each file prints a line ("line calls").  While it's useful to be able to see the contrasts between the two, we're mainly interested in the script calls counter for the purposes of this question.

In [22]:
%%writefile mapper.py
#!/usr/bin/python

#HW 2.2 - Mapper Function Code
import sys
count = 0 #Running total of occurrances for the chosen word
sys.stderr.write("reporter:counter:Mapper,Script Calls,1\n") 
for line in sys.stdin:
    sys.stderr.write("reporter:counter:Mapper,Line Calls,1\n")    
    line=line.strip()
    words=line.split()
    for word in words:
        print word+'\t1'

Overwriting mapper.py


In [23]:
%%writefile reducer.py
#!/usr/bin/python

#HW 2.2 - Reducer Function Code
import sys
current_word=''
count = 0 #Running total of occurrances for the chosen word

#Increment script call counter once when the file runs
sys.stderr.write("reporter:counter:Reducer,Script Calls,1\n")
for line in sys.stdin:
    line=line.strip().split('\t') #Parse line into a list of fields
    word,sub_count=line
    if current_word==word:
        count+=int(sub_count) #Extract chunk count from the second field of each incoming line
    else:
        if current_word:
            
            #Increment line call counter whenever we emit a record
            sys.stderr.write("reporter:counter:Reducer,Line Calls,1\n")
            print current_word+'\t'+str(count)
        current_word=word
        count=int(sub_count)

#Make sure to emit final record and increment counter accordingly.
if current_word:
    sys.stderr.write("reporter:counter:Reducer,Line Calls,1\n")
    print current_word+'\t'+str(count)

Overwriting reducer.py


In [None]:
#Use the command line to test that our modified mapper/reducer files still work right
!chmod +x ./mapper.py ./reducer.py
!cat testfile.txt | ./mapper.py | sort| ./reducer.py

In [26]:
#Load the input data into HDFS and make sure the output directory is clear
#!bin/hdfs dfs -put testfile.txt
!bin/hdfs dfs -rm -r hw_3_2_a_output

16/02/01 23:14:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 23:14:33 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted hw_3_2_a_output


In [27]:
%%bash
#Run the job in Hadoop using 4 reducers!
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=4 \
-file ./mapper.py    -mapper ./mapper.py \
-file ./reducer.py   -reducer ./reducer.py \
-input /user/nicholashamlin/testfile.txt -output /user/nicholashamlin/hw_3_2_a_output

packageJobJar: [./mapper.py, ./reducer.py, /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/hadoop-unjar6400658890163302828/] [] /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/streamjob3903753916758367105.jar tmpDir=null


16/02/01 23:14:35 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 23:14:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 23:14:36 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/01 23:14:36 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/01 23:14:36 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/01 23:14:36 INFO mapreduce.JobSubmitter: number of splits:1
16/02/01 23:14:36 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/02/01 23:14:36 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
16/02/01 23:14:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1454033924139_0068
16/02/01 23:14:37 INFO impl.YarnClientImpl: Submitted application application_14540339241

Sure enough, with a single mapper and four reducers, we are able to see the corresponding values counted in the Script Calls counters in the Hadoop output above.

###Part B
*Please use mulitple mappers and reducers for these jobs (at least 2 mappers and 2 reducers).
Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper and Reducer based WordCount (i.e., no combiners used anywhere)  using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job.*


In [28]:
%%writefile mapper.py
#!/usr/bin/python

#HW 3.2.C - Mapper Function Code
import sys
import re
from csv import reader
WORD_RE = re.compile(r"[\w']+")

count = 0 #Running total of occurrances for the chosen word
sys.stderr.write("reporter:counter:Mapper,Script Count,1\n") 
for line in reader(sys.stdin):
    sys.stderr.write("reporter:counter:Mapper,Line Count,1\n")    
    try:
        int(line[0]) #check if the ID field is an integer, skip the record if not
    except ValueError:
        continue
    
    words = re.findall(WORD_RE, line[3])
    for word in words:
        print word.lower()+'\t1'

Overwriting mapper.py


In [29]:
%%writefile reducer.py
#!/usr/bin/python

#HW 2.2 - Reducer Function Code
import sys
current_word=''
count = 0 #Running total of occurrances for the chosen word
sys.stderr.write("reporter:counter:Reducer,Script Count,1\n")
for line in sys.stdin:
    line=line.strip().split('\t') #Parse line into a list of fields
    word,sub_count=line
    if current_word==word:
        count+=int(sub_count) #Extract chunk count from the second field of each incoming line
    else:
        if current_word:
            sys.stderr.write("reporter:counter:Reducer,Line Count,1\n")
            print current_word+'\t'+str(count)
        current_word=word
        count=int(sub_count)
if current_word:
    sys.stderr.write("reporter:counter:Reducer,Line Count,1\n")
    print current_word+'\t'+str(count)

Overwriting reducer.py


In [30]:
#Make sure the output directory is clear
!bin/hdfs dfs -rm -r hw_3_2_b_output

16/02/01 23:19:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 23:19:12 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted hw_3_2_b_output


In [31]:
%%bash
#Run the job in Hadoop, this time making sure to specify multiple mappers/reducers
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=2 \
-file ./mapper.py    -mapper ./mapper.py \
-file ./reducer.py   -reducer ./reducer.py \
-input /user/nicholashamlin/Consumer_Complaints.csv \
-output /user/nicholashamlin/hw_3_2_b_output

packageJobJar: [./mapper.py, ./reducer.py, /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/hadoop-unjar7114918443370059801/] [] /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/streamjob4918872333976241133.jar tmpDir=null


16/02/01 23:19:16 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 23:19:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 23:19:17 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/01 23:19:17 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/01 23:19:18 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/01 23:19:18 INFO mapreduce.JobSubmitter: number of splits:2
16/02/01 23:19:18 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/02/01 23:19:18 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
16/02/01 23:19:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1454033924139_0069
16/02/01 23:19:18 INFO impl.YarnClientImpl: Submitted application application_14540339241

In [61]:
# Examine the output of the job in HDFS and print the results
! echo "HW 3.2 PART B RESULTS:"
! echo "10 First Results:"
!bin/hdfs dfs -cat hw_3_2_b_output/* | head -10

HW 3.2 PART B RESULTS:
10 First Results:
16/01/31 20:36:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
a	3503
account	57448
acct	163
an	2964
and	16448
applied	139
apr	3431
arbitration	168
available	274
bankruptcy	222
cat: Unable to write to output stream.


In the two cells above, we can see that the job has run successfully, and that, as we'd expect, the script call counters for both the mapper and reducer correspond to the number of tasks we chose for the job (2 each).

###Part C
*Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper, Reducer, and standalone combiner (i.e., not an in-memory combiner) based WordCount using user defined Counters to count up how many time the mapper, combiner, reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job.*

Here, we use exactly the same mapper and reducer as the previous job, but now we add a combiner.  This combiner uses the same logic as the reducer, and is only separated into its own file so we can use it to iterate on its own counter.  If we didn't care about this distinction, we could simply point the job to use the reducer as the combiner and the collective work would be tracked under the single counter.

In [32]:
%%writefile mapper.py
#!/usr/bin/python

#HW 3.2.C - Mapper Function Code
import sys
import re
from csv import reader
WORD_RE = re.compile(r"[\w']+")

count = 0 #Running total of occurrances for the chosen word
sys.stderr.write("reporter:counter:Mapper,Script Count,1\n") 
for line in reader(sys.stdin):
    sys.stderr.write("reporter:counter:Mapper,Line Count,1\n")    
    try:
        int(line[0]) #check if the ID field is an integer, skip the record if not
    except ValueError:
        continue
    
    words = re.findall(WORD_RE, line[3])
    for word in words:
        print word.lower()+'\t1'

Overwriting mapper.py


In [33]:
%%writefile combiner.py
#!/usr/bin/python

#HW 2.2 - Combiner Function Code (same as reducer)
import sys
current_word=''
count = 0 #Running total of occurrances for the chosen word
sys.stderr.write("reporter:counter:Combiner,Script Count,1\n")
for line in sys.stdin:
    line=line.strip().split('\t') #Parse line into a list of fields
    word,sub_count=line
    if current_word==word:
        count+=int(sub_count) #Extract chunk count from the second field of each incoming line
    else:
        if current_word:
            sys.stderr.write("reporter:counter:Combiner,Line Count,1\n")
            print current_word+'\t'+str(count)
        current_word=word
        count=int(sub_count)
if current_word:
    sys.stderr.write("reporter:counter:Combiner,Line Count,1\n")
    print current_word+'\t'+str(count)

Overwriting combiner.py


In [34]:
%%writefile reducer.py
#!/usr/bin/python

#HW 2.2 - Reducer Function Code
import sys
current_word=''
count = 0 #Running total of occurrances for the chosen word
sys.stderr.write("reporter:counter:Reducer,Script Count,1\n")
for line in sys.stdin:
    line=line.strip().split('\t') #Parse line into a list of fields
    word,sub_count=line
    if current_word==word:
        count+=int(sub_count) #Extract chunk count from the second field of each incoming line
    else:
        if current_word:
            sys.stderr.write("reporter:counter:Reducer,Line Count,1\n")
            print current_word+'\t'+str(count)
        current_word=word
        count=int(sub_count)
if current_word:
    sys.stderr.write("reporter:counter:Reducer,Line Count,1\n")
    print current_word+'\t'+str(count)

Overwriting reducer.py


In [35]:
#Make sure combiner is executable and the HDFS output directory is clear
#!chmod +x ./combiner.py
!bin/hdfs dfs -rm -r hw_3_2_c_output

16/02/01 23:42:14 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 23:42:15 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted hw_3_2_c_output


In [36]:
%%bash
#Run the word count job in Hadoop with a single reducer
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=2 \
-file ./mapper.py    -mapper ./mapper.py \
-file ./reducer.py   -reducer ./reducer.py \
-file ./combiner.py   -combiner ./combiner.py \
-input /user/nicholashamlin/Consumer_Complaints.csv \
-output /user/nicholashamlin/hw_3_2_c_output

packageJobJar: [./mapper.py, ./reducer.py, ./combiner.py, /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/hadoop-unjar2476349128554665704/] [] /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/streamjob2811663075173071386.jar tmpDir=null


16/02/01 23:42:26 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 23:42:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 23:42:27 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/01 23:42:27 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/01 23:42:28 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/01 23:42:28 INFO mapreduce.JobSubmitter: number of splits:2
16/02/01 23:42:28 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/02/01 23:42:28 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
16/02/01 23:42:28 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1454033924139_0070
16/02/01 23:42:28 INFO impl.YarnClientImpl: Submitted application application_14540339241

In [68]:
# Examine the output of the job in HDFS and print the results
! echo "HW 3.2 PART C RESULTS:"
! echo "10 First Results:"
!bin/hdfs dfs -cat hw_3_2_c_output/* | head -10

HW 3.2 PART C RESULTS:
10 First Results:
16/01/31 20:39:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
a	3503
account	57448
acct	163
an	2964
and	16448
applied	139
apr	3431
arbitration	168
available	274
bankruptcy	222
cat: Unable to write to output stream.


This time, when we add the combiner, we see that it runs four times, in addition to the two map and reduce tasks.  Had we used the reducer as a combiner, we'd have seen 2 map tasks and 6 reduce tasks.

### Part D
*Using a single reducer: What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items).*

#### HW 3.2 Part D - Mapper and Reducer
This code works very similarly to the word count jobs we ran in the previous homework, with the addition of the counters.  Since we need to produce both raw frequencies and relative frequencies for each word, we need to use order inversion to make sure that the total word count is available to the reducer before it begins iterating through the individual words.  We can accomplish this by taking advantage of the Hadoop shuffle and assigning the total a special key that will be automatically sorted to the top before the data is passed to the reducer.  Thankfully, we only have one reducer, so no fancy partioning is necessary to make this work.

In [70]:
%%writefile mapper.py
#!/usr/bin/python

#HW 3.2.D - Mapper Function Code
import sys
import re
from csv import reader
WORD_RE = re.compile(r"[\w']+")

count = 0 #Running total of occurrances for the chosen word
sys.stderr.write("reporter:counter:Mapper,Script Count,1\n") 
for line in reader(sys.stdin):
    sys.stderr.write("reporter:counter:Mapper,Line Count,1\n")    
    try:
        int(line[0]) #check if the ID field is an integer, skip the record if not
    except ValueError:
        continue
    
    words = re.findall(WORD_RE, line[3])
    
    for word in words:
        print word.lower()+'\t1' #emit one record for each word

Overwriting mapper.py


In [71]:
%%writefile reducer.py
#!/usr/bin/python

#HW 2.2 - Reducer Function Code
import sys
current_word=''
count = 0 #Running total of occurrances for the chosen word
sys.stderr.write("reporter:counter:Reducer,Script Count,1\n")
for line in sys.stdin:
    line=line.strip().split('\t') #Parse line into a list of fields
    word,sub_count=line
    if current_word==word:
        count+=int(sub_count) #Extract chunk count from the second field of each incoming line
    else:
        if current_word:
            sys.stderr.write("reporter:counter:Reducer,Line Count,1\n")
            print current_word+'\t'+str(count)
        current_word=word
        count=int(sub_count)
if current_word:
    sys.stderr.write("reporter:counter:Reducer,Line Count,1\n")
    print current_word+'\t'+str(count)

Overwriting reducer.py


In [None]:
#!chmod +x ./mapper2.py ./reducer2.py
!cat complaints_test.csv | ./mapper.py |sort -k1,1| ./reducer.py #| ./mapper2.py | sort -nr | ./reducer2.py > test_output.txt
#!cat test_output.txt | head -30 
#!rm test_output.txt

In [72]:
#Load the input data into HDFS and make sure the output directory is clear
#!bin/hdfs dfs -put testfile.txt
!bin/hdfs dfs -rm -r hw_3_2_d_tmp_output

16/01/31 20:41:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 20:41:44 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted hw_3_2_d_tmp_output


In [73]:
%%bash
#Run the word count job in Hadoop with a single reducer
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=1 \
-file ./mapper.py    -mapper ./mapper.py \
-file ./reducer.py   -reducer ./reducer.py \
-input /user/nicholashamlin/Consumer_Complaints.csv -output /user/nicholashamlin/hw_3_2_d_tmp_output

packageJobJar: [./mapper.py, ./reducer.py, /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/hadoop-unjar4337502379901276165/] [] /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/streamjob1531995029869945457.jar tmpDir=null


16/01/31 20:41:49 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/01/31 20:41:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 20:41:51 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/01/31 20:41:51 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/01/31 20:41:51 INFO mapred.FileInputFormat: Total input paths to process : 1
16/01/31 20:41:51 INFO mapreduce.JobSubmitter: number of splits:2
16/01/31 20:41:51 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/01/31 20:41:51 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
16/01/31 20:41:52 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1454033924139_0045
16/01/31 20:41:52 INFO impl.YarnClientImpl: Submitted application application_14540339241

In [74]:
%%writefile mapper2.py
#!/usr/bin/python

#HW 2.2.1 - Mapper Function Code
import sys
for line in sys.stdin:
    clean_line=line.strip() #strip whitespace, just to be safe
    fields=clean_line.split('\t') #parse remaining line
    print fields[1]+'\t'+fields[0] #reverse key-value from previous job

Overwriting mapper2.py


In [75]:
%%writefile reducer2.py
#!/usr/bin/python

#HW 2.2.1 - Reducer Function Code
import sys
for line in sys.stdin:
    print "%s" % (line.strip()) #pass line through unchanged

Overwriting reducer2.py


In [76]:
#Load the input data into HDFS and make sure the output directory is clear
#!bin/hdfs dfs -put testfile.txt
!bin/hdfs dfs -rm -r hw_3_2_d_final_output

16/01/31 20:43:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 20:43:18 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted hw_3_2_d_final_output


In [77]:
%%bash
#Run the sorting job using the output of the previous data in Hadoop with a single reducer
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=1 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D  mapred.text.key.comparator.options=-nr \
-file ./mapper2.py    -mapper ./mapper2.py \
-file ./reducer2.py   -reducer ./reducer2.py \
-input /user/nicholashamlin/hw_3_2_d_tmp_output -output /user/nicholashamlin/hw_3_2_d_final_output

packageJobJar: [./mapper2.py, ./reducer2.py, /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/hadoop-unjar1628639992031426564/] [] /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/streamjob3923105184832607127.jar tmpDir=null


16/01/31 20:43:27 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/01/31 20:43:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 20:43:27 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/01/31 20:43:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/01/31 20:43:28 INFO mapred.FileInputFormat: Total input paths to process : 1
16/01/31 20:43:28 INFO mapreduce.JobSubmitter: number of splits:2
16/01/31 20:43:28 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/01/31 20:43:28 INFO Configuration.deprecation: mapred.text.key.comparator.options is deprecated. Instead, use mapreduce.partition.keycomparator.options
16/01/31 20:43:28 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
16/01/31 20:43:28 INFO Configur

In [78]:
# Examine the output of the job in HDFS and print the results
! echo "HW 3.2 PART D RESULTS:"
! echo "50 Largest Numbers:"
!bin/hdfs dfs -cat hw_3_2_d_final_output/* | head -50
! echo "==================================="
! echo "10 Smallest Numbers:"
!bin/hdfs dfs -cat hw_3_2_d_final_output/* | tail -10

HW 3.2 PART D RESULTS:
50 Largest Numbers:
16/01/31 20:44:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
119630	loan
72394	collection
70487	foreclosure
70487	modification
57448	account
55251	credit
40508	or
39993	payments
36767	servicing
36767	escrow
34903	report
29133	incorrect
29069	information
29069	on
27874	debt
19000	closing
18477	not
17972	owed
17972	cont'd
17972	attempts
17972	collect
16448	and
16205	opening
16205	management
13983	of
10731	my
10555	withdrawals
10555	deposits
9484	problems
8868	application
8671	communication
8671	tactics
8625	mortgage
8625	originator
8625	broker
8401	to
8178	unable
8158	billing
7886	other
7655	verification
7655	disclosure
6938	disputes
6559	reporting
6337	lease
6248	the
5663	low
5663	by
5663	being
5663	funds
5663	caused
10 Smallest Numbers:
16/01/31 20:44:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java 

##HW3.3. Shopping Cart Analysis
Product Recommendations: The action or practice of selling additional products or services 
to existing customers is called cross-selling. Giving product recommendation is 
one of the examples of cross-selling that are frequently used by online retailers. 
One simple method to give product recommendations is to recommend products that are frequently
browsed together by the customers.

For this homework use the [online browsing behavior dataset](https://www.dropbox.com/s/zlfyiwa70poqg74/ProductPurchaseData.txt?dl=0)

      
Each line in this dataset represents a browsing session of a customer. 
On each line, each string of 8 characters represents the id of an item browsed during that session. The items are separated by spaces.

Do some exploratory data analysis of this dataset. How many unique items are available from this supplier?

Using a single reducer: Report your findings such as number of unique products; largest basket; report the top 50 most frequently purchased items,  their frequency,  and their relative frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce. 


### HW 3.3 Mapper and Reducer #1
The first mapreduce job does the majority of the work.  The mapper emits one space-delimited row with the format {product_id cart_id 1 number_of_products_in_cart] where cart_id is a auto incremented identifier for the row to make the largest cart easy to find at the end.  By passing the number of products in the cart along with each product, we can track the largest cart in the reducer as we iterate through all the mapper output.  In addition, we use an order-inversion pattern to emit the total number of all products seen as the first record(s) that the reducer sees.  This enables lazy calculation of product relative frequencies without the need for storing many intermediate values in memory.

In [42]:
%%writefile mapper.py
#!/usr/bin/python

#HW 3.3 - Mapper Function Code
import sys
count = 0 #Running total of occurrances for the chosen product
product_count=0
cart_id=1 #The carts don't have IDs of their own, but we can make our own
for line in sys.stdin:
    line=line.strip()
    products=line.split() #split on whitespace

    for product in products:
        product_count+=1
        print product+' '+str(cart_id)+' 1 '+str(len(products)) #emit one row per product per cart
    cart_id+=1
print '**Total '+'0'+' '+str(product_count)+' 0' #emit total number of products for order inversion

Overwriting mapper.py


In [43]:
%%writefile reducer.py
#!/usr/bin/python

#HW 3.3 - Reducer Function Code
from __future__ import division
import sys
current_product=None
count = 0 #Running total of occurrances for the chosen product
largest_basket_id=0
largest_basket_size=0
unique_products=0
total_product_count=0

for line in sys.stdin:
    #Parse line into fields
    product,cart_id,sub_count,cart_total=line.strip().split(' ')
    sub_count=int(sub_count)
    cart_total=int(cart_total)
    
    if product=='**Total': #Extract total products for order inversion
        total_product_count+=sub_count
        continue
        
    #If we find a cart that's bigger than any we've seen so far, record it
    if cart_total>largest_basket_size: 
        largest_basket_size=cart_total
        largest_basket_id=cart_id
  
    if current_product==product:
        count+=int(sub_count)
    else:
        if current_product and current_product!='**Total':
            print current_product+'\t'+str(count)+'\t'+str(count/total_product_count)
            unique_products+=1
        current_product=product
        count=int(sub_count)
if current_product:
    print current_product+'\t'+str(count)+'\t'+str(count/total_product_count)
    unique_products+=1
    
print '*Largest Cart\t'+str(largest_basket_id)+'\t'+str(largest_basket_size)
print '*Unique Products'+'\t'+str(unique_products)
#print '*Total Products'+'\t'+str(total_product_count)

Overwriting reducer.py


#### HW 3.3 Mapper and Reducer #2
As in previous problems, we run a second job focused solely on sorting the output of the first job. 

In [44]:
%%writefile mapper2.py
#!/usr/bin/python

largest_basket_id=0
largest_basket_size=0
unique_products=0

#HW 3.3 - Mapper #2 Function Code
import sys
for line in sys.stdin:
    clean_line=line.strip() #strip whitespace, just to be safe
    fields=clean_line.split('\t') #parse remaining line
    if fields[0]=="*Largest Cart":
        largest_basket_id=fields[1]
        largest_basket_size=fields[2]
        continue
    elif fields[0]=="*Unique Products":
        unique_products=fields[1]
        continue
    else:
        try:
            print fields[1]+'\t'+fields[0]+'\t'+fields[2] #reverse key-value from previous job
        except:
            print fields
#pass these values through
print '*Largest Cart\t'+'count\t'+str(largest_basket_size)
print '*Unique Products\t'+'count\t'+str(unique_products)


Overwriting mapper2.py


In [45]:
%%writefile reducer2.py
#!/usr/bin/python

#HW 3.3 - Reducer #2 Function Code
import sys
for line in sys.stdin:
    print "%s" % (line.strip()) #pass line through unchanged

Overwriting reducer2.py


#### HW 3.3 - Running the jobs

In [46]:
### Make sure data is available and 1st job output directory is clear in HDFS
#!bin/hdfs dfs -put ProductPurchaseData.txt
!bin/hdfs dfs -rm -r hw_3_3_tmp_output

16/02/02 00:00:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 00:00:29 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted hw_3_3_tmp_output


In [47]:
%%bash
#Run the word count job in Hadoop with a single reducer, as per the instructions
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=1 \
-file ./mapper.py    -mapper ./mapper.py \
-file ./reducer.py   -reducer ./reducer.py \
-input /user/nicholashamlin/ProductPurchaseData.txt -output /user/nicholashamlin/hw_3_3_tmp_output

packageJobJar: [./mapper.py, ./reducer.py, /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/hadoop-unjar772246691824913389/] [] /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/streamjob4584940453826406318.jar tmpDir=null


16/02/02 00:00:32 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/02 00:00:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 00:00:33 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/02 00:00:33 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/02 00:00:33 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/02 00:00:33 INFO mapreduce.JobSubmitter: number of splits:2
16/02/02 00:00:33 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/02/02 00:00:33 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
16/02/02 00:00:33 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1454033924139_0073
16/02/02 00:00:33 INFO impl.YarnClientImpl: Submitted application application_14540339241

In [48]:
#Make sure the destination directory for the second job is clear
!bin/hdfs dfs -rm -r hw_3_3_final_output

16/02/02 00:04:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 00:04:08 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted hw_3_3_final_output


In [49]:
%%bash
#Run the sorting job using the output of the previous data in Hadoop with a single reducer
#We also use the secondary sort to make sure the output arrives in the correct order
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=1 \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options='-k1,1nr -k2,2' \
-file ./mapper2.py    -mapper ./mapper2.py \
-file ./reducer2.py   -reducer ./reducer2.py \
-input /user/nicholashamlin/hw_3_3_tmp_output -output /user/nicholashamlin/hw_3_3_final_output

packageJobJar: [./mapper2.py, ./reducer2.py, /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/hadoop-unjar6783790101450448729/] [] /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/streamjob2322646805457870257.jar tmpDir=null


16/02/02 00:04:12 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/02 00:04:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 00:04:13 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/02 00:04:13 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/02 00:04:14 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/02 00:04:14 INFO mapreduce.JobSubmitter: number of splits:2
16/02/02 00:04:14 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/02/02 00:04:14 INFO Configuration.deprecation: mapred.text.key.comparator.options is deprecated. Instead, use mapreduce.partition.keycomparator.options
16/02/02 00:04:14 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
16/02/02 00:04:14 INFO Configur

In [50]:
# Examine the output of the job in HDFS and print the results
! echo "HW 3.3 RESULTS:"
! echo ""
! echo "50 Most Frequent Products:"
! echo "Raw Frequency | Product ID | Relative Frequency"
!bin/hdfs dfs -cat hw_3_3_final_output/* | head -50
! echo "==================================="
! echo "Summary Stats:"
# We have this odd syntax here because we're trying to do everything in 1 job
!bin/hdfs dfs -cat hw_3_3_final_output/* | tail -4 |head -2

HW 3.3 RESULTS:

50 Most Frequent Products:
Raw Frequency | Product ID | Relative Frequency
16/02/02 00:07:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
6667	DAI62779	0.0175067747831
3881	FRO40251	0.010191059387
3875	ELE17451	0.0101753040775
3602	GRO73461	0.00945843749344
3044	SNA80324	0.00799319370628
2851	ELE32164	0.0074863979161
2736	DAI75645	0.00718442114993
2455	SNA45677	0.0064465474865
2330	FRO31317	0.0061183118711
2293	DAI85309	0.00602115412894
2292	ELE26917	0.00601852824402
2233	FRO80039	0.00586360103355
2115	GRO21487	0.00555374661261
2083	SNA99873	0.00546971829507
2004	GRO59710	0.00526227338613
1920	GRO71621	0.00504169905258
1918	FRO85978	0.00503644728273
1840	GRO30386	0.00483162825872
1816	ELE74009	0.00476860702057
1784	GRO56726	0.00468457870302
1773	DAI63921	0.00465569396887
1756	GRO46854	0.00461105392517
1713	ELE66600	0.00449814087347
1712	DAI83733	0.00449551498855
1702	FRO32293	0.0044692

Based on these results, we have 12592 unique products browsed.  The largest session covered 37 products, and the most commonly browsed product was DAI62779, which was seen 6667 times for a relative frequency of 0.0175.

##HW3.4. Pairs
(Computationally prohibitive but then again Hadoop can handle this) 

Suppose we want to recommend new products to the customer based on the products they
have already browsed on the online website. Write a map-reduce program 
to find products which are frequently browsed together. Fix the support count (cooccurence count) to s = 100 
(i.e. product pairs need to occur together at least 100 times to be considered frequent) 
and find pairs of items (sometimes referred to itemsets of size 2 in association rule mining) that have a support count of 100 or more.

List the top 50 product pairs with corresponding support count (aka frequency), and relative frequency or support (number of records where they coccur, the number of records where they coccur/the number of baskets in the dataset)  in decreasing order of support  for frequent (100>count) itemsets of size 2. 

Use the Pairs pattern (lecture 3)  to  extract these frequent itemsets of size 2. Free free to use combiners if they bring value. Instrument your code with counters for count the number of times your mapper, combiner and reducers are called.  

Please output records of the following form for the top 50 pairs (itemsets of size 2): 

      item1, item2, support count, support

Fix the ordering of the pairs lexicographically (left to right), 
and break ties in support (between pairs, if any exist) 
by taking the first ones in lexicographically increasing order. 

Report  the compute time for the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers)
Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts.


####HW 3.4 - Mapper and Reducer #1
The first job does most of the work here.  Each cart is broken into a lists of products.  As we iterate through the list, a pair is emitted for each combination of the current product and every subsequent product that follows.  This ensures that we don't double count pairs.  Similarly, pairs are sorted lexicographically before being sent to the reducer.  As before, we use order inversion to ensure that the reducers can access the overal total and compute relative frequency efficiently. 

In the reducer, we take the same approach as we have with past word counts, with the key difference being that two pieces of information are compared (both products) to the current state of each iteration.  If the incoming pair matches the present pair, we increment our totals.  If not, we emit the record.

In [94]:
%%writefile mapper.py
#!/usr/bin/python

#HW 3.4 - Mapper Function Code
import sys
number_of_carts=0
sys.stderr.write("reporter:counter:Mapper,Script Count,1\n") 

for line in sys.stdin:
    line=line.strip()
    products=line.split() #split on whitespace

    for i,product in enumerate(products):
        product_a=product
        
        #Only iterate through products we haven't seen yet to avoid duplicates
        for product_b in products[i+1:]:
            
            #Sort output in alphabetical order
            output=sorted([product_a,product_b])
            sys.stderr.write("reporter:counter:Mapper,Line Count,1\n")
            #Emit one pair for every result
            print output[0]+'\t'+output[1]+'\t1'
    number_of_carts+=1

#Output total number of carts with a special key for order-inversion purposes
#Two versions will enable one record for each reducer.  If we wanted to be more
#precise here, we could add an additional field to the output and use a custom partitioner
print '**Total0'+'\t'+'**Total0'+'\t'+str(number_of_carts)
print '**Total1'+'\t'+'**Total1'+'\t'+str(number_of_carts)
sys.stderr.write("reporter:counter:Mapper,Line Count,1\n") 

Overwriting mapper.py


In [95]:
%%writefile reducer.py
#!/usr/bin/python

#HW 3.4 - Reducer Function Code
from __future__ import division
import sys

sys.stderr.write("reporter:counter:Reducer,Script Count,1\n") 

s=100 #cutoff for "frequent"

#We want to track two products at a time as they come in from the mapper
current_product_a=None 
current_product_b=None
count = 0 #Running total of occurrances for the chosen product

number_of_carts=0

for line in sys.stdin:
    #Parse line into fields
    product_a,product_b,support=line.strip().split('\t')
    support=int(support)
    
    #Extract total products for order inversion
    if product_a[:7]=='**Total' and product_b[:7]=='**Total': 
        number_of_carts+=support
        continue
    
    #Only increment counter if both products match
    if current_product_a==product_a and current_product_b==product_b:
        count+=support
    else:
        if current_product_a and current_product_b and current_product_a[:7]!='**Total':
            if count>=s:
                print current_product_a+'\t'+current_product_b+'\t'+str(count)+'\t'+str(count/number_of_carts)
                sys.stderr.write("reporter:counter:Reducer,Line Count,1\n") 
        current_product_a=product_a
        current_product_b=product_b
        count=support
        
#Make sure to emit final result as well
if current_product_a and current_product_b and current_product_a[:7]!='**Total':
    if count>=s:
        print current_product_a+'\t'+current_product_b+'\t'+str(count)+'\t'+str(count/number_of_carts)
        sys.stderr.write("reporter:counter:Reducer,Line Count,1\n") 


Overwriting reducer.py


####HW 3.4 - Mapper and Reducer #2
This second job takes care of the sorting.  We need to wait until the first job is completed to do this sort, otherwise we won't have calculated the overall totals for each pair.  

In [96]:
%%writefile mapper2.py
#!/usr/bin/python

#HW 3.4 - Mapper #2 Function Code
import sys
for line in sys.stdin:
    clean_line=line.strip() #strip whitespace, just to be safe
    fields=clean_line.split('\t') #parse remaining line
    print fields[2]+'\t'+fields[0]+'\t'+fields[1]+'\t'+fields[3]


Overwriting mapper2.py


In [97]:
%%writefile reducer2.py
#!/usr/bin/python

#HW 3.4 - Reducer #2 Function Code
import sys
for line in sys.stdin:
    clean_line=line.strip() #strip whitespace, just to be safe
    fields=clean_line.split('\t') #parse remaining line
    print fields[1]+'\t'+fields[2]+'\t'+fields[0]+'\t'+fields[3]

Overwriting reducer2.py


####HW 3.4 - Running the jobs
The first job is the one we're interested in tracking, since it's where we've implemented the pairs approach.  To monitor how long it takes, we use the simple `time` command available in bash.

In [98]:
### Make sure 1st job output directory is clear in HDFS
#!bin/hdfs dfs -put purchase_test.txt
!bin/hdfs dfs -rm -r hw_3_4_tmp_output
#!bin/hdfs dfs -ls

16/02/02 00:57:20 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `hw_3_4_tmp_output': No such file or directory


In [99]:
#!chmod +x ./mapper2.py ./reducer2.py
#!echo "FRO11987 ELE17451 ELE89019 FRO11987 SNA90258 GRO99222 " | ./mapper.py |sort | ./reducer.py
!cat purchase_test.txt  | ./mapper.py |sort -k 1,1 -k2,2 | ./reducer.py #| sort -k 3,3nr -k 1,1 -k 2,2
#!cat purchase_test.txt | ./mapper.py |sort  | ./reducer.py # | ./mapper2.py | sort -nr | ./reducer2.py  > test_output.txt
#!rm test_output.txt

reporter:counter:Mapper,Script Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:counter:Mapper,Line Count,1
reporter:c

In [100]:
%%bash
#Run the word count job in Hadoop, making sure to time the results
time bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=2 \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options='-k1,1 -k2,2' \
-file ./mapper.py    -mapper ./mapper.py \
-file ./reducer.py   -reducer ./reducer.py \
-input /user/nicholashamlin/ProductPurchaseData.txt -output /user/nicholashamlin/hw_3_4_tmp_output

packageJobJar: [./mapper.py, ./reducer.py, /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/hadoop-unjar4553872691828371481/] [] /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/streamjob2886759158133234087.jar tmpDir=null


16/02/02 00:57:26 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/02 00:57:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 00:57:27 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/02 00:57:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/02 00:57:28 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/02 00:57:28 INFO mapreduce.JobSubmitter: number of splits:2
16/02/02 00:57:28 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/02/02 00:57:28 INFO Configuration.deprecation: mapred.text.key.comparator.options is deprecated. Instead, use mapreduce.partition.keycomparator.options
16/02/02 00:57:28 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
16/02/02 00:57:28 INFO Configur

In [60]:
#Make sure final output directory is clear
!bin/hdfs dfs -rm -r hw_3_4_final_output

16/02/02 00:44:01 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 00:44:02 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted hw_3_4_final_output


In [61]:
%%bash
#Run the sorting job using the output of the previous data in Hadoop with a single reducer
time bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D stream.num.map.output.key.fields=3 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options='-k 1,1nr -k 2,2 -k 3,3' \
-file ./mapper2.py    -mapper ./mapper2.py \
-file ./reducer2.py   -reducer ./reducer2.py \
-input /user/nicholashamlin/hw_3_4_tmp_output -output /user/nicholashamlin/hw_3_4_final_output

packageJobJar: [./mapper2.py, ./reducer2.py, /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/hadoop-unjar7620061275795980411/] [] /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/streamjob3264147358446414775.jar tmpDir=null


16/02/02 00:44:06 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/02 00:44:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 00:44:07 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/02 00:44:07 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/02 00:44:08 INFO mapred.FileInputFormat: Total input paths to process : 0
16/02/02 00:44:08 INFO mapreduce.JobSubmitter: number of splits:0
16/02/02 00:44:08 INFO Configuration.deprecation: mapred.text.key.comparator.options is deprecated. Instead, use mapreduce.partition.keycomparator.options
16/02/02 00:44:08 INFO Configuration.deprecation: mapred.output.key.comparator.class is deprecated. Instead, use mapreduce.job.output.key.comparator.class
16/02/02 00:44:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1454033924139_0076
16/02/02 00:4

In [62]:
# Examine the output of the job in HDFS and print the results
! echo "HW 3.4 RESULTS:"
! echo ""
! echo "50 Most Frequent Pairs:"
! echo "Product 1   |   Product 2 | Raw Freq. | Support "
!bin/hdfs dfs -cat hw_3_4_final_output/* | head -50

HW 3.4 RESULTS:

50 Most Frequent Pairs:
Product 1   |   Product 2 | Raw Freq. | Support 
16/02/02 00:44:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


##HW3.5. Stripes
*Repeat 3.4 using the stripes design pattern for finding cooccuring pairs.*

*Report  the compute times for stripes job versus the Pairs job. Describe the computational setup used (E.g., single computer; dual core; linux, number of mappers, number of reducers)*

*Instrument your mapper, combiner, and reducer to count how many times each is called using Counters and report these counts. Discuss the differences in these counts between the Pairs and Stripes jobs*


#### HW 3.5 Mapper and Reducer #1
For the stripes implementation, the mapper emits an associative array for each product that contains the list of products paired with it.  In the reducer, these arrays are unpacked and summed to product the final total for each pair of products.  We implement this simply in python using the Counter object, which enables us to increment the marginal product counts using similar syntax to the pairs approach and without needing to worry about locating and incrementing each individual value for each individual product.

In [108]:
%%writefile mapper.py
#!/usr/bin/python

#HW 3.5 - Mapper #1 Function Code
import sys
sys.stderr.write("reporter:counter:Mapper,Script Count,1\n") 
number_of_carts=0
for line in sys.stdin:
    line=line.strip()
    products=line.split() #split on whitespace
    try:
        products.sort() #This products against double-counting
    except:
        pass #don't bother sorting if there aren't any pairs
    for i,product_a in enumerate(products):
        line_output={}
        for product_b in products[i+1:]:
            try:
                line_output[product_b]+=1
            except KeyError: #If we haven't seen a product before, add it to the dictionary
                line_output[product_b]=1
        if line_output.keys(): #only emit a record if we find more than one product in the cart
            print product_a+'\t'+str(line_output)
            sys.stderr.write("reporter:counter:Mapper,Line Count,1\n") 
    
    number_of_carts+=1
print '**Total0'+'\t{"number_of_carts":'+str(number_of_carts)+'}'
print '**Total1'+'\t{"number_of_carts":'+str(number_of_carts)+'}'
sys.stderr.write("reporter:counter:Mapper,Line Count,1\n")

Overwriting mapper.py


In [109]:
%%writefile reducer.py
#!/usr/bin/python

#HW 3.5 - Reducer #1 Function Code
from __future__ import division
import sys
from collections import Counter, OrderedDict
sys.stderr.write("reporter:counter:Reducer,Script Count,1\n") 
s=100 #cutoff for "frequent"
current_product_dict=Counter({}) #Counters make tracking individual product counts easier
current_product=None
count = 0 #Running total of occurrances for the chosen product

number_of_carts=0

for line in sys.stdin:
    #Parse line into fields
    product,product_dict=line.strip().split('\t')
    #The dict is passed from the mapper as a string, so we need to convert it back to a dict
    product_dict=Counter(eval(product_dict))  
    
    if product[:7]=='**Total': #Extract total products for order inversion
        number_of_carts+=product_dict['number_of_carts']
        continue
  
    if current_product==product:
        #The counter is smart enough to increment keys when they exist, and create them when they don't
        current_product_dict+=Counter(product_dict)
    else:
        if current_product and current_product[:7]!='**Total':
            #Ordering the results ensures we maintain lexicographic sorting through the reducer
            for i in OrderedDict(sorted(current_product_dict.items())):
                if current_product_dict[i]>=s:
                    sys.stderr.write("reporter:counter:Reducer,Line Count,1\n") 
                    print current_product+'\t'+i+'\t'+str(current_product_dict[i])+'\t'+str(current_product_dict[i]/number_of_carts)
        current_product=product
        current_product_dict=product_dict

#Make sure to emit final row
if current_product and current_product[:7]!='**Total':
    for i in OrderedDict(sorted(current_product_dict.items())):
        if current_product_dict[i]>=s:
            sys.stderr.write("reporter:counter:Reducer,Line Count,1\n") 
            print current_product+'\t'+i+'\t'+str(current_product_dict[i])+'\t'+str(current_product_dict[i]/number_of_carts)

Overwriting reducer.py


In [None]:
#!chmod +x ./mapper2.py ./reducer2.py
#!echo "FRO11987 ELE17451 ELE89019 FRO11987 SNA90258 GRO99222 " | ./mapper.py |sort | ./reducer.py
!cat purchase_test.txt  | ./mapper.py |sort -k 1,1 -k2,2 | ./reducer.py #| sort -k 3,3nr -k 1,1 -k 2,2
#!cat purchase_test.txt | ./mapper.py |sort  | ./reducer.py # | ./mapper2.py | sort -nr | ./reducer2.py  > test_output.txt
#!rm test_output.txt

In [110]:
### Make sure 1st job output directory is clear in HDFS
!bin/hdfs dfs -rm -r hw_3_5_tmp_output
#!bin/hdfs dfs -ls

16/02/02 01:07:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 01:07:08 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted hw_3_5_tmp_output


In [111]:
%%bash
#Run the word count job in Hadoop
time bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D mapred.map.tasks=2 \
-D mapred.reduce.tasks=2 \
-D stream.num.map.output.key.fields=1 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options='-k1,1' \
-file ./mapper.py    -mapper ./mapper.py \
-file ./reducer.py   -reducer ./reducer.py \
-input /user/nicholashamlin/ProductPurchaseData.txt -output /user/nicholashamlin/hw_3_5_tmp_output

packageJobJar: [./mapper.py, ./reducer.py, /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/hadoop-unjar7418117565516238624/] [] /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/streamjob4690028250125051951.jar tmpDir=null


16/02/02 01:07:09 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/02 01:07:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 01:07:11 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/02 01:07:11 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/02 01:07:11 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/02 01:07:11 INFO mapreduce.JobSubmitter: number of splits:2
16/02/02 01:07:11 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/02/02 01:07:11 INFO Configuration.deprecation: mapred.text.key.comparator.options is deprecated. Instead, use mapreduce.partition.keycomparator.options
16/02/02 01:07:11 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
16/02/02 01:07:11 INFO Configur

In [105]:
#Make sure the final output directory is clear
!bin/hdfs dfs -rm -r hw_3_5_final_output

16/02/02 01:02:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 01:02:52 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted hw_3_5_final_output


In [106]:
%%bash
#Run the sorting job using the output of the previous data in Hadoop with a single reducer
time bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.6.3.jar \
-D stream.num.map.output.key.fields=3 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options='-k 1,1nr -k 2,2 -k 3,3' \
-file ./mapper2.py    -mapper ./mapper2.py \
-file ./reducer2.py   -reducer ./reducer2.py \
-input /user/nicholashamlin/hw_3_5_tmp_output -output /user/nicholashamlin/hw_3_5_final_output

packageJobJar: [./mapper2.py, ./reducer2.py, /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/hadoop-unjar2717343439363103815/] [] /var/folders/rz/drh189k95919thyy3gs3tq400000gn/T/streamjob1093526335151344432.jar tmpDir=null


16/02/02 01:02:57 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/02 01:02:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/02 01:02:58 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/02 01:02:58 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
16/02/02 01:02:59 INFO mapred.FileInputFormat: Total input paths to process : 2
16/02/02 01:02:59 INFO mapreduce.JobSubmitter: number of splits:2
16/02/02 01:02:59 INFO Configuration.deprecation: mapred.text.key.comparator.options is deprecated. Instead, use mapreduce.partition.keycomparator.options
16/02/02 01:02:59 INFO Configuration.deprecation: mapred.output.key.comparator.class is deprecated. Instead, use mapreduce.job.output.key.comparator.class
16/02/02 01:02:59 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1454033924139_0082
16/02/02 01:0

In [107]:
# Examine the output of the job in HDFS and print the results
! echo "HW 3.5 RESULTS:"
! echo ""
! echo "50 Most Frequent Pairs:"
! echo "Product 1   |   Product 2 | Raw Freq. | Support "
!bin/hdfs dfs -cat hw_3_5_final_output/* | head -50

HW 3.5 RESULTS:

50 Most Frequent Pairs:
Product 1   |   Product 2 | Raw Freq. | Support 
16/02/02 01:04:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
DAI62779	ELE17451	1592	0.0511880646925
FRO40251	SNA80324	1412	0.0454004694383
DAI75645	FRO40251	1254	0.0403202469374
FRO40251	GRO85051	1213	0.0390019613517
DAI62779	GRO73461	1139	0.0366226166361
DAI75645	SNA80324	1130	0.0363332368734
DAI62779	FRO40251	1070	0.0344040384554
DAI62779	SNA80324	923	0.0296775023311
DAI62779	DAI85309	918	0.0295167357963
ELE32164	GRO59710	911	0.0292916626475
DAI62779	DAI75645	882	0.0283592167454
FRO40251	GRO73461	882	0.0283592167454
DAI62779	ELE92920	877	0.0281984502106
FRO40251	FRO92469	835	0.026848011318
DAI62779	ELE32164	832	0.0267515513971
DAI75645	GRO73461	712	0.0228931545609
DAI43223	ELE32164	711	0.022861001254
DAI62779	GRO30386	709	0.02279669464
ELE17451	FRO40251	697	0.0224108549564
DAI85309	ELE99737	659	0.0211890292917

###HW 3.5 - Discussion of results
Again, we are running this job on a single Macbook Pro with 2 cores using two mappers and two reducers.  This time though, we find the job takes 3 minutes and 2 seconds to run, with the mapper and reducer both getting called twice.  This extra runtime is likely caused by the fact that reducing is a more computationally complex process when we're using stripes.  Pairs, on the other hand, generates many more intermediate data points for the reducer to process, though their structure is simpler.  The main benefit of the stripes paradigm is that it reduces the amount of network throughput required during a job, and it's in this exchange of data points that efficiencies can be created.  However, since we're running this job on a single machine rather than a cluster, the network traffic savings don't outweight the additional computational complexity of the stripes reducer step.

###End of Submission