# HW 2 - Naive Bayes in Hadoop MR
__`MIDS w261: Machine Learning at Scale | UC Berkeley School of Information | Fall 2018`__

In the live sessions for week 2 and week 3 you got some practice designing and debugging Hadoop Streaming jobs. In this homework we'll use Hadoop MapReduce to implement your first parallelized machine learning algorithm: Naive Bayes. As you develop your implementation you'll test it on a small dataset that matches the 'Chinese Example' in the _Manning, Raghavan and Shutze_ reading for Week 2. For the main task in this assignment you'll be working with a small subset of the Enron Spam/Ham Corpus. By the end of this assignment you should be able to:
* __... describe__ the Naive Bayes algorithm including both training and inference.
* __... perform__ EDA on a corpus using Hadoop MR.
* __... implement__ parallelized Naive Bayes.
* __... constrast__ partial, unordered and total order sort and their implementations in Hadoop Streaming.
* __... explain__ how smoothing affects the bias and variance of a Multinomial Naive Bayes model.

As always, your work will be graded both on the correctness of your output and on the clarity and design of your code. __Please refer to the `README` for homework submission instructions.__ 

## Notebook Setup
Before starting, run the following cells to confirm your setup.

In [1]:
# imports
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
%reload_ext autoreload
%autoreload 2

In [2]:
# global vars (paths) - ADJUST AS NEEDED
JAR_FILE = "/usr/lib/hadoop-mapreduce/hadoop-streaming.jar"
HDFS_DIR = "/user/root/HW2"
HOME_DIR = "/media/notebooks/Assignments/HW2" # FILL IN HERE eg. /media/notebooks/Assignments/HW2

In [3]:
# save path for use in Hadoop jobs (-cmdenv PATH={PATH})
from os import environ
PATH  = environ['PATH']

In [4]:
# data path
ENRON = "data/enronemail_1h.txt"

# Question 1: Hadoop MapReduce Key Takeaways.  

This assignment will be the only one in which you use Hadoop Streaming to implement a distributed algorithm. The key reason we continue to teach Hadoop streaming is because of the way it forces the programmer to think carefully about what is happening under the hood when you parallelize a calculation. This question will briefly highlight some of the most important concepts that you need to understand about Hadoop Streaming and MapReduce before we move on to Spark next week.   

### Q1 Tasks:

* __a) short response:__ What "programming paradigm" is Hadoop MapReduce based on? What are the main ideas of this programming paradigm and how does MapReduce exemplify these ideas?

* __b) short response:__ What is the Hadoop Shuffle? When does it happen? Why is it potentially costly? Describe one specific thing we can we do to mitigate the cost associated with this stage of our Hadoop Streaming jobs.

* __c) short response:__ In Hadoop Streaming why do the input and output record format of a combiner script have to be the same? [__`HINT`__ _what level of combining does the framework guarantee? what is the relationship between the record format your mapper emits and the format your reducer expects to receive?_]

* __d) short response:__ To what extent can you control the level of parallelization of your Hadoop Streaming jobs? Please be specific.

* __e) short response:__ What change in the kind of computing resources available prompted the creation of parallel computation frameworks like Hadoop? 

### Q1 Student Answers:

> __a)__ Hadoop MapReduce is based on a programming paradigm that process large data sets with a parallel, distributed algorithm on a cluster. This programming paradigm consists of 2 stages - a first map stage which applies transformation or filtering to elements in a data set followed by a reduce stage in which task results are aggregated or folded based for similar elements.  The key idea exemplified by MapReduce is that of distributed parallel processing, in which the input data is partitioned and processed simultaneously within each partition during the map phase, and then results are aggregated in the reduce phase. 

> __b)__ The Hadoop shuffle is a process in which the data from the mapper is sorted and transfered to the reducer, where a shuffle is performed to ensure that key-value pairs with the same key end up in the same reducer. This process consists of 3 steps, the first being a partition step in which each output stream from the mapper is partitioned into separate files based on key, consisting of one file for each reducer. The next step consists of sorting records (key-value pairs) in the same partition, and the final step consists of the reducing within each partition where records of the same key are combined. On the reducer side, multiple files intended for each reducer are merge-sorted to perform aggregation by key more effectively. 

> This step is costly because it leads to performance degradation as we scale up with more data.  The sorting of the records, the communication of records between map and reduce steps, and the synchronization of the steps in the shuffle will all lead to increase in performance time as the data set becomes larger, and will quickly diminish the gains that come from the divide and conquer strategy of the MapReduce framework.  

> The one specific thing we can do to mitigate cost is to perform local intermediate aggregation, where values for the same key are combined either on the mapper side or prior to transfer to the reducer.  When a combiner is used as an intermediate aggregation step, we decrease the number of records that are submitted to the shuffle and reduce the time it takes for communication, thereby decreasing the cost.  

> __c)__ The input and output record format of a combiner has to be the same because the combiner may run several times during a streaming job or it may never run.  If the combiner is a separate script, meaning that it is not part of the mapper or reducer script, the combiner's input is the output from the mapper script, and the combiner's output is then the input for the reducer script.  In the event that a combiner does not run, the mapper output format and reducer input format must match exactly.  Therefore, a proper combiner's input format must match the mapper output format and the combiner's output much match the reducer's input format, which are the same.  

> Another way to consider this is that if the input and output format of the combiner are different, then the combiner becomes responsible in transforming the data format between mapper and reducer.  However, if the combiner never runs, and the mapper output format does not match the reducer input format, the the job cannot finish. Therefore, the combiner input and output record format must match, so that they can match the mapper output and reducer input formats respectively.

> __d)__ We can specify the level of parallelization in a Hadoop streaming job by specifying the number of mappers, the method of partitioning, and the number of reducers.  The number of mappers is used to specify the number of partitions in which the task is performed in parallel across the data. The number of mappers is driven by the DFS blocks in the input file so we can adjust the number of mappers by adjusting the dfs block size, however we do not have guaranteed control over the number of mappers. The method of partitioning can be custom and specifies how data is sorted across the reducers.  The number of reducers also specify a level of parallelization in which aggregation of values is performed for the same keys. The optimal level of parallelism is achieved by maps that take less than 1 minute to execute, which comes to a recommended number of 10-100 maps/node depending on the size of the task. The ideal number of reducers is the number closest to a multiple of the number of blocks and a task time of between 5-15 minutes, while creating the fewest files possible. 

> __e)__ The change in the middle of the 2000s that prompted the creation of parallel computing frameworks came from the semiconducter industry and the growth of large scale data problems.  The semiconductor industry ran out of opportunities to improve single-processor machines, but were able to find ways to produce existing processing resources at increasingly reduced costs. In order to tackle large data problems that cannot be processed by a single core, computer scientists and organizations shifted to using multiple cores for computing, where they can take advantage of investing in multiple cheaper processors rather than one very expensive high performing processor. The uses of multiple cores naturally lead to the need for divide and conquer processing strategies and therefore led to the advent of parallel computing frameworks.

# Question 2: MapReduce Design Patterns.  

In the last two live sessions and in your readings from Lin & Dyer you encountered a number of techniques for manipulating the logistics of a MapReduce implementation to ensure that the right information is available at the right time and location. In this question we'll review a few of the key techniques you learned.   

### Q2 Tasks:

* __a) short response:__ What are counters (in the context of Hadoop Streaming)? How are they useful? What kinds of counters does Hadoop provide for you? How do you create your own custom counter?

* __b) short response:__ What are composite keys? How are they useful? How are they related to the idea of custom partitioning?

* __c) short response:__ What is the order inversion pattern? What problem does it help solve? How do we implement it? 

### Q2 Student Answers:

> __a)__ Counters are objects, or variables that contain numbers, in the Hadoop streaming process that allow you to keep track of the progress in the map and reduce stages by counting the number of global events.  Hadoop provides for us counters such as the number and size of files read and written, the number of mapper and reducer jobs and the time they take, the number of records inputted and outputted by reducers, mappers and combiners, and the number and type of shuffle errors if they exist. These counters are a useful way for us to gauge the efficiency of our MapReduce streaming job and whether or not changes we make, such as writing a better combiner function, can improve the efficiency or not. 

> We can create our own custom counter by inserting this line - `sys.stderr.write("reporter:counter:counter-group,counter-name,#\n")` into the mapper, reducer, or combiner scripts with any accompanying logic.  The "counter-name" is where we enter a variable name for the custom counter, and the "counter-group" is where we enter a custom name for the the group under which the counter appears.  The # refers to the count that is incremented each time the function that encapsulates this line is ran.  

> __b)__ A composite key is a key that consists of two or more fields, or the combination of two or more values that together form one unique key.  We actually use a composite key in #4 of this assignment, were the key is the combination of the the word and the value 0 or 1 to indicate spam or ham respectively.  Composite keys are useful in more complex operations where we need to group key-value pairs and then sort them in a meaningful way within these groups.  For instance, the composite key can consiste of a partition key and a sort key where the partition key is used to specify which partition or reducer key value pairs go to and the sort key is then used to determine how values are aggregated by the same key and outputed by the reducer. In question #4 for instance, the composite key is essential for us to perform custom partitioning using the spam/ham indicator part of the composite key, and we then use the sort key or word for the actual reducer function.   

> __c)__ The order inversion pattern allows us to solve the problem of computing relative frequencies of words in the MapReduce framework without having to store the total dictionary of words in memory in order for us to access a total count. Following the example from the async, one variation of this type of problem is one in which we calculate the relative frequencies of co-occuring terms in a corpus.  In order to do this, we need to pass to the reducer a series of key-value pairs where the keys are composite keys with a primary key and a secondary key and the values are the intermediate counts for each pair of words.  The objective is then to calculate the frequency of the word that is the secondary key given the total count of the word that is the primary key.  If we attempt to store all primary key words in a dictionary to access their total counts, this could lead to a memory storage problem for larger corpuses.  Instead we can implement order inversion pattern.  First, specific to this example, we would want to specify custom partitioning such that key value pairs for the same primary key are passed to the same partition and therefore the same reducer.  Next to implement order inversion pattern, we create a key of the form (primary-key-word, \*), such that the total count of the primary key is aggregated and comes out first in the reducer stream.  By using an \*, we are essentially specifiying that we want the total count aggregation of all pairs with the same primary key regardless of secondary key, and that this value should appear before the individual totals of all other composite key pairs using the same primary key.  Because the reducer receives this marginal total first, it can then calculate the relative frequencies of each composite key, or word pair, using this total without having to store the primary keys to memory. 

# Question 3: Understanding Total Order Sort

The key challenge in distributed computing is to break a problem into a set of sub-problems that can be performed without communicating with each other. Ideally, we should be able to define an arbirtary number of splits and still get the right result, but that is not always possible. Parallelization becomes particularly challenging when we need to make comparisons between records, for example when sorting. Total Order Sort allows us to order large datasets in a way that enables efficient retrieval of results. Before beginning this assignment, make sure you have read and understand the [Total Order Sort Notebook](https://github.com/UCB-w261/main/tree/master/HelpfulResources/TotalSortGuide/_total-sort-guide-spark2.01-JAN27-2017.ipynb). You can skip the first two MRJob sections, but the rest of section III and all of section IV are **very** important (and apply to Hadoop Streaming) so make sure to read them closely. Feel free to read the Spark sections as well but you won't be responsible for that material until later in the course. To verify your understanding, answer the following questions.

### Q3 Tasks:

* __a) short response:__ What is the difference between a partial sort, an unordered total sort, and a total order sort? From the programmer's perspective, what does total order sort allow us to do that we can't with unordered total? Why is this important with large datasets?

* __b) short response:__ Which phase of a MapReduce job is leveraged to implement Total Order Sort? Which default behaviors must be changed. Why must they be changed?

* __c) short response:__ Describe in words how to configure a Hadoop Streaming job for the custom sorting and partitioning that is required for Total Order Sort.  

* __d) short response:__ Explain why we need to use an inverse hash code function.

* __e) short response:__ Where does this function need to be located so that a Total Order Sort can be performed?

### Q3 Student Answers:

> __a)__ Parital Sort is the default behavior of the Hadoop MapReduce output.  The default reducer output is mulple partition files where the key-value pairs are sorted within each patition based on key.  Unordered total sort is different is different in that there is a total ordering of keys across all partitions, where keys across different partitions are sorted relative to each other versus partial sort in which only keys in the same partition relative to each other.  However, in unordered total sort, because the paritions are not ordered themselves, they are not able to generate total order of keys.  Total Order sort is then different from unordered total sort in that the partitions themselves are stacked according to some order, such that it produces a total sorting of all of the keys.  

> Total order sort allows us to have a complete ordering of key-value pairs within one map reduce job, which cannot be accomplished if we opt for unordered total sort.  Using unordered total sort, in order for us to achieve the same complete ordering of key-value pairs, we would need to include an additional post processing step to restack the partitions. This post processing step is increasingly more costly as the data sets becomes larger and the number of paritions becomes greater.

> __b)__ The map phase and shuffle phase of the MapReduce job is leveraged to implement Total Order Sort. Within the map phase, the partition key and secondary sort key are defined such that mapper outputs contain both keys as separate fields. To accomplish Total Order Sort, three defalt behaviors must be changed.  The first is to define more than one field to be used as a composite key, since a partition key and sort key will both be needed.  This requires inserting a line to the Hadoop streaming job where we set the necessary number of fields used as the key via the stream.num.map.output.key.fields parameter to more than 1, so that the job knows to deviate from the default behavior of using only the first field from the mapper output as the key.  The second default behavior is how the mapper output records will be partitioned into reducers.  The default behavior of a Hadoop MapReduce only ensures that records from the mapper with the same key end up in the same reducer, but only partial sorting is acheived in the case of multiple reducer outputs.  This means that the default behavior cannot ensure that the first partition results are in order when compared to the second partition results, for instance.    However, Total order sort relies heavily on the correct paritioning of reducer records so that the output files for each reducer partition are naturally stacked to achieved ordering of all records across multiple partitions in order.  To acheive custom partitioning that deviates from the default behavior, we insert a line in the hadoop streaming job where we specificy the parameter mapreduce.partition.keypartitioner.options to be the field that is the parition key from the mapper. Lastly the sorting of records within each reducer partition output will need to be specfied as well.  The default behavior of record sorting in reducer partitions is lexicographically by the first field, assumed to be the key.  However, since Total Order Sort requires a composite key, we also need to specify the sort key for in-partition sorting via the parameter mapreduce.partition.keycomparator.options.  We can also specify in this parameter if we wish to have additional deviations in ordering, such as reverse-alphabetical or descending order of number.  

> __c)__ Please refer to my answer in part (b) for additional details on the response here. Total Order Sort in Hadoop Streaming is accomplished by first transforming the data through the mapper function definition such that record outputs from the mapper consists of key-value pairs with a composite key.  The minimum composite key for the mapper output must consist of a parition key and a sorting key.  We then change the default behavior of the Hadoop shuffle through multiple lines inserted into our streaming job where we specify parameters for the composite key, the parition of mapper outputs to reducers and the order in which our output records are displayed within a reducer.  First by specifying a key consisting of more than one field, we allow the Hadoop MapReduce job to leverage both a partition key and a sort key instead of just defaulting to the first field from a mapper output record as the single key.  Next we change partitioner parameter within the job by specifying the exact field that is the partition key that we wish to use.  To ensure that records are sent to partitions in a ordered way, meaning that records sent to the first partition should appear in order before records that are sent to the second parition, and all subsequent partitions have records that are ordered from previous paritions, we use an inverse hash function to specify the parition key within the mapper.  If done correctly, this then ensures that outputs from the reducer appear in partitions in such a way that all of the records in each partition are ordered ahead of records in subsequent partitions.  Lastly, we insert a line into the job to specify the key comparator field, where we specify a secondary sort key and any deviations from the default sorting of records within each partition.  Since the default behavior sorts records in a partition by the first field, but the first field may not be the sort key, we use the key comparator field to specify how records within each partition should be sorted. We can also input sorting patterns that are in reverse order from the default behavior if necessary.  The end resut, if all steps are done correctly, should be a series of reducer outputs where records are sorted in order within each parition and across partitions, and the paritions themselves also appear to be in order.  

> __d)__ By default, Hadoop has a HashParitioner class that it uses to compute the partition index for keys produced by mappers.  Depending on the number of reducers specified in the streaming job, the HashPartitioner class has a getPartition method that derives the partition index by hash modulo the number of reducers.  To achieve total order sort, we need to specify a partition key, such that mapper outputs are custom partitioned into each reducer.  The inverse hash function retrieves the getPartition resulting index, and generates a parition key for the mapper outputs. In the examples we have seen, a common form for the partition key is a letter, which can be used for default alphabetical sorting.  The inverse hash function essentially maps the parition index, an integer, to a letter.  Once mapper outputs have a proper partition key, and the custom partitioning by the partition key is specified within streaming job, mapper outputs are then sent to the reducer partitions in such a way that records sent to the first partition, index 0, are all ordered ahead of all records sent to the second parition, index 1, which are ahead of records sent to the third partition, index 2, and so on.  Once sorting is done of records within each partition, the reducer output files acheives total order sort, since all records within a partition are in order, and all records within each partition are ordered properly compared to subsequent partitions. 

> __e)__ The inverse hash function needs to be located in the mapper, so that all mapper output records have a parition key that is related to the parition index.  Only with a partition key that is outputted in mapper records are we then able to specify custom partitioning of mapper outputs to reducer partitions.  

# About the Data
For the main task in this portion of the homework you will train a classifier to determine whether an email represents spam or not. You will train your Naive Bayes model on a 100 record subset of the Enron Spam/Ham corpus available in the HW2 data directory (__`HW2/data/enronemail_1h.txt`__).

__Source:__   
The original data included about 93,000 emails which were made public after the company's collapse. There have been a number raw and preprocessed versions of this corpus (including those available [here](http://www.aueb.gr/users/ion/data/enron-spam/index.html) and [here](http://www.aueb.gr/users/ion/publications.html)). The subset we will use is limited to emails from 6 Enron employees and a number of spam sources. It is part of [this data set](http://www.aueb.gr/users/ion/data/enron-spam/) which was created by researchers working on personlized Bayesian spam filters. Their original publication is [available here](http://www.aueb.gr/users/ion/docs/ceas2006_paper.pdf). __`IMPORTANT!`__ _For this homework please limit your analysis to the 100 email subset which we provide. No need to download or run your analysis on any of the original datasets, those links are merely provided as context._

__Preprocessing:__  
For their work, Metsis et al. (the authors) appeared to have pre-processed the data, not only collapsing all text to lower-case, but additionally separating "words" by spaces, where "words" unfortunately include punctuation. As a concrete example, the sentence:  
>  `Hey Jon, I hope you don't get lost out there this weekend!`  

... would have been reduced by Metsis et al. to the form:  
> `hey jon , i hope you don ' t get lost out there this weekend !` 

... so we have reverted the data back toward its original state, removing spaces so that our sample sentence would now look like:
> `hey jon, i hope you don't get lost out there this weekend!`  

Thus we have at least preserved contractions and other higher-order lexical forms. However, one must be aware that this reversion is not complete, and that some object (specifically web sites) will be ill-formatted, and that all text is still lower-cased.


__Format:__   
All messages are collated to a tab-delimited format:  

>    `ID \t SPAM \t SUBJECT \t CONTENT \n`  

where:  
>    `ID = string; unique message identifier`  
    `SPAM = binary; with 1 indicating a spam message`  
    `SUBJECT = string; title of the message`  
    `CONTENT = string; content of the message`   
    
Note that either of `SUBJECT` or `CONTENT` may be "NA", and that all tab (\t) and newline (\n) characters have been removed from both of the `SUBJECT` and `CONTENT` columns.  

In [5]:
# take a look at the first 100 characters of the first 5 records (RUN THIS CELL AS IS)
!head -n 5 {ENRON} | cut -c-100

0001.1999-12-10.farmer	0	 christmas tree farm pictures	NA
0001.1999-12-10.kaminski	0	 re: rankings	 thank you.
0001.2000-01-17.beck	0	 leadership development pilot	" sally:  what timing, ask and you shall receiv
0001.2000-06-06.lokay	0	" key dates and impact of upcoming sap implementation over the next few week
0001.2001-02-07.kitchen	0	 key hr issues going forward	 a) year end reviews-report needs generating 


In [6]:
# see how many messages/lines are in the file 
#(this number may be off by 1 if the last line doesn't end with a newline)
!wc -l {ENRON}

100 data/enronemail_1h.txt


In [7]:
# make the HDFS directory if it doesn't already exist
!hdfs dfs -mkdir {HDFS_DIR}

In [8]:
# load the data into HDFS (RUN THIS CELL AS IS)
!hdfs dfs -copyFromLocal {ENRON} {HDFS_DIR}/enron.txt

# Question 4:  Enron Ham/Spam EDA.
Before building our classifier, lets get aquainted with our data. In particular, we're interested in which words occur more in spam emails than in real emails. In this question you'll implement two Hadoop MapReduce jobs to count and sort word occurrences by document class. You'll also learn about two new Hadoop streaming parameters that will allow you to control how the records output of your mappers are partitioned for reducing on separate nodes. 

__`IMPORTANT NOTE:`__ For this question and all subsequent items, you should include both the subject and the body of the email in your analysis (i.e. concatetate them to get the 'text' of the document).

### Q4 Tasks:
* __a) code:__ Complete the missing components of the code in __`EnronEDA/mapper.py`__ and __`EnronEDA/reducer.py`__ to create a Hadoop MapReduce job that counts how many times each word in the corpus occurs in an email for each class. Pay close attention to the data format specified in the docstrings of these scripts _-- there are a number of ways to accomplish this task, we've chosen this format to help illustrate a technique in `part e`_. Run the provided unit tests to confirm that your code works as expected then run the provided Hadoop streaming command to apply your analysis to the Enron data.


* __b) code + short response:__ How many times does the word "__assistance__" occur in each class? (`HINT:` Use a `grep` command to read from the results file you generated in '`a`' and then report the answer in the space provided.)


* __c) short response:__ Would it have been possible to add some sorting parameters to the Hadoop streaming command that would cause our `part a` results to be sorted by count? Briefly explain why or why not.


* __d) code + short response:__ Write a second Hadoop MapReduce job to sort the output of `part a` first by class and then by count. Run your job and save the results to a local file. Then describe in words how you would go about printing the top 10 words in each class given this sorted output. (`HINT 1:` _remember that you can simply pass the `part a` output directory to the input field of this job; `HINT 2:` since this task is just reodering the records from `part a` we don't need to write a mapper or reducer, just use `/bin/cat` for both_)


* __ e) code:__ A more efficient alternative to '`grep`-ing' for the top 10 words in each class would be to use the Hadoop framework to separate records from each class into its own partition so that we can just read the top lines in each. Edit your job from ` part d` to specify 2 reduce tasks and to tell Hadoop to partition based on the second field (which indicates spam/ham in our data). Your code should maintain the secondary sort -- that is each partition should list words from most to least frequent.

### Q4 Student Answers:
> __b)__ Based on the results below, the word assistance appears twice for the ham class and 8 times in the spam class.  

> __c)__ It is not possible to sort the part a results by count, or at least not by the total count of each word within each class.  The reason for this is because we can only specify sorting or records in the reducer output by values we get from the mapper.  The total count of each word is something that the mapper cannot output, as these are outputs of the reducer.  Since the total counts appear after the reducer has run, we will not be have to use them as a sort key for the reducer outputs.   

> __d)__ In order to find the top occurring words within each class, we actually need a way to compare counts of words within each class.  This is most easily acheived by ensuring that mapper output records with the same class value are sent to the same reducer partition. In this sense, the class value of 0 or 1 can be used as the partition key for custom partitioning.  We can then print the top 10 occurring words within each class by sorting the reducer output records by within each class by count, which can be used as a sort key since we are using the output from part (a) as the input for the job in this part.  

> Since reducer outputs are now partitioned by class, and sorted within each partition by count, we print the top 10 words for each class using the following way. For each partition, we can display the top 10 words by using a unix head function on the partition file, which will naturally display the top 10 rows of the partition file.  Since the records within the partition are already sorted by count, this display should output the top 10 most freqent words.  We can handle the "for each partition" part of this logic using a loop that loops through partition indexes.  

In [9]:
# part a - do your work in the provided scripts then RUN THIS CELL AS IS
!chmod a+x EnronEDA/mapper.py
!chmod a+x EnronEDA/reducer.py

In [10]:
# part a - unit test EnronEDA/mapper.py (RUN THIS CELL AS IS)
!echo -e "d1	1	title	body\nd2	0	title	body" | EnronEDA/mapper.py

title	1	1
body	1	1
title	0	1
body	0	1


In [11]:
# part a - unit test EnronEDA/reducer.py (RUN THIS CELL AS IS)
!echo -e "one	1	1\none	0	1\none	0	1\ntwo	0	1" | EnronEDA/reducer.py

one	0	2
one	1	1
two	0	1


In [12]:
# part a - clear output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/eda-output

rm: `/user/root/HW2/eda-output': No such file or directory


In [13]:
# part a - Hadoop streaming job (RUN THIS CELL AS IS)
!hadoop jar {JAR_FILE} \
  -files EnronEDA/reducer.py,EnronEDA/mapper.py \
  -mapper mapper.py \
  -reducer reducer.py \
  -input {HDFS_DIR}/enron.txt \
  -output {HDFS_DIR}/eda-output \
  -numReduceTasks 2 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob3626335361420952488.jar tmpDir=null
19/06/01 23:07:23 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/01 23:07:23 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/01 23:07:25 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/01 23:07:25 INFO mapreduce.JobSubmitter: number of splits:2
19/06/01 23:07:26 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1559430286868_0001
19/06/01 23:07:26 INFO impl.YarnClientImpl: Submitted application application_1559430286868_0001
19/06/01 23:07:27 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1559430286868_0001/
19/06/01 23:07:27 INFO mapreduce.Job: Running job: job_1559430286868_0001
19/06/01 23:07:41 INFO mapreduce.Job: Job job_1559430286868_0001 running in uber mode : false
19/06/01 23:07:41 INFO mapreduce.Job:  map 0% reduce 0%
19/06

In [14]:
# part a - retrieve results from HDFS & copy them into a local file (RUN THIS CELL AS IS)
!hdfs dfs -cat {HDFS_DIR}/eda-output/part-0000* > EnronEDA/results.txt

In [17]:
# part b - write your grep command here
!grep assistance EnronEDA/results.txt

assistance	0	2
assistance	1	8


In [18]:
# part d/e - clear the output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/eda-sort-output

rm: `/user/root/HW2/eda-sort-output': No such file or directory


In [19]:
# part d/e - write your Hadoop streaming job here

!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=3 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k2,2n -k3,3nr" \
  -D mapreduce.partition.keypartitioner.options="-k2,2"  \
  -mapper /bin/cat \
  -reducer /bin/cat \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  -input {HDFS_DIR}/eda-output \
  -output {HDFS_DIR}/eda-sort-output \
  -numReduceTasks 2 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob8209606843618682925.jar tmpDir=null
19/06/01 23:10:02 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/01 23:10:02 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/01 23:10:04 INFO mapred.FileInputFormat: Total input paths to process : 2
19/06/01 23:10:05 INFO mapreduce.JobSubmitter: number of splits:2
19/06/01 23:10:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1559430286868_0002
19/06/01 23:10:06 INFO impl.YarnClientImpl: Submitted application application_1559430286868_0002
19/06/01 23:10:06 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1559430286868_0002/
19/06/01 23:10:06 INFO mapreduce.Job: Running job: job_1559430286868_0002
19/06/01 23:10:23 INFO mapreduce.Job: Job job_1559430286868_0002 running in uber mode : false
19/06/01 23:10:23 INFO mapreduce.Job:  map 0% reduce 0%
19/06

In [20]:
# part e - view the top 10 records from each partition (RUN THIS CELL AS IS)
for idx in range(2):
    print(f"\n===== part-0000{idx}=====\n")
    !hdfs dfs -cat {HDFS_DIR}/eda-sort-output/part-0000{idx} | head


===== part-00000=====

the	0	549	
to	0	398	
ect	0	382	
and	0	278	
of	0	230	
hou	0	206	
a	0	196	
in	0	182	
for	0	170	
on	0	135	
cat: Unable to write to output stream.

===== part-00001=====

the	1	698	
to	1	566	
and	1	392	
your	1	357	
a	1	347	
you	1	345	
of	1	336	
in	1	236	
for	1	204	
com	1	153	


__Expected output:__
<table>
<th>part-00000:</th>
<th>part-00001:</th>
<tr><td><pre>
the	0	549	
to	0	398	
ect	0	382	
and	0	278	
of	0	230	
hou	0	206	
a	0	196	
in	0	182	
for	0	170	
on	0	135
</pre></td>
<td><pre>
the	1	698	
to	1	566	
and	1	392	
your	1	357	
a	1	347	
you	1	345	
of	1	336	
in	1	236	
for	1	204	
com	1	153
</pre></td></tr>
</table>

# Question 5: Counters and Combiners.
Tuning the number of mappers & reducers is helpful to optimize very large distributed computations. Doing so successfully requires a thorough understanding of the data size at each stage of the job. As you learned in the week3 live session, counters are an invaluable resource for understanding this kind of detail. In this question, we will take the EDA performed in Question 4 as an opportunity to illustrate some related concepts.

### Q5 Tasks:
* __a) short response:__ Read the Hadoop output from your job in Question 4a to report how many records are emitted by the mappers and how many records are received be the reducers. In the context of word counting what does this number represent practically?

* __b) code:__ Note that we wrote the reducer in question 4a such that the input and output record format is identical. This makes it easy to use the same reducer script as a combiner. In the space provided below, write the Hadoop Streaming command to re-run your job from question 4a with this combining added.

* __c) short response__: Report the number of records emitted by your mappers in part b and the number of records received by your reducers. Compare your results here to what you saw in part a. Explain.

* __d) short response__: Describe a scenario where using a combiner would _NOT_ improve the efficiency of the shuffle stage. Explain. [__`BONUS:`__ how does increasing the number of mappers affect the usefulness of a combiner?]

### Q5 Student Answers:
> __a)__ Number emitted by mappers and received by reducers is the same number - 31490, Since my mapper outputs a 1 for every word in the corpus, this number represents the total number of words, defined by only consecutive letters, in the entire corpus of 100 emails, where the complete text consists of the body and subject of each email. 

> __c)__ Number emitted by mappers is still 31490, but the number received by reducers is 7648, which is also the number of records outputted by the combiners. This means that mapper performs the same way here as it did in 4(a), but fewer records are shuffled into the reducers because the combiner did run.  The combiner, since it has the same functionality that was defined in the reducer, has performed intermediate aggregation of key-value pairs with the same key emitted by the mappers. This then reduces the number of records that will need to be sent to the reducers, since intermediate aggregation has already combined some of the mapper outputs.  

> __d)__ A scenario in which the combiner would not improve the efficiency of the shuffle stage is if the mapper was written in such a way that intermediate aggregation was already performed at the mapper phase and the records emitted by the mapper already consisted of key-value pairs with unique keys.  If the mapper phase already emits records with unique keys, then a combiner phase that performs additional intermediate aggregation would do very little to improve the efficiency of the shuffle stage, since the number of records outputted from the mappers and the number of records inputted to the reducers are the same, and a combiner will have done nothing to reduce the records inputed to reducers from mappers.

In [21]:
# part b - clear output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/eda-output

Deleted /user/root/HW2/eda-output


In [22]:
# part b - write your Hadoop streaming job here
# part a - Hadoop streaming job (RUN THIS CELL AS IS)
!hadoop jar {JAR_FILE} \
  -files EnronEDA/reducer.py,EnronEDA/mapper.py \
  -mapper mapper.py \
  -reducer reducer.py \
  -combiner reducer.py \
  -input {HDFS_DIR}/enron.txt \
  -output {HDFS_DIR}/eda-output \
  -numReduceTasks 2 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob8562657381361214069.jar tmpDir=null
19/06/01 23:15:12 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/01 23:15:12 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/01 23:15:14 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/01 23:15:14 INFO mapreduce.JobSubmitter: number of splits:2
19/06/01 23:15:15 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1559430286868_0003
19/06/01 23:15:15 INFO impl.YarnClientImpl: Submitted application application_1559430286868_0003
19/06/01 23:15:15 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1559430286868_0003/
19/06/01 23:15:15 INFO mapreduce.Job: Running job: job_1559430286868_0003
19/06/01 23:15:28 INFO mapreduce.Job: Job job_1559430286868_0003 running in uber mode : false
19/06/01 23:15:28 INFO mapreduce.Job:  map 0% reduce 0%
19/06

# Question 6: Document Classification Task Overview.
The week 2 assigned reading from Chapter 13 of _Introduction to Information Retrieval_ by Manning, Raghavan and Schutze provides a thorough introduction to the document classification task and the math behind Naive Bayes. In this question we'll use the example from Table 13.1 (reproduced below) to 'train' an unsmoothed Multinomial Naive Bayes model and classify a test document by hand.

<table>
<th>DocID</th>
<th>Class</th>
<th>Subject</th>
<th>Body</th>
<tr><td>Doc1</td><td>1</td><td></td><td>Chinese Beijing Chinese</td></tr>
<tr><td>Doc2</td><td>1</td><td></td><td>Chinese Chinese Shanghai</td></tr>
<tr><td>Doc3</td><td>1</td><td></td><td>Chinese Macao</td></tr>
<tr><td>Doc4</td><td>0</td><td></td><td>Tokyo Japan Chinese</td></tr>
</table>

### Q6 Tasks:
* __a) short response:__ Equation 13.3 in Manning, Raghavan and Shutze shows how a Multinomial Naive Bayes model classifies a document. It predicts the class, $c$, for which the estimated conditional probability of the class given the document's contents,  $\hat{P}(c|d)$, is greatest. In this equation what two pieces of information are required to calculate  $\hat{P}(c|d)$? Your answer should include both mathematical notatation and verbal explanation.


* __b) short response:__ The Enron data includes two classes of documents: `spam` and `ham` (they're actually labeled `1` and `0`). In plain English, explain what  $\hat{P}(c)$ and   $\hat{P}(t_{k} | c)$ mean in the context of this data. How will we would estimate these values from a training corpus? How many passes over the data would we need to make to retrieve this information for all classes and all words?


* __c) hand calculations:__ Above we've reproduced the document classification example from the textbook (we added an empty subject field to mimic the Enron data format). Remember that the classes in this "Chinese Example" are `1` (about China) and `0` (not about China). Calculate the class priors and the conditional probabilities for an __unsmoothed__ Multinomial Naive Bayes model trained on this data. Show the calculations that lead to your result using markdown and $\LaTeX$ in the space provided or by embedding an image of your hand written work. [`NOTE:` _Your results should NOT match those in the text -- they are training a model with +1 smoothing you are training a model without smoothing_]


* __d) hand calculations:__ Use the model you trained to classify the following test document: `Chinese Chinese Chinese Tokyo Japan`. Show the calculations that lead to your result using markdown and   $\LaTeX$ in the space provided or by embedding an image of your hand written work.


* __e) short response:__ Compare the classification you get from this unsmoothed model in `d`/`e` to the results in the textbook's "Example 1" which reflects a model with Laplace plus 1 smoothing. How does smoothing affect our inference?

### Q6 Student Answers:
> __a)__ The information that is needed to calculate $\hat{P}(c|d)$ are $\hat{P}(c)$ and $\hat{P}(t_k|c)$ for all terms $t_k$ occuring in document c.  The probability $\hat{P}(c)$  is called the prior and is denoted as $\hat{P}(c) = \frac{N_c}{N}$, or the relative frequency of documents of class c, calculated by occurrences of documents of class c divided by the total number of documents.  $\hat{P}(t_k|c)$ is the conditional probability of term $t_k$ occurring in a document of class c.  The value $\hat{P}(c|d)$ is then the product of $\hat{P}(c)$ and all of the conditional probabilities $\hat{P}(t_k|c)$  for each word in that document for which we are predicting the class.  This is then denoted by equation 13.2 - $\hat{P}(c|d)\propto\hat{P}(c)\Pi\hat{P}(t_k|c)$

> __b)__ $\hat{P}(c)$ is the relative frequency of each class.  This is calculated by the number of emails of each class divided by the total number of emails, performed for each class.  In order to estimate these values from the training corpus, we will need to count the total number of lines of class 0 and class 1 to be able to calculate this value. $\hat{P}(t_k|c)$ is the conditional probability of a word given a class.  For each word, $t_k$, this is calculated by the total count for the occurence of the composite key ($t_k$, c) divided by the count of total words in class c.  For example, if we would like to find the conditional probability of the word "the" given class spam, this is found by counting all occurences of the word "the" in spam emails and dividing by the total number of words in all spam emails.  

> We can calculate both values by passing through the corpus data only once.  To count the number of emails within each class and calclate $\hat{P}(c)$, we can leverage the structure of code in the mapper which reads each email as a line. In addition to emitting a count for each word, we can accumulate counts for emails by adding to 3 counters for total lines (or emails), spam emails and ham emails. To calculate $\hat{P}(t_k|c)$, we need the counts of each unique ($t_k$, c) pair, which can be caculated by the reducer, provided we specify the composite key in the mapper.  However, to get the actual probability we need totals for words of class ham and spam, which we can also accumulate using 2 more counter variables in the mapper for loop.  When emitting key value pairs from the mapper, we should emit all words and their class and count, but we can also emit additional records that represent each of the 5 counters that are needed for probability calculations, so that they will reach the reducers.  However, to ensure the counters are accessbile by the reducers in advance of the probability calculations within the reducers, we will need to implement order inversion pattern, where keys for the counter values start with a special character so that they appear first.  If we need to do these probability calculations using multiple reducers, we also need to emit partition keys as part of our mapper outputs to ensure that all totals are sent to all reducers.  In summary, this method allows us to calculate these probabilities by passing through the data once. 

> __c)__ Show your calculations here using markdown & $\LaTeX$ or embed them below!

**Priors**

$P(c=1)=3/4$

$P(c=0)=1/4$

**Conditional Probabilities**

$P(chinese|1)=\dfrac{2+2+1}{3+3+2}=5/8$

$P(beijing|1)=\dfrac{1}{3+3+2}=1/8$

$P(shanghai|1)=\dfrac{1}{3+3+2}=1/8$

$P(macao|1)=\dfrac{1}{3+3+2}=1/8$

$P(tokyo|0)=P(japan|0)=P(chinese|0)=1/3$

> __d)__ Show your calculations here using markdown & $\LaTeX$ or embed them below!

$P(0|d)=P(c=0)\prod P(t_{k}|c)$

$=P(c=0)\cdot\left[P(chinese|0)^{3}\cdot P(tokyo|0)\cdot P(japan|0)\right]$

$=\dfrac{1}{4}\cdot\left[(\dfrac{1}{3})^{3}\cdot\dfrac{1}{3}\cdot\dfrac{1}{3}\right]=\dfrac{1}{972}$

$P(1|d)=P(c=1)\prod P(t_{k}|c)$

$=P(c=1)\cdot\left[P(chinese|1)\cdot P(tokyo|1)\cdot P(japan|1)\right]$

$=\dfrac{1}{4}\cdot\left[(\dfrac{1}{3})^{3}\cdot0\cdot0\right]=0$

> We then classify the document by choosing the class that resulted in the higher probability of ${P}(c|d)$, which is $\boxed{class\ 0}$  in this case.  

> __e)__ With Laplace +1 smoothing, the probability of $P(1|d)$ is actually higer than  $P(0|d)$. In the unsomoothed model, since the words 'Japan' and 'Tokyo' never appear for class 1 in the training set, $P(1|d)$ was calculated as 0 given 0 conditional probability of these words for class 1. However, smoothing adds a small probability to events that are unseen in the training set, and thus with smoothing, $P(1|d)$ becomes non-zero.  However, the smoothing results affect our inference greatly, since in the presense of smoothing, we would have classfied this document as class 1 instead.  

In [None]:
# part d/e - if you didn't write out your calcuations above, embed a picture of them here:
from IPython.display import Image
Image(filename="path-to-hand-calulations-image.png")

# Question 7: Naive Bayes Inference.
In the next two questions you'll write code to parallelize the Naive Bayes calculations that you performed above. We'll do this in two phases: one MapReduce job to perform training and a second MapReduce to perform inference. While in practice we'd need to train a model before we can use it to classify documents, for learning purposes we're going to develop our code in the opposite order. By first focusing on the pieces of information/format we need to perform the classification (inference) task you should find it easier to develop a solid implementation for training phase when you get to question 8 below. In both of these questions we'll continue to use the Chinese example corpus from the textbook to help us test our MapReduce code as we develop it. Below we've reproduced the corpus, test set and model in text format that matches the Enron data.

### Q7 Tasks:
* __a) short response:__ run the provided cells to create the example files and load them in to HDFS. Then take a closer look at __`NBmodel.txt`__. This text file represents a Naive Bayes model trained (with Laplace +1 smoothing) on the example corpus. What are the 'keys' and 'values' in this file? Which record means something slightly different than the rest? The value field of each record includes two numbers which will be helpful for debugging but which we don't actually need to perform inference -- what are they? [`HINT`: _This file represents the model from Example 13.1 in the textbook, if you're having trouble getting oriented try comparing our file to the numbers in that example._]


* __b) short response:__ When performing Naive Bayes in practice instead of multiplying the probabilities (as in equation 13.3) we add their logs (as in equation 13.4). Why do we choose to work with log probabilities? If we had an unsmoothed model, what potential error could arise from this transformation?


* __c) short response:__ Documents 6 and 8 in the test set include a word that did not appear in the training corpus (and as a result does not appear in the model). What should we do at inference time when we need a class conditional probability for this word?


* __d) short response:__ The goal of our MapReduce job is to stream over the test set and classify each document by peforming the calculation from equation 13.4. To do this we'll load the model file (which contains the probabilities for equation 13.4) into memory on the nodes where we do our mapping. This is called an in-memory join. Does loading a model 'state' like this depart from the functional programming principles? Explain why or why not. From a scability perspective when would this kind of memory use be justified? when would it be unwise?


* __e) code:__ Complete the code in __`NaiveBayes/classify_mapper.py`__. Read the docstring carefully to understand how this script should work and the format it should return. Run the provided unit tests to confirm that your script works as expected then write a Hadoop streaming job to classify the Chinese example test set. [`HINT 1:` _you shouldn't need a reducer for this one._ `HINT 2:` _Don't forget to add the model file to the_ `-files` _parameter in your Hadoop streaming job so that it gets shipped to the mapper nodes where it will be accessed by your script._]


* __f) short response:__ In our test example and in the Enron data set we have fairly short documents. Since these fit fine in memory on a mapper node we didn't need a reducer and could just do all of our calculations in the mapper. However with much longer documents (eg. books) we might want a higher level of parallelization -- for example we might want to process parts of a document on different nodes. In this hypothetical scenario how would our algorithm design change? What could the mappers still do? What key-value structure would they emit? What would the reducers have to do as a last step?

### Q7 Student Answers:
> __a)__ The keys are the words in the first column.  The value for each key is the list of 4 numbers which accompany each word.  The record that means something slightly different from the rest is the row for key = ClassPriors and the numbers that accompany it.  ClassPriors is not an actual word in the corpus, but it does contain the priors probability for each class, which is needed for inference. The values associated ClassPriors are the count of documents with class 0, the count of documents with class 1, the relative frequency of class 0 documents and the relative frequency of class 1 documents.  The value list has the following order - the total count of the word in class 0 documents, the total count of the word in class 1 documents, the class 0 conditional probability of the word, the class 1 conditional probability of the word.  The values not needed for inference are the first two - the total count of the word in each class.  

> __b)__ When performing Naive Bayes calulations, we find the conditional probability of each class by multiplying the prior and conditional probabilities of each feature together.  The feature conditional probabilities might be very small the larger the data set, and thus the product caculation can lead to  floating point underflow.  By taking the log of the probability, we can still assess which class results in he highest conditional probability, without causing the underflow issue in our computation.  However, if there is no smoothing and we are taking into consideration a feature class pair that is does not exist in the training data, this would result in a term of log(P(feature|class))=log(0) which would lead to an undefined error.  

> __c)__ While in Laplace smoothing we can assign non-zero probability to words that appear in the training set but do not appear for a particular class, we cannot do the same for words that never appear in the training set.  If we allowed smoothing to be applied for the word 'Trade' during our second MapReduce job for classification, we would be erroneous retraining our model in the process.  Instead, when performing classification, we should treat the words that never appear in the corpus as if they have have no effect on our posterior calculations.  In other words, when we are calculating log(P(class|document)), we should set the value for log(P('Trade'|class)) = 0 for both classes. 

> __d)__ No, I do not think that loading the model state into memory departs from functional programing principles.  Loading the model into memory does not violate the concept of statelessness that is essential to functional programming. While it is true that we are storing a "state", this state is actually not mutable during the entirety of the MapReduce job that performs the classification.  From a scalability perspective, the method of loading the model to memory is justified if the model is relative small compared to the amount of storage we have access to, and if we think that the feature set does not often change with future iterations of retraining.  It would be unwise to store the model in memory if the feature set is too large, say an extremely large corpus where the vocabulary is beyond our means to store, or if the feature set continues to grow with future iterations of re-training.

> __e)__ Complete the coding portion of this question before answering 'f'.

> __f)__ If we were processing the classfiication task on a very large corpus that is partitioned to multiple nodes, then we would not perform our predictions within the mapper, but rather the reducer.  Since the mapper outputs will be partitioned, the mapper should emit partial probabilities that can then be aggregated by key on the reducer side.  Key value pair structure that is emitted by mappers should then be  of the format - document id, class, log(P(0|d)), log(P(1|d)).  The reducer will then need to aggregate the partial probabilities log(P(0|d)), log(P(1|d)) by key, and in this case we can use the composite of document id and class as the key.  Lastly, the reducer will also calculate the prediction based on the larger of the two probabilities. 


Run these cells to create the example corpus and model.

In [23]:
%%writefile NaiveBayes/chineseTrain.txt
D1	1		Chinese Beijing Chinese
D2	1		Chinese Chinese Shanghai
D3	1		Chinese Macao
D4	0		Tokyo Japan Chinese

Overwriting NaiveBayes/chineseTrain.txt


In [24]:
%%writefile NaiveBayes/chineseTest.txt
D5	1		Chinese Chinese Chinese Tokyo Japan
D6	1		Beijing Shanghai Trade
D7	0		Japan Macao Tokyo
D8	0		Tokyo Japan Trade

Overwriting NaiveBayes/chineseTest.txt


In [25]:
%%writefile NBmodel.txt
beijing	0.0,1.0,0.111111111111,0.142857142857
chinese	1.0,5.0,0.222222222222,0.428571428571
tokyo	1.0,0.0,0.222222222222,0.0714285714286
shanghai	0.0,1.0,0.111111111111,0.142857142857
ClassPriors	1.0,3.0,0.25,0.75
japan	1.0,0.0,0.222222222222,0.0714285714286
macao	0.0,1.0,0.111111111111,0.142857142857

Overwriting NBmodel.txt


In [26]:
# load the data files into HDFS
!hdfs dfs -copyFromLocal NaiveBayes/chineseTrain.txt {HDFS_DIR}
!hdfs dfs -copyFromLocal NaiveBayes/chineseTest.txt {HDFS_DIR}

Your work for `part e` starts here:

In [27]:
# part e - do your work in NaiveBayes/classify_mapper.py first, then run this cell.
!chmod a+x NaiveBayes/classify_mapper.py

In [28]:
# part e - unit test NaiveBayes/classify_mapper.py (RUN THIS CELL AS IS)
!cat NaiveBayes/chineseTest.txt | NaiveBayes/classify_mapper.py | column -t

d5  1  -8.90668134500626   -8.10769031284611   1
d6  1  -5.780743515794329  -4.179502370564408  1
d7  0  -6.591673732011658  -7.511706880737812  0
d8  0  -4.394449154674438  -5.565796731681498  0


In [29]:
# part e - clear the output directory in HDFS (RUN THIS CELL AS IS)
!hdfs dfs -rm -r {HDFS_DIR}/chinese-output

rm: `/user/root/HW2/chinese-output': No such file or directory


In [30]:
# part e - write your Hadooop streaming job here
!hadoop jar {JAR_FILE} \
  -files NaiveBayes/classify_mapper.py,NBmodel.txt \
  -mapper classify_mapper.py \
  -reducer /bin/cat \
  -input {HDFS_DIR}/chineseTest.txt \
  -output {HDFS_DIR}/chinese-output \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob5142590688070079451.jar tmpDir=null
19/06/01 23:51:29 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/01 23:51:29 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/01 23:51:31 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/01 23:51:31 INFO mapreduce.JobSubmitter: number of splits:2
19/06/01 23:51:31 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1559430286868_0004
19/06/01 23:51:32 INFO impl.YarnClientImpl: Submitted application application_1559430286868_0004
19/06/01 23:51:32 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1559430286868_0004/
19/06/01 23:51:32 INFO mapreduce.Job: Running job: job_1559430286868_0004
19/06/01 23:51:43 INFO mapreduce.Job: Job job_1559430286868_0004 running in uber mode : false
19/06/01 23:51:43 INFO mapreduce.Job:  map 0% reduce 0%
19/06

In [31]:
# part e - retrieve test set results from HDFS (RUN THIS CELL AS IS)
!hdfs dfs -cat {HDFS_DIR}/chinese-output/part-000* > NaiveBayes/chineseResults.txt

In [32]:
# part e - take a look (RUN THIS CELL AS IS)
!cat NaiveBayes/chineseResults.txt | column -t

d5  1  -8.90668134500626   -8.10769031284611   1
d6  1  -5.780743515794329  -4.179502370564408  1
d7  0  -6.591673732011658  -7.511706880737812  0
d8  0  -4.394449154674438  -5.565796731681498  0


<table>
<th> Expected output for the test set:</th>
<tr align=Left><td><pre>
d5	1	-8.90668134	-8.10769031	1
d6	1	-5.78074351	-4.17950237	1
d7	0	-6.59167373	-7.51170688	0
d8	0	-4.39444915	-5.56579673	0
</pre></td><tr>
</table>

# Question 8: Naive Bayes Training.
In Question 7 we used a model that we had trained by hand. Next we'll develop the code to do that same training in parallel, making it suitable for use with larger corpora (like the Enron emails). The end result of the MapReduce job you write in this question should be a model text file that looks just like the example (`NBmodel.txt`) that we created by hand above.

To refresh your memory about the training process take a look at  `6a` and `6b` where you described the pieces of information you'll need to collect in order to encode a Multinomial Naive Bayes model. We now want to retrieve those pieces of information while streaming over a corpus. The bulk of the task will be very similar to the word counting excercises you've already done but you may want to consider a slightly different key-value record structure to efficiently tally counts for each class. 

The most challenging (interesting?) design question will be how to retrieve the totals (# of documents and # of words in documents for each class). Of course, counting these numbers is easy. The hard part is the timing: you'll need to make sure you have the counts totalled up _before_ you start estimating the class conditional probabilities for each word. It would be best (i.e. most scalable) if we could find a way to do this tallying without storing the whole vocabulary in memory... Use an appropriate MapReduce design pattern to implement this efficiently! 

__IMPORTANT NOTE:__ For full credit on this question, your code must work with multiple reducers. [`HINT:` _You will need to implement Total Order Sort_ - [Total Order Sort Notebook](https://github.com/UCB-w261/main/tree/master/HelpfulResources/TotalSortGuide/_total-sort-guide-spark2.01-JAN27-2017.ipynb)]


### Q8 Tasks:
* __a) make a plan:__  Fill in the docstrings for __`NaiveBayes/train_mapper.py`__ and __`NaiveBayes/train_reducer.py`__ to appropriately reflect the format that each script will input/output. [`HINT:` _the input files_ (`enronemail_1h.txt` & `chineseTrain.txt`) _have a prespecified format and your output file should match_ `NBmodel.txt` _so you really only have to decide on an internal format for Hadoop_].


* __b) implement it:__ Complete the code in __`NaiveBayes/train_mapper.py`__ and __`NaiveBayes/train_reducer.py`__ so that together they train a Multinomial Naive Bayes model __with no smoothing__. Make sure your end result is formatted correctly (see note above). Test your scripts independently and together (using `chineseTrain.txt` or test input of your own devising). When you are satisfied with your Python code design and run a Hadoop streaming command to run your job in parallel on the __chineseTrain.txt__. Confirm that your trained model matches your hand calculations from Question 5.


* __c) short response:__ We saw in Question 6 that adding Laplace +1 smoothing makes our classifications less sensitve to rare words. However implementing this technique requires access to one additional piece of information that we had not previously used in our Naive Bayes training. What is that extra piece of information? [`HINT:` see equation 13.7 in Manning, Raghavan and Schutze].


* __d) short response:__ There are three approaches that we could take to handle the extra piece of information you identified in `c`: 1) we could hard code it into our reducer (_where would we get it in the first place?_). Or 2) we could compute it inside the reducer which would require storing some information in memory (_what information?_). Or 3) we could compute it in the reducer without storing any bulky information in memory but then we'd need some postprocessing or a second MapReduce job to complete the calculation (_why?_). Breifly explain what is non-ideal about each of these options. BONUS: which of these options is incompatible with using multiple reducers?


* __e) code + short response:__ Choose one of the 3 options above. State your choice & reasoning in the space below then use that strategy to complete the code in __`NaiveBayes/train_reducer_smooth.py`__. Test this alternate reducer then write and run a Hadoop streaming job to train an MNB model with smoothing on the Chinese example. Your results should match the model that we provided for you above (and the calculations in the textbook example). [`HINT:` _don't start from scratch with this one -- you can just copy over your reducer code from part `b` and make the needed modifications_].

### Q8 Student Answers:
> __ c)__ Laplace +1 smoothing requires that when we calculate the probability of each feature conditioned on class, $\hat{P}(t_k|c)$, we add 1 to the numerator (occurences of the word in class c) and we add the number unique words in the corpus to the denominator (the total number of words in the class).  The information that we are currently missing in our Naive Bayes training is the number of unique words in the corpus.  

> __ d)__ In option 1, the hard coding method is non-ideal if the training data changes such that the dictionary of unique words changes.  We can obtain the unique word counts externally by counting the number of lines from the output file of the unsmoothed NB training job, where we exclude the ClassPriors lines. If we choose to hard code unique word counts, then we need to recalculate unique word totals each time re-trainining occurs with test data that could lead to dictionary changes.  This prevents us from frequent retraining, since we have an additional steps of calculating the unique words and updating the code, which is very unscalable.  

>In option 2, we can get the set of unique words by commiting information to memory, such that the information is a dictionary that stores each unique word from the corpus. Comitting the dictionary of unique words to memory is not ideal in the case that model retraining with test data causes the dictionary to grow. If the dictionary grows too large as the training data scales up, then the processor will eventually crash due to lack of memory.  

> Option 3 where we use postprocessing or a second MapReduce job where we input the output from the first job could be non-ideal in that it becomes increasingly more costly with larger vocabulary size.  We would need a second job or post processing because the only way to count the unique words for each class without committing to memory is calculate smoothed probabilities and write them to record/files instantenously. The output from the initial job that generates an unsmoothed model will be very large if the corpus is large, and while we are not committing the dictionary to memory, a poorly written second MapReduce job could be costly in processing power, especially in its shuffle phase. 

> __ e)__ The option I have chosen to implement is the second MapReduce job.  However, in order to do so, I will also be writing a mapper so that the job can work with multiple reducers.  The input for the second map reduce job will the be output file from the first job containing the unsmoothed model.  In the mapper, I will loop through all rows to count the number of unique words.  I will then emit records for the total unique words and ensure that all totals are passed to all reducer paritions using a custom partition key.  The mapper will also emit each input row as an output along with one row for the Class Priors so that the word counts can be used by the reducer to calculate the new probabilities.  Order inversion pattern will again be implemented so that the unique counts appear first in reducer paritions.  The reducer will then use these unique counts to calculate Laplace +1 smoothing probabilities for each word.

In [113]:
# part a - do your work in train_mapper.py and train_reducer.py then RUN THIS CELL AS IS
!chmod a+x NaiveBayes/train_mapper.py
!chmod a+x NaiveBayes/train_reducer.py
!echo "=========== MAPPER DOCSTRING ============"
!head -n 9 NaiveBayes/train_mapper.py | tail -n 6
!echo "=========== REDUCER DOCSTRING ============"
!head -n 9 NaiveBayes/train_reducer.py | tail -n 6

Each line of document is tokenized into words, where words are continuous strings 
Order Inversion Pattern will be implemented to emit additional lines that count:
  - total number of lines or documents
  - total number of lines by class
  - total number of words by class
In addition, a custom partition key is added to ensure job works for multiple reducers    
Using assumption from implementing order inversion pattern, 
	the necessary total counts for frequency calculations are assumed to be at the top of each input file
5 Total counts outputed from he mapper and sent to every reducer are first extracted and stored
Then spam/ham counts and frequencies are calculated for subsequent words
An additional line is added to print the final word
Another additional line is added to print the ClassPrior row for class counts and prior probabilities    


__`part b starts here`:__ MNB _without_ Smoothing (training on Chinese Example Corpus).

In [114]:
# part b - write a unit test for your mapper here
!python NaiveBayes/train_mapper.py < NaiveBayes/chineseTrain.txt | sort -k2,2 -k1,1 > NaiveBayes/unitTestMapper.txt
!cat NaiveBayes/unitTestMapper.txt

!totaldocs	A	0	4
!totalhamdocs	A	0	1
!totalhamwords	A	1	3
!totalspamdocs	A	0	3
!totalspamwords	A	1	8
beijing	A	1	1
chinese	A	0	1
chinese	A	1	1
chinese	A	1	1
chinese	A	1	1
chinese	A	1	1
chinese	A	1	1
!totaldocs	B	0	4
!totalhamdocs	B	0	1
!totalhamwords	B	1	3
!totalspamdocs	B	0	3
!totalspamwords	B	1	8
japan	B	0	1
macao	B	1	1
!totaldocs	C	0	4
!totalhamdocs	C	0	1
!totalhamwords	C	1	3
!totalspamdocs	C	0	3
!totalspamwords	C	1	8
shanghai	C	1	1
tokyo	C	0	1
!totaldocs	D	0	4
!totalhamdocs	D	0	1
!totalhamwords	D	1	3
!totalspamdocs	D	0	3
!totalspamwords	D	1	8


In [155]:
# part b - write a unit test for your reducer here

#please note, my unit test only runs for one custom, since I could not simulate multiple reducers
#the keys used for total counts are repeated in the mapper out so that partial counts can be sent to each reducer and aggregated
#however, since I cannot easily write paritions for multiple reducers in my unit test, I only run unit test for one parition
!head -n 12 NaiveBayes/unitTestMapper.txt | NaiveBayes/train_reducer.py

beijing	0.0,1.0,0.0,0.125
chinese	1.0,5.0,0.3333333333333333,0.625
ClassPriors	1.0,3.0,0.25,0.75


In [116]:
# part b - write a systems test for your mapper + reducer together here
!python NaiveBayes/train_mapper.py < NaiveBayes/chineseTrain.txt | sort -k2,2 -k1,1 | head -n 12 | NaiveBayes/train_reducer.py > NaiveBayes/unsmoothNBmodel.txt
!cat NaiveBayes/unsmoothNBmodel.txt

beijing	0.0,1.0,0.0,0.125
chinese	1.0,5.0,0.3333333333333333,0.625
ClassPriors	1.0,3.0,0.25,0.75


In [156]:
# part b - clear (and name) an output directory in HDFS for your unsmoothed chinese NB model
!hdfs dfs -rm -r {HDFS_DIR}/unsmoothNB-output

Deleted /user/root/HW2/unsmoothNB-output


In [157]:
# part b - write your hadoop streaming job
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k2,2 -k1,1" \
  -D mapreduce.partition.keypartitioner.options="-k2,2"  \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer.py \
  -mapper train_mapper.py \
  -reducer train_reducer.py \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  -input {HDFS_DIR}/chineseTrain.txt \
  -output {HDFS_DIR}/unsmoothNB-output \
  -numReduceTasks 4 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob4737498598478850876.jar tmpDir=null
19/06/02 04:45:26 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 04:45:27 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 04:45:29 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/02 04:45:29 INFO mapreduce.JobSubmitter: number of splits:2
19/06/02 04:45:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1559430286868_0024
19/06/02 04:45:30 INFO impl.YarnClientImpl: Submitted application application_1559430286868_0024
19/06/02 04:45:30 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1559430286868_0024/
19/06/02 04:45:30 INFO mapreduce.Job: Running job: job_1559430286868_0024
19/06/02 04:45:43 INFO mapreduce.Job: Job job_1559430286868_0024 running in uber mode : false
19/06/02 04:45:43 INFO mapreduce.Job:  map 0% reduce 0%
19/06

In [158]:
#print all reducer outputs for a final check:

for idx in range(4):
    print(f"\n===== part-0000{idx}=====\n")
    !hdfs dfs -cat {HDFS_DIR}/unsmoothNB-output/part-0000{idx}


===== part-00000=====

ClassPriors	1.0,3.0,0.25,0.75

===== part-00001=====

beijing	0.0,1.0,0.0,0.125
chinese	1.0,5.0,0.3333333333333333,0.625
ClassPriors	1.0,3.0,0.25,0.75

===== part-00002=====

japan	1.0,0.0,0.3333333333333333,0.0
macao	0.0,1.0,0.0,0.125
ClassPriors	1.0,3.0,0.25,0.75

===== part-00003=====

shanghai	0.0,1.0,0.0,0.125
tokyo	1.0,0.0,0.3333333333333333,0.0
ClassPriors	1.0,3.0,0.25,0.75


In [159]:
# part b - extract your results (i.e. model) to a local file
!hdfs dfs -cat {HDFS_DIR}/unsmoothNB-output/part-000* > NaiveBayes/unsmoothNBmodel.txt

In [160]:
# part b - print your model so that we can confirm that it matches expected results
!cat NaiveBayes/unsmoothNBmodel.txt

ClassPriors	1.0,3.0,0.25,0.75
beijing	0.0,1.0,0.0,0.125
chinese	1.0,5.0,0.3333333333333333,0.625
ClassPriors	1.0,3.0,0.25,0.75
japan	1.0,0.0,0.3333333333333333,0.0
macao	0.0,1.0,0.0,0.125
ClassPriors	1.0,3.0,0.25,0.75
shanghai	0.0,1.0,0.0,0.125
tokyo	1.0,0.0,0.3333333333333333,0.0
ClassPriors	1.0,3.0,0.25,0.75


__`part e starts here`:__ MNB _with_ Smoothing (training on Chinese Example Corpus).

In [122]:
# part e - unit test for my new mapper
!python NaiveBayes/train_mapper_smooth.py < NaiveBayes/unsmoothNBmodel.txt | sort -k2,2 -k1,1 > NaiveBayes/unitTestMapperSmooth.txt
!cat NaiveBayes/unitTestMapperSmooth.txt

!unique	A	3.0,8.0,6.0,0
ClassPriors	A	1.0,3.0,0.25,0.75
beijing	A	0.0,1.0,0.0,0.125
chinese	A	1.0,5.0,0.3333333333333333,0.625
!unique	B	3.0,8.0,6.0,0
japan	B	1.0,0.0,0.3333333333333333,0.0
macao	B	0.0,1.0,0.0,0.125
!unique	C	3.0,8.0,6.0,0
shanghai	C	0.0,1.0,0.0,0.125
tokyo	C	1.0,0.0,0.3333333333333333,0.0
!unique	D	3.0,8.0,6.0,0


In [123]:
!chmod a+x NaiveBayes/train_mapper_smooth.py
!chmod a+x NaiveBayes/train_reducer_smooth.py

In [124]:
# part e - write a unit test for your NEW reducer here

#please note, my unit test only runs for one custom, since I could not simulate multiple reducers
#the keys used for total counts are repeated in the mapper out so that partial counts can be sent to each reducer and aggregated
#however, since I cannot easily write paritions for multiple reducers in my unit test, I only run unit test for one parition
!head -n 4 NaiveBayes/unitTestMapperSmooth.txt | NaiveBayes/train_reducer_smooth.py

ClassPriors	1.0,3.0,0.25,0.75
beijing	0.0,1.0,0.1111111111111111,0.14285714285714285
chinese	1.0,5.0,0.2222222222222222,0.42857142857142855


In [125]:
# part e - write a systems test for your mapper + reducer together here
!python NaiveBayes/train_mapper_smooth.py < NaiveBayes/unsmoothNBmodel.txt | sort -k2,2 -k1,1 | head -n 4 | NaiveBayes/train_reducer_smooth.py

ClassPriors	1.0,3.0,0.25,0.75
beijing	0.0,1.0,0.1111111111111111,0.14285714285714285
chinese	1.0,5.0,0.2222222222222222,0.42857142857142855


In [144]:
# part e - clear (and name) an output directory in HDFS for your SMOOTHED chinese NB model
!hdfs dfs -rm -r {HDFS_DIR}/smoothNB-output

Deleted /user/root/HW2/smoothNB-output


In [145]:
# part e - write your hadoop streaming job

!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=2 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k2,2 -k1,1" \
  -D mapreduce.partition.keypartitioner.options="-k2,2"  \
  -files NaiveBayes/train_mapper_smooth.py,NaiveBayes/train_reducer_smooth.py \
  -mapper train_mapper_smooth.py \
  -reducer train_reducer_smooth.py \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  -input {HDFS_DIR}/unsmoothNB-output \
  -output {HDFS_DIR}/smoothNB-output \
  -numReduceTasks 4 \
  -cmdenv PATH={PATH}

packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob8689789598324640198.jar tmpDir=null
19/06/02 04:28:59 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 04:28:59 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 04:29:01 INFO mapred.FileInputFormat: Total input paths to process : 4
19/06/02 04:29:01 INFO mapreduce.JobSubmitter: number of splits:4
19/06/02 04:29:01 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1559430286868_0020
19/06/02 04:29:02 INFO impl.YarnClientImpl: Submitted application application_1559430286868_0020
19/06/02 04:29:02 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1559430286868_0020/
19/06/02 04:29:02 INFO mapreduce.Job: Running job: job_1559430286868_0020
19/06/02 04:29:19 INFO mapreduce.Job: Job job_1559430286868_0020 running in uber mode : false
19/06/02 04:29:19 INFO mapreduce.Job:  map 0% reduce 0%
19/06

In [146]:
#print all reducer outputs for a final check:

for idx in range(4):
    print(f"\n===== part-0000{idx}=====\n")
    !hdfs dfs -cat {HDFS_DIR}/smoothNB-output/part-0000{idx}


===== part-00000=====


===== part-00001=====

ClassPriors	1.0,3.0,0.25,0.75
ClassPriors	1.0,3.0,0.25,0.75
ClassPriors	1.0,3.0,0.25,0.75
ClassPriors	1.0,3.0,0.25,0.75
beijing	0.0,1.0,0.1111111111111111,0.14285714285714285
chinese	1.0,5.0,0.2222222222222222,0.42857142857142855

===== part-00002=====

japan	1.0,0.0,0.2222222222222222,0.07142857142857142
macao	0.0,1.0,0.1111111111111111,0.14285714285714285

===== part-00003=====

shanghai	0.0,1.0,0.1111111111111111,0.14285714285714285
tokyo	1.0,0.0,0.2222222222222222,0.07142857142857142


In [147]:
# part e - extract your results (i.e. model) to a local file
!hdfs dfs -cat {HDFS_DIR}/smoothNB-output/part-000* > NaiveBayes/smoothNBmodel.txt
!cat NaiveBayes/smoothNBmodel.txt

ClassPriors	1.0,3.0,0.25,0.75
ClassPriors	1.0,3.0,0.25,0.75
ClassPriors	1.0,3.0,0.25,0.75
ClassPriors	1.0,3.0,0.25,0.75
beijing	0.0,1.0,0.1111111111111111,0.14285714285714285
chinese	1.0,5.0,0.2222222222222222,0.42857142857142855
japan	1.0,0.0,0.2222222222222222,0.07142857142857142
macao	0.0,1.0,0.1111111111111111,0.14285714285714285
shanghai	0.0,1.0,0.1111111111111111,0.14285714285714285
tokyo	1.0,0.0,0.2222222222222222,0.07142857142857142


# Question 9: Enron Ham/Spam NB Classifier & Results.

Fantastic work. We're finally ready to perform Spam Classification on the Enron Corpus. In this question you'll run the analysis you've developed, report its performance, and draw some conclusions.

### Q9 Tasks:
* __a) train/test split:__ Run the provided code to split our Enron file into a training set and testing set then load them into HDFS. 

[`NOTE:` _If you hard coded the vocab size in question 8d make sure you re calculate the vocab size for just the training set!_]

* __b) train 2 models:__ Write Hadoop Streaming jobs to train MNB Models on the training set with and without smoothing. Save your models to local files at __`NaiveBayes/Unsmoothed/NBmodel.txt`__ and __`NaiveBayes/Smoothed/NBmodel.txt`__. [`NOTE:` _This naming is important because we wrote our classification task so that it expects a file of that name... if this inelegance frustrates you there is an alternative that would involve a few adjustments to your code [read more about it here](http://www.tnoda.com/blog/2013-11-23)._] Finally run the checks that we provide to confirm that your results are correct.


* __c) code:__ Recall that we designed our classification job with just a mapper. An efficient way to report the performance of our models would be to simply add a reducer phase to this job and compute precision and recall right there. Complete the code in __`NaiveBayes/evaluation_reducer.py`__ and then write Hadoop jobs to evaluate your two models on the test set. Report their performance side by side. [`NOTE:` if you need a refresher on precision, recall and F1-score [Wikipedia](https://en.wikipedia.org/wiki/F1_score) is a good resource.]


* __d) short response:__ Compare the performance of your two models. What do you notice about the unsmoothed model's predictions? Can you guess why this is happening? Which evaluation measure do you think is most relevant in our use case? [`NOTE:` _Feel free to answer using your common sense but if you want more information on evaluating the classification task checkout_ [this blogpost](https://tryolabs.com/blog/2013/03/25/why-accuracy-alone-bad-measure-classification-tasks-and-what-we-can-do-about-it/
) or [this paper](http://www.flinders.edu.au/science_engineering/fms/School-CSEM/publications/tech_reps-research_artfcts/TRRA_2007.pdf
)]


* __e) code + short response:__ Let's look at the top ten words with the highest conditional probability in `Spam` and in `Ham`. We'll do this by writing a Hadoop job that sorts the model file (`NaiveBayes/Smoothed/NBmodel.py`). Normally we'd have to run two jobs -- one that sorts on $P(word|ham)$ and another that sorts on $P(word|spam)$. However if we slighly modify the data format in the model file then we can get the top words in each class with just one job. We've written a mapper that will do just this for you. Read through __`NaiveBayes/model_sort_mapper.py`__ and then briefly explain how this mapper will allow us to partition and sort our model file. Write a Hadoop job that uses our mapper and `/bin/cat` for a reducer to partition and sort. Print out the top 10 words in each class (where 'top' == highest conditional probability).[`HINT:` _this should remind you a lot of what we did in Question 6._]


* __f) short response:__ What do you notice about the 'top words' we printed in `e`? How would increasing the smoothing parameter 'k' affect the probabilities for the top words that you identified for 'e'. How would they affect the probabilities of words that occur much more in one class than another? In summary, how does the smoothing parameter 'k' affect the bias and the variance of our model. [`NOTE:` _you do not need to code anything for this task, but if you are struggling with it you could try changing 'k' and see what happens to the test set. We don't recommend doing this exploration with the Enron data because it will be harder to see the impact with such a big vocabulary_]

### Q9 Student Answers:
> __d)__ The unsmoothed model has lower accuracy, lower recall, and lower F-score than the smoothed model.  The smoothed model has slightly higher accuracy.  Most noticeably, the true positive count is very high for the smoothed model compared to the unsmoothed model and the false negative count is very high for the unsmoothed model compared to the smoothed model.  This may be happening because some of the train set and test set are both small and some of the emails are very short.  Without smoothing, the NB models punishes heavily for test emails that have words that may not appear in train emails of the same class.  As a result, the model has a hard time predicting documents with words that appear for a class in the test data but does not appear for the same class in the training data.  Since a lot of words are natural language fillers, (the, to, of, etc....), smoothing allows us to assign small probabilities to words that appear in the test set for a class but not in the training set for the same class.  This then prevents the possibility for test emails that might contain addition fillers or missing fillers compared to train emails of the same class to be predicted to have the wrong class.  The evaluation score that is most relevant is the F1-Score.  

> __e)__ The NaiveBayes/model_sort_mapper.py intakes rows from the NBModel file, compares the spam and ham conditional probabilities, and outputs new key value pairs, where the keys but the values include the original values in the input plus 2 more fields, one for the maximum probability and one for the prediced class associated with the maximum probability.  To find the top 10 words for each class, we can follow what we did in 4.  In the MapReduce job, we would use 2 reducers and the predicted class value that is outputed from each mapper record becomes partition key.  We then sort the records within each partition using the the newly outputted maximum probability value as the sort key, in reverse order.  Afterwards, we can use unix command head to print the top rows for each class to see the top words with the highest probabilities.  

> __f)__ Many of the words with the top probabilities are in the category of natural language fillers I alluded to in (d). Increasing the smoothing parameter k will increase the variance and decrease the bias. 

__Test/Train split__

In [183]:
# part a - test/train split (RUN THIS CELL AS IS)
!head -n 80 data/enronemail_1h.txt > data/enron_train.txt
!tail -n 20 data/enronemail_1h.txt > data/enron_test.txt
!hdfs dfs -copyFromLocal data/enron_train.txt {HDFS_DIR}
!hdfs dfs -copyFromLocal data/enron_test.txt {HDFS_DIR}

copyFromLocal: `/user/root/HW2/enron_train.txt': File exists
copyFromLocal: `/user/root/HW2/enron_test.txt': File exists


In [None]:
!mkdir NaiveBayes/Unsmoothed

__Training__ (Enron MNB Model _without smoothing_ )

In [184]:
# part b -  Unsmoothed model (FILL IN THE MISSING CODE BELOW)

# clear the output directory
!hdfs dfs -rm -r {HDFS_DIR}/enron-model

# hadoop command
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=4 \
  -D stream.map.output.field.separator="\t" \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keypartitioner.options="-k2,2"  \
  -D mapreduce.partition.keycomparator.options="-k1,1" \
  -files NaiveBayes/train_mapper.py,NaiveBayes/train_reducer.py \
  -mapper train_mapper.py \
  -reducer train_reducer.py \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  -input {HDFS_DIR}/enron_train.txt \
  -output {HDFS_DIR}/enron-model \
  -numReduceTasks 4 \
  -cmdenv PATH={PATH}

# save the model locally
!hdfs dfs -cat {HDFS_DIR}/enron-model/part-000* > NaiveBayes/Unsmoothed/NBmodel.txt

Deleted /user/root/HW2/enron-model
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob8887388945230211309.jar tmpDir=null
19/06/02 05:46:07 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 05:46:08 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 05:46:09 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/02 05:46:09 INFO mapreduce.JobSubmitter: number of splits:2
19/06/02 05:46:10 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1559430286868_0031
19/06/02 05:46:10 INFO impl.YarnClientImpl: Submitted application application_1559430286868_0031
19/06/02 05:46:10 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1559430286868_0031/
19/06/02 05:46:10 INFO mapreduce.Job: Running job: job_1559430286868_0031
19/06/02 05:46:21 INFO mapreduce.Job: Job job_1559430286868_0031 running in uber mode : false
19/06/02 05:46:21 INFO map

In [185]:
# part b - check your UNSMOOTHED model results (RUN THIS CELL AS IS)
!grep assistance NaiveBayes/Unsmoothed/NBmodel.txt
# EXPECTED OUTPUT: assistance	2,4,0.000172547666293,0.000296823983378

assistance	2.0,4.0,0.0001725476662928134,0.00029682398337785694


In [186]:
# part b - check your UNSMOOTHED model results (RUN THIS CELL AS IS)
!grep money NaiveBayes/Unsmoothed/NBmodel.txt
# EXPECTED OUTPUT: money	1,22,8.62738331464e-05,0.00163253190858

money	1.0,22.0,8.62738331464067e-05,0.001632531908578213


__Training__ (Enron MNB Model _with Laplace +1 smoothing_ )

In [170]:
!mkdir NaiveBayes/Smoothed

In [198]:
# part b -  Smoothed model (FILL IN THE MISSING CODE BELOW)

# clear the output directory
!hdfs dfs -rm -r {HDFS_DIR}/smooth-model

# hadoop command
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=3 \
  -D stream.map.output.field.separator="\t" \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keypartitioner.options="-k2,2"  \
  -D mapreduce.partition.keycomparator.options="-k1,1" \
  -files NaiveBayes/train_mapper_smooth.py,NaiveBayes/train_reducer_smooth.py \
  -mapper train_mapper_smooth.py \
  -reducer train_reducer_smooth.py \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  -input {HDFS_DIR}/enron-model \
  -output {HDFS_DIR}/smooth-model \
  -numReduceTasks 4 \
  -cmdenv PATH={PATH}

# save the model locally
!hdfs dfs -cat {HDFS_DIR}/smooth-model/part-000* > NaiveBayes/Smoothed/NBmodel.txt

Deleted /user/root/HW2/smooth-model
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob3638238434155065464.jar tmpDir=null
19/06/02 06:00:05 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 06:00:06 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 06:00:08 INFO mapred.FileInputFormat: Total input paths to process : 4
19/06/02 06:00:08 INFO mapreduce.JobSubmitter: number of splits:4
19/06/02 06:00:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1559430286868_0036
19/06/02 06:00:09 INFO impl.YarnClientImpl: Submitted application application_1559430286868_0036
19/06/02 06:00:09 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1559430286868_0036/
19/06/02 06:00:09 INFO mapreduce.Job: Running job: job_1559430286868_0036
19/06/02 06:00:24 INFO mapreduce.Job: Job job_1559430286868_0036 running in uber mode : false
19/06/02 06:00:24 INFO ma

In [199]:
# part b - check your SMOOTHED model results (RUN THIS CELL AS IS)
!grep assistance NaiveBayes/Smoothed/NBmodel.txt
# EXPECTED OUTPUT: assistance	2,4,0.000185804533631,0.000277300205202

assistance	2.0,4.0,0.0001858045336306206,0.00027730020520215184


In [200]:
# part b - check your SMOOTHED model results (RUN THIS CELL AS IS)
!grep money NaiveBayes/Smoothed/NBmodel.txt
# EXPECTED OUTPUT: money	1,22,0.000123869689087,0.00127558094393

money	1.0,22.0,0.0001238696890870804,0.0012755809439298986


__Evaluation__

In [201]:
# part c - write your code in NaiveBayes/evaluation_reducer.py then RUN THIS
!chmod a+x NaiveBayes/evaluation_reducer.py

In [202]:
# part c - unit test your evaluation job on the chinese model (RUN THIS CELL AS IS)
!cat NaiveBayes/chineseTest.txt | NaiveBayes/classify_mapper.py 
!cat NaiveBayes/chineseTest.txt | NaiveBayes/classify_mapper.py | NaiveBayes/evaluation_reducer.py

d5	1	-8.90668134500626	-8.10769031284611	1
d6	1	-5.780743515794329	-4.179502370564408	1
d7	0	-6.591673732011658	-7.511706880737812	0
d8	0	-4.394449154674438	-5.565796731681498	0
d5	1	-8.90668134500626	-8.10769031284611	 True
d6	1	-5.780743515794329	-4.179502370564408	 True
d7	0	-6.591673732011658	-7.511706880737812	 True
d8	0	-4.394449154674438	-5.565796731681498	 True
# Documents 4.0
True Positives: 2.0
True Negatives: 2.0
False Positives: 0.0
False Negatives: 0.0
Accuracy 1.0
Precision 1.0
Recall 1.0
F-Score 1.0


In [203]:
# part c - Evaluate the UNSMOOTHED Model Here (FILL IN THE MISSING CODE)

# clear output directory
!hdfs dfs -rm -r {HDFS_DIR}/enron-model-results

# hadoop command
!hadoop jar {JAR_FILE} \
  -files NaiveBayes/classify_mapper.py,NaiveBayes/Unsmoothed/NBmodel.txt,NaiveBayes/evaluation_reducer.py \
  -mapper classify_mapper.py \
  -reducer evaluation_reducer.py \
  -input {HDFS_DIR}/enron_test.txt \
  -output {HDFS_DIR}/enron-model-results \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

# save the model locally
!hdfs dfs -cat {HDFS_DIR}/enron-model-results/part-000* > NaiveBayes/Unsmoothed/results.txt

Deleted /user/root/HW2/enron-model-results
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob7161278097575508786.jar tmpDir=null
19/06/02 06:03:14 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 06:03:14 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 06:03:16 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/02 06:03:16 INFO mapreduce.JobSubmitter: number of splits:2
19/06/02 06:03:17 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1559430286868_0037
19/06/02 06:03:17 INFO impl.YarnClientImpl: Submitted application application_1559430286868_0037
19/06/02 06:03:17 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1559430286868_0037/
19/06/02 06:03:17 INFO mapreduce.Job: Running job: job_1559430286868_0037
19/06/02 06:03:31 INFO mapreduce.Job: Job job_1559430286868_0037 running in uber mode : false
19/06/02 06:03:31 

In [204]:
# part c - Evaluate the SMOOTHED Model Here (FILL IN THE MISSING CODE)

# clear output directory
!hdfs dfs -rm -r {HDFS_DIR}/smooth-model-results

# hadoop command
!hadoop jar {JAR_FILE} \
  -files NaiveBayes/classify_mapper.py,NaiveBayes/Smoothed/NBmodel.txt,NaiveBayes/evaluation_reducer.py \
  -mapper classify_mapper.py \
  -reducer evaluation_reducer.py \
  -input {HDFS_DIR}/enron_test.txt \
  -output {HDFS_DIR}/smooth-model-results \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

# save the model locally
!hdfs dfs -cat {HDFS_DIR}/smooth-model-results/part-000* > NaiveBayes/Smoothed/results.txt

Deleted /user/root/HW2/smooth-model-results
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob5262634055469441777.jar tmpDir=null
19/06/02 06:06:49 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 06:06:49 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 06:06:51 INFO mapred.FileInputFormat: Total input paths to process : 1
19/06/02 06:06:52 INFO mapreduce.JobSubmitter: number of splits:2
19/06/02 06:06:52 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1559430286868_0038
19/06/02 06:06:53 INFO impl.YarnClientImpl: Submitted application application_1559430286868_0038
19/06/02 06:06:53 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1559430286868_0038/
19/06/02 06:06:53 INFO mapreduce.Job: Running job: job_1559430286868_0038
19/06/02 06:07:08 INFO mapreduce.Job: Job job_1559430286868_0038 running in uber mode : false
19/06/02 06:07:08

In [205]:
# part c - display results 
# NOTE: feel free to modify the tail commands to match the format of your results file
print('=========== UNSMOOTHED MODEL ============')
!tail -n 9 NaiveBayes/Unsmoothed/results.txt
print('=========== SMOOTHED MODEL ============')
!tail -n 9 NaiveBayes/Smoothed/results.txt

# Documents 20.0	
True Positives: 1.0	
True Negatives: 9.0	
False Positives: 0.0	
False Negatives: 10.0	
Accuracy 0.5	
Precision 1.0	
Recall 0.09090909090909091	
F-Score 0.16666666666666669	
# Documents 20.0	
True Positives: 11.0	
True Negatives: 6.0	
False Positives: 3.0	
False Negatives: 0.0	
Accuracy 0.85	
Precision 0.7857142857142857	
Recall 1.0	
F-Score 0.88	


__`EXPECTED RESULTS:`__ 
<table>
<th>Unsmoothed Model</th>
<th>Smoothed Model</th>
<tr>
<td><pre>
# Documents:	20
True Positives:	1
True Negatives:	9
False Positives:	0
False Negatives:	10
Accuracy	0.5
Precision	1.0
Recall	0.0909
F-Score	0.1666
</pre></td>
<td><pre>
# Documents:	20
True Positives:	11
True Negatives:	6
False Positives:	3
False Negatives:	0
Accuracy	0.85
Precision	0.7857
Recall	1.0
F-Score	0.88
</pre></td>
</tr>
</table>

__`NOTE:`__ _Don't be too disappointed if these seem low to you. We've trained and tested on a very very small corpus... bigger datasets coming soon!_

__`part e starts here:`__

In [206]:
# part e - write your Hadoop job here (sort smoothed model on P(word|class))

# clear output directory
!hdfs dfs -rm -r {HDFS_DIR}/smooth-sort

# hadoop job
!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=4 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k4,4nr -k1,1" \
  -D mapreduce.partition.keypartitioner.options="-k3,3"  \
  -files NaiveBayes/model_sort_mapper.py \
  -mapper model_sort_mapper.py \
  -reducer /bin/cat \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
  -input {HDFS_DIR}/smooth-model \
  -output {HDFS_DIR}/smooth-sort \
  -numReduceTasks 2 \
  -cmdenv PATH={PATH}


rm: `/user/root/HW2/smooth-sort': No such file or directory
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.6.0-cdh5.15.1.jar] /tmp/streamjob7269015326975516087.jar tmpDir=null
19/06/02 06:14:31 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 06:14:31 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
19/06/02 06:14:32 INFO mapred.FileInputFormat: Total input paths to process : 4
19/06/02 06:14:32 INFO mapreduce.JobSubmitter: number of splits:4
19/06/02 06:14:33 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1559430286868_0039
19/06/02 06:14:33 INFO impl.YarnClientImpl: Submitted application application_1559430286868_0039
19/06/02 06:14:33 INFO mapreduce.Job: The url to track the job: http://docker.w261:8088/proxy/application_1559430286868_0039/
19/06/02 06:14:33 INFO mapreduce.Job: Running job: job_1559430286868_0039
19/06/02 06:14:43 INFO mapreduce.Job: Job job_1559430286868_0039 running in uber mode : false
1

In [208]:
# part e - print top words in each class

#I print more than 10 lines because of the extra ClassPriors
for idx in range(2):
    print(f"\n===== part-0000{idx}=====\n")
    !hdfs dfs -cat {HDFS_DIR}/smooth-sort/part-0000{idx} | head -n 14


===== part-00000=====

ClassPriors	47.0,33.0,0.5875,0.4125	ham	0.5875	
ClassPriors	47.0,33.0,0.5875,0.4125	ham	0.5875	
ClassPriors	47.0,33.0,0.5875,0.4125	ham	0.5875	
ClassPriors	47.0,33.0,0.5875,0.4125	ham	0.5875	
ect	378.0,0.0,0.023473306082001735,5.546004104043037e-05	ham	0.023473306082001735	
and	258.0,277.0,0.01604112473677691,0.015417891409239643	ham	0.01604112473677691	
hou	203.0,0.0,0.0126347082868822,5.546004104043037e-05	ham	0.0126347082868822	
in	160.0,157.0,0.009971509971509971,0.008762686484387999	ham	0.009971509971509971	
for	148.0,153.0,0.00922829183698749,0.008540846320226277	ham	0.00922829183698749	
on	122.0,95.0,0.007617985878855444,0.005324163939881316	ham	0.007617985878855444	
enron	116.0,0.0,0.007246376811594203,5.546004104043037e-05	ham	0.007246376811594203	
i	113.0,106.0,0.007060572277963582,0.00593422439132605	ham	0.007060572277963582	
will	113.0,69.0,0.007060572277963582,0.003882202872830126	ham	0.007060572277963582	
this	99.0,90.0,0.00619348445435402,0.005046

# HW2 ends here, please refer to the `README.md` for submission instructions.