# <span style="color:darkgreen">DATSCIW261 ASSIGNMENT 2</span>
#### MIDS UC Berkeley, Machine Learning at Scale

<b>AUTHOR</b> : Rajesh Thallam <br>
<b>EMAIL</b>  : rajesh.thallam@ischool.berkeley.edu <br>
<b>WEEK</b>   : 2 <br>
<b>DATE</b>   : 15-Sep-15

<h3><span style="color:dodgerblue;font:12px">HW2.0</span></h3> 
<span style="color:firebrick">What is a race condition in the context of parallel computation? Give an example. What is MapReduce? How does it differ from Hadoop? Which programming paradigm is Hadoop based on? Explain and give a simple example in code and show the code running.</span>

<span style="color:CornflowerBlue "><b>What is a race condition in the context of parallel computation? Give an example.<b></span><br>
**Race condition**  is *consequence of simultaneous access of a shared data resource when two or more asynchronous (parallel) threads attempt to access and modify a shared resource*. Since the application is unknown of the order in which the threads access and modify the resource, the output is ambiguous. One of the ways to avoid the race condition is using mutex which basically allows for acquiring and releasing lock on the shared resource.

One of the common example I have encountered is multiple threads attempting to increment value of global variable. Imagine a global variable **p** accessed by two threads A and B to increment value by +1 using ++ (increment) operation. Increment operator performs three steps (i) read variable (ii) increment value and (iii) store variable. So increment is not an atomic operation.

```
# global variable p with current value
p = 18

# THREAD A
p++ 
# value will be 19

# THREAD B at same time as THREAD A or just little after
p++ 
# it still sees p as 18 and attempts to increment p to 19
```
At the end of the operation, we see that the value of p in both threads is 19 instead of 19 (A) and 20 (B).

<span style="color:CornflowerBlue "><b>What is MapReduce? How does it differ from Hadoop?<b></span><br>

**MapReduce** is a functional programming design pattern accepting functions as arguments. This programming paradigm allows parallel data processing of embarrassingly parallel data problems. The **map** part of the progam chunks incoming data in parallel as defined by the number of mappers. Then the **reduce** part folds or combines the results of mappers to generate final result of the problem. <br>

**Hadoop** is a framework built on MapReduce programming paradigm (data processing) and Hadoop file system (data storage) to solve the large data set problems in an embarrassingly parallel way by moving MapReduce program near to the data storage to process the data. The framework provides a distributed data handling capability combined with distributed computation by concealing system level details to the programmer. The framework also accomodates necessary fault tolerance and resiliency built into the application.

<span style="color:CornflowerBlue "><b>Explain and give a simple example in code and show the code running.</span><b></span><br>

In [1]:
# this simple example calculates word counts in given strings
import itertools

# define mapper to split word and count as 1
def mapper(key, value):
    return [(word,1) for word in value.split()]

# define reducer to sum counts of a given word
def reducer(key, values):
    return (key, sum(values))

# tie map and reduce phases
def map_reduce(lines, mapper,reducer):
    map_out = []
    
    # call mapper
    for (key,value) in lines.items():
        map_out.extend(mapper(key, value))

    # partition mapper output
    groups = {}
    for key, group in itertools.groupby(sorted(map_out), lambda x: x[0]):
        groups[key] = list([y for x, y in group])
  
    # reduce phase to output counts
    return [reducer(key, groups[key]) for key in groups] 

# feed input and call map reduce
lines = {}
lines["1"] = "foo bar foo bar foo bar foo foo foo bax lines line"
lines["2"] = "hello world this is foo bar"
map_reduce(lines, mapper, reducer)

[('bar', 4),
 ('this', 1),
 ('is', 1),
 ('lines', 1),
 ('bax', 1),
 ('world', 1),
 ('line', 1),
 ('foo', 7),
 ('hello', 1)]

<b>Preparation for HW2_*<b>

In [2]:
# stop hadoop
!ssh hduser@rtubuntu /usr/local/hadoop/sbin/stop-yarn.sh
!ssh hduser@rtubuntu /usr/local/hadoop/sbin/stop-dfs.sh

stopping yarn daemons
no resourcemanager to stop
localhost: no nodemanager to stop
no proxyserver to stop
Stopping namenodes on [localhost]
localhost: no namenode to stop
localhost: no datanode to stop
Stopping secondary namenodes [0.0.0.0]
0.0.0.0: stopping secondarynamenode


In [3]:
# start hadoop
!ssh hduser@rtubuntu /usr/local/hadoop/sbin/start-yarn.sh
!ssh hduser@rtubuntu /usr/local/hadoop/sbin/start-dfs.sh

starting yarn daemons
starting resourcemanager, logging to /usr/local/hadoop/logs/yarn-hduser-resourcemanager-rtubuntu.out
localhost: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-hduser-nodemanager-rtubuntu.out
Starting namenodes on [localhost]
localhost: starting namenode, logging to /usr/local/hadoop/logs/hadoop-hduser-namenode-rtubuntu.out
localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-hduser-datanode-rtubuntu.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-hduser-secondarynamenode-rtubuntu.out


In [4]:
# create necessary directories
!hdfs dfs -mkdir /hw2

15/09/15 01:30:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
mkdir: `/hw2': File exists


<h3><span style="color:dodgerblue;font:12px">HW2.1</span></h3> 
<span style="color:firebrick">Sort in Hadoop MapReduce. Given as input: Records of the form (integer, "NA"), where integer is any integer, and "NA" is just the empty string. Output: sorted key value pairs of the form (integer, "NA"); what happens if you have multiple reducers? Do you need additional steps? Explain.</span><br><br>
<span style="color:firebrick">Write code to generate N  random records of the form (integer, "NA"). Let N = 10,000.</span><br>
<span style="color:firebrick">Write the python Hadoop streaming map-reduce job to perform this sort.</span><br>

<span style="color:CornflowerBlue "><b>What happens if you have multiple reducers? Do you need additional steps? Explain.</b></span><br>
When there are multiple reducers, each reducer will sort the data chunks sent to each reducer from the partition phase of mapreduce. The default partitioning uses hash code mod number of reducers i.e. if there are 5 reducers then there will be 5 output files, each sorted with overlapping ranges. In order to avoid the overlapping ranges we either need one reducer or make the partitioner more aware of the nature of the keys. For example, make partitioner to direct all of the keys within a range (say 1 to 2000) to the same partition. Thus, there will be multiple files in the output but all the files will have sorted data without overlapping ranges. <br>

<span style="color:CornflowerBlue "><b>Generate Input</b></span><br>
`gen_in_hw2_1.py` script generates input file for the mapreduce program to generate 10000 random numbers.

In [5]:
%%writefile gen_in_hw2_1.py
#!/usr/bin/python
import random

N = 10000
# used random.sample to avoid replacement of same numbers
r = random.sample(range(N), N)

for n in r:
    print "{0} {1}".format(n, "NA")

Overwriting gen_in_hw2_1.py


In [6]:
!./gen_in_hw2_1.py > hw2_1.txt
!head hw2_1.txt

9662 NA
4030 NA
6587 NA
9595 NA
9528 NA
6418 NA
2197 NA
5853 NA
9532 NA
4327 NA


<span style="color:CornflowerBlue "><b>Mapper</b></span><br>
This is an identity mapper as hadoop streaming needs atleast one mapper. This mapper just prints the input

In [26]:
%%writefile mapper.py
#!/usr/bin/env python
import sys

for line in sys.stdin:    
    print "%s" % (line.strip())

Overwriting mapper.py


<span style="color:CornflowerBlue "><b>Reducer</b></span><br>
This is an identity reducer as the intention is sort the mapper output as is and the shuffle/sort phase is handled by the hadoop streaming (or hadoop framework)

In [27]:
%%writefile reducer.py
#!/usr/bin/env python
import sys

for line in sys.stdin:    
    print "%s" % (line.strip())

Overwriting reducer.py


<span style="color:CornflowerBlue "><b>Preparing to run the job</b></span><br>

In [9]:
# Use chmod for permissions
!chmod a+x mapper.py
!chmod a+x reducer.py

In [None]:
!hdfs dfs -mkdir /hw2/hw2_1
!hdfs dfs -mkdir /hw2/hw2_1/src
!hdfs dfs -put ./hw2_1.txt /hw2/hw2_1/src

<span style="color:CornflowerBlue "><b>Driver Function</b></span><br>
Driver function calls the hadoop streaming job after purging previously generated target files (to avoid the `'File Already Exists'` error). Few points to notice

- used KeyFieldBasedComparator and key.comparator.options to sort the data from the mapper. This is provided by the Hadoop Streaming jar
- number of mappers is set to 10
- number of reducers is set to 1
- output first few lines from the output of the job

In [28]:
# HW 2.1: execute hadoop streaming job to generate and sort 
#         10K random integers
def hw2_1():
    # cleanup target directory
    !hdfs dfs -rm -R /hw2/hw2_1/tgt
    
    !echo "sample input data"
    !hdfs dfs -cat /hw2/hw2_1/src/hw2_1.txt | head

    # run map reduce job
    !hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
    -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -D mapred.text.key.comparator.options=-k1,1n \
    -Dmapreduce.job.maps=10 \
    -Dmapreduce.job.reduces=1 \
    -files mapper.py,reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input /hw2/hw2_1/src/hw2_1.txt \
    -output /hw2/hw2_1/tgt
    
    print "\n"
    !echo "partial output data"
    !hdfs dfs -cat /hw2/hw2_1/tgt/part-00000 | head

hw2_1()

15/09/15 01:42:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 01:42:17 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /hw2/hw2_1/tgt
sample input data
15/09/15 01:42:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1956 NA
2198 NA
2266 NA
2762 NA
6692 NA
1838 NA
953 NA
1389 NA
4361 NA
9687 NA
cat: Unable to write to output stream.
15/09/15 01:42:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 01:42:22 INFO Configuration.deprecation: mapred.output.key.comparator.class is deprecated. Instead, use mapreduce.job.output.key.comparator.class
15/09/15 01:42:22 INFO Configuration.deprecation: mapred.text.key.comparator.options is deprecated. Inste

<h3><span style="color:dodgerblue;font:12px">HW2.2</span></h3> 
<span style="color:firebrick">Using the Enron data from HW1 and Hadoop MapReduce streaming, write mapper/reducer pair that  will determine the number of occurrences of a single, user-specified word. Examine the word “assistance” and report your results. To do so, make sure that</span><br><br>
<span style="color:firebrick">- mapper.py counts all occurrences of a single word, and</span><br>
<span style="color:firebrick">- reducer.py collates the counts of the single word.</span>

<span style="color:CornflowerBlue "><b>Assumptions<b></span>

1. For this problem, both email body and subject is considered for classification
2. Removed punctuations, special characters from email content

<span style="color:CornflowerBlue "><b>Mapper<b></span>

In [29]:
%%writefile mapper.py
#!/usr/bin/python
import traceback
import sys
import re

# read input parameters
find_word = sys.argv[1]

try:    
    for email in sys.stdin:
        # split email by tab (\t)
        mail = email.split('\t')
            
        # handle missing email content
        if len(mail) == 3:
            mail.append(mail[2])
            mail[2] = ""
        assert len(mail) == 4

        # email id
        email_id = mail[0]
        # email content - remove special characters and punctuations
        content = re.sub('[^A-Za-z0-9\s]+', '', mail[2] + " " +  mail[3])

        # find word with counts
        for word in content.split():
            if word == find_word:
                print '{}\t{}'.format(word, 1)
except Exception: 
    traceback.print_exc()

Overwriting mapper.py


In [30]:
%%writefile reducer.py
#!/usr/bin/python
import traceback
import sys

try:
    word_counts = {}

    # read each map output
    for line in sys.stdin:
        # parse mapper output
        word, count = line.strip('\n').split('\t')
        
        try:
            word_counts[word] += int(count)
        except:
            word_counts[word] = int(count)
        
    print word_counts
except Exception: 
    traceback.print_exc()

Overwriting reducer.py


<span style="color:CornflowerBlue "><b>Preparing to run the job</b></span><br>

In [None]:
# move source file to hdfs
!hdfs dfs -mkdir /hw2/hw2_2
!hdfs dfs -mkdir /hw2/hw2_2/src
!hdfs dfs -put ./enronemail_1h.txt /hw2/hw2_2/src

<span style="color:CornflowerBlue "><b>Driver Function</b></span><br>

In [15]:
# HW 2.2  Mapper/reducer pair to determine the number of occurrences 
#         of a single, user-specified word

def hw2_2(word):
    # cleanup target directory
    !hdfs dfs -rm -R /hw2/hw2_2/tgt
    
    # run map reduce job
    !hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
    -Dmapreduce.job.maps=10 \
    -Dmapreduce.job.reduces=1 \
    -files mapper.py,reducer.py \
    -mapper 'mapper.py {word}' \
    -reducer reducer.py \
    -input /hw2/hw2_2/src/enronemail_1h.txt \
    -output /hw2/hw2_2/tgt

    print "\nOUTPUT"
    # display count on the screen
    print "output from mapper/reducer to determine the number of occurrences of word assistance"
    !hdfs dfs -cat /hw2/hw2_2/tgt/part-00000

    # CROSSCHECK
    print "\nCROSSCHECK"
    print "output from command line mapper/reducer"
    ! grep assistance enronemail_1h.txt | awk -F'\t' '{print $3, $4}' | grep -o assistance | wc -l
        
hw2_2("assistance")

15/09/15 01:30:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 01:30:46 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /hw2/hw2_2/tgt
15/09/15 01:30:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 01:30:49 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/09/15 01:30:49 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/09/15 01:30:49 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
15/09/15 01:30:49 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/15 01:30:50 INFO mapreduce.JobSubmitter: number of splits:1
15/09/15 01:30:50 INFO mapreduce.JobSubmitter: Subm

<h3><span style="color:dodgerblue;font:12px">HW2.3</span></h3> 
<span style="color:firebrick">Using the Enron data from HW1 and Hadoop MapReduce that will classify the email messages by a single, user-specified word. Examine the word “assistance” and report your results. To do so, make sure that:</span><br>

   <span style="color:firebrick">  - mapper.py</span><br>
   <span style="color:firebrick">  - reducer.py that performs a single word multinomial Naive Bayes classification</span>

<span style="color:CornflowerBlue "><b>Assumptions<b></span>

1. Based on the instructions on LMS, only email body is considered for classification
2. The mapper takes care of classifiation based on user specified single word, multiple words or all words (*)
3. The reducer would require additional logic to handle special requirements when all words are used in the classifier

<span style="color:CornflowerBlue "><b>Mapper<b></span><br>
I chose mapper output to contain following fields for eeah email in the input data set
- email_id (as key)
- spam/ham indicator
- total number of words in eah email
- number of ocurrences of each word in the vocab

In [16]:
%%writefile mapper.py
#!/usr/bin/python
import traceback
import sys
import re

from collections import Counter

# read input parameters
find_words = sys.argv[1:]

try:
    search_all = 0

    # custom logic to handle all words (*)
    if find_words[0] == "*":
        search_all = 1
        word_list = []
    else:
        word_list = find_words
    
    for email in sys.stdin:
        # split email by tab (\t)
        mail = email.split('\t')
            
        # handle missing email content
        if len(mail) == 3:
            mail.append(mail[2])
            mail[2] = ""
        assert len(mail) == 4

        # email id
        email_id = mail[0]
        # spam/ham binary indicator
        is_spam = mail[1]
        # email content - remove special characters and punctuations
        #content = re.sub('[^A-Za-z0-9\s]+', '', mail[2] + " " +  mail[3])
        content = re.sub('[^A-Za-z0-9\s]+', '', mail[3])
        # count number of words
        content_wc = len(content.split())

        # find words with counts - works for single word or list of words
        # custom logic to handle all words (*)
        if search_all == 1:
            hits = Counter(content.split())
        else:
            find_words = re.compile("|".join(r"\b%s\b" % w for w in word_list))
            hits = Counter(re.findall(find_words, content))

        hits = {k: v for k, v in hits.iteritems()}
        
        # emit tuple delimited by |
        # (email id, spam ind, content word count, word hit counts)
        print "{} | {} | {} | {}".format(email_id, is_spam, content_wc, hits)
except Exception: 
    traceback.print_exc()

Overwriting mapper.py


<span style="color:CornflowerBlue "><b>Reducer<b></span><br>
Reducer does all the magic of training the classifier and predictions. The program preserves the output of mappers as a list after reading from standard in to use the mapper output as input for training and prediction. Based on the search term the program dynamically sets the vocabulary size. The output of the reducer is each email id with actual spam/ham indicator with prediction followed by accuracy. 

**NOTE** Even if a search term is not available in the training data set, vocabulatory includes the missing search term for calculations during Laplace smoothing.

In [17]:
%%writefile reducer.py
#!/usr/bin/python
import traceback
import math
import sys
import ast

from collections import Counter

# read input parameters
find_words = sys.argv[1:]

# vocab
vocab = find_words

try:
    spam_count = 0
    ham_count = 0
    spam_all_wc = 0
    ham_all_wc = 0
    spam_term_wc = {}
    ham_term_wc = {}
    pr_word_given_spam = {}
    pr_word_given_ham = {}

    # read each mapper output to loop during the prediction phase
    # after training the model
    map_output = []
    for line in sys.stdin:
        map_output.append(line)
    
    for email in map_output:
        # parse mapper output
        mail = email.split(" | ")
        # read spam/ham indicator, content word count, 
        is_spam = int(mail[1])
        content_wc = int(mail[2])
        hits = ast.literal_eval(mail[3])

        # capture counts required for naive bayes probabilities
        if is_spam:
            # spam mail count
            spam_count += 1
            # term count when spam
            spam_term_wc = dict(Counter(hits) + Counter(spam_term_wc))
            # all word count when spam
            spam_all_wc += content_wc
        else:
            # ham email count
            ham_count += 1
            # term count when ham
            ham_term_wc = dict(Counter(hits) + Counter(ham_term_wc))
            # all word count when ham
            ham_all_wc += content_wc

    vocab = dict(Counter(vocab) + Counter(spam_term_wc) + Counter(ham_term_wc))
    V = len(vocab) * 1.0
    print "vocab size = {}".format(V)
                        
    # calculate priors
    pr_spam_prior = (1.0 * spam_count) / (spam_count + ham_count)
    pr_ham_prior = (1.0 - pr_spam_prior)
    pr_spam_prior = math.log10(pr_spam_prior)
    pr_ham_prior = math.log10(pr_ham_prior)
    
    # calculate conditional probabilites with laplace smoothing = 1
    # pr_word_given_class = ( count(w, c) + 1 ) / (count(c) + 1 * |V|)
    for word in vocab:
        pr_word_given_spam[word] = math.log10((spam_term_wc.get(word, 0) + 1.0) / (spam_all_wc + V))
        pr_word_given_ham[word] = math.log10((ham_term_wc.get(word, 0) + 1.0) / (ham_all_wc + V))
    
    print "/*log probabilities*/"
    print "pr_spam_prior = {}".format(pr_spam_prior)
    print "pr_ham_prior = {}".format(pr_ham_prior)
    
    print "\n"
    print "{0: <50} | {1} | {2}".format("ID", "TRUTH", "CLASS")
    print "{0: <50}-+-{1}-+-{2}".format("-" * 50, "-" * 7, "-" * 10)

    # spam/ham prediction using Multinomial Naive Bayes priors and conditional probabilities
    accuracy = []

    for email in map_output:
        # initialize
        word_count = 0
        pred_is_spam = 0
        pr_spam = pr_spam_prior
        pr_ham = pr_ham_prior

        # parse mapper output
        mail = email.split(" | ")
        email_id = mail[0]
        is_spam = int(mail[1])
        hits = ast.literal_eval(mail[3])

        # number of search words
        word_count = sum(hits.values())

        # probability for each class for a given email
        # argmax [ log P(C) + sum( P(Wi|C) ) ]
        for word in vocab:
            pr_spam += (pr_word_given_spam.get(word, 0) * hits.get(word, 0))
            pr_ham += (pr_word_given_ham.get(word, 0) * hits.get(word, 0))

        # predict based on maximum likelihood
        if pr_spam > pr_ham: 
            pred_is_spam = 1

        # calculate accuracy
        accuracy.append(pred_is_spam==is_spam)
        
        print '{0:<50} | {1:<7} | {2:<10}'.format(email_id, is_spam, pred_is_spam)

    print "\n"
    print "/*accuracy*/"
    print "accuracy = {:.2f}".format(sum(accuracy) / float(len(accuracy)))
    
except Exception: 
    traceback.print_exc()

Overwriting reducer.py


<span style="color:CornflowerBlue "><b>Preparing to run the job</b></span><br>

In [None]:
!hdfs dfs -mkdir /hw2/hw2_3

<span style="color:CornflowerBlue "><b>Driver Function</b></span><br>

In [19]:
# HW 2.3  Mapper/reducer pair to classify the email messages by a single, 
#         user-specified word using the Naive Bayes Formulation
def hw2_3(word):
    # cleanup target directory
    !hdfs dfs -rm -R /hw2/hw2_3/tgt
    
    # run map reduce job
    !hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
    -Dmapreduce.job.maps=10 \
    -Dmapreduce.job.reduces=1 \
    -files mapper.py,reducer.py \
    -mapper 'mapper.py {word}' \
    -reducer 'reducer.py {word}' \
    -input /hw2/hw2_2/src/enronemail_1h.txt \
    -output /hw2/hw2_3/tgt

    print "\nOUTPUT"
    # display accuracy on the console
    print "Accuracy of the Naive Bayes classifier with single word '{}'\n".format(word)
    !hdfs dfs -cat /hw2/hw2_3/tgt/part-00000
        
hw2_3("assistance")

15/09/15 01:31:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 01:31:03 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /hw2/hw2_3/tgt
15/09/15 01:31:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 01:31:05 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/09/15 01:31:05 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/09/15 01:31:05 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
15/09/15 01:31:06 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/15 01:31:06 INFO mapreduce.JobSubmitter: number of splits:1
15/09/15 01:31:06 INFO mapreduce.JobSubmitter: Subm

<h3><span style="color:dodgerblue;font:12px">HW2.4</span></h3> 
<span style="color:firebrick">Using the Enron data from HW1 and in the Hadoop MapReduce framework, write  a mapper/reducer pair that will classify the email messages using multinomial Naive Bayes Classifier using a list of one or more user-specified words. Examine the words "assistance", "valium", and "enlargementWithATypo" and report your results. To do so, make sure that</span><br>

   <span style="color:firebrick">  - mapper.py counts all occurrences of a list of words, and</span><br>
   <span style="color:firebrick">  - reducer.py</span><br>
   <span style="color:firebrick"> that performs a multiple-word multinomial Naive Bayes classification via the chosen list</span>

<span style="color:CornflowerBlue "><b>Preparing to run the job</b></span><br>

In [None]:
!hdfs dfs -mkdir /hw2/hw2_4

<span style="color:CornflowerBlue "><b>Driver Function</b></span><br>

In [21]:
# HW 2.4  Mapper/reducer pair to classify the email messages by a 
#         list of multiple word using the multinomial Naive Bayes 
#         classification

def hw2_4(word):
    # cleanup target directory
    !hdfs dfs -rm -R /hw2/hw2_4/tgt
    
    # run map reduce job
    !hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
    -Dmapreduce.job.maps=10 \
    -Dmapreduce.job.reduces=1 \
    -files mapper.py,reducer.py \
    -mapper 'mapper.py {word}' \
    -reducer 'reducer.py {word}' \
    -input /hw2/hw2_2/src/enronemail_1h.txt \
    -output /hw2/hw2_4/tgt

    print "\nOUTPUT"
    # display accuracy on the console
    print "Accuracy of the Naive Bayes classifier with single word '{}'\n".format(word)
    !hdfs dfs -cat /hw2/hw2_4/tgt/part-00000
        
hw2_4("assistance valium enlargementWithATypo")

15/09/15 01:31:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 01:31:20 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /hw2/hw2_4/tgt
15/09/15 01:31:21 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 01:31:22 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/09/15 01:31:22 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/09/15 01:31:22 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
15/09/15 01:31:23 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/15 01:31:23 INFO mapreduce.JobSubmitter: number of splits:1
15/09/15 01:31:23 INFO mapreduce.JobSubmitter: Subm

<h3><span style="color:dodgerblue;font:12px">HW2.5</span></h3> 
<span style="color:firebrick">Using the Enron data from HW1 an in the  Hadoop MapReduce framework, write  a mapper/reducer for a multinomial Naive Bayes Classifier that will classify the email messages using  words present. Also drop words with a frequency of less than three (3). How does it affect the misclassification error of learnt naive multinomial Bayesian Classifiers on the training dataset.</span><br>

<span style="color:CornflowerBlue "><b>Reducer<b></span><br>
The reducer in this problem handles all the words in the emails. Additionally, classifier drops words with a frequency of less than three (3). The output of the reducer is each email id with actual spam/ham indicator with prediction followed by accuracy. <br>

**When the classifier drops words with frequency less than 3**, I see **there is NO change in accuracy** though vocabularizy size reduces by ~60%.

In [22]:
%%writefile reducer.py
#!/usr/bin/python
import traceback
import math
import sys
import ast

from collections import Counter

# read input parameters
find_words = sys.argv[1:]

# vocab size
if find_words == "*":
    vocab = []
else:
    vocab = find_words

try:
    spam_count = 0
    ham_count = 0
    spam_all_wc = 0
    ham_all_wc = 0
    spam_term_wc = {}
    ham_term_wc = {}
    pr_word_given_spam = {}
    pr_word_given_ham = {}

    # read each mapper output to loop during the prediction phase
    # after training the model
    map_output = []
    for line in sys.stdin:
        map_output.append(line)
    
    for email in map_output:
        # parse mapper output
        mail = email.split(" | ")
        # read spam/ham indicator, content word count, 
        is_spam = int(mail[1])
        content_wc = int(mail[2])
        hits = ast.literal_eval(mail[3])

        # capture counts required for naive bayes probabilities
        if is_spam:
            # spam mail count
            spam_count += 1
            # term count when spam
            spam_term_wc = dict(Counter(hits) + Counter(spam_term_wc))
            # all word count when spam
            spam_all_wc += content_wc
        else:
            # ham email count
            ham_count += 1
            # term count when ham
            ham_term_wc = dict(Counter(hits) + Counter(ham_term_wc))
            # all word count when ham
            ham_all_wc += content_wc

    vocab = dict(Counter(vocab) + Counter(spam_term_wc) + Counter(ham_term_wc))
    vocab = {k:v for (k,v) in vocab.items() if v >= 3}
    V = len(vocab) * 1.0
    print "vocab size = {}".format(V)
                        
    # calculate priors
    pr_spam_prior = (1.0 * spam_count) / (spam_count + ham_count)
    pr_ham_prior = (1.0 - pr_spam_prior)
    pr_spam_prior = math.log10(pr_spam_prior)
    pr_ham_prior = math.log10(pr_ham_prior)
    
    # calculate conditional probabilites with laplace smoothing = 1
    # pr_word_given_class = ( count(w, c) + 1 ) / (count(c) + 1 * |V|)
    for word in vocab:
        #if (vocab[word] >= 3):
        pr_word_given_spam[word] = math.log10((spam_term_wc.get(word, 0) + 1.0) / (spam_all_wc + V))
        pr_word_given_ham[word] = math.log10((ham_term_wc.get(word, 0) + 1.0) / (ham_all_wc + V))
    
    print "/*log probabilities*/"
    print "pr_spam_prior = {}".format(pr_spam_prior)
    print "pr_ham_prior = {}".format(pr_ham_prior)
    
    print "\n"
    print "{0: <50} | {1} | {2}".format("email id", "actuals", "predictions")
    print "{0: <50}-+-{1}-+-{2}".format("-" * 50, "-" * 7, "-" * 10)

    # spam/ham prediction using Multinomial Naive Bayes priors and conditional probabilities
    accuracy = []

    for email in map_output:
        # initialize
        word_count = 0
        pred_is_spam = 0
        pr_spam = pr_spam_prior
        pr_ham = pr_ham_prior

        # parse mapper output
        mail = email.split(" | ")
        email_id = mail[0]
        is_spam = int(mail[1])
        hits = ast.literal_eval(mail[3])

        # number of search words
        word_count = sum(hits.values())

        # probability for each class for a given email
        # argmax [ log P(C) + sum( P(Wi|C) ) ]
        for word in vocab:
            pr_spam += (pr_word_given_spam.get(word, 0) * hits.get(word, 0))
            pr_ham += (pr_word_given_ham.get(word, 0) * hits.get(word, 0))

        # predict based on maximum likelihood
        if pr_spam > pr_ham: 
            pred_is_spam = 1

        # calculate accuracy
        accuracy.append(pred_is_spam==is_spam)
        
        print '{0:<50} | {1:<7} | {2:<10}'.format(email_id, is_spam, pred_is_spam)

    print "\n"
    print "/*accuracy*/"
    print "accuracy = {:.2f}".format(sum(accuracy) / float(len(accuracy)))
    
except Exception: 
    traceback.print_exc()

Overwriting reducer.py


<span style="color:CornflowerBlue "><b>Preparing to run the job</b></span><br>

In [None]:
!hdfs dfs -mkdir /hw2/hw2_5

<span style="color:CornflowerBlue "><b>Driver Function</b></span><br>

In [24]:
# HW 2.5  Mapper/reducer pair to classify the email messages by a 
#         all words present to perform a word-distribution-wide Naive 
#         Bayes classification

def hw2_5(word):
    # cleanup target directory
    !hdfs dfs -rm -R /hw2/hw2_5/tgt
    
    # run map reduce job
    !hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
    -Dmapreduce.job.maps=10 \
    -Dmapreduce.job.reduces=1 \
    -files mapper.py,reducer.py \
    -mapper 'mapper.py {word}' \
    -reducer 'reducer.py {word}' \
    -input /hw2/hw2_2/src/enronemail_1h.txt \
    -output /hw2/hw2_5/tgt

    print "\nOUTPUT"
    # display accuracy on the console
    print "Accuracy of the Naive Bayes classifier with single word '{}'\n".format(word)
    !hdfs dfs -cat /hw2/hw2_5/tgt/part-00000
        
hw2_5("*")

15/09/15 01:31:33 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 01:31:34 INFO fs.TrashPolicyDefault: Namenode trash configuration: Deletion interval = 0 minutes, Emptier interval = 0 minutes.
Deleted /hw2/hw2_5/tgt
15/09/15 01:31:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/15 01:31:36 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/09/15 01:31:36 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/09/15 01:31:36 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
15/09/15 01:31:37 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/15 01:31:37 INFO mapreduce.JobSubmitter: number of splits:1
15/09/15 01:31:37 INFO mapreduce.JobSubmitter: Subm

<span style="color:firebrick">** -- END OF ASSIGNMENT 2 -- **</span>