# DATASCI W261: Machine Learning at Scale
## Section 3
## Homework, Week 3
## Name: T.Thomas
## Email: tgthomas@berkeley.edu
## Submission Date:  ****** PST

### nbviewer link:


### pdf link:



## Week 2 ASSIGNMENTS using Hadoop Streaming and Python

#### Reference Links



## HW 3.0.  

### What is a merge sort? Where is it used in Hadoop?

In computer science, merge sort (also commonly spelled mergesort) is an efficient, general-purpose, comparison-based sorting algorithm. 

Merge sort is a comparison based recursive sorting algorithm.
Conceptually there are two steps :

   1. Divide the unsorted list into **n** sublists, each containing 1 element (base caase is a list of one element, and it is considered sorted).
   2. Repeatedly merge sublists to generate new sorted sublists until 1 sublist remains which will be the sorted list.


<img src="http://interactivepython.org/runestone/static/pythonds/_images/mergesortB.png" height="300" width="300" align=right  hspace=100 vspace=10 border=100>
<img src="http://interactivepython.org/runestone/static/pythonds/_images/mergesortA.png" height="300" width="300" align=right  hspace=100 vspace=10 border=100>



**Source:* http://interactivepython.org/runestone/static/pythonds/SortSearch/TheMergeSort.html*



It turns out that sorting partially sorted lists is much more efficient in terms of operations and memory consumption than sorting the complete list.

If the reducer gets 4 sorted lists it only needs to look for the smallest element of the 4 lists and pick that one. If the number of lists is constant this reducing is an O(N) operation.

Also typically the reducers are also "distributed" in something like a tree, so the work can be parrallelized too.

### How is  a combiner function in the context of Hadoop? 


Combiner Functions
Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks. Hadoop allows the user to specify a combiner function to be run on the map output, and the combiner function’s output forms the input to the reduce function. Because the combiner function is an optimization, Hadoop does not provide a guarantee of how many times it will call it for a particular map output record, if at all. In other words, calling the combiner function zero, one, or many times should produce the same output from the reducer.

Running the combiner function makes for a more compact map output, so there is less data to write to local disk and to transfer to the reducer.


If a combiner function is used, then it has the same form as the reduce function (and is an implementation of Reducer), except its output types are the intermediate key and value types (K2 and V2), so they can feed the reduce function:

    map: $(K1, V1) → list(K2, V2)$
    combiner: $(K2, list(V2)) → list(K2, V2)$
    reduce: $(K2, list(V2)) → list(K3, V3)$

### Give an example where it can be used and justify why it should be used in the context of this problem.

### What is the Hadoop shuffle?

Shuffle is the main heart of MapReduce

The Mapper and Reducer's have a contract that all similar keys go to the same reducer. This means that after the mapper is done, there needs to be an intermediate sorting step where the intermediate map results are sorted to meet this inherent contract.

This process by which the system performs the sort and transfers the map outputs to the reducers as inputs is known as the shuffle.

The MR framework accomplishes this in 3 steps 
	- Partition
	- Sortreal
	- Combine (in memory If possible or on disk)


## HW3.1 Use Counters to do EDA (exploratory data analysis and to monitor progress)

Counters are lightweight objects in Hadoop that allow you to keep track of system progress in both the map and reduce stages of processing. By default, Hadoop defines a number of standard counters in "groups"; these show up in the jobtracker webapp, giving you information such as "Map input records", "Map output records", etc. 

While processing information/data using MapReduce job, it is a challenge to monitor the progress of parallel threads running across nodes of distributed clusters. Moreover, it is also complicated to distinguish between the data that has been processed and the data which is yet to be processed. The MapReduce Framework offers a provision of user-defined Counters, which can be effectively utilized to monitor the progress of data across nodes of distributed clusters.

Use the Consumer Complaints  Dataset provide here to complete this question:

     https://www.dropbox.com/s/vbalm3yva2rr86m/Consumer_Complaints.csv?dl=0

The consumer complaints dataset consists of diverse consumer complaints, which have been reported across the United States regarding various types of loans. The dataset consists of records of the form:

Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed?

Here’s is the first few lines of the  of the Consumer Complaints  Dataset:


    Complaint ID,Product,Sub-product,Issue,Sub-issue,State,ZIP code,Submitted via,Date received,Date sent to company,Company,Company response,Timely response?,Consumer disputed?
    1114245,Debt collection,Medical,Disclosure verification of debt,Not given enough info to verify debt,FL,32219,Web,11/13/2014,11/13/2014,"Choice Recovery, Inc.",Closed with explanation,Yes,
    1114488,Debt collection,Medical,Disclosure verification of debt,Right to dispute notice not received,TX,75006,Web,11/13/2014,11/13/2014,"Expert Global Solutions, Inc.",In progress,Yes,
    1114255,Bank account or service,Checking account,Deposits and withdrawals,,NY,11102,Web,11/13/2014,11/13/2014,"FNIS (Fidelity National Information Services, Inc.)",In progress,Yes,
    1115106,Debt collection,"Other (phone, health club, etc.)",Communication tactics,Frequent or repeated calls,GA,31721,Web,11/13/2014,11/13/2014,"Expert Global Solutions, Inc.",In progress,Yes,

### User-defined Counters

#### Now, let’s use Hadoop Counters to identify the number of complaints pertaining to debt collection, mortgage and other categories (all other categories get lumped into this one) in the consumer complaints dataset. Basically produce the distribution of the Product column in this dataset using counters (limited to 3 counters here).

#### Hadoop offers Job Tracker, an UI tool to determine the status and statistics of all jobs. Using the job tracker UI, developers can view the Counters that have been created. Screenshot your  job tracker UI as your job completes and include it here. Make sure that your user defined counters are visible. 


#### Mapper to test counters in Hadoop

In [33]:
%%writefile counter_test_mapper.py
#!/usr/bin/python
## counter_test_mapper.py
## Author: Tigi Thomas
## Description: mapper code for HW 3.1 
import csv
import sys

def printcounters(counters):
    for counter, total in counters.iteritems():
        sys.stderr.write("reporter:counter:ComplaintCategory,{0},{1}\n".format(counter,total))
        
counters = {} #keep all the counters.
header_done = False #Just keep tracks if we are done reading the header

# input comes from STDIN (standard input)
for row in csv.reader(iter(sys.stdin.readline, '')):
    if not header_done:
        header_done = True;
    else:
        category = row[1].lower()
        if(category == 'debt collection' or category == 'mortgage'):
            counters[row[1]] = counters.get(row[1], 0) + 1
        else:
            counters["Other"] = counters.get("Other", 0) + 1
        print row[1] + '\t1'

printcounters(counters)

Overwriting counter_test_mapper.py


#### Reducer with counter 

In [50]:
%%writefile wordcount_reducer.py
#!/usr/bin/python
## basic_reducer.py
##     - This is a very basic reducer to reduce
##       Word,count
## Author: Tigi Thomas
## Description: reducer code for HW 3.1 

from operator import itemgetter
import sys

#This custom counter , counts how many reducers.
sys.stderr.write("reporter:counter:HowMany,Reducer,1\n")

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

Overwriting wordcount_reducer.py


#### Test Mapper & Reducer using unix commandline.

In [36]:
# Test
#!cat Consumer_Complaints.csv | python counter_test_mapper.py | sort -k1,1 | python wordcount_reducer.py

#### Run MapReduce Job via Hadoop Streaming

In [21]:
# Check contents of hdfs folder.
#!hdfs dfs -ls /user/dsq

In [37]:
#Folder already exists
#!hdfs dfs -mkdir -p /user/dsq

#Clear up prev inputs/outputs.
!echo
!echo "Cleaning out old input and output files..."
!echo "------------------------------------------"
!echo
!hdfs dfs -rm /user/dsq/Consumer_Complaints.csv
!hdfs dfs -rm /user/dsq/countertestoutput/*
!hdfs dfs -rmdir /user/dsq/countertestoutput
!hdfs dfs -put Consumer_Complaints.csv /user/dsq


Cleaning out old input and output files...
------------------------------------------

16/01/31 19:32:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 19:32:31 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/Consumer_Complaints.csv
16/01/31 19:32:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 19:32:34 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/countertestoutput/_SUCCESS
16/01/31 19:32:34 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/countertestoutput/part-00000
16/01/31 19:32:34 INFO fs.TrashPolicyDefault: Namenode trash configurat

In [38]:
!$HADOOP_INSTALL/bin/hadoop jar $HADOOP_INSTALL/hadoop-streaming-2.7.1.jar \
-D mapreduce.job.maps=2 \
-D mapreduce.job.reduces=2 \
-D mapred.job.name="Testing Counters" \
-mapper counter_test_mapper.py -file counter_test_mapper.py \
-reducer wordcount_reducer.py -file wordcount_reducer.py \
-input Consumer_Complaints.csv -file Consumer_Complaints.csv \
-output countertestoutput


16/01/31 19:32:52 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/01/31 19:32:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [counter_test_mapper.py, counter_test_reducer.py, Consumer_Complaints.csv] [] /tmp/streamjob8197482647832701843.jar tmpDir=null
16/01/31 19:32:55 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/01/31 19:32:55 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/01/31 19:32:55 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/01/31 19:32:55 INFO mapred.FileInputFormat: Total input paths to process : 1
16/01/31 19:32:56 INFO mapreduce.JobSubmitter: number of splits:1
16/01/31 19:32:56 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduc

***From Log***

    ComplaintCategory
		Debt collection=44372
		Mortgage=125752
		Other=142788

## HW 3.2 Analyze the performance of your Mappers, Combiners and Reducers using Counters

For this brief study the Input file will be one record (the next line only): 

   **foo foo quux labs foo bar quux**

* Perform a word count analysis of this single record dataset using a Mapper and Reducer based WordCount (i.e., no combiners are used here) using user defined Counters to count up how many time the mapper and reducer are called. 
* What is the value of your user defined Mapper Counter, and Reducer Counter after completing this word count job. 

The answer  should be 1 and 4 respectively. Please explain.

Please use mulitple mappers and reducers for these jobs (at least 2 mappers and 2 reducers).

* **3.2.a** Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper and Reducer based WordCount (i.e., no combiners used anywhere)  using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job. 

* **3.2.b** Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper, Reducer, and standalone combiner (i.e., not an in-memory combiner) based WordCount using user defined Counters to count up how many time the mapper, combiner, reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job.

* **3.2.c** Using a single reducer: What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). 


### (3.2.a) Perform a word count analysis of the Issue column of the Consumer Complaints Dataset using a Mapper and Reducer based WordCount (i.e., no combiners used anywhere) using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job. 

In [79]:
%%writefile howmanymrinput.txt
foo foo quux labs foo bar quux

Writing howmanymrinput.txt


#### Mapper

In [41]:
%%writefile howmany_mapper.py
#!/usr/bin/python
## howmany_mapper.py
##   - simple mapper with mapper level counter to see 
##     how many mappers are created.
## Author: Tigi Thomas
## Description: mapper code for HW 3.2 

import sys
# input comes from STDIN (standard input)

sys.stderr.write("reporter:counter:HowMany,Mapper,1\n")
    
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)

Overwriting howmany_mapper.py


#### Reducer : We use the simple Wordcount Reducer from HW 3.1

#### Test 

In [45]:
#Generete the sorted output just with the reducer
#!cat howmanymrinput.txt | python howmany_mapper.py | sort -k1,1  | python wordcount_reducer.py

In [46]:
#Create folder if needed (already exists)
#!hdfs dfs -mkdir -p /user/dsq

#Clear up prev inputs/outputs.
!echo
!echo "Cleaning out old input and output files..."
!echo "------------------------------------------"
!echo
!hdfs dfs -rm /user/dsq/howmanymrinput.txt
!hdfs dfs -rm /user/dsq/howmanymroutput/*
!hdfs dfs -rmdir /user/dsq/howmanymroutput
!hdfs dfs -put howmanymrinput.txt /user/dsq


Cleaning out old input and output files...
------------------------------------------

16/01/31 19:37:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 19:37:41 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/howmanymrinput.txt
16/01/31 19:37:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 19:37:44 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/howmanymroutput/_SUCCESS
16/01/31 19:37:44 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/howmanymroutput/part-00000
16/01/31 19:37:44 INFO fs.TrashPolicyDefault: Namenode trash configuration: Dele

In [47]:
!echo
!echo "Running Hadoop Map Reducer for 3.2..."
!echo "------------------------------------------"
!echo
!$HADOOP_INSTALL/bin/hadoop jar $HADOOP_INSTALL/hadoop-streaming-2.7.1.jar \
-D mapreduce.job.maps=1 \
-D mapreduce.job.reduces=4 \
-D mapreduce.job.name="Testing Counters How Many Mappers and Reducers 3.2.a" \
-mapper howmany_mapper.py -file howmany_mapper.py \
-reducer howmany_reducer.py -file wordcount_reducer.py \
-input howmanymrinput.txt -file howmanymrinput.txt \
-output howmanymroutput \


Running Hadoop Map Reducer for 3.2.a...
------------------------------------------

16/01/31 19:41:11 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/01/31 19:41:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [howmany_mapper.py, howmany_reducer.py, howmanymrinput.txt] [] /tmp/streamjob8442593972214384666.jar tmpDir=null
16/01/31 19:41:12 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/01/31 19:41:12 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/01/31 19:41:12 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/01/31 19:41:13 INFO mapred.FileInputFormat: Total input paths to process : 1
16/01/31 19:41:13 INFO mapreduce.JobSubmitter: number of splits:1
16/01/31 19:41:13 INFO mapreduce

#### 3.2.a Number of Mappers and Reducers    

    HowMany
		Mapper=1
		Reducer=4

### (3.2.a) Perform a word count analysis of the Issue column of the Consumer Complaints  Dataset using a Mapper and Reducer based WordCount (i.e., no combiners used anywhere)  using user defined Counters to count up how many time the mapper and reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job. 


In [105]:
%%writefile issuecount_mapper.py
#!/usr/bin/python
## issuecount_mapper.py
##    - mapper for word & count from 
##      issue field in the Complaints dataset.csv
## Author: Tigi Thomas
## Description: mapper code for HW 3.2.a
import sys
import re
import csv

#This will be our mapper count.
sys.stderr.write("reporter:counter:HowMany,Mapper,1\n")

#fixed list of stop words
# ---------------------------------------
#Not used...
'''
with open (stopwords, "rU") as stopwordf:
     for line in stopwordf:
        line = line.lower()
        stopwords = line.split(",")
'''
# --------------------------------------

#'''
# Stop words to use 
stopwords = ['a','able','about','across','after','all','almost','also','am','among',
             'an','and','any','are','as','at','be','because','been','but','by','can',
             'cannot','could','dear','did','do','does','either','else','ever','every',
             'for','from','get','got','had','has','have','he','her','hers','him','his',
             'how','however','i','if','in','into','is','it','its','just','least','let',
             'like','likely','may','me','might','most','must','my','neither','no','nor',
             'not','of','off','often','on','only','or','other','our','own','rather','said',
             'say','says','she','should','since','so','some','than','that','the','their',
             'them','then','there','these','they','this','tis','to','too','twas','us',
             'wants','was','we','were','what','when','where','which','while','who',
             'whom','whya','will','with','would','yet','you','your']
#'''

#Use some pre-compiled re-gex for punctuation and numbers (exclude period and comma)
punctpattern = re.compile(r'[\.\^\$\*\+\-\=\{\}\[\]\\\|\(\)<>-@&#%_=!?:;,/\'\"]')
#since we are going to consider period and comma as delimitter in addition to space
nonprintable = re.compile('[^\s!-~]')
#take out numbers..
numpatt1 = re.compile(r'\b[0-9]{1,100}\b')
numpatt2 = re.compile(r'[0-9]{1,100}\b')
numpatt3 = re.compile(r'\b[0-9]{1,100}')

# Preprocess any text to remove punctuations, numbers, convert to lower etc.
def preprocess_txt(s): 
    s = s.lower()
    s = re.sub(nonprintable, r' ', s)
    s = re.sub(punctpattern, r' ', s)
    s = re.sub(numpatt1, r'', s )
    s = re.sub(numpatt2, r'', s )
    s = re.sub(numpatt3, r'', s )
    return s

# Print a set of counters...
def printcounters(counters):
    for counter, total in counters.iteritems():
        sys.stderr.write("reporter:counter:ComplaintCategory,{0},{1}\n".format(counter,total))

#Just keep tracks if we are done reading the header
header_done = False 
total_words = 0

# input comes from STDIN (standard input)
#   since the data is comma separated, we use the csv.reader
#   to get worry free comma delimitted values , even those
#   inside quotes.
for row in csv.reader(iter(sys.stdin.readline, '')):
    if not header_done:
        header_done = True;
    else:
        
        # issue text is in the 4th field.
        # remove leading and trailing whitespace
        issuelog = row[3].strip()

        # pre-process the line
        issuelog = preprocess_txt(issuelog)

        # split the line into words
        words = issuelog.split()

        #filter out the stop words
        filtered_words = [tk for tk in words if tk not in stopwords]

        # increase counters
        for word in filtered_words:
            # write the results to STDOUT 
            # tab-delimited; the trivial word count is 1
            total_words += 1
            print word + '\t1'

print '**total\t{0}'.format(total_words)

Overwriting issuecount_mapper.py


#### Reducer : We use the simple Wordcount Reducer from HW 3.1

In [52]:
#Clear up prev inputs/outputs.
!echo
!echo "Cleaning out old input and output files..."
!echo "------------------------------------------"
!echo
!hdfs dfs -rm /user/dsq/Consumer_Complaints.csv
!hdfs dfs -rm /user/dsq/issuecountoutput/*
!hdfs dfs -rmdir /user/dsq/issuecountoutput



Cleaning out old input and output files...
------------------------------------------

16/01/31 19:53:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 19:53:36 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/Consumer_Complaints.csv
16/01/31 19:53:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 19:53:39 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/issuecountoutput/_SUCCESS
16/01/31 19:53:39 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/issuecountoutput/part-00000
16/01/31 19:53:40 WARN util.NativeCodeLoader: Unable to load native-hadoo

In [53]:
!echo
!echo "Running Hadoop Map Reducer for 3.2.a..."
!echo "------------------------------------------"
!echo

!$HADOOP_INSTALL/bin/hadoop jar $HADOOP_INSTALL/hadoop-streaming-2.7.1.jar \
-D mapreduce.job.maps=2 \
-D mapreduce.job.reduces=2 \
-D mapreduce.job.name="Issue word count Mapper with Counters 3.2.a" \
-mapper issuecount_mapper.py  -file issuecount_mapper.py \
-reducer wordcount_reducer.py -file wordcount_reducer.py \
-input Consumer_Complaints.csv -file Consumer_Complaints.csv \
-output issuecountoutput


Running Hadoop Map Reducer for 3.2.b...
------------------------------------------

16/01/31 19:59:48 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/01/31 19:59:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [issuecount_mapper.py, wordcount_reducer.py, Consumer_Complaints.csv] [] /tmp/streamjob4828918602925064173.jar tmpDir=null
16/01/31 19:59:50 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/01/31 19:59:51 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/01/31 19:59:51 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/01/31 19:59:51 INFO mapred.FileInputFormat: Total input paths to process : 1
16/01/31 19:59:51 INFO mapreduce.JobSubmitter: number of splits:1
16/01/31 19:59:52 INFO

#### Number of Mappers and Reducers    

    HowManyMappers
		Mapper=1
	HowManyReducers
		Reducer=2

In [54]:
#!hdfs dfs -ls issuecountoutput

In [55]:
#!hdfs dfs -cat issuecountoutput/part-00000 | head -3

### (3.2.b) Perform a word count analysis of the Issue column of the Consumer Complaints Dataset using a Mapper, Reducer, and standalone combiner (i.e., not an in-memory combiner) based WordCount using user defined Counters to count up how many time the mapper, combiner, reducer are called. What is the value of your user defined Mapper Counter, and Reducer Counter after completing your word count job.

***NOTE:***

**For this question, Creating a separate Stand-alone Combiner and use with previously created mapper and reducer** 

In [30]:
%%writefile issuecount_combiner.py
#!/usr/bin/python
## issuecount_combiner.py
##    - simple word count combiner
##      for intermediate sorting.
## Author: Tigi Thomas
## Description: mapper code for HW 3.2.a

import sys
# input comes from STDIN (standard input)

sys.stderr.write("reporter:counter:HowMany,Combiner,1\n")
    
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)

Writing issuecount_combiner.py


In [77]:
#Clear up prev inputs/outputs.
!echo
!echo "Cleaning out old input and output files..."
!echo "------------------------------------------"
!echo
!hdfs dfs -rm /user/dsq/Consumer_Complaints.csv
!hdfs dfs -rm /user/dsq/issuecountoutput/*
!hdfs dfs -rmdir /user/dsq/issuecountoutput
!hdfs dfs -put Consumer_Complaints.csv /user/dsq


Cleaning out old input and output files...
------------------------------------------

16/01/31 22:52:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 22:52:24 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/Consumer_Complaints.csv
16/01/31 22:52:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `/user/dsq/issuecountoutput/*': No such file or directory
16/01/31 22:52:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/31 22:52:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [78]:
!echo
!echo "Running Hadoop Map Reducer for 3.2.b.. "
!echo "---------------------------------------"
!echo

!$HADOOP_INSTALL/bin/hadoop jar $HADOOP_INSTALL/hadoop-streaming-2.7.1.jar \
-D mapreduce.job.maps=2 \
-D mapreduce.job.reduces=2 \
-D mapreduce.job.name="Testing How Many Mappers and Reducers again 3.2.b" \
-mapper issuecount_mapper.py  -file issuecount_mapper.py \
-reducer wordcount_reducer.py -file wordcount_reducer.py \
-combiner wordcount_reducer.py -file wordcount_reducer.py \
-input Consumer_Complaints.csv -file Consumer_Complaints.csv \
-output issuecountoutput


Running Hadoop Map Reducer for 3.2.b.. 
---------------------------------------

16/01/31 22:53:50 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/01/31 22:53:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [issuecount_mapper.py, wordcount_reducer.py, wordcount_reducer.py, Consumer_Complaints.csv] [] /tmp/streamjob1191859889868039290.jar tmpDir=null
16/01/31 22:53:53 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/01/31 22:53:53 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/01/31 22:53:53 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/01/31 22:53:54 INFO mapred.FileInputFormat: Total input paths to process : 1
16/01/31 22:53:54 INFO mapreduce.JobSubmitter: number of splits:1
16/

#### Number of Combiners, Mappers and Reducers    
    HowManyCombiners
		Combiner=2
	HowManyMappers
		Mapper=1
	HowManyReducers
		Reducer=2

### (3.2.c) Using a single reducer: What are the top 50 most frequent terms in your word count analysis? Present the top 50 terms and their frequency and their relative frequency. Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. Present bottom 10 tokens (least frequent items). 

In [133]:
%%writefile issuecount_mapper2.py
#!/usr/bin/python
## issuecount_mapper2.py
##    - mapper for word & count from 
##      issue field in the Complaints dataset.csv
## Author: Tigi Thomas
## Description: mapper code for HW 3.2.a
import sys
import re
import csv

#This will be our mapper count.
sys.stderr.write("reporter:counter:HowMany,Mapper,1\n")

#fixed list of stop words
# ---------------------------------------
#Not used...
'''
with open (stopwords, "rU") as stopwordf:
     for line in stopwordf:
        line = line.lower()
        stopwords = line.split(",")
'''
# --------------------------------------

#'''
# Stop words to use 
stopwords = ['a','able','about','across','after','all','almost','also','am','among',
             'an','and','any','are','as','at','be','because','been','but','by','can',
             'cannot','could','dear','did','do','does','either','else','ever','every',
             'for','from','get','got','had','has','have','he','her','hers','him','his',
             'how','however','i','if','in','into','is','it','its','just','least','let',
             'like','likely','may','me','might','most','must','my','neither','no','nor',
             'not','of','off','often','on','only','or','other','our','own','rather','said',
             'say','says','she','should','since','so','some','than','that','the','their',
             'them','then','there','these','they','this','tis','to','too','twas','us',
             'wants','was','we','were','what','when','where','which','while','who',
             'whom','whya','will','with','would','yet','you','your']
#'''

#Use some pre-compiled re-gex for punctuation and numbers (exclude period and comma)
punctpattern = re.compile(r'[\.\^\$\*\+\-\=\{\}\[\]\\\|\(\)<>-@&#%_=!?:;,/\'\"]')
#since we are going to consider period and comma as delimitter in addition to space
nonprintable = re.compile('[^\s!-~]')
#take out numbers..
numpatt1 = re.compile(r'\b[0-9]{1,100}\b')
numpatt2 = re.compile(r'[0-9]{1,100}\b')
numpatt3 = re.compile(r'\b[0-9]{1,100}')

# Preprocess any text to remove punctuations, numbers, convert to lower etc.
def preprocess_txt(s): 
    s = s.lower()
    s = re.sub(nonprintable, r' ', s)
    s = re.sub(punctpattern, r' ', s)
    s = re.sub(numpatt1, r'', s )
    s = re.sub(numpatt2, r'', s )
    s = re.sub(numpatt3, r'', s )
    return s

# Print a set of counters...
def printcounters(counters):
    for counter, total in counters.iteritems():
        sys.stderr.write("reporter:counter:ComplaintCategory,{0},{1}\n".format(counter,total))

#Just keep tracks if we are done reading the header
header_done = False 
total_words = 0

# input comes from STDIN (standard input)
#   since the data is comma separated, we use the csv.reader
#   to get worry free comma delimitted values , even those
#   inside quotes.
for row in csv.reader(iter(sys.stdin.readline, '')):
    if not header_done:
        header_done = True;
    else:
        
        # issue text is in the 4th field.
        # remove leading and trailing whitespace
        issuelog = row[3].strip()

        # pre-process the line
        issuelog = preprocess_txt(issuelog)

        # split the line into words
        words = issuelog.split()

        #filter out the stop words
        filtered_words = [tk for tk in words if tk not in stopwords]

        # increase counters
        for word in filtered_words:
            # write the results to STDOUT 
            # tab-delimited; the trivial word count is 1
            total_words += 1
            print word + '\t1'

print '**total\t{0}'.format(total_words)

Overwriting issuecount_mapper2.py


In [134]:
%%writefile issuecount_reducer.py
#!/usr/bin/python
## issuecount_reducer.py
##     - This reducer is for counting word
##       freq and relative word freq
## Author: Tigi Thomas
## Description: reducer code for HW 3.2.c 

from operator import itemgetter
import sys

#This custom counter , counts how many reducers.
sys.stderr.write("reporter:counter:HowMany,Reducer,1\n")

current_word = None
current_count = 0
word = None
total_count = 0

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t')
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:        
        current_count += count
    else:
        if current_word and current_word != "**total":
                print '{0}\t{1}\t{2:.5f}'.format(current_word, current_count, current_count/float(total_count))
        elif (current_word == "**total"):
                total_count = current_count    
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
     print '{0}\t{1}\t{2:.5f}'.format(current_word, current_count, current_count/float(total_count))

Overwriting issuecount_reducer.py


In [129]:
# Test
!cat Consumer_Complaints.csv | \
python issuecount_mapper2.py | \
LC_ALL=C sort -k1,1 | \
python issuecount_reducer.py > issuereduceroutput.txt

#!cat Consumer_Complaints.csv | python issuecount_mapper.py | sort -k1,1 | python wordcount_reducer.py

reporter:counter:HowMany,Mapper,1
reporter:counter:HowMany,Reducer,1


In [135]:
#Clear up prev inputs/outputs.
!echo
!echo "Cleaning out old input and output files..."
!echo "------------------------------------------"
!echo
!hdfs dfs -rm /user/dsq/Consumer_Complaints.csv
!hdfs dfs -rm /user/dsq/issuecountoutput/*
!hdfs dfs -rmdir /user/dsq/issuecountoutput
!hdfs dfs -put Consumer_Complaints.csv /user/dsq


Cleaning out old input and output files...
------------------------------------------

16/02/01 00:15:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 00:15:02 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/Consumer_Complaints.csv
16/02/01 00:15:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `/user/dsq/issuecountoutput/*': No such file or directory
16/02/01 00:15:06 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 00:15:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Display 50 Top

**NOTE**


In [136]:
#-D stream.num.map.output.key.fields=2 \
#-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
#-D mapred.text.key.comparator.options="-k2,2nr -k1,1" \
!echo
!echo "Running Hadoop Map Reducer for 3.2.c.. "
!echo "---------------------------------------"
!echo

!$HADOOP_INSTALL/bin/hadoop jar $HADOOP_INSTALL/hadoop-streaming-2.7.1.jar \
-D mapreduce.job.maps=2 \
-D mapreduce.job.reduces=1 \
-D mapreduce.job.name="Top 50 word count 3.2" \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k2,2nr -k1,1" \
-mapper issuecount_mapper.py -file issuecount_mapper.py \
-reducer issuecount_reducer.py -file issuecount_reducer.py \
-input Consumer_Complaints.csv -file Consumer_Complaints.csv \
-output issuecountoutput


Running Hadoop Map Reducer for 3.2.c.. 
---------------------------------------

16/02/01 00:15:21 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 00:15:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
packageJobJar: [issuecount_mapper.py, issuecount_reducer.py, Consumer_Complaints.csv] [] /tmp/streamjob7798050576392304059.jar tmpDir=null
16/02/01 00:15:24 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/02/01 00:15:24 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/02/01 00:15:24 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/02/01 00:15:26 INFO mapred.FileInputFormat: Total input paths to process : 1
16/02/01 00:15:26 INFO mapreduce.JobSubmitter: number of splits:1
16/02/01 00:15:26 INFO C

In [167]:
# Check the output from first mapper reducer
!hdfs dfs -cat issuecountoutput/part-00000 | head -10

16/02/01 00:39:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
account	57448	0.04852
acct	163	0.00014
action	2964	0.00250
advance	240	0.00020
advertising	1193	0.00101
amount	98	0.00008
amt	71	0.00006
application	8868	0.00749
applied	139	0.00012
apply	118	0.00010


#### Write another mapper/reducer, just for sorting.

In [177]:
%%writefile sort_mapper.py
#!/usr/bin/python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
    word, count, freq = line.strip().split('\t')
    print '{0}\t{1}\t{2}'.format(word, count, freq)

Overwriting sort_mapper.py


In [178]:
%%writefile sort_reducer.py
#!/usr/bin/python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
    word, count, freq = line.strip().split('\t')
    print '{0}\t{1}\t{2}'.format(word, count, freq)

Overwriting sort_reducer.py


In [179]:
!rm HW32c_Issuecountoutput_Result.txt
!hdfs dfs -copyToLocal issuecountoutput/part-00000 HW32c_Issuecountoutput_Result.txt
!cat HW32c_Issuecountoutput_Result.txt | head -10

16/02/01 00:51:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 00:51:14 WARN hdfs.DFSClient: DFSInputStream has been closed already
account	57448	0.04852
acct	163	0.00014
action	2964	0.00250
advance	240	0.00020
advertising	1193	0.00101
amount	98	0.00008
amt	71	0.00006
application	8868	0.00749
applied	139	0.00012
apply	118	0.00010


In [186]:
# Test
!cat HW32c_Issuecountoutput_Result.txt | python sort_mapper.py | python sort_reducer.py > sortreducerout.txt
!cat sortreducerout.txt| head -10
#| \
#python issuecount_reducer.py > issuereduceroutput.txt

account	57448	0.04852
acct	163	0.00014
action	2964	0.00250
advance	240	0.00020
advertising	1193	0.00101
amount	98	0.00008
amt	71	0.00006
application	8868	0.00749
applied	139	0.00012
apply	118	0.00010


In [185]:
!echo
!echo "Cleaning up previous run results.. "
!echo "---------------------------------------"
!echo
!hdfs dfs -rm /user/dsq/issuecountoutputsorted/*
!hdfs dfs -rmdir /user/dsq/issuecountoutputsorted

!echo
!echo "Running Hadoop Map Reducer for 3.2.c.. "
!echo "---------------------------------------"
!echo

!$HADOOP_INSTALL/bin/hadoop jar $HADOOP_INSTALL/hadoop-streaming-2.7.1.jar \
-D mapreduce.job.maps=2 \
-D mapreduce.job.reduces=1 \
-D mapreduce.job.name="Top 50 word count" \
-D stream.num.map.output.key.fields=2 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k2,2nr -k1,1" \
-mapper sort_mapper.py -file sort_mapper.py \
-reducer sort_reducer.py -file sort_reducer.py \
-input issuecountoutput/* \
-output issuecountoutputsorted



Cleaning up previous run results.. 
---------------------------------------

16/02/01 00:53:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/02/01 00:53:17 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/issuecountoutputsorted/_SUCCESS
16/02/01 00:53:17 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /user/dsq/issuecountoutputsorted/part-00000
16/02/01 00:53:18 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Running Hadoop Map Reducer for 3.2.c.. 
---------------------------------------

16/02/01 00:53:21 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
16/02/01 00:53:21 WARN util.NativeCodeLoader: Unable t

#### Present the top 50 terms and their frequency and their relative frequency. If there are ties please sort the tokens in alphanumeric/string order. 

In [189]:
!hdfs dfs -cat issuecountoutputsorted/part-00000 | head -50

16/02/01 00:56:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
loan	119630	0.10104
collection	72394	0.06114
foreclosure	70487	0.05953
modification	70487	0.05953
account	57448	0.04852
credit	55251	0.04666
payments	39993	0.03378
escrow	36767	0.03105
servicing	36767	0.03105
report	34903	0.02948
incorrect	29133	0.02460
information	29069	0.02455
debt	27874	0.02354
closing	19000	0.01605
attempts	17972	0.01518
collect	17972	0.01518
cont	17972	0.01518
d	17972	0.01518
owed	17972	0.01518
management	16205	0.01369
opening	16205	0.01369
deposits	10555	0.00891
withdrawals	10555	0.00891
problems	9484	0.00801
application	8868	0.00749
communication	8671	0.00732
tactics	8671	0.00732
broker	8625	0.00728
mortgage	8625	0.00728
originator	8625	0.00728
unable	8178	0.00691
billing	8158	0.00689
disclosure	7655	0.00647
verification	7655	0.00647
disputes	6938	0.00586
reporting	6559	0.00554
lease	6337	0.00535
being	5663	0.00478
c

#### Present bottom 10 tokens (least frequent items). 

In [190]:
!hdfs dfs -cat issuecountoutputsorted/part-00000 | tail -10

16/02/01 00:56:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
apply	118	0.00010
amount	98	0.00008
credited	92	0.00008
payment	92	0.00008
checks	75	0.00006
convenience	75	0.00006
amt	71	0.00006
day	71	0.00006
disclosures	64	0.00005
missing	64	0.00005
