```
---
title: MRJob lab
type: lab
duration: "1:25"
creator:
    name: Francesco Mosconi
    city: SF
updated (minor updates):
    name: David Yerrington
    city: SF
---
```

<img src="https://ga-dash.s3.amazonaws.com/production/assets/logo-9f88ae6c9c3871690e33280fcf557f33.png" style="float: left; margin: 15px;">
# MRJob Lab

## Introduction
In the past lab we've used a Virtual Machine to run Map Reduce jobs on native hadoop. As you may have understood, it's quite cumbersome and complicated.

Luckily we don't have to do that, because our friends at Yelp developed a great open source python library that wraps around hadoop streaming called [MRJob](https://github.com/Yelp/mrjob).

This is already installed in your VM, but you can also install it locally if you prefer, using:

`pip install mrjob`.

## MRJob

While Hadoop streaming is a simple way to do simple map-reduce tasks, it's complicated to use and not really friendly when things fail and we have to debug our code. Also, if we wanted to do a join from two different sets of data, it would be complicate to handle both with a single mapper, while it'd be much easier to have two separate mappers and one reducer.

_MRJob_ is a library written and maintained by YELP that allows us to write map reduce jobs in python.


Here's the word count map reduce, rewritten using MRJob.  

> Notice the use of `%%file` magic command.  This will write a file called "wordcount.py" into the same directory as this notebook.  Then you can run this file from shell `python wordcount.py counts.tsv` or transfer it over to the VM and run it.  We may go over how to access the VM on in class and how to transfer the code over as we work on this together.

In [7]:
%%file wordcount.py

"""The classic MapReduce job: count the frequency of words.
"""
from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")

class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def reducer(self, word, counts):
        yield (word, sum(counts))


if __name__ == '__main__':
     MRWordFreqCount.run()

Overwriting wordcount.py


Notice that it's almost trivial: the mapper and the reducer become methods of our `MRWordFreqCount` class that inherits from the MRJob class.

The script can be run (on the VM) in local mode with the following command:

```bash
python scripts/mrjob_wc.py data/project_gutenberg/1184-0.txt
```

Notice that it executes immediately on our VM. This is because if run in local mode MRJob will not use Hadoop to do the map-reduce. This will be very fast on small data, but become increasingly slow if data size increases.

We can switch to hadoop mode by simply using the flag `-r hadoop` like this:

```bash
python scripts/mrjob_wc.py data/project_gutenberg/1184-0.txt -r hadoop
```

As you can see this wraps around hadoop streaming and it automates all the steps we did manually in the previous lecture including:

- copying data to hdfs
- running the data through hadoop streaming
- copying back the output
- removing temporary folders from hdfs

Nice!!

## Exercise 1

1. Use the code above to run the Word count on the whole project_gutenberg folder using MRJob
1. Compare the execution time for the local mode and the hadoop mode.

> Answer: 
>
    python scripts/mrjob_wc.py data/project_gutenberg
    python scripts/mrjob_wc.py data/project_gutenberg -r hadoop




## Exercise 2: add a combiner


A Combiner, also known as a semi-reducer, is an optional class that operates by accepting the inputs from the Map class and thereafter passing the output key-value pairs to the Reducer class.

The main function of a Combiner is to summarize the map output records with the same key. The output (key-value collection) of the combiner will be sent over the network to the actual Reducer task as input.

In MRJob these can be added simply by defining a method called `combiner`. Go ahead and modify the `MR` class adding a combiner. Then run it on the `project_gutenberg` folder.

In [None]:
from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    def combiner(self, word, counts):
        yield word, sum(counts)

    def reducer(self, word, counts):
        yield word, sum(counts)


if __name__ == '__main__':
    MRWordFreqCount.run()

## Exercise 3: multi step jobs

So far we've always worked with one step Map-reduce jobs. These are very simple. What if we wanted to perform a calculation that involves multiple steps? For example, what if we wanted to count the words in our documents and then find the most common word? This would involve the following steps:

- map our text to a mapper that output pairs of (word, 1)
- combine the pairs using the word as key (optional)
- reduce the pairs using the word as key
- find the word with the maximum count

The last step can be achieved by chaining a new map reduce where the map function is the identity and the reduce function is something like:

In [None]:
def reducer_find_max_word(self, _, word_count_pairs):
    yield max(word_count_pairs)

Notice that we are aggregating over a blank key in order to get all possible word count pairs and then get the one with the maximum count.

Go ahead and insert that into the MR class. In order to do that you'll need to use the `steps` function which is documented [here](https://pythonhosted.org/mrjob/job.html#mrjob.job.MRJob.steps).

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")

class MRMostUsedWord(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # optimization: sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)

if __name__ == '__main__':
    MRMostUsedWord.run()

## Exercise 4: Setup and teardown of tasks

MRJob allows you to write methods to run at the beginning and end of each mapper and reducer functions: the *_init() and *_final() methods:

- mapper_init()
- combiner_init()
- reducer_init()
- mapper_final()
- combiner_final()
- reducer_final()

These functions are run only once at the beginning or at the end of each mapper and reducer and are useful for example when we want to have local variables available to the mapper.

For example we could use `mapper_init` to load some kind of support file, like a sqlite3 database, or perhaps create a temporary fileor variable.

Go ahead and rewrite the Word count using a `mapper_init` and `mapper_final` that do the following:

- mapper_init should initialize a dictionary for the words
- mapper should add the words to the dictionary as keys and increase the count each time the same word is ecountered
- mapper_final should yield the (word, count) pairs contained in the dictionary
- the reducer is going to be the same as usual

In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRWordFreqCount(MRJob):

    def init_get_words(self):
        self.words = {}

    def get_words(self, _, line):
        for word in WORD_RE.findall(line):
            word = word.lower()
            self.words.setdefault(word, 0)
            self.words[word] = self.words[word] + 1

    def final_get_words(self):
        for word, val in self.words.iteritems():
            yield word, val

    def sum_words(self, word, counts):
        yield word, sum(counts)

    def steps(self):
        return [MRStep(mapper_init=self.init_get_words,
                       mapper=self.get_words,
                       mapper_final=self.final_get_words,
                       combiner=self.sum_words,
                       reducer=self.sum_words)]

## Exercise 5: Counters

When we run a longer MR task, we may want to check the status of the calculation. This is achieved in MRJob using counters.

Hadoop lets you track counters that are aggregated over a step. A counter has a group, a name, and an integer value. Hadoop itself tracks a few counters automatically. mrjob prints your job’s counters to the command line when your job finishes, and they are available to the runner object if you invoke it programmatically.

To increment a counter from anywhere in your job, use the `increment_counter()` method.

Go ahead and add a custom counter to the mapper function of the word count so that we know how many words it has processed. In order to see the counts, you may want to redirect the output of the job to a file or to `/dev/null` if you don't care about it.

    python mrjob_counters.py data/project_gutenberg > /dev/null

In [None]:
from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")

class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            self.increment_counter('my_group', 'my_special_mapper_counter', 1)
            yield (word.lower(), 1)

    def reducer(self, word, counts):
        self.increment_counter('my_group', 'my_special_reducer_counter', 1)
        yield (word, sum(counts))


if __name__ == '__main__':
     MRWordFreqCount.run()

## Bonus: Putting it all together

You've seen how powerful MRJob can be. Let's put it all together and write a class that returns the top 15 most frequent words in our text. We can implement this in various ways, you're free to choose the one you think is more appropriate.

In [None]:
#!/usr/bin/python
# Copyright 2009-2010 Yelp
# Copyright 2013 David Marin
# Copytight 2016 Francesco Mosconi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Determine the most used word in the input."""
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
from heapq import nlargest

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWords(MRJob):

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_top_15_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding the top 15 results in key=counts, value=word
        for val in nlargest(15, word_count_pairs)
            yield val

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_top_15_word)
        ]


if __name__ == '__main__':
    MRMostUsedWords.run()

## Bonus 2: Use NLTK to recognize a book

Let's see if we can recognize a book from the most common words.

- Run a local MRJob word count on a single file in the project_gutemberg
- save the result to a pandas dataframe
- use nltk to filter out the common english words
- print out the most common 40 words
- what book is it?

In [3]:
import pandas as pd
from nltk.corpus import stopwords
s=set(stopwords.words('english'))

df = pd.read_csv('counts.tsv', sep='\t', names=['word', 'count']) 
df[df['word'].apply(lambda x: x not in s)].sort_values('count', ascending=False).head(40)

Unnamed: 0,word,count
0,',1115
2531,said,462
347,alice,385
1802,little,128
88,'i,126
2081,one,100
1433,gutenberg,91
1701,know,88
2312,project,87
1783,like,85


Additional Resources

- [MRJob Documentation](https://pythonhosted.org/mrjob/)
- [MRJob Examples](https://github.com/Yelp/mrjob/tree/master/mrjob/examples)
