# DATASCI W261: Machine Learning at Scale 

**Name: Carlos Eduardo Rodriguez Castillo**

**email: cerodriguez@berkeley.edu**

**Week 3**

**Section 2**

__This notebook attempts to solve the exercises for homework assignment three.__

### HW3.0

How do you merge  two sorted  lists/arrays of records of the form [key, value]? Where is this  used in Hadoop MapReduce? [Hint within the shuffle]

__ANSWER:__

Two sorted lists of records of the form [key, value] can be merged into a single, sorted list by:

- iterating through the larger of the two sorted lists.
    - comparing the first (assumed smallest) element of the smaller list to the first (assumed smallest) element of the larger list
    - popping the smallest of the elements from the above comparison and placing it at the bottom of a new list to hold the single merged list
    - the above is repeated until one of the two lists is empty
    - finally, the remainding list is appended to the bottom of the single merged list (because itself is sorted)

This sorting technique is used in the shuffle phase of Hadoop MapReduce: the shuffle occurs after the mappers have output the intermediate key value records, but before these intermediate records are passed to the reducers. Specifically, after the mapper tasks have been executed, the intermediate records are written to memory (and later to disk if there is enough data), the records are then partitioned based on their keys, and then mergesorted.

What is  a combiner function in the context of Hadoop? Give an example where it can be used and justify why it should be used in the context of this problem.

__ANSWER:__

In the context of Hadoop, a combiner function is a function that aggregates intermediate results from mapper tasks before these are fed to reducer tasks downstream in the MapReduce framework of Hadoop. The Hadoop MapReduce framework reserves the right to use combiners (or not) at its discretion regardless of the instructions presented by the programmer to the framework. As such, it is critical that the combiner functions not only ingest intermediate records in the same format as those fed to the reducers downstream, but they must also produce aggregated intermediate records that are identical in format to the un-aggregated records output by the mapper tasks upstream. As a sort of 'mini reducers', combiners are meant to accomplish the important tasks of (1) minimizing the number of key-value pairs that are  shuffled across the network from the mappers to the reducers and (2) reducing the risk of reducer tasks 'lagging' due to aggregating across keys that have a very large list of values.

An example of when combiners can (and should) be used is the simple word count exercise! Combiners should be used in word count to achieve tasks (1) and (2) mentioned above. Particularly, the frequency of words varies widely in a corpus; in an English corpus, the token "the" is likely to be encountered a very large number of times and considerably more times than many other tokens in the corpus. As such the reducer assigned to aggregate the counts for the token will likely lag behind other reducers processing rarer tokens; furthermore, there will be a very large number of "the\t1" intermediate records that will be shuffled across the network. Combiner tasks will critically reduce the amount of work that the reducers assigned to process common tokens (such as "the") will need to do while also minimizing the amount of data that is shuffled across the network (e.g. shuffling "the\tN" intermediate records as opposed to "the\t1").

Finally, it is important to note that the word count example (and any larger task that uses combiners for that matter) can be resolved (abeit consuming more resources) without the use of combiners.

What is the Hadoop shuffle?

__ANSWER__:

The Hadoop MapReduce shuffle is a critical step in a Hadoop MapReduce job; on a high level, it is a step that transfers intermediate records that are output in the map phase of a job by mappers to reducers such that they may be used as inputs to the reduce phase. During the shuffle the intermediate key value pairs are merge sorted and passed through the network from the mapper nodes to the reducer nodes.

#### HW3.1 Consumer complaints dataset

__Use Counters to do EDA (exploratory data analysis and to monitor progress)__

Counters are lightweight objects in Hadoop that allow you to keep track of system progress in both the map and reduce stages of processing. By default, Hadoop defines a number of standard counters in "groups"; these show up in the jobtracker webapp, giving you information such as "Map input records", "Map output records", etc. 

While processing information/data using MapReduce job, it is a challenge to monitor the progress of parallel threads running across nodes of distributed clusters. Moreover, it is also complicated to distinguish between the data that has been processed and the data which is yet to be processed. The MapReduce Framework offers a provision of user-defined Counters, which can be effectively utilized to monitor the progress of data across nodes of distributed clusters.

Use the Consumer Complaints  Dataset provide here to complete this question:

     https://www.dropbox.com/s/vbalm3yva2rr86m/Consumer_Complaints.csv?dl=0

The consumer complaints dataset consists of diverse consumer complaints, which have been reported across the United States regarding various types of loans. The dataset consists of records of the form:

|Complaint ID|Product|Sub-product|Issue|Sub-issue|State|ZIP code|Submitted via|Date received|Date sent to company|Company|Company response|Timely response?|Consumer disputed?|
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|1114245|Debt collection|Medical|Disclosure verification of debt|Not given enough info to verify debt|FL|32219|Web|11/13/2014|11/13/2014|"Choice Recovery, Inc."|Closed with explanation|Yes|
|1114488|Debt collection|Medical|Disclosure verification of debt|Right to dispute notice not received|TX|75006|Web|11/13/2014|11/13/2014|"Expert Global Solutions, Inc."|In progress|Yes|
|1114255|Bank account or service|Checking account|Deposits and withdrawals| |NY|11102|Web|11/13/2014|11/13/2014|"FNIS (Fidelity National Information Services, Inc.)"|In progress|Yes|
|1115106|Debt collection|"Other (phone, health club, etc.)"|Communication tactics|Frequent or repeated calls|GA|31721|Web|11/13/2014|11/13/2014|"Expert Global Solutions, Inc."|In progress|Yes|

__User-defined Counters__

Now, let’s use Hadoop Counters to identify the number of complaints pertaining to debt collection, mortgage and other categories (all other categories get lumped into this one) in the consumer complaints dataset. Basically produce the distribution of the Product column in this dataset using counters (limited to 3 counters here).

Hadoop offers Job Tracker, an UI tool to determine the status and statistics of all jobs. Using the job tracker UI, developers can view the Counters that have been created. Screenshot your  job tracker UI as your job completes and include it here. Make sure that your user defined counters are visible.

__ANSWER:__

Using Hadoop Counters, we identified that the number of complaints pertaining to __debt collection is 44372__, the number of complaints pertaining to __mortgage is 125752__, and the number of complaints pertaining to __other issues is 142789__.

In [2]:
%%writefile counterMapper.py
#!/usr/bin/python
"""
Code for counterMapper.py for W261 HW3.0
"""
__author__ = "Carlos Eduardo Rodriguez Castillo"
__email__ = "cerodriguez@berkeley.edu"

import sys
import re

def map_function(record):
    record = record.strip()
    record_parameters = record.split(",")
    product = record_parameters[1]
    return (product, 1)

if __name__ == "__main__":
    for line in sys.stdin:
        product_type, count = map_function(line)
        print "%s\t%d" % (product_type, int(count))

Writing counterMapper.py


In [3]:
!chmod a+x counterMapper.py

In [14]:
%%writefile counterReducer.py
#!/usr/bin/python
"""
Code for counterReducer.py for W261 HW3.0
"""
__author__ = "Carlos Eduardo Rodriguez Castillo"
__email__ = "cerodriguez@berkeley.edu"

import sys
import re

def reduce_function(record):
    record = record.strip()
    record_parameters = record.split("\t")
    product_type = record_parameters[0]
    count = int(record_parameters[1])
    if product_type == "Mortgage":
        sys.stderr.write("reporter:counter:Product,Mortgage,1\n")
    elif product_type == "Debt collection":
        sys.stderr.write("reporter:counter:Product,Debt collection,1\n")
    else:
        sys.stderr.write("reporter:counter:Product,Other,1\n")
if __name__ == "__main__":
    for line in sys.stdin:
        reduce_function(line)

Overwriting counterReducer.py


In [15]:
!chmod a+x counterReducer.py

In [11]:
!hdfs dfs -copyFromLocal /home/cloudera/w261/HW3/data/* \
/user/cloudera/w261/HW3/data

In [74]:
!hdfs dfs -rm -r /user/cloudera/w261/HW3/output-3_0
!hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-mr1.jar \
-D mapred.map.tasks=1 \
-D mapred.reduce.tasks=4 \
-mapper /home/cloudera/w261/HW3/src/counterMapper.py \
-reducer /home/cloudera/w261/HW3/src/counterReducer.py \
-input /user/cloudera/w261/HW3/data/* \
-output /user/cloudera/w261/HW3/output-3_0

16/06/01 10:33:50 INFO fs.TrashPolicyDefault: Moved: 'hdfs://quickstart.cloudera:8020/user/cloudera/w261/HW3/output-3_0' to trash at: hdfs://quickstart.cloudera:8020/user/cloudera/.Trash/Current/user/cloudera/w261/HW3/output-3_0
packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob2418945648414801191.jar tmpDir=null
16/06/01 10:33:54 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
16/06/01 10:33:55 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
16/06/01 10:33:55 INFO mapred.FileInputFormat: Total input paths to process : 1
16/06/01 10:33:55 INFO mapreduce.JobSubmitter: number of splits:1
16/06/01 10:33:56 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
16/06/01 10:33:56 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/06/01 10:33:56 INFO mapreduce.JobSubmitter: Submitting tok

#### HW3.2 Analyze the performance of your Mappers, Combiners and Reducers using Counters

For this brief study the Input file will be one record (the next line only):

foo foo quux labs foo bar quux

Perform a word count analysis of this single record dataset using a Mapper and Reducer based WordCount (i.e., no combiners are used here) using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing this word count job. The answer  should be 1 and 4 respectively. Please explain.

Please use multiple mappers and reducers for these jobs (at least 2 mappers and 2 reducers).

In [62]:
%%writefile mapperCounter3_2.py
#!/usr/bin/python
"""
Code for mapper counter for W261 HW3.2
"""
__author__ = "Carlos Eduardo Rodriguez Castillo"
__email__ = "cerodriguez@berkeley.edu"

import sys
import re

def map_initialize():
    sys.stderr.write("reporter:counter:Mapper,Calls,1\n")

def map_function(record):
    record = record.strip()
    #sys.stderr.write("mapper record:%s\n" % record)
    terms = record.split(" ")
    for term in terms:
        #sys.stderr.write("mapper term:%s\n" % term)
        print "%s\t1" % term

if __name__ == "__main__":
    map_initialize()
    for line in sys.stdin:
        intermediate_input = map_function(line)
        #print intermediate_input

Overwriting mapperCounter3_2.py


In [35]:
!chmod a+x mapperCounter3_2.py

In [8]:
%%writefile reducerCounter3_2.py
#!/usr/bin/python
"""
Code for reducer counter for W261 HW3.2
"""
__author__ = "Carlos Eduardo Rodriguez Castillo"
__email__ = "cerodriguez@berkeley.edu"

import sys
import re

def reduce_initialize():
    sys.stderr.write("reporter:counter:Reducer,Calls,1\n")
    init_word = ""
    return init_word

def reduce_function(record,current_word,current_count):
    record = record.strip()
    #sys.stderr.write("reducer record:%s\n" % record)
    word_count_pair = record.split("\t")
    word = word_count_pair[0]
    count = int(word_count_pair[1])
    if current_word == "" or current_word == word:
        current_word = word
        current_count = current_count + count
#         return (current_word, current_sum)
#     elif current_word == word:
#         current_word = word
#         current_count = current_count + count
    else:
        print "%s\t%d" % (current_word, current_count)
        current_word = word
        current_count = count
    return (current_word, current_count)
        
if __name__ == "__main__":
    #reduce_initialize()
    word = reduce_initialize()
    current_count = 0
    for line in sys.stdin:
        word, current_count = reduce_function(line, word, current_count)
    print "%s\t%d" % (word, current_count)

Overwriting reducerCounter3_2.py


In [43]:
!chmod a+x reducerCounter3_2.py

In [49]:
!cat /home/cloudera/w261/HW3/data/document3_2.txt

foo foo quux labs foo bar quux


In [72]:
!cat /home/cloudera/w261/HW3/data/document3_2.txt | ./mapperCounter3_2.py | sort -k1,1 | ./reducerCounter3_2.py

reporter:counter:Mapper,Calls,1
reporter:counter:Reducer,Calls,1
reporter:counter:Reducer,Calls,1
bar	1
foo	3
labs	1
quux	2


In [32]:
!hdfs dfs -copyFromLocal /home/cloudera/w261/HW3/data/document3_2.txt \
/user/cloudera/w261/HW3/data3_2

In [30]:
!hdfs dfs -mkdir /user/cloudera/w261/HW3/data3_2

In [8]:
!hdfs dfs -rm -r /user/cloudera/w261/HW3/output-3_2
!hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-mr1.jar \
-D mapreduce.job.maps=1 \
-numReduceTasks 4 \
-mapper /home/cloudera/w261/HW3/src/mapperCounter3_2.py \
-reducer /home/cloudera/w261/HW3/src/reducerCounter3_2.py \
-input /user/cloudera/w261/HW3/data3_2/* \
-output /user/cloudera/w261/HW3/output-3_2

rm: `/user/cloudera/w261/HW3/output-3_2': No such file or directory
packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob5284007959467736255.jar tmpDir=null
16/06/02 19:55:42 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
16/06/02 19:55:43 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
16/06/02 19:55:44 INFO mapred.FileInputFormat: Total input paths to process : 1
16/06/02 19:55:44 INFO mapreduce.JobSubmitter: number of splits:1
16/06/02 19:55:44 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1464465911163_0043
16/06/02 19:55:45 INFO impl.YarnClientImpl: Submitted application application_1464465911163_0043
16/06/02 19:55:45 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1464465911163_0043/
16/06/02 19:55:45 INFO mapreduce.Job: Running job: job_1464465911163_0043
16/06/02 19:55:55 INFO mapreduce.Job: Job job_14644659

In [9]:
!hdfs dfs -cat /user/cloudera/w261/HW3/output-3_2/*

quux	2
foo	3
bar	1
labs	1


Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper and Reducer based WordCount (i.e., no combiners used anywhere)  using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job. 

In [5]:
%%writefile mapper3_2_b.py
#!/usr/bin/python
"""
Code for mapper for W261 HW3.0.b
"""
__author__ = "Carlos Eduardo Rodriguez Castillo"
__email__ = "cerodriguez@berkeley.edu"

import sys
import re

def map_initialize():
    sys.stderr.write("reporter:counter:Mapper,Calls,1\n")

def map_function(record):
    record = record.strip()
    record_parameters = record.split(",")
    issue = record_parameters[3]
    issue_words = issue.split()
    for word in issue_words:
        count = 1
        print "%s\t%d" % (word, int(count))

if __name__ == "__main__":
    map_initialize()
    for line in sys.stdin:
        map_function(line)

Overwriting mapper3_2_b.py


In [11]:
!chmod a+x mapper3_2_b.py

In [None]:
!cat ./mapper3_2_b.py

In [3]:
%%writefile reducer3_2_b.py
#!/usr/bin/python
"""
Code for reducer for W261 HW3.2.b
"""
__author__ = "Carlos Eduardo Rodriguez Castillo"
__email__ = "cerodriguez@berkeley.edu"

import sys
import re

def reduce_initialize():
    sys.stderr.write("reporter:counter:Reducer,Calls,1\n")
    init_word = ""
    return init_word

def reduce_function(record,current_word,current_count):
    record = record.strip()
    word_count_pair = record.split("\t")
    word = word_count_pair[0]
    count = int(word_count_pair[1])
    if current_word == "" or current_word == word:
        current_word = word
        current_count = current_count + count
    else:
        print "%s\t%d" % (current_word, current_count)
        current_word = word
        current_count = count
    return (current_word, current_count)
        
if __name__ == "__main__":
    #reduce_initialize()
    word = reduce_initialize()
    current_count = 0
    for line in sys.stdin:
        word, current_count = reduce_function(line, word, current_count)
    print "%s\t%d" % (word, current_count)

Writing reducer3_2_b.py


In [6]:
!chmod a+x reducer3_2_b.py

In [13]:
!hdfs dfs -rm -r /user/cloudera/w261/HW3/output-3_2_b
!hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-mr1.jar \
-D mapreduce.job.maps=1 \
-numReduceTasks 4 \
-mapper /home/cloudera/w261/HW3/src/mapper3_2_b.py \
-reducer /home/cloudera/w261/HW3/src/reducer3_2_b.py \
-input /user/cloudera/w261/HW3/data/* \
-output /user/cloudera/w261/HW3/output-3_2_b

rm: `/user/cloudera/w261/HW3/output-3_2_b': No such file or directory
packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob8582929186455445908.jar tmpDir=null
16/06/02 20:02:46 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
16/06/02 20:02:46 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
16/06/02 20:02:47 INFO mapred.FileInputFormat: Total input paths to process : 1
16/06/02 20:02:47 INFO mapreduce.JobSubmitter: number of splits:1
16/06/02 20:02:48 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1464465911163_0044
16/06/02 20:02:48 INFO impl.YarnClientImpl: Submitted application application_1464465911163_0044
16/06/02 20:02:48 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1464465911163_0044/
16/06/02 20:02:48 INFO mapreduce.Job: Running job: job_1464465911163_0044
16/06/02 20:02:58 INFO mapreduce.Job: Job job_146446

In [17]:
!hdfs dfs -ls /user/cloudera/w261/HW3/output-3_2_b
!hdfs dfs -cat /user/cloudera/w261/HW3/output-3_2_b/* | head
!hdfs dfs -cat /user/cloudera/w261/HW3/output-3_2_b/* | tail

Found 5 items
-rw-r--r--   1 cloudera cloudera          0 2016-06-02 20:03 /user/cloudera/w261/HW3/output-3_2_b/_SUCCESS
-rw-r--r--   1 cloudera cloudera        491 2016-06-02 20:03 /user/cloudera/w261/HW3/output-3_2_b/part-00000
-rw-r--r--   1 cloudera cloudera        661 2016-06-02 20:03 /user/cloudera/w261/HW3/output-3_2_b/part-00001
-rw-r--r--   1 cloudera cloudera        548 2016-06-02 20:03 /user/cloudera/w261/HW3/output-3_2_b/part-00002
-rw-r--r--   1 cloudera cloudera        595 2016-06-02 20:03 /user/cloudera/w261/HW3/output-3_2_b/part-00003
"Account	16205
Account	350
Applied	139
Can't	1999
Cash	240
Cont'd	11848
Debt	1343
Delinquent	1061
I	925
Incorrect	29069
cat: Unable to write to output stream.
cat: Unable to write to output stream.
cat: Unable to write to output stream.
"Account	16205
Account	350
Applied	139
Can't	1999
Cash	240
Cont'd	11848
Debt	1343
Delinquent	1061
I	925
Incorrect	29069
"Loan	107254
"Making/receiving	3226
ATM	2422
Communication	6920
Dealing	1944
Improper	

Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper, Reducer, and standalone combiner (i.e., not an in-memory combiner) based WordCount using user defined Counters to count up how many time the mapper, combiner, reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job.

__ANSWER:__

*Note that I am using the mapper and reducer code from 3.2.b as it does not change for this exercise.*

In [21]:
%%writefile combiner3_2_c.py
#!/usr/bin/python
"""
This is the combiner code for HW3.2.c
"""
__author__ = "Carlos Eduardo Rodriguez Castillo"
__email__ = "cerodriguez@berkeley.edu"

import sys
import re

def combine_initialize():
    sys.stderr.write("reporter:counter:Combiner,Calls,1\n")
    init_word = ""
    return init_word

def combine_function(record,current_word,current_count):
    record = record.strip()
    word_count_pair = record.split("\t")
    word = word_count_pair[0]
    count = int(word_count_pair[1])
    if current_word == "" or current_word == word:
        current_word = word
        current_count = current_count + count
    else:
        print "%s\t%d" % (current_word, current_count)
        current_word = word
        current_count = count
    return (current_word, current_count)
    
if __name__ == "__main__":
    word = combine_initialize()
    current_count = 0
    for line in sys.stdin:
        word, current_count = combine_function(line, word, current_count)
    print "%s\t%d" % (word, current_count)

Overwriting combiner3_2_c.py


In [20]:
!chmod a+x combiner3_2_c.py

In [23]:
!hdfs dfs -rm -r /user/cloudera/w261/HW3/output-3_2_c
!hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-mr1.jar \
-D mapreduce.job.maps=1 \
-numReduceTasks 4 \
-mapper /home/cloudera/w261/HW3/src/mapper3_2_b.py \
-reducer /home/cloudera/w261/HW3/src/reducer3_2_b.py \
-combiner /home/cloudera/w261/HW3/src/combiner3_2_c.py \
-input /user/cloudera/w261/HW3/data/* \
-output /user/cloudera/w261/HW3/output-3_2_c

16/06/02 20:35:38 INFO fs.TrashPolicyDefault: Moved: 'hdfs://quickstart.cloudera:8020/user/cloudera/w261/HW3/output-3_2_c' to trash at: hdfs://quickstart.cloudera:8020/user/cloudera/.Trash/Current/user/cloudera/w261/HW3/output-3_2_c
packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob1137443623450988204.jar tmpDir=null
16/06/02 20:35:43 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
16/06/02 20:35:43 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
16/06/02 20:35:44 INFO mapred.FileInputFormat: Total input paths to process : 1
16/06/02 20:35:44 INFO mapreduce.JobSubmitter: number of splits:1
16/06/02 20:35:44 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1464465911163_0046
16/06/02 20:35:45 INFO impl.YarnClientImpl: Submitted application application_1464465911163_0046
16/06/02 20:35:45 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/p

__Using a single reducer:__ What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items).

__ANSWER:__

_Note: I am taking as input to this exercise the results from the word count analysis in the previous exercise._

In [27]:
%%writefile identityMapper.py
#!/usr/bin/python

#need to specify an identity mapper in order to trigger the sort in Hadoop
#
#Can also use the identtiy mapper:  -mapper /bin/cat \  
#
import sys

# input comes from STDIN
for line in sys.stdin:
    # minimal feature engineering: setting tokens to all be lower case
    # for the purposes of having case insensitive secondary sort
    line = line.lower()
    print line.strip()
    #print output to stadout/screen 
    #sys.stderr.write("reporter:status:processing line [%s]" % (line.strip()))

Overwriting identityMapper.py


In [21]:
!chmod a+x identityMapper.py

In [28]:
%%writefile reducer3_2_d.py
#!/usr/bin/python
"""
Code for reducer for W261 HW3.2.b
"""
__author__ = "Carlos Eduardo Rodriguez Castillo"
__email__ = "cerodriguez@berkeley.edu"

import sys
import re
from operator import itemgetter

def reduce_initialize():
    sys.stderr.write("reporter:counter:Reducer,Calls,1\n")
    init_word = ""
    return init_word

def reduce_function(record, current_total_terms):
    record = record.strip()
    word_count_pair = record.split("\t")
    word = word_count_pair[0]
    count = int(word_count_pair[1])
#     if current_word == "" or current_word == word:
#         current_word = word
#         current_count = current_count + count
#     else:
#         #print "%s\t%d" % (current_word, current_count)
#         output_array.extend([{"count":current_count,"word":word}])
#         current_word = word
#         current_count = count
    current_total_terms = current_total_terms + count
    return (word, count, current_total_terms)
        
if __name__ == "__main__":
    #reduce_initialize()
    output_array = []
    word = reduce_initialize()
#     current_count = 0
#     total_words = 0
    total_terms = 0
    for line in sys.stdin:
        word, word_count, total_terms = reduce_function(line, total_terms)
        output_array.extend([{"word":word,"count":word_count}])
#         total_words = total_words + int(current_count)
    #print "%s\t%d" % (word, current_count)
#     output_array = sorted(output_array, key=lambda k: (k["count"],k["word"]), reverse=True)
    #output_array = sorted(output_array, key= itemgetter("word"))
    #output_array = sorted(output_array, key=itemgetter("count"), reverse=True)
#     print output_array
    print "\n########################################################"
    print "### TOP 50 TERMS BY FREQUENCY WITH RELATIVE FREQUENCY###"
    print "########################################################\n"
    for i in range(0,50):
        print "%s\t%d\t%.10f" % (output_array[i]["word"], output_array[i]["count"], float(output_array[i]["count"]/float(total_terms)))
    #output_array = sorted(output_array, key=itemgetter("word"))
    #output_array = sorted(output_array, key=itemgetter("count"))
#     output_array = sorted(output_array, key=lambda k: (k["count"],k["word"]))
    print "\n###########################################################"
    print "### BOTTOM 10 TERMS BY FREQUENCY WITH RELATIVE FREQUENCY###"
    print "###########################################################\n"
    for i in range(1,11):
        i = -1*i
        print "%s\t%d\t%.10f" % (output_array[i]["word"], output_array[i]["count"], float(output_array[i]["count"]/float(total_terms)))

Overwriting reducer3_2_d.py


In [43]:
!chmod a+x reducer3_2_d.py

In [30]:
# Testing code for HW3.2.d
!hdfs dfs -cat /user/cloudera/w261/HW3/output-3_2_c/*  \
| ./identityMapper.py \
| sort -k2,2nr -k1,1r | ./reducer3_2_d.py

reporter:counter:Reducer,Calls,1

########################################################
### TOP 50 TERMS BY FREQUENCY WITH RELATIVE FREQUENCY###
########################################################

"loan	107254	0.1095956200
modification	70487	0.0720259055
servicing	36767	0.0375697145
credit	36126	0.0369147199
report	30546	0.0312128947
on	29069	0.0297036481
information	29069	0.0297036481
incorrect	29069	0.0297036481
or	22533	0.0230249511
debt	17966	0.0183582422
and	16448	0.0168071005
opening	16205	0.0165587952
"account	16205	0.0165587952
credit	14768	0.0150904220
health	12545	0.0128188884
club	12545	0.0128188884
/	12386	0.0126564170
not	12353	0.0126226965
loan	12237	0.0125041640
owed	11848	0.0121066711
cont'd	11848	0.0121066711
collect	11848	0.0121066711
attempts	11848	0.0121066711
of	10885	0.0111226465
my	10731	0.0109652843
withdrawals	10555	0.0107854417
deposits	10555	0.0107854417
problems	9484	0.0096910592
"application	8625	0.0088133051
to	8401	0.0085844146
billing	8158	0.008

In [32]:
!hdfs dfs -rm -r /user/cloudera/w261/HW3/output-3_2_d
!hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-mr1.jar \
-D stream.num.map.output.key.fields=2 \
-D mapreduce.job.maps=1 \
-D mapreduce.job.output.key.comparator.class=\
org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options="-k2,2nr -k1,1" \
-numReduceTasks 1 \
-mapper /home/cloudera/w261/HW3/src/identityMapper.py \
-reducer /home/cloudera/w261/HW3/src/reducer3_2_d.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input /user/cloudera/w261/HW3/output-3_2_c/* \
-output /user/cloudera/w261/HW3/output-3_2_d
!hdfs dfs -cat /user/cloudera/w261/HW3/output-3_2_d/*

16/06/05 06:45:41 INFO fs.TrashPolicyDefault: Moved: 'hdfs://quickstart.cloudera:8020/user/cloudera/w261/HW3/output-3_2_d' to trash at: hdfs://quickstart.cloudera:8020/user/cloudera/.Trash/Current/user/cloudera/w261/HW3/output-3_2_d1465134341181
packageJobJar: [] [/usr/jars/hadoop-streaming-2.6.0-cdh5.7.0.jar] /tmp/streamjob4013988832558737341.jar tmpDir=null
16/06/05 06:45:44 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
16/06/05 06:45:44 INFO client.RMProxy: Connecting to ResourceManager at quickstart.cloudera/10.0.2.15:8032
16/06/05 06:45:45 INFO mapred.FileInputFormat: Total input paths to process : 4
16/06/05 06:45:45 INFO mapreduce.JobSubmitter: number of splits:4
16/06/05 06:45:46 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1465099569246_0010
16/06/05 06:45:46 INFO impl.YarnClientImpl: Submitted application application_1465099569246_0010
16/06/05 06:45:46 INFO mapreduce.Job: The url to track the job: http://quickstart.cl

#### HW3.2.1 Using two reducers

Using two reducers: What are the top 50 most frequent terms in your word count analysis?

Present the top 50 terms and their frequency and their relative frequency. Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). Please use a combiner.

In [103]:
%%writefile mapperWithPartitionTable.py
#!/usr/bin/python

import sys

total_term_count = 0
values = []
# using the word count output to determine the partitions
f = open('word_count_output','r')
for line in f:
    line = line.strip()
    key, value = line.split("\t")
    values.extend([int(value)])

values = sorted(values)
median = int(values[(len(values) / 2)])

for line in sys.stdin:
    line = line.strip()
    # minimal feature engineering: setting tokens to all be lower case
    # for the purposes of having case insensitive secondary sort
    line = line.lower()
    key, value = line.split("\t")
    value = int(value)
    if value <= median:
        print "group1\t%s" % (line)
    else:
        print "group2\t%s" % (line)

Overwriting mapperWithPartitionTable.py


In [35]:
!chmod a+x mapperWithPartitionTable.py

In [104]:
!hdfs dfs -cat /user/cloudera/w261/HW3/output-3_2_c/* > word_count_output
!hdfs dfs -cat /user/cloudera/w261/HW3/output-3_2_c/*  \
| ./mapperWithPartitionTable.py | head
!rm word_count_output

group2	"account	16205
group1	account	350
group1	applied	139
group1	can't	1999
group1	cash	240
group2	cont'd	11848
group1	debt	1343
group1	delinquent	1061
group1	i	925
group2	incorrect	29069


In [105]:
%%writefile combineWithPartitionTable.py
#!/usr/bin/python
"""
This is the combiner code for HW3.2.2
"""
__author__ = "Carlos Eduardo Rodriguez Castillo"
__email__ = "cerodriguez@berkeley.edu"

import sys
import re

# NOTE:
# The exercise mentions that a combiner should be used, but for the
# purposes of sorting the output of the word count analysis
# combining is not required...

if __name__ == "__main__":
    sys.stderr.write("reporter:counter:Combiner,Calls,1\n")
    output_array = []
    cur_count = 0
    for line in sys.stdin:
        line = line.strip()
        print line
#         group, key, value = line.split("\t")
#         output_array.extend([{"group":group,"word":key,"count":int(value)}])
#     for line in output_array:
#         print "%s\t%s\t%d\t%d" % (line["group"], line["word"], line["count"], cur_count)
#         word, current_count = combine_function(line, word, current_count)
#     print "%s\t%d" % (word, current_count)

Overwriting combineWithPartitionTable.py


In [49]:
!chmod a+x combineWithPartitionTable.py

In [106]:
!hdfs dfs -cat /user/cloudera/w261/HW3/output-3_2_c/* > word_count_output
!hdfs dfs -cat /user/cloudera/w261/HW3/output-3_2_c/*  \
| ./mapperWithPartitionTable.py \
| ./combineWithPartitionTable.py | sort -k3,3nr -k2,2 | head
!rm word_count_output

reporter:counter:Combiner,Calls,1
group2	"loan	107254
group2	modification	70487
group2	servicing	36767
group2	credit	36126
group2	report	30546
group2	incorrect	29069
group2	information	29069
group2	on	29069
group2	or	22533
group2	debt	17966


In [107]:
%%writefile reducerWithPartitionKey.py
#!/usr/bin/python

import sys

cur_key = None
cur_count = 0
total_tokens = 0
output_array = []
sys.stderr.write("reporter:counter:Reducer Counters,Calls,1\n")

# using the word count output to determine the total tokens
# for relative frequency
f = open('word_count_output','r')
for line in f:
    line = line.strip()
    key, value = line.split("\t")
    total_tokens += int(value)

for line in sys.stdin:
    line = line.strip()
    group, key, value = line.split("\t") #one minor modification to process the parition key. I.e., drop it
    value = int(value)
#     sys.stderr.write("LINE: %s" %line)
#     sys.stderr.write("GROUP: %s" %group)
#     sys.stderr.write("KEY: %s" %key)
#     sys.stderr.write("VALUE: %s" %value)
#     sys.stderr.write("TEMP_COUNT: %s" %temp_count)
#     output_array.extend([{"word":key,"count":int(value)}])
#     print output_array
    print "%s\t%d\t%.10f" %(key,value,(float(value)/float(total_tokens)))
# for record in output_array:
#     print "%s\t%d\t%.10f" % (record["word"], record["count"], (record["count"]/float(cur_count)))
# print "\n########################################################"
# print "### TOP 50 TERMS BY FREQUENCY WITH RELATIVE FREQUENCY###"
# print "########################################################\n"
# for i in range(0,50):
#     print "%s\t%d\t%.10f" % (output_array[i]["word"], output_array[i]["count"], float(output_array[i]["count"]/float(cur_count)))

# print "\n###########################################################"
# print "### BOTTOM 10 TERMS BY FREQUENCY WITH RELATIVE FREQUENCY###"
# print "###########################################################\n"
# for i in range(1,11):
#     i = -1*i
#     print "%s\t%d\t%.10f" % (output_array[i]["word"], output_array[i]["count"], float(output_array[i]["count"]/float(cur_count)))

Overwriting reducerWithPartitionKey.py


In [40]:
!chmod a+x reducerWithPartitionKey.py

In [108]:
!hdfs dfs -cat /user/cloudera/w261/HW3/output-3_2_c/* > word_count_output
!hdfs dfs -cat /user/cloudera/w261/HW3/output-3_2_c/*  \
| ./mapperWithPartitionTable.py \
| ./combineWithPartitionTable.py \
| sort -k3,3nr -k2,2 | ./reducerWithPartitionKey.py | head
!rm word_count_output

reporter:counter:Reducer Counters,Calls,1
reporter:counter:Combiner,Calls,1
"loan	107254	0.1095956200
modification	70487	0.0720259055
servicing	36767	0.0375697145
credit	36126	0.0369147199
report	30546	0.0312128947
incorrect	29069	0.0297036481
information	29069	0.0297036481
on	29069	0.0297036481
or	22533	0.0230249511
debt	17966	0.0183582422


#### HW3.3 Shopping Cart Analysis

Product Recommendations: The action or practice of selling additional products or services 
to existing customers is called cross-selling. Giving product recommendation is 
one of the examples of cross-selling that are frequently used by online retailers. 
One simple method to give product recommendations is to recommend products that are frequently
browsed together by the customers.
	
For this homework use the online browsing behavior dataset located at: 

       https://www.dropbox.com/s/zlfyiwa70poqg74/ProductPurchaseData.txt?dl=0

Each line in this dataset represents a browsing session of a customer. 
On each line, each string of 8 characters represents the id of an item browsed during that session. 
The items are separated by spaces.

Here are the first few lines of the ProductPurchaseData 
FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 
GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 
ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 
ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 
ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444 


Do some exploratory data analysis of this dataset guided by the following questions:. 

How many unique items are available from this supplier?

Using a single reducer: Report your findings such as number of unique products; largest basket; report the top 50 most frequently purchased items,  their frequency,  and their relative frequency (break ties by sorting the products alphabetical order) etc. using Hadoop Map-Reduce. 
