# Word Count with MapReduce and mrjob

`mrjob` is a Python package that helps you write and run Hadoop Streaming jobs. It supports Amazon's Elastic MapReduce (EMR) and it also works with your own Hadoop cluster.  It has been released as an open-source framework by Yelp and we will use it to interface with Hadoop due to its legibility and ease of use with MapReduce tasks.  

[mrjob docs](http://mrjob.readthedocs.org/en/latest/index.html) 
[mrjob tutorial](https://pythonhosted.org/mrjob/guides/quickstart.html) 

Some important features of ```mrjob```:

* It can run jobs on EMR, your own Hadoop cluster, or locally (for testing).
* It can be used to write multi-step jobs, where one map-reduce step feeds into the next. 

## Exercise 1: Use MapReduce to execute code across documents in multiple sub-directories 

### Simple Count

Do a simple word count on the [Reuters 20 Newsgroups dataset](http://qwone.com/~jason/20Newsgroups/) click [here](http://qwone.com/~jason/20Newsgroups/20news-19997.tar.gz) to download.  If that is unavailable look [here](http://kdd.ics.uci.edu/databases/20newsgroups/20newsgroups.html). The beauty of MapReduce is that it will execute the code in all the subfolders within the specified directory. Cool!


#### 1. Create a file `wordcounts.py` with the following code:

```Python
from mrjob.job import MRJob
from string import punctuation


class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in line.split():
            yield (word.strip(punctuation).lower(), 1)

    def reducer(self, word, counts):
        yield (word, sum(counts))

if __name__ == '__main__':
    MRWordFreqCount.run()

```
  

#### 2. Create a mini version (to test code) of the dataset with these terminal commands: 

```bash
    mkdir mini_20_newsgroups
    mkdir mini_20_newsgroups/comp.windows.x
    mkdir mini_20_newsgroups/rec.motorcycles
    mkdir mini_20_newsgroups/sci.med
    cp 20_newsgroups/comp.windows.x/663* mini_20_newsgroups/comp.windows.x
    cp 20_newsgroups/rec.motorcycles/10311* mini_20_newsgroups/rec.motorcycles
    cp 20_newsgroups/sci.med/5889* mini_20_newsgroups/sci.med
    ```
    
#### 3. Execute the script from the terminal

From the terminal, execute the file with the mini folder `mini_20_newsgroups` and the MapReduce will go through each folder in the directory and perform the word count.

The results are stored in a text file called `mini20_results.txt` in the folder `print_outs`

# Exercise 2: json and tokenization

I want to take it a step further and use word tokenization to clean up my results. I have a json file containing NYT articles `articles.json`. I want to clean and lemmitize the text before I tokenize. I also want to count words by the times they appear in a given section of the newspaper. This is what I will call my topic.

```Python
import json
from string import punctuation
from nltk import word_tokenize
from nltk.stem import SnowballStemmer
from mrjob.job import MRJob

sno = SnowballStemmer('english')

class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        d = json.loads(line)
        section = d['section_name']
        for line in d['content']:
            line = line.strip().encode('ascii','ignore').decode('utf-8').lower()
            translator = line.maketrans('', '', punctuation)
            new_line = line.translate(translator)
            for word in word_tokenize(new_line):
                    stemmed_wrd = sno.stem(word)
                    # Word count by section name
                    yield ("{}_{}".format(section, stemmed_wrd), 1)

    def combiner(self, key, counts):
        yield (key, sum(counts))

    def reducer(self, key, counts):
        yield (key, sum(counts))

if __name__ == '__main__':
    MRWordFreqCount.run()
```



The results are sent to a text file called `json_results.txt` in the `print_outs` folder.

# Exercise 3: What words are most associated with what topics?

We can implement multiple mapper, combiner, and reducer functions in one MapReduce job. Then using the `MRStep` to tell `MRJob` the sequence to run them in. 

__topics_by_wordcount.py__

```python
from mrjob.job import MRJob
from mrjob.step import MRStep
from nltk import word_tokenize
from nltk.stem import SnowballStemmer
import json
from string import punctuation

sno = SnowballStemmer('english')

class WordCountByTopic(MRJob):

    # Mapper 1: word count by section
    def mapper1(self, _, line):
        d = json.loads(line)
        topic = d['section_name']
        for art_text in d['content']:
            art_text = art_text.strip().encode("ascii", "ignore").decode('utf-8').lower()
            translator = art_text.maketrans('', '', punctuation)
            clean_text = art_text.translate(translator)
            for word in word_tokenize(clean_text):
                wrd = sno.stem(word)
                yield (topic + "_" + wrd, 1)

    def combiner1(self, key, counts):
        yield (key, sum(counts))

    def reducer1(self, key, counts):
        yield (key, sum(counts))

    # Mapper 2: For each word, what topic appears most frequent?
    def mapper2(self, key, count):
        topic, word = key.split('_')
        yield word, (topic, count)

    def combiner2(self, word, values):
        yield word, max(values, key=lambda x: x[1])

    def reducer2(self, word, values):
        word_ = "word: {}".format(word)
        topic_ = "Topic: {}".format(max(values, key=lambda x: x[1])[0])
        yield word_, topic_

    def steps(self):
        return [
            MRStep(mapper=self.mapper1,
                   combiner=self.combiner1,
                   reducer=self.reducer1),

            MRStep(mapper=self.mapper2,
                   combiner=self.combiner2,
                   reducer=self.reducer2)]

if __name__ == '__main__':
    WordCountByTopic.run()
```

The results are sent to a text file called `common_topic_results.txt` in the `print_outs` folder.