# MIDS W261 Machine Learning At Scale

Christopher Llop | christopher.llop@ischool.berkeley.edu <br>
Week 3 | Submission Date: 9/22/2015


<b>HW3.0.</b>

What is a merge sort? Where is it used in Hadoop?



How is  a combiner function in the context of Hadoop? 



Give an example where it can be used and justify why it should be used in the context of this problem.



What is the Hadoop shuffle?



What is the Apriori algorithm? Describe an example use in your domain of expertise. 



Define confidence and lift.

<b>HW3.1. </b>

Product Recommendations: The action or practice of selling additional products or services to existing customers is called cross-selling. Giving product recommendation is one of the examples of cross-selling that are frequently used by online retailers. One simple method to give product recommendations is to recommend products that are frequently browsed together by the customers.

Suppose we want to recommend new products to the customer based on the products they have already browsed on the online website. Write a program using the A-priori algorithm to find products which are frequently browsed together. Fix the support to s = 100  (i.e. product pairs need to occur together at least 100 times to be considered frequent) and find itemsets of size 2 and 3. (Note - Jake told us not to do this via the Google Group).

Use the online browsing behavior dataset at: 

https://www.dropbox.com/s/zlfyiwa70poqg74/ProductPurchaseData.txt?dl=0

Each line in this dataset represents a browsing session of a customer. On each line, each string <br>
of 8 characters represents the id of an item browsed during that session. The items are separated <br>
by spaces.

Do some exploratory data analysis of this dataset. 

Report your findings such as number of unique products; largest basket, etc. using Hadoop Map-Reduce.


In [None]:
# Number of unique products

In [None]:
# Largest basket

In [None]:
# Frequency of basket counts

In [None]:
# Most common product

<b>HW3.2.</b> (Computationally prohibitive but then again Hadoop can handle this)

Note: for this part the writeup will require a specific rule ordering but the program need not sort the output.

List the top 5 rules with corresponding confidence scores in decreasing order of confidence score 
for frequent (100>count) itemsets of size 2. 
A rule is of the form: 

(item1) ⇒ item2.

Fix the ordering of the rule lexicographically (left to right), 
and break ties in confidence (between rules, if any exist) 
by taking the first ones in lexicographically increasing order. 
Use Hadoop MapReduce to complete this part of the assignment; 
use a single mapper and single reducer; use a combiner if you think it will help and justify. 


<b>HW3.3</b>

Benchmark your results using the pyFIM implementation of the Apriori algorithm
(Apriori - Association Rule Induction / Frequent Item Set Mining implemented by Christian Borgelt). 
You can download pyFIM from here: 

http://www.borgelt.net/pyfim.html

Comment on the results from both implementations (your Hadoop MapReduce of apriori versus pyFIM) 
in terms of results and execution times.


<b>HW3.4</b> (Conceptual Exercise)

Suppose that you wished to perform the Apriori algorithm once again,
though this time now with the goal of listing the top 5 rules with corresponding confidence scores 
in decreasing order of confidence score for itemsets of size 3 using Hadoop MapReduce.
A rule is now of the form: 

(item1, item2) ⇒ item3 

Recall that the Apriori algorithm is iterative for increasing itemset size,
working off of the frequent itemsets of the previous size to explore 
ONLY the NECESSARY subset of a large combinatorial space. 
Describe how you might design a framework to perform this exercise.

In particular, focus on the following:
  — map-reduce steps required
  - enumeration of item sets and filtering for frequent candidates


<span style="color:silver"><b>HW2.0.</b> What is a race condition in the context of parallel computation? Give an example.
What is MapReduce?
How does it differ from Hadoop?
Which programming paradigm is Hadoop based on? Explain and give a simple example in code and show the code running.</span>

<span style="color:green"><b>Answer:</b></span>

A <b>race condition</b> is a condition where two threads must access the same data source. The programmer in this instance does not control which thread modifies the data first. As a result, it is possible that the ultimate end product of the code differs at random based on the order that the data source is accessed by threads.

As an example, say the number "3" is stored on disk. Thread 1 wants to double the number, while Thread 2 wants to add 5 to the number. If Thread 1 acts first, the result is $(3 * 2) + 5 = 11$. If Thread 2 acts first, the result is $(3 + 5) * 2 = 16$. This sort of condition can cause all sorts of difficulties.

<br>
<b>MapReduce</b> is a problem solving framework/concept for embaressingly parallel data analysis. At its core, a problem is chunked and first processed in parallel by a number of mappers. Reducers then "fold" together the results of the mappers into a final output. <b>Hadoop</b> is a technical environment that, when combined with Hadoop File System allows a programmer to execute MapReduce jobs with ease. Hadoop programming is based on the MapReduce paradigm, which stems back to Functional Programming, or the idea of a programming language that can accept entire functions as an arguement.

Hadoop is similar to functional programming because the "mapper" and "reducer" passed to Hadoop can be thought of as two functions being passed as arguements to the Hadoop program. Hadoop then executes these functions in the MapReduce framework. In fact, looking at the code examples throughout this assignment, we can see that the name of the mapper and reducer are passed as arguments to Hadoop Streaming.

Below is a short example leveraging functional programming in Python.


In [1]:
# Calculate Function
def calculate( fun, *args ):
    return fun( *args )

# Add Function
def add(a=0, b=0):
    return a + b

# Multiply Function
def multiply(a=0, b=0):
    return a * b

# Demonstrate passing a function as an arguement to another function
print calculate( add, 2, 3 )
print calculate( multiply, 5, 3 )

5
15


<span style="color:silver"><b>HW2.1. </b> Sort in Hadoop MapReduce
Given as input: Records of the form (integer, “NA”), where integer is any integer, and “NA” is just the empty string.
Output: sorted key value pairs of the form (integer, “NA”); what happens if you have multiple reducers? Do you need additional steps? Explain.</span>

<span style="color:silver">Write code to generate N  random records of the form (integer, “NA”). Let N = 10,000.
Write the python Hadoop streaming map-reduce job to perform this sort.</span>


<span style="color:green"><b>Answer:</b></span>

If we have multiple reducers, the results will be sorted within each reducer - however they will not be globally sorted. While all the results for a given key wind up at the same reducer, reducers are not guaranteed to be given consecutive keys in the sort order. To correct for this, we could either force our system to send keys to the reducers in sorted chunks, or we could post-process all the reducer outputs to re-sort.

In [1]:
%%writefile mapper.py
#!/usr/bin/python
from random import randint

for x in range(0,10000):
    # Generate random integers
    print "{}\t{}".format(randint(0,10000),"NA")

Overwriting mapper.py


In [2]:
%%writefile reducer.py
#!/usr/bin/python
import sys

# input comes from STDIN
for line in sys.stdin:
    # Because the reducer is given the keys in sorted order, we can simply print
    print line.rstrip('\n')

Overwriting reducer.py


In [3]:
# Use chmod for permissions
!chmod a+x mapper.py
!chmod a+x reducer.py

In [7]:
# We need a dummy text file to run streaming
!echo nothing > dummy.txt
!hadoop fs -mkdir ./W261/In/HW2
!hdfs dfs -put ./dummy.txt ./W261/In/HW2/

Exception in thread "main" java.lang.UnsupportedClassVersionError: org/apache/hadoop/fs/FsShell : Unsupported major.minor version 51.0
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:621)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
	at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
Exception in thread "main" java.lang.UnsupportedClassVersionError: org/apache/hadoop/fs/FsShell : Unsupported major.

In [5]:
# HW2.1: Execute a job using Hadoop Streaming to generate 10,000 random integers and sort them.
def HW2_1():
    !hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
    -Dmapreduce.job.maps=10 \
    -Dmapreduce.job.reduces=1 \
    -files ./mapper.py,./reducer.py \
    -mapper ./mapper.py  \
    -reducer ./reducer.py \
    -input ./W261/In/HW2/dummy.txt -output ./W261/Out/HW2_1
    
    print
    print "Display head of file to prove run worked:"
    !hadoop fs -cat ./W261/Out/HW2_1/part-00000 | head -n15

HW2_1()

Exception in thread "main" java.lang.UnsupportedClassVersionError: org/apache/hadoop/util/RunJar : Unsupported major.minor version 51.0
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:621)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
	at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:247)

Display head of file to prove run worked:
Exception in thread "main" java.lang.UnsupportedClassVersionError: org/a

<span style="color:silver"><b>HW2.2.</b> Using the Enron data from HW1 and Hadoop MapReduce streaming, write mapper/reducer pair that  will determine the number of occurrences of a single, user-specified word. Examine the word “assistance” and report your results.</span>


   <span style="color:silver">To do so, make sure that</span>
   
   - <span style="color:silver">mapper.py counts all occurrences of a single word, and</span>
   - <span style="color:silver">reducer.py collates the counts of the single word.</span>

In [7]:
%%writefile mapper.py
#!/usr/bin/python
import sys
import re

findword = sys.argv[1]

# input comes from standard input
for full_email in sys.stdin:

    # Parse out email body for processing. Find body using "tab spam/ham tab"
    # use regex to strip out non alpha-numeric. "don't" will become "dont" which is fine.
    keyword = re.findall("\t[0-1]\t",full_email)[0]
    email_id, is_spam_tabbed, email_body = full_email.partition(keyword)
    email_body = re.sub('[^A-Za-z0-9\s]+', '', email_body)

    for word in email_body.split():
        if word == findword:
            print '%s\t%s' % (word, 1)

Overwriting mapper.py


In [8]:
%%writefile reducer.py
#!/usr/bin/python
import sys

current_word = None
current_count = 0
word = None

# input comes from standard input
for line in sys.stdin:
    # parse the input we got from mapper.py
    word, count = line.strip().split('\t', 1)
    count = int(count)

    # take advantage of sorted keys
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # print result when word changes
            print "{}\t{}".format(current_word, current_count)
        current_count = count
        current_word = word

# print final word
if current_word == word:
    print "{}\t{}".format(current_word, current_count)


Overwriting reducer.py


In [9]:
# Move input file to HDFS
!hdfs dfs -put ./enronemail_1h.txt ./W261/In/HW2/

15/09/13 00:16:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
put: `W261/In/HW2/enronemail_1h.txt': File exists


In [10]:
# HW2.2: Execute a job using Hadoop Streaming to search the input file for a user-specified word
def HW2_2(term="assistance"):    
    import re

    !hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
    -Dmapreduce.job.maps=10 \
    -Dmapreduce.job.reduces=1 \
    -files ./mapper.py,./reducer.py \
    -mapper './mapper.py {term}' \
    -reducer ./reducer.py \
    -input ./W261/In/HW2/enronemail_1h.txt -output ./W261/Out/HW2_2
    
    print
    print "Display head of file to prove run worked:"
    !hadoop fs -cat ./W261/Out/HW2_2/part-00000 | head -n15

    # Crosscheck results (data is small enough to use RE in python)
    print "Running Crosscheck..."
    with open ("enronemail_1h.txt", "r") as myfile:
        print "Check Result:", len(re.findall(term,myfile.read()))
        
HW2_2(term="assistance")



15/09/13 00:16:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/13 00:16:43 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/09/13 00:16:43 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/09/13 00:16:43 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
15/09/13 00:16:44 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/13 00:16:44 INFO mapreduce.JobSubmitter: number of splits:1
15/09/13 00:16:44 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local826747041_0001
15/09/13 00:16:45 INFO mapred.LocalDistributedCacheManager: Localized file:/Users/cjllop/Code/MIDS/W261/HW/W2/mapper.py as file:/usr/local/Cellar/hadoop/hdfs/tmp/mapred/local/1442117804916/mapper.py
15/09/13 00:16:45 INFO mapred.LocalDistributedCacheManager: Lo

<span style="color:silver"><b>HW2.3.</b> Using the Enron data from HW1 and Hadoop MapReduce, write  a mapper/reducer pair that
   will classify the email messages by a single, user-specified word. Examine the word “assistance” and report your results. To do so, make sure that</span>
   
   - <span style="color:silver">mapper.py</span>
   - <span style="color:silver">reducer.py </span>

   <span style="color:silver">performs a single word multinomial Naive Bayes classification.</span>

In [11]:
%%writefile mapper.py
#!/usr/bin/python
import sys
import re

# Get search term(s)
findword = sys.argv[1]

for full_email in sys.stdin:
#with open (findfile, "r") as myfile:
#    for full_email in myfile:
    # Empty dictionary
    term_hits = {}

    # Spam classification
    is_spam = re.findall("\t([0-1])\t",full_email)[0]

    # Parse out email body for processing. Find body using "tab spam/ham tab"
    # use regex to strip out non alpha-numeric. "don't" will become "dont" which is fine for classifying.
    keyword = re.findall("\t[0-1]\t",full_email)[0]
    email_id, is_spam_tabbed, email_body = full_email.partition(keyword)
    email_body = re.sub('[^A-Za-z0-9\s]+', '', email_body)
    # Must process search query and email bodies the same
    findword = re.sub('[^A-Za-z0-9\s]+', '', findword)
    email_len = len(email_body.split())

    # Build counts of term words. 
    for word in list(set(email_body.split())):
        term_hits[word] = len(re.findall(word,email_body))

    # Print as tuple with unique splitter "|||"
    print "{}\t{} ||| {} ||| {}".format(email_id, is_spam, email_len, term_hits)


Overwriting mapper.py


In [12]:
%%writefile reducer.py
#!/usr/bin/python
import sys
import ast
import math

# Get search term(s)
findword = sys.argv[1]
search_terms = findword.split()

# Parse all mapper results into a list so we can loop through again to predict after 
# looping through to train the model
mapper_results = []
for line in sys.stdin:
    mapper_results.append(line)

spam_term_counts = {}
ham_term_counts = {}
word_prob_given_spam = {}
word_prob_given_ham = {}
spam_count = 0
ham_count = 0
spam_len = 0
ham_len = 0
distinct_term_list = []

# Open each file and build Multinomial Naive Bayes model
for processed_email in mapper_results:
    # Read in tuples created by mapper
    email_id, processed_email = processed_email.split("\t")
    processed_email = processed_email.split(" ||| ")
    is_spam = int(processed_email[0])
    email_len = int(processed_email[1])
    count_dict = ast.literal_eval(processed_email[2])

    # Build counts for spam and ham definitions.
    if is_spam:
        for key, value in count_dict.iteritems():
            spam_term_counts[key] = spam_term_counts.get(key, 0) + value
        spam_count += 1
        spam_len += email_len
    else:
        for key, value in count_dict.iteritems():
            ham_term_counts[key] = ham_term_counts.get(key, 0) + value
        ham_count += 1
        ham_len += email_len

    distinct_term_list = list(set(distinct_term_list + count_dict.keys()))

# Calculate our priors based on the overall ratio of spam to ham
spam_prior = float(spam_count) / (spam_count + ham_count)
ham_prior = 1 - spam_prior
spam_prior = math.log10(spam_prior)
ham_prior = math.log10(ham_prior)

# Calculate our conditional probabilites for the search term using MNB formula
#     term_given_spam = (spam_term_count + 1 for smoothing) / (total count of spam words + total distinct vocab size)
distinct_term_count = len(distinct_term_list)

for term in search_terms:
    word_prob_given_spam[term] = math.log10((spam_term_counts.get(term,0) + 1.0) / (float(spam_len) + distinct_term_count))
    word_prob_given_ham[term] = math.log10((ham_term_counts.get(term,0) + 1.0) / (float(ham_len) + distinct_term_count))

# Now let's predict!
accuracy = []
for processed_email in mapper_results:
    # Defaults
    pred_spam = 0
    spam_prediction = spam_prior
    ham_prediction = ham_prior

    # Read in tuples created by mapper
    email_id, processed_email = processed_email.split("\t")
    processed_email = processed_email.split(" ||| ")
    is_spam = int(processed_email[0])
    count_dict = ast.literal_eval(processed_email[2])

    # Read in counts to use in prediction
    for term in word_prob_given_spam.keys():
        # Calculate the probability for each class
        spam_prediction += (word_prob_given_spam[term] * count_dict.get(term, 0))
        ham_prediction += (word_prob_given_ham[term] * count_dict.get(term, 0))

    # Pick the higher probability
    if spam_prediction > ham_prediction: 
        pred_spam = 1

    # Store accuracy in a list
    accuracy.append(1*(pred_spam==is_spam))

    # Print predictions to results file
    print '{}\t{}\t{}'.format(email_id, is_spam, pred_spam)

# Print accuracy
sys.stderr.write("\nSpam Probs: {}\n".format(word_prob_given_spam))
sys.stderr.write("Ham Probs: {}\n".format(word_prob_given_ham))
sys.stderr.write("Accuracy = {:.2f}\n".format(float(sum(accuracy))/len(accuracy)))


Overwriting reducer.py


In [13]:
# HW2.3: Predict via MNBusing Hadoop Streaming for a user-specified word
def HW2_3(term="assistance"):
    !hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
    -Dmapreduce.job.maps=10 \
    -Dmapreduce.job.reduces=1 \
    -files ./mapper.py,./reducer.py \
    -mapper './mapper.py {term}' \
    -reducer './reducer.py {term}' \
    -input ./W261/In/HW2/enronemail_1h.txt -output ./W261/Out/HW2_3
    
    print
    print "Display head of file to prove run worked:"
    !hadoop fs -cat ./W261/Out/HW2_3/part-00000 | head -n15
        
HW2_3(term="assistance")

# Note - output shows same accuracy of 60% that we saw in HW1


15/09/13 00:16:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/13 00:16:53 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/09/13 00:16:53 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/09/13 00:16:53 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
15/09/13 00:16:54 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/13 00:16:54 INFO mapreduce.JobSubmitter: number of splits:1
15/09/13 00:16:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local566410638_0001
15/09/13 00:16:55 INFO mapred.LocalDistributedCacheManager: Localized file:/Users/cjllop/Code/MIDS/W261/HW/W2/mapper.py as file:/usr/local/Cellar/hadoop/hdfs/tmp/mapred/local/1442117814842/mapper.py
15/09/13 00:16:55 INFO mapred.LocalDistributedCacheManager: Lo

<span style="color:silver"><b>HW2.4.</b> Using the Enron data from HW1 and in the Hadoop MapReduce framework, write  a mapper/reducer pair that
   will classify the email messages using multinomial Naive Bayes Classifier using a list of one or more user-specified words. Examine the words “assistance”, “valium”, and “enlargementWithATypo” and report your results
   To do so, make sure that</span>

   - <span style="color:silver">mapper.py </span>
   - <span style="color:silver">reducer.py </span>

   <span style="color:silver">performs the multiple-word multinomial Naive Bayes classification via the chosen list.</span>

In [14]:
# HW2.4: Predict via MNBusing Hadoop Streaming for multiple user-specified words
# Note - the solution program to HW2.3 can already do this. We just need to give it more terms.
def HW2_4(term="assistance valium enlargementWithATypo"):
    !hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
    -Dmapreduce.job.maps=10 \
    -Dmapreduce.job.reduces=1 \
    -files ./mapper.py,./reducer.py \
    -mapper './mapper.py "{term}"' \
    -reducer './reducer.py "{term}"' \
    -input ./W261/In/HW2/enronemail_1h.txt -output ./W261/Out/HW2_4
    
    print
    print "Display head of file to prove run worked:"
    !hadoop fs -cat ./W261/Out/HW2_4/part-00000 | head -n15
        
HW2_4(term="assistance valium enlargementWithATypo")

# Note - output shows same accuracy of 63% that we saw in HW1


15/09/13 00:17:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/13 00:17:04 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/09/13 00:17:04 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/09/13 00:17:04 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
15/09/13 00:17:05 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/13 00:17:05 INFO mapreduce.JobSubmitter: number of splits:1
15/09/13 00:17:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1288622694_0001
15/09/13 00:17:06 INFO mapred.LocalDistributedCacheManager: Localized file:/Users/cjllop/Code/MIDS/W261/HW/W2/mapper.py as file:/usr/local/Cellar/hadoop/hdfs/tmp/mapred/local/1442117825858/mapper.py
15/09/13 00:17:06 INFO mapred.LocalDistributedCacheManager: L

<span style="color:silver"><b>HW2.5.</b> Using the Enron data from HW1 an in the  Hadoop MapReduce framework, write  a mapper/reducer for a multinomial Naive Bayes Classifier that
   will classify the email messages using  words present. Also drop words with a frequency of less than three (3). How does it affect the misclassifcation error of learnt naive multinomial Bayesian Classifiers on the training dataset:</span>


<span style="color:green"><b>Answer:</b></span>
The full word (>=3) classifier drastically increases accuracy from 0.63 to 0.90. This is a misclassification error rate of 10%.

In [15]:
%%writefile reducer.py
#!/usr/bin/python
import sys
import ast
import math

# Get search term(s)
findword = sys.argv[1]
search_terms = findword.split()

# Parse all mapper results into a list so we can loop through again to predict after 
# looping through to train the model
mapper_results = []
for line in sys.stdin:
    mapper_results.append(line)

spam_term_counts = {}
ham_term_counts = {}
word_prob_given_spam = {}
word_prob_given_ham = {}
spam_count = 0
ham_count = 0
spam_len = 0
ham_len = 0
distinct_term_list = []

# Open each file and build Multinomial Naive Bayes model
for processed_email in mapper_results:
    # Read in tuples created by mapper
    email_id, processed_email = processed_email.split("\t")
    processed_email = processed_email.split(" ||| ")
    is_spam = int(processed_email[0])
    email_len = int(processed_email[1])
    count_dict = ast.literal_eval(processed_email[2])

    # Build counts for spam and ham definitions.
    if is_spam:
        for key, value in count_dict.iteritems():
            spam_term_counts[key] = spam_term_counts.get(key, 0) + value
        spam_count += 1
        spam_len += email_len
    else:
        for key, value in count_dict.iteritems():
            ham_term_counts[key] = ham_term_counts.get(key, 0) + value
        ham_count += 1
        ham_len += email_len

    distinct_term_list = list(set(distinct_term_list + count_dict.keys()))

# Calculate our priors based on the overall ratio of spam to ham
spam_prior = float(spam_count) / (spam_count + ham_count)
ham_prior = 1 - spam_prior
spam_prior = math.log10(spam_prior)
ham_prior = math.log10(ham_prior)

# Calculate our conditional probabilites for the search term using MNB formula
#     term_given_spam = (spam_term_count + 1 for smoothing) / (total count of spam words + total distinct vocab size)
distinct_term_count = len(distinct_term_list)

# Added logic for this problem - replace search_terms with all terms if we were given a "*"
if search_terms[0] == "*":
    search_terms = distinct_term_list

for term in search_terms:
    if (spam_term_counts.get(term,0) + ham_term_counts.get(term,0)) >= 3:
        word_prob_given_spam[term] = math.log10((spam_term_counts.get(term,0) + 1.0) / (float(spam_len) + distinct_term_count))
        word_prob_given_ham[term] = math.log10((ham_term_counts.get(term,0) + 1.0) / (float(ham_len) + distinct_term_count))

# Now let's predict!
accuracy = []
for processed_email in mapper_results:
    # Defaults
    pred_spam = 0
    spam_prediction = spam_prior
    ham_prediction = ham_prior

    # Read in tuples created by mapper
    email_id, processed_email = processed_email.split("\t")
    processed_email = processed_email.split(" ||| ")
    is_spam = int(processed_email[0])
    count_dict = ast.literal_eval(processed_email[2])

    # Read in counts to use in prediction
    for term in word_prob_given_spam.keys():
        # Calculate the probability for each class
        spam_prediction += (word_prob_given_spam[term] * count_dict.get(term, 0))
        ham_prediction += (word_prob_given_ham[term] * count_dict.get(term, 0))

    # Pick the higher probability
    if spam_prediction > ham_prediction: 
        pred_spam = 1

    # Store accuracy in a list
    accuracy.append(1*(pred_spam==is_spam))

    # Print predictions to results file
    print '{}\t{}\t{}'.format(email_id, is_spam, pred_spam)

# Print accuracy
sys.stderr.write("Accuracy = {:.2f}\n".format(float(sum(accuracy))/len(accuracy)))


Overwriting reducer.py


In [16]:
# HW2.5: Predict via MNBusing Hadoop Streaming for multiple user-specified words
# Note - the mapper program to HW2.3 can already do this, so we only created a new reducer
def HW2_5(term="*"):
    !hadoop jar /usr/local/Cellar/hadoop/2.6.0/libexec/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
    -Dmapreduce.job.maps=10 \
    -Dmapreduce.job.reduces=1 \
    -files ./mapper.py,./reducer.py \
    -mapper './mapper.py "{term}"' \
    -reducer './reducer.py "{term}"' \
    -input ./W261/In/HW2/enronemail_1h.txt -output ./W261/Out/HW2_5
    
    print
    print "Display head of file to prove run worked:"
    !hadoop fs -cat ./W261/Out/HW2_5/part-00000 | head -n15
        
HW2_5(term="*")

# Note - output shows 90% accuracy


15/09/13 00:17:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/13 00:17:16 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/09/13 00:17:16 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/09/13 00:17:16 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
15/09/13 00:17:16 INFO mapred.FileInputFormat: Total input paths to process : 1
15/09/13 00:17:17 INFO mapreduce.JobSubmitter: number of splits:1
15/09/13 00:17:17 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1829190690_0001
15/09/13 00:17:18 INFO mapred.LocalDistributedCacheManager: Localized file:/Users/cjllop/Code/MIDS/W261/HW/W2/mapper.py as file:/usr/local/Cellar/hadoop/hdfs/tmp/mapred/local/1442117837403/mapper.py
15/09/13 00:17:18 INFO mapred.LocalDistributedCacheManager: L

In [17]:
# This cell can be used to delete old output to allow re-run of any Hadoop script.
#!hadoop fs -rm -r ./W261/Out/HW2_1
#!hadoop fs -rm -r ./W261/Out/HW2_2
#!hadoop fs -rm -r ./W261/Out/HW2_3
#!hadoop fs -rm -r ./W261/Out/HW2_4
#!hadoop fs -rm -r ./W261/Out/HW2_5

This concludes HW 2.0. Thanks for reading!