# Homework Week 2
## Brandon Shurick

### HW2.0
> What is a race condition in the context of parallel computation? Give an example.  
> What is MapReduce?  
> How does it differ from Hadoop?  
> Which programming paradigm is Hadoop based on? Explain and give a simple example in code and show the code running.  

A race condition is what can result from two parallel tasks which originate from the same application but are not properly synchronized, so that the order in which the tasks finish in determines the final result. For example, if you write an application which divides the result of some improperly synchronized process by itself plus another value and loads it back to the original variable, the number that you divide may be different depending on if the threads finish near the same time or if one thread reads the result of another in sequence. 

MapReduce is a framework that can process large datasets in parallel across multiple nodes. 

The Hadoop core framework is a combination of the MapReduce framework plus a distributed file system called Hadoop File System (HDFS). 

Hadoop is based on the Functional Programming paradigm, where a function (map) is first applied to each value in the dataset, the data is sorted by key, and then another function (reduce / fold) is applied to all of the values for each key. 

Code example below:

In [143]:
import re
from itertools import groupby
def mapper(line):
    ''' Map function outputs 
        all words for each spam document
    '''
    line = re.sub(r'[^\t\sa-z]+',' ',line.lower())
    words = re.findall(r'[a-z]+',line)
    return('\n'.join('{}\t{}'.format(w,1) for w in words))

def reducer(g):
    ''' Aggregate grouped words
        into counts 
    '''
    sums = 0
    for kv in g:
        k,v = kv.split('\t')
        sums += int(v)
    return('{}\t{}'.format(k,sums))
    
def run(): 
    ''' Run functional programming steps (mapper, sort, reducer)
        Print all word counts from spam file 
    '''
    lines = '\n'.join(mapper(l) for l in open('enronemail_1h.txt','r').readlines() if mapper(l))
    words = sorted(lines.split('\n'))
    print('\n'.join(reducer(g) for k,g in groupby(words)))

run()

a	543
ab	5
abidjan	2
ability	2
able	14
abn	1
about	52
above	11
absent	1
absenteeism	1
absolute	2
absolutely	1
absorb	1
abuse	2
abused	1
acce	1
accelerate	1
accelerated	1
accept	3
acceptable	1
accepted	1
accepting	2
accepts	1
access	12
accomodate	4
accomodates	1
accompanied	1
according	2
accordingly	1
account	36
accountability	1
accounting	5
accounts	1
accrual	2
accurate	1
aches	1
achieve	1
achieved	1
acid	1
acquire	1
acquisition	1
acrobaat	1
acrobat	1
across	10
act	7
action	1
activate	4
active	1
activists	1
activities	10
actor	1
actress	1
actual	4
actually	2
ad	31
adage	1
adams	1
adapted	1
add	12
added	2
adding	2
addition	5
additional	13
additionally	2
address	27
addressed	1
addresses	5
addressing	1
addtional	1
adequately	1
adhesion	1
adm	1
admin	1
adminder	2
administration	3
admitted	1
admixture	1
adobe	12
adobee	1
adolescent	1
adr	1
adrianbold	2
ads	5
adult	3
adv	1
advance	4
advanced	1
advantage	1
advantages	1
advertise	2
advertised	1
advertisement	4
advertisements	1
advertising	7
ad

### HW2.1 Sort in Hadoop MapReduce
> Given as input: Records of the form (integer, “NA”), where integer is any integer, and “NA” is just the empty string.  
> Output: sorted key value pairs of the form (integer, “NA”) in decreasing order; what happens if you have multiple reducers? Do you need additional steps? Explain.  

> Write code to generate N  random records of the form (integer, “NA”). Let N = 10,000.  
> Write the python Hadoop streaming map-reduce job to perform this sort. Display the top 10 biggest numbers. Display the 10 smallest numbers  

In [40]:
!mkdir input
!rmdir output

mkdir: input: File exists


In [37]:
from __future__ import print_function
import random
def generate_file(N,fname='input/randomrecords.txt'):
    ''' Function to generate random integers '''
    gen_number = lambda n: random.randint(0,n-1)
    nums = '\n'.join('<{},"{}">'.format(gen_number(N),'NA') for n in range(N))
    w = open(fname,'w')
    print(nums,file=w)
    w.close()
generate_file(10000)

In [38]:
!head -n2 input/randomrecords.txt

<5453,"NA">
<8038,"NA">


In [161]:
%%writefile mapper.py
#!/usr/bin/env python
import re, sys
def mapper(line):
    ''' Mapper function for Hadoop '''
    line = re.sub(r'[<>\"]','',line.strip())
    num,word = line.split(',')
    print '{}\t{}'.format(word,num)

for line in sys.stdin:
    mapper(line)

Writing mapper.py


In [30]:
%%writefile reducer.py
#!/usr/bin/env python
import sys
priorkey = None
values = []

def printkey(values):
    print 'Key: '+key
    print '\nBottom 10:'
    print '\n'.join(str(s) for s in sorted(values)[:10])
    print '\nTop 10:'
    print '\n'.join(str(s) for s in sorted(values,reverse=True)[:10])

for line in sys.stdin:
    ''' Reducer '''
    key,val = line.strip().split('\t')
    
    if key==priorkey or priorkey is None:
        values.append(int(val))
    else:
        printkey(values)
        values = []
    priorkey = key

# Last line 
printkey(values)

Overwriting reducer.py


In [17]:
!chmod +x mapper.py && chmod +x reducer.py

In [41]:
!hadoop jar /usr/local/Cellar/hadoop/2.7.0/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar -input ./input/randomrecords.txt -mapper ./mapper.py -reducer ./reducer.py -output ./output 

16/01/25 18:56:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/25 18:56:54 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/01/25 18:56:54 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/01/25 18:56:54 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/01/25 18:56:54 INFO mapred.FileInputFormat: Total input paths to process : 1
16/01/25 18:56:54 INFO mapreduce.JobSubmitter: number of splits:1
16/01/25 18:56:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1446511426_0001
16/01/25 18:56:54 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/01/25 18:56:54 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/01/25 18:56:54 INFO mapreduce.Job: Running job: job_local1446511426_0001
16/01/25 18:56:5

In [42]:
!cat ./output/*

Key: NA	
	
Bottom 10:	
0	
2	
2	
3	
7	
8	
9	
10	
12	
12	
	
Top 10:	
9999	
9998	
9998	
9997	
9996	
9996	
9996	
9995	
9994	
9993	


### HW2.2.  WORDCOUNT
>Using the Enron data from HW1 and Hadoop MapReduce streaming, write the mapper/reducer job that  will determine the word count (number of occurrences) of each white-space delimitted token (assume spaces, fullstops, comma as delimiters). Examine the word “assistance” and report its word count results.
> 
>CROSSCHECK: >grep assistance enronemail_1h.txt|cut -d$'\t' -f4| grep assistance|wc -l    
>       8    
>       NOTE  "assistance" occurs on 8 lines but how many times does the token occur? 10 times! This is the number we ?are looking for!




In [45]:
%%writefile mapper.py
#!/usr/bin/env python
import re, sys
def mapper(line):
    ''' Map function for wordcount in Hadoop
    '''
    line = re.sub(r'[^\t\sa-z]+',' ',line.lower())
    words = re.findall(r'[a-z]+',line)
    print '\n'.join('{}\t{}'.format(w,1) for w in words)

for line in sys.stdin:
    mapper(line)

Overwriting mapper.py


In [82]:
%%writefile reducer.py
#!/usr/bin/env python
import sys 

sums = 0
prev_k = None
for line in sys.stdin:
    k,v = line.strip().split('\t')
    if k==prev_k or prev_k is None:
        sums += int(v)
    else:
        print '{}\t{}'.format(k,sums)
        sums = 0
    prev_k = k

# Last line
print '{}\t{}'.format(k,sums)

Overwriting reducer.py


In [63]:
!chmod +x mapper.py && chmod +x reducer.py
!rm -Rf ./output

In [83]:
!hadoop jar /usr/local/Cellar/hadoop/2.7.0/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar -input ./enronemail_1h.txt -mapper ./mapper.py -reducer ./reducer.py -output ./output 

16/01/25 19:22:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/25 19:22:26 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/01/25 19:22:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/01/25 19:22:26 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/01/25 19:22:26 INFO mapred.FileInputFormat: Total input paths to process : 1
16/01/25 19:22:26 INFO mapreduce.JobSubmitter: number of splits:1
16/01/25 19:22:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local46544892_0001
16/01/25 19:22:26 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/01/25 19:22:26 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/01/25 19:22:26 INFO mapreduce.Job: Running job: job_local46544892_0001
16/01/25 19:22:26 IN

In [84]:
!cat ./output/*

ab	543
abidjan	4
ability	1
able	1
abn	13
about	0
above	51
absent	10
absenteeism	0
absolute	0
absolutely	1
absorb	0
abuse	0
abused	1
acce	0
accelerate	0
accelerated	0
accept	0
acceptable	2
accepted	0
accepting	0
accepts	1
access	0
accomodate	11
accomodates	3
accompanied	0
according	0
accordingly	1
account	0
accountability	35
accounting	0
accounts	4
accrual	0
accurate	1
aches	0
achieve	0
achieved	0
acid	0
acquire	0
acquisition	0
acrobaat	0
acrobat	0
across	0
act	9
action	6
activate	0
active	3
activists	0
activities	0
actor	9
actress	0
actual	0
actually	3
ad	1
adage	30
adams	0
adapted	0
add	0
added	11
adding	1
addition	1
additional	4
additionally	12
address	1
addressed	26
addresses	0
addressing	4
addtional	0
adequately	0
adhesion	0
adm	0
admin	0
adminder	0
administration	1
admitted	2
admixture	0
adobe	0
adobee	11
adolescent	0
adr	0
adrianbold	0
ads	1
adult	4
adv	2
advance	0
advanced	3
advantage	0
advant

#### HW2.2.1
>Using Hadoop MapReduce and your wordcount job (from HW2.2) determine the top-10 occurring tokens (most frequent tokens)

In [122]:
%%writefile reducer.py
#!/usr/bin/env python
import sys 

i = 0

for line in sys.stdin:
    if i<10:
        print line.strip()
        i+=1

Overwriting reducer.py


In [119]:
!chmod +x reducer.py

In [136]:
!rm -Rf output2 
!hadoop jar /usr/local/Cellar/hadoop/2.7.0/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator -D stream.map.output.field.separator='\t' -D stream.num.map.output.key.fields=2 -D mapreduce.partition.keycomparator.options=-k2,2nr -D mapreduce.job.reduces=1 -mapper /bin/cat -reducer reducer.py -input ./output -output ./output2 

16/01/25 19:52:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/25 19:52:37 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
16/01/25 19:52:37 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
16/01/25 19:52:37 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
16/01/25 19:52:37 INFO mapred.FileInputFormat: Total input paths to process : 1
16/01/25 19:52:37 INFO mapreduce.JobSubmitter: number of splits:1
16/01/25 19:52:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1206287299_0001
16/01/25 19:52:37 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
16/01/25 19:52:37 INFO mapred.LocalJobRunner: OutputCommitter set in config null
16/01/25 19:52:37 INFO mapreduce.Job: Running job: job_local1206287299_0001
16/01/25 19:52:3

In [137]:
!cat output2/*

theinvestment	1246
tobacco	963
andorra	685
off	565
ab	543
youcan	444
iname	417
yourmembership	394
ed	381
forbearance	373


### HW2.3. Multinomial NAIVE BAYES with NO Smoothing
>Using the Enron data from HW1 and Hadoop MapReduce, write  a mapper/reducer job(s) that
>   will both learn  Naive Bayes classifier and classify the Enron email messages using the learnt Naive Bayes classifier. Use all white-space delimitted tokens as independent input variables (assume spaces, fullstops, commas as delimiters). Note: for multinomial Naive Bayes, the Pr(X=“assistance”|Y=SPAM) is calculated as follows:
>
>   the number of times “assistance” occurs in SPAM labeled documents / the number of words in documents labeled SPAM 
>
>   E.g.,   “assistance” occurs 5 times in all of the documents Labeled SPAM, and the length in terms of the number of words in all documents labeled as SPAM (when concatenated) is 1,000. Then Pr(X=“assistance”|Y=SPAM) = 5/1000. Note this is a multinomial estimation of the class conditional for a Naive Bayes Classifier. No smoothing is needed in this HW. Multiplying lots of probabilities, which are between 0 and 1, can result in floating-point underflow. Since log(xy) = log(x) + log(y), it is better to perform all computations by summing logs of probabilities rather than multiplying probabilities. Please pay attention to probabilites that are zero! They will need special attention. Count up how many times you need to process a zero probabilty for each class and report. 
>
>   Report the performance of your learnt classifier in terms of misclassifcation error rate of your multinomial Naive Bayes Classifier. Plot a histogram of the  posterior probabilities (i.e., Pr(Class|Doc)) for each class over the training set. Summarize what you see. 
>
>   Error Rate = misclassification rate with respect to a provided set (say training set in this case). It is more formally defined here:
>
>Let DF represent the evalution set in the following:
>Err(Model, DF) = |{(X, c(X)) ∈ DF : c(X) != Model(x)}|   / |DF|
>
>Where || denotes set cardinality; c(X) denotes the class of the tuple X in DF; and Model(X) denotes the class inferred by the Model “Model”

In [209]:
%%writefile mapper.py
#!/usr/bin/env python
import re, sys
WORDS = re.compile(r'[\w]+')
for line in sys.stdin:
    ## Read lines from data chunk ##
    # Remove non-word, non-whitespace characters
    line = re.sub(r'[^\w\s\t]+',' ',line.strip())
    components = line.split('\t')
    
    ## Only process valid records ##
    try:
        spamdoc = int(components[1])
    except IndexError:
        continue 
    
    ## Compile list of words ##
    words = ' '.join(components[2:])
    wordslist = WORDS.findall(words)
    vocab = set(wordslist)
    
    ## Count total words in document ##
    totalwords = len(wordslist)
    
    ## Find words based on user input ##
    for word in vocab:
        word_cnt = len([ w for w in wordslist if w==word ])
        # Send results for each inputted word to reducer
        print('K{}\t{}\t{}\t{}'.format(components[0],word,word_cnt,spamdoc))

Overwriting mapper.py


In [175]:
%%writefile reducer.py
#!/usr/bin/env python
import sys
import math

docs = 0
typecnt = {}
prior = {}
findwords = {}
wordcnt = {}
totalwords = {}

prev_cid = None
for line in sys.stdin:  
    ## Read in lines from Mapper ##
    line = line.strip()
    components = line.split('\t')
    cid,word,word_cnt,spamdoc = components

    if spamdoc not in findwords: 
        wordcnt[word] = int(word_cnt)
        findwords[spamdoc] = wordcnt
    else: 
        wordcnt = findwords[spamdoc]
        if word not in wordcnt:
            wordcnt[word] = int(word_cnt)
        else:
            wordcnt[word] += int(word_cnt)
        findwords[spamdoc] = wordcnt

    if prev_cid!=cid:
        docs += 1

        # Count of class 
        if spamdoc not in typecnt: typecnt[spamdoc] = 1
        else: typecnt[spamdoc] += 1
    
    prev_cid = cid
    print line 

findwords_spam = '~'.join('{}:{}'.format(w,findwords['1'][w]) for w in findwords['1'])
findwords_ham = '~'.join('{}:{}'.format(w,findwords['0'][w]) for w in findwords['0'])
out = '*\tdocs^{}'.format(docs)
out += '%spamdocs^{}'.format(typecnt['1'])
out += '%hamdocs^{}'.format(typecnt['0'])
out += '%findwords_spam^{}'.format(findwords_spam)
out += '%findwords_ham^{}'.format(findwords_ham)
print out

Overwriting reducer.py


In [272]:
%%writefile reducer2.py
#!/usr/bin/env python
import sys, math 
''' Make a second pass through the data
    Read in lines from Mapper 
    (i.e. 'cat' from previous output) 
'''
docs = 0
spamdocs = 0
hamdocs = 0
totalwords_spam = 0
totalwords_ham = 0
prior_spam = 0
prior_ham = 0
spam_prob = 0
ham_prob = 0

findwords_spam = {}
findwords_ham = {}
doccnts = {}

prev_cid = None 

for line in sys.stdin:
    line = line.strip()
    components = line.split('\t')
    cid = components[0]
    if cid == '*':
        d,s,h,fs,fh = components[1].split('%')
        docs = int(d.split('^')[1])
        spamdocs = int(s.split('^')[1])
        hamdocs = int(h.split('^')[1])
        
        findwords_spam = { x.split(':')[0]:int(x.split(':')[1]) \
                        for x in fs.split('^')[1].split('~') }
        findwords_ham = { x.split(':')[0]:int(x.split(':')[1])  \
                        for x in fh.split('^')[1].split('~') }
        
        totalwords_spam = sum(findwords_spam[w] for w in findwords_spam)
        totalwords_ham = sum(findwords_ham[w] for w in findwords_ham)
        
        prior_spam = (spamdocs*1.0) / docs
        prior_ham = (hamdocs*1.0) / docs
    else:
        cid, word, word_cnt, spam = components
        if prev_cid!=cid and prev_cid is not None:
            spam_prob = math.log(prior_spam)
            ham_prob = math.log(prior_ham)
            
            for w in doccnts:
                spam_prob += math.log(findwords_spam[w]*1.0 / totalwords_spam)*math.log(doccnts[w])
                ham_prob += math.log(findwords_ham[w]*1.0 / totalwords_ham)*math.log(doccnts[w])
            
            doccnts = {}
            
            if spam_prob>ham_prob: 
                print '{}\t{}\t{}\t{}\t{}'.format(cid,1,spam,spam_prob,ham_prob)
            else:
                print '{}\t{}\t{}\t{}\t{}'.format(cid,0,spam,spam_prob,ham_prob)
        else:        
            if word not in doccnts:
                doccnts[word] = int(word_cnt)
            else:
                doccnts[word] += int(word_cnt)
        prev_cid = cid 

Overwriting reducer2.py


In [150]:
!chmod +x mapper.py && chmod +x reducer.py && chmod +x reducer2.py

In [None]:
!rm -Rf ./output && rm -Rf ./output2
!hadoop jar /usr/local/Cellar/hadoop/2.7.0/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar -input ./enronemail_1h.txt -mapper ./mapper.py -reducer ./reducer.py -output ./output 
!hadoop jar /usr/local/Cellar/hadoop/2.7.0/libexec/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar -input ./output -mapper /bin/cat -reducer reducer2.py -output ./output2 

In [271]:
cat ./output2/* 


K0001 1999 12 10 kaminski	0	0
K0001 2000 01 17 beck	0	0
K0001 2000 06 06 lokay	0	0
K0001 2001 02 07 kitchen	0	0
K0001 2001 04 02 williams	0	0
K0002 1999 12 13 farmer	0	0
K0002 2001 02 07 kitchen	0	0
K0002 2001 05 25 SA_and_HP	0	1
K0002 2003 12 18 GP	0	1
K0002 2004 08 01 BG	0	1
K0003 1999 12 10 kaminski	0	0
K0003 1999 12 14 farmer	0	0
K0003 2000 01 17 beck	0	0
K0003 2001 02 08 kitchen	0	0
K0003 2003 12 18 GP	0	1
K0003 2004 08 01 BG	0	1
K0004 1999 12 10 kaminski	0	0
K0004 1999 12 14 farmer	0	0
K0004 2001 04 02 williams	0	0
K0004 2001 06 12 SA_and_HP	0	1
K0004 2004 08 01 BG	0	1
K0005 1999 12 12 kaminski	0	0
K0005 1999 12 14 farmer	0	0
K0005 2000 06 06 lokay	0	0
K0005 2001 02 08 kitchen	0	0
K0005 2001 06 23 SA_and_HP	0	1
K0005 2003 12 18 GP	0	1
K0006 1999 12 13 kaminski	0	0
K0006 2001 02 08 kitchen	0	0
K0006 2001 04 03 williams	0	0
K0006 2001 06 25 SA_and_HP	0	1
K0006 2003 12 18 GP	0	1
K0006 2004 08 01 BG	0	1
K0007 1999 12 13 kaminski	0	0
K0007 1999 12 14 