<a href="https://colab.research.google.com/github/H-Petes/MapReduce/blob/master/MapReduce_book_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**MapReduce**

## Objective

In this practice, we try to

+ Understand and become familiar with MapReduce.
+ Apply MapReduce to solve different problems
+ Learn how to frame a problem in MapReduce
+ Use Elastic MapReduce to work with large databases.

You should spend time to think and understand MapReduce framework, as well as playing with the code to see what happens.

## mrjob

[mrjob](https://github.com/Yelp/mrjob) is a Python library that allows us to write MapReduce job in Python and run it on different platforms. 

For example, we can run the job:
+ On a local machine (to test)
+ On a Hadoop cluster
+ In the cloud using Amazon Elastic MapReduce (EMR)

In [None]:
# Install mrjob package
!pip install mrjob



## First example 
Counting the number of words in a document:

In [None]:
%%writefile eg1.py
from mrjob.job import MRJob

class WordCount1(MRJob):
    def mapper(self, _, line):
        yield "words", len(line.split())

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    WordCount1.run()

Overwriting eg1.py


In [None]:
!python eg1.py /content/sample_data/README.md

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/eg1.root.20200804.084038.681921
Running step 1 of 1...
job output is in /tmp/eg1.root.20200804.084038.681921/output
Streaming final output from /tmp/eg1.root.20200804.084038.681921/output...
"words"	80
Removing temp directory /tmp/eg1.root.20200804.084038.681921...


**What's happening here?**

**MapReduce job**

We import the class `MRJob` in `mrjob.job`. Then, we define our MapReduce class with `MRJob` as the parent class. 

In our new class `WordCount1`, we define 2 methods `mapper` and `reducer`, which are the map task and the reduce task. In this example where we have 1 map task and 1 reduce task, their names must be exactly `mapper` and `reducer`.

`mapper` receive a pair of key and value as input. For the `mapper` in the example, the key is ignored, the value is a line of the document. For each line, the `mapper` outputs a pair of key:value. As we can see, here we define the key as `"words"`, and it's the unique key. The value is the number of words in the line. `line.split()` gives us a list of 'words' in the line, so `len(line.split())` gives us the number of words in a line.

The `reducer` receives as input a key and an iterator of values (all tha values that share the corresponding key). As we've mentionned our unique key is "words", so the keys are numbers of words each line. As we want to count the number of words in the document we just output the key `"words"` (we choose again), and the sum of the above numbers.

Because `mrjob` wants an iterator as `output` of a `mapper` or reducer, we must use `yeild`, and not `return`.

**Run the code**

We write our code into a file called `mr1.py`. We also add
```
if __name__ == '__main__':
    WordCount1.run()
```
at the end so that we can run the code from the command-line. 

To run the code we execute:
```
!python mr1.py /content/sample_data/README.md
```
which consists of the code file `eg1.py` and the input of the MapReduce job `/content/sample_data/README.md`.

**Result**

We obtain some notifications and the result of the job `"words"	80`.

## Ex1

1. Based on the code above, write a file called `mr1.py` to count the number of characters in the above document. Run the job.

1. `mapper` may `yeild` as many key-value pairs as we want. In other words, we may have many `yeild` in the mapper codes. Let's say you use the key `"characters"` for the previous task. Modify your code to count the number of lines, by adding another `yield` to the mapper, with key `"lines"` and an appropriate value. Run the job.

## Running the job diferently


Pass input via stdin (note: mrjob will just dump it to a file first): 

In [None]:
%%writefile mr1.py
from mrjob.job import MRJob

class WordCount1(MRJob):
    def mapper(self, _, line):
        yield "characters", len(line)
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    WordCount1.run()

Overwriting mr1.py


In [None]:
!python mr1.py < /content/sample_data/README.md

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mr1.root.20200804.085231.505584
Running step 1 of 1...
reading from STDIN
job output is in /tmp/mr1.root.20200804.085231.505584/output
Streaming final output from /tmp/mr1.root.20200804.085231.505584/output...
"lines"	19
"characters"	911
Removing temp directory /tmp/mr1.root.20200804.085231.505584...


Multiple input files:

In [None]:
%%writefile mytext1.txt
Hello world

Writing mytext1.txt


In [None]:
%%writefile mytext2.txt
Good bye

Writing mytext2.txt


In [None]:
# 3 input files
!python mr1.py /content/sample_data/README.md mytext1.txt - < mytext2.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mr1.root.20200804.085450.851622
Running step 1 of 1...
reading from STDIN
job output is in /tmp/mr1.root.20200804.085450.851622/output
Streaming final output from /tmp/mr1.root.20200804.085450.851622/output...
"lines"	21
"characters"	930
Removing temp directory /tmp/mr1.root.20200804.085450.851622...


Write the result to a file:

In [None]:
# write the result to result1.txt
!python mr1.py /content/sample_data/README.md mytext1.txt - < mytext2.txt > result1.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mr1.root.20200804.085524.328427
Running step 1 of 1...
reading from STDIN
job output is in /tmp/mr1.root.20200804.085524.328427/output
Streaming final output from /tmp/mr1.root.20200804.085524.328427/output...
Removing temp directory /tmp/mr1.root.20200804.085524.328427...


## Ex2

We've counted the number of words in a document. Now we want to know how many times each word occurs.

To do so we change the mapper so that, for each line, we will yield many pairs of key-value. Each word (not necessarily distinct) is a key, and the corresponding value is 1 (which means the word is counted once).

In the reducer for each input key (a distinct word), we take the sum of all values to count the number of occurences.

Write a code to count know how many times each word occurs into a file called `mr2.py` and run it with the text file `/content/sample_data/README.md`.

In [None]:
%%writefile mr2.py
from mrjob.job import MRJob

class WordCount1(MRJob):
    def mapper(self, _, line):
        for word in line.split():
            yield word, 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    WordCount1.run()

Overwriting mr2.py


In [None]:
!python mr2.py < /content/sample_data/README.md

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mr2.root.20200804.090502.066463
Running step 1 of 1...
reading from STDIN
job output is in /tmp/mr2.root.20200804.090502.066463/output
Streaming final output from /tmp/mr2.root.20200804.090502.066463/output...
"more"	1
"of"	2
"originally"	1
"our"	1
"prepared"	1
"quartet](https://en.wikipedia.org/wiki/Anscombe%27s_quartet);"	1
"sample"	2
"small"	1
"started."	1
"the"	3
"to"	1
"was"	2
"which"	1
"you"	1
"http://yann.lecun.com/exdb/mnist/"	1
"https://developers.google.com/machine-learning/crash-course/california-housing-data-description"	1
"in"	2
"includes"	1
"information"	1
"is"	4
"it"	1
"library](https://github.com/altair-viz/vega_datasets/blob/4f67bdaad10f45e3549984e17e1b3088c731503d/vega_datasets/_data/anscombe.json)."	1
"`california_housing_data*.csv`"	1
"`mnist_*.csv`"	1
"a"	3
"and"	1
"at:"	2
"available"	1
"by"	1
"contains"	1
"copy"	2
"data"	1
"database](https://en.

## Ex3

In the previous task if you haven't used some regular expression yet, then you can see in the result that the code consider punctuations as parts of words.

+ Use this to define a regular expression pattern:

```
import re

WORD_RE = re.compile(r"[\w']+")
```

+ In you mapper code, use this to have a list of words (all non-overlapping matches of pattern):

```
WORD_RE.findall(line)
```

+ Also use the method `.lower()` of a string to turn a word into lower case in the maper.


write your file as mr3.py and run it with the text file `/content/sample_data/README.md`.

In [None]:
%%writefile mr3.py
from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")

class WordFreq(MRJob):
  def mapper(self, _, line):
    for word in WORD_RE.findall(line):
      yield word.lower(), 1

  def reducer(self, key, values):
    yield key, sum(values)

if __name__ == '__main__':
  WordFreq.run()

Writing mr3.py


In [None]:
!python mr3.py < /content/sample_data/README.md

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mr3.root.20200804.112315.866695
Running step 1 of 1...
reading from STDIN
job output is in /tmp/mr3.root.20200804.112315.866695/output
Streaming final output from /tmp/mr3.root.20200804.112315.866695/output...
"quartet"	1
"sample"	2
"small"	1
"started"	1
"statistical"	1
"statistician"	1
"the"	3
"this"	1
"to"	1
"us"	1
"vega_datasets"	3
"viz"	1
"was"	2
"which"	1
"wiki"	2
"wikipedia"	2
"yann"	1
"you"	1
"http"	1
"https"	4
"in"	2
"includes"	1
"information"	1
"is"	4
"it"	1
"j"	1
"json"	2
"jstor"	1
"learning"	1
"lecun"	1
"library"	1
"machine"	1
"mnist"	2
"mnist_"	1
"mnist_database"	1
"more"	1
"of"	2
"org"	2
"originally"	1
"our"	1
"prepared"	1
"california_housing_data"	1
"census"	1
"com"	3
"contains"	1
"copy"	2
"course"	1
"crash"	1
"csv"	2
"data"	2
"database"	1
"datasets"	1
"described"	2
"description"	1
"developers"	1
"directory"	1
"en"	2
"exdb"	1
"f"	1
"few"	1
"from"	1
"get

We see that the result is not perfect but better. Now that we have many pairs of word-frequency, we want to sort them by frequency. 

## Multiple mapper and reducer steps

In MapReduce jobs, you often need to use multiple map and reduce steps. We do so by using the function `MRStep` to define a step. We then make a list steps. This list will be the output of the method `steps` of the MapReduce class.

Let's see how we use multiple mapper and reducer steps to obtain sorted word frequencies:

In [None]:
%%writefile eg4.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")

class SortedWordFreq(MRJob):
  def steps(self):
    return [
        MRStep(mapper=self.mapper1, reducer=self.reducer1),
        MRStep(reducer=self.reducer2),
    ]
  def mapper1(self, _, line):
    for word in WORD_RE.findall(line):
      yield word.lower(), 1
    
  def reducer1(self, key, values):
    yield 'OneForAll', (sum(values), key)
    
  def reducer2(self, _, value_key_pairs):
    for value, key in sorted(value_key_pairs, reverse=True):
      yield key, value
   

if __name__ == '__main__':
  SortedWordFreq.run()

Writing eg4.py


In [None]:
!python eg4.py /content/sample_data/README.md 

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/eg4.root.20200804.112347.672958
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/eg4.root.20200804.112347.672958/output
Streaming final output from /tmp/eg4.root.20200804.112347.672958/output...
"is"	4
"https"	4
"anscombe"	4
"vega_datasets"	3
"the"	3
"com"	3
"a"	3
"wikipedia"	2
"wiki"	2
"was"	2
"sample"	2
"org"	2
"of"	2
"mnist"	2
"json"	2
"in"	2
"housing"	2
"en"	2
"described"	2
"data"	2
"csv"	2
"copy"	2
"california"	2
"at"	2
"you"	1
"yann"	1
"which"	1
"viz"	1
"us"	1
"to"	1
"this"	1
"statistician"	1
"statistical"	1
"started"	1
"small"	1
"quartet"	1
"prepared"	1
"our"	1
"originally"	1
"more"	1
"mnist_database"	1
"mnist_"	1
"machine"	1
"library"	1
"lecun"	1
"learning"	1
"jstor"	1
"j"	1
"it"	1
"information"	1
"includes"	1
"http"	1
"google"	1
"github"	1
"get"	1
"from"	1
"few"	1
"f"	1
"exdb"	1
"directory"	1
"developers"	1
"description"	1
"datasets"	1
"da

In the above example we use a trick to make sure global sorting. We let the first `reducer` output one unique key and use the frequency-word pair as value. We don't have a second `mapper`. Because there is one unique key, there will be only one `reducer` node. It sorts the data as we want.

## Ex4: 
Based on the example above, write a MapReduce job `mr4.py` to find the all characters and their frequecies in a document, and output them in sorted order. Run the code with the text file `/content/sample_data/README.md`.

In [None]:
%%writefile mr4.py
from mrjob.job import MRJob
from mrjob.step import MRStep

class SortedCharFreq(MRJob):
  def steps(self):
    return [
        MRStep(mapper=self.mapper1, reducer=self.reducer1),
        MRStep(reducer=self.reducer2),
    ]
  def mapper1(self, _, line):
    for character in line:
      yield character.lower(), 1
    
  def reducer1(self, character, freqs):
    yield 'OneForAll', (sum(freqs), character)
    
  def reducer2(self, _, value_key_pairs):
    for freq, character in sorted(value_key_pairs, reverse=True):
      yield character, freq
   

if __name__ == '__main__':
  SortedCharFreq.run()

Writing mr4.py


In [None]:
!python mr4.py < /content/sample_data/README.md

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mr4.root.20200804.112758.440403
Running step 1 of 2...
reading from STDIN
Running step 2 of 2...
job output is in /tmp/mr4.root.20200804.112758.440403/output
Streaming final output from /tmp/mr4.root.20200804.112758.440403/output...
" "	112
"a"	77
"s"	62
"i"	57
"t"	56
"e"	53
"o"	39
"n"	39
"c"	31
"r"	30
"d"	27
"/"	27
"."	22
"l"	21
"m"	20
"h"	18
"b"	18
"p"	16
"g"	15
"u"	13
"f"	11
"y"	9
"_"	9
"1"	9
"w"	8
"v"	8
":"	8
"9"	7
"7"	7
"-"	7
"`"	6
"8"	5
"3"	5
"2"	5
"*"	5
")"	5
"("	5
"k"	4
"j"	4
"4"	4
"0"	4
"]"	3
"["	3
"5"	3
"'"	3
"q"	2
";"	2
"6"	2
","	2
"z"	1
"x"	1
"%"	1
Removing temp directory /tmp/mr4.root.20200804.112758.440403...


**California Housing Data Set** is California housing data from the 1990 US Census.  

More information about the data set is available at: [this link.](https://developers.google.com/machine-learning/crash-course/california-housing-data-description)
## Ex5
+ Have a look at a few first lines of the data set `/content/sample_data/california_housing_train.csv`.
+ Write and run a MapReduce code called `mr5.py` to find the maximum medianIncome.


In [None]:
%%writefile mr5.py
from mrjob.job import MRJob

class MaxMedIncome(MRJob):
  def mapper(self, _, line):
    income = line.split(',')
    try:
      yield 'income', float(income)
    except:
      pass
      
  def reducer(self, _, incomes):
    yield 'Maximum medianIncome', (max(incomes))

if __name__ == '__main__':
  MaxMedIncome.run()

Overwriting mr5.py


In [None]:
!python mr5.py < /content/sample_data/README.md

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mr5.root.20200804.112925.156045
Running step 1 of 1...
reading from STDIN
job output is in /tmp/mr5.root.20200804.112925.156045/output
Streaming final output from /tmp/mr5.root.20200804.112925.156045/output...
Removing temp directory /tmp/mr5.root.20200804.112925.156045...


## Combiner
Imagine that you are counting the number of occurences for each word. If the mapper only output a pair of word-count where count is 1, then there is a lot of networ traffic between mapper nodes and reducer nodes.

To reduce the amount of traffic, We use something called combiner. The idea is that we put a bit of reducer task to the mapper node. After the the map task, a mapper node process the result with the combiner before sending it to reducer nodes.

Let's see an example of finding word frequencies:

In [None]:
%%writefile eg6.py

from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")

class WordFreq(MRJob):
  def mapper(self, _, line):
    for word in WORD_RE.findall(line):
      yield word.lower(), 1
  
  def combiner(self, key, values):
    yield key, sum(values)
    
  def reducer(self, key, values):
    yield key, sum(values)

if __name__ == '__main__':
  WordFreq.run()

Writing eg6.py


In [None]:
!python eg6.py /content/sample_data/README.md 

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/eg6.root.20200804.112948.623929
Running step 1 of 1...
job output is in /tmp/eg6.root.20200804.112948.623929/output
Streaming final output from /tmp/eg6.root.20200804.112948.623929/output...
"org"	2
"originally"	1
"our"	1
"prepared"	1
"quartet"	1
"sample"	2
"small"	1
"started"	1
"statistical"	1
"statistician"	1
"the"	3
"this"	1
"to"	1
"us"	1
"vega_datasets"	3
"viz"	1
"was"	2
"which"	1
"wiki"	2
"wikipedia"	2
"yann"	1
"you"	1
"from"	1
"get"	1
"github"	1
"google"	1
"housing"	2
"http"	1
"https"	4
"in"	2
"includes"	1
"information"	1
"is"	4
"it"	1
"j"	1
"json"	2
"jstor"	1
"learning"	1
"lecun"	1
"library"	1
"machine"	1
"mnist"	2
"mnist_"	1
"mnist_database"	1
"more"	1
"of"	2
"california"	2
"california_housing_data"	1
"census"	1
"com"	3
"contains"	1
"copy"	2
"course"	1
"crash"	1
"csv"	2
"data"	2
"database"	1
"datasets"	1
"described"	2
"description"	1
"developers"	1
"directory

Remark: The result is exactly the same as the case where we don't use a combiner. The difference is what happens behind the scene. The benefit of combiner would be seen when we process a huge amount of data.

## Ex6

Write and run a MapReduce code called `mr6.py` to find the mean and standard deviation of population in the data set `/content/sample_data/california_housing_train.csv`. Use one mapper, one combiner and one reducer.

In [None]:
%%writefile mr6.py
from mrjob.job import MRJob

class MeanStdPop(MRJob):
  def mapper(self, _, line):
    population = line.split(',')[5]
    try:
      yield 'income', float(population)
    except:
      pass
    
  def combiner(self, _, populations):
    n = 0
    Sum = 0
    Sum_squared = 0
    for population in populations:
      n += 1
      Sum += population
      Sum_squared += population**2
    yield 'OneForAll', (n, Sum, Sum_squared)
    
  def reducer(self, _, n_Sum_Sum_squareds):
    n = 0
    Sum = 0
    Sum_squared = 0
    for n_Sum_Sum_squared in n_Sum_Sum_squareds:
      n += n_Sum_Sum_squared[0]
      Sum += n_Sum_Sum_squared[1]
      Sum_squared += n_Sum_Sum_squared[2]
    yield 'Mean and Std of population:', (Sum/n, (Sum_squared/n)**0.5)

if __name__ == '__main__':
  MeanStdPop.run()

Writing mr6.py


In [None]:
!python mr6.py /content/sample_data/california_housing_train.csv

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mr6.root.20200804.114353.064060
Running step 1 of 1...
job output is in /tmp/mr6.root.20200804.114353.064060/output
Streaming final output from /tmp/mr6.root.20200804.114353.064060/output...
"Mean and Std of population:"	[1429.5739411764705, 1833.3495480227698]
Removing temp directory /tmp/mr6.root.20200804.114353.064060...


## EMR

Amazon Elastic MapReduce is a web service that allows to process a vast amount of data. By using this cloud vervice, we don't need to invest in machines, maintain the system, pay the electricity bill to run the our jobs.

1. To use EMR we need an [AWS](https://aws.amazon.com/) account and a credit card.

2. Next we will create a key to use with mrjob. Sign in and go to [security credentials](https://console.aws.amazon.com/iam/home?#/security_credentials).

3. Here you will see some warning: "The account credentials provide unlimited access to your AWS resources." That's why you should keep your credentials safe or delete them after use, so that noone can have them and spend your money on the cloud. Here we choose "Continue to Security Credentials".

4. Click the tab "Access keys (access key ID and secret access key)". Here you can create, inactivate/reactivate, or delete your keys.

5. Choose "Create New Access Key". Then AWS states that the key has been created successfully. You can see it by clicking "Show Access Key". You can download it (a csv file that contains it) because AWS won't show you this key again. When you click "close" or the exit button, your key is ready to be used. 

The code below shows how to run mrjob with Elastic MapReduce:




In [None]:
# Set environment variable
import os
os.environ['AWS_ACCESS_KEY_ID'] = 'AKIAINQOIMOFV52AFX4A'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'qUZPKmbOQJ6Ym+j6WVHA06OaPCIkHKSP36gIFSIP'

In [None]:
# run with emr by adding `-r emr`
!python mr6.py -r emr /content/sample_data/california_housing_train.csv

No configs found; falling back on auto-configuration
No configs specified for emr runner
Traceback (most recent call last):
  File "mr6.py", line 32, in <module>
    MeanStdPop.run()
  File "/usr/local/lib/python3.6/dist-packages/mrjob/job.py", line 616, in run
    cls().execute()
  File "/usr/local/lib/python3.6/dist-packages/mrjob/job.py", line 687, in execute
    self.run_job()
  File "/usr/local/lib/python3.6/dist-packages/mrjob/job.py", line 634, in run_job
    with self.make_runner() as runner:
  File "/usr/local/lib/python3.6/dist-packages/mrjob/job.py", line 713, in make_runner
    return self._runner_class()(**self._runner_kwargs())
  File "/usr/local/lib/python3.6/dist-packages/mrjob/emr.py", line 314, in __init__
    self._fix_s3_tmp_and_log_uri_opts()
  File "/usr/local/lib/python3.6/dist-packages/mrjob/emr.py", line 527, in _fix_s3_tmp_and_log_uri_opts
    self._set_cloud_tmp_dir()
  File "/usr/local/lib/python3.6/dist-packages/mrjob/emr.py", line 545, in _set_cloud_tmp_di

As we can see, the program:
+ Sets up a S3 directory (Amazon simple storage service).
+ Uploads our data and our code to this directory.
+ Creates a cluster and run the job
+ Streams the final output (send it to us)
+ Removes S3 directory, log files, and terminate the cluster.

By default, for EMR, the region of cluster is 'us-west-2' Oregon. You can change the region using `--region 'us-east-1'`. You find the list of regions [here](https://docs.aws.amazon.com/general/latest/gr/rande.html). 

1. Go to your AWS console, choose the region of your cluster (by default Oregon), then go to EMR service. If it's the correct region, you will see your clusters there. 

2. If you click the black arrow on the left of your cluste tab (to expand), you can see more information, like the hardware or the steps of MapReduce job. 

3. Now, for more details you can click "View cluster details". Here we have several tabs from "summary" to "Bootstrap actions". 

4. In the tab "Hardware" you can the hardware of your cluster. By default, mrjob uses only one machine. 



## Running several cores

To increase the number of machines you can add `--num-core-instances 3`. Here 3 is the number of core (worker) instances. We have 1 master node (1 machine), so 4 machines in total.

You should note that using 3 worker instances doesn't increase the speed 3 times. Therefore you will probably pay more. The processing time will probably decrease but less than 3 times (ideal case). That is because managing many machines takes some time.

By default, the machine type for emr is `m5.xlarge` (more information: [machine types](https://aws.amazon.com/ec2/instance-types/), [pricing](https://aws.amazon.com/emr/pricing/))

You can choose the type of machines by using `--instance-type 'm5.xlarge'`.

In [None]:
# 3 worker instances
!python mr6.py -r emr --num-core-instances 3 /content/sample_data/california_housing_train.csv

## Ex7 (Debug EMR)

Before running a MapReduce job on the cloud, we should debug it locally first. Sometimes even when the job runs well locally, we get some errors running it on the cloud, probably because of the difference between operating systems, packages, etc. 

Run the codes below, see EMR error messages and try to debug the code 

In [None]:
# example
%%writefile eg7.py
from mrjob.job import MRJob

class MaxMedIncome(MRJob):
  def mapper(self, _, line):
    income = line.split(',')[7]
    yield 'income', float(income)
      
  def reducer(self, _, incomes):
    yield 'Maximum medianIncome', (max(incomes))

if __name__ == '__main__':
  MaxMedIncome.run()

In [None]:
# run with emr
!python eg7.py -r emr /content/sample_data/california_housing_train.csv

## Ex8

Download (and extract) the [Book-Crossing Dataset](http://www2.informatik.uni-freiburg.de/~cziegler/BX/) collected by Cai-Nicolas Ziegler 2004. 

Write a MapReduce code `mr8.py` to find the top 10 popular books (top 10 ISBNs that have the largest number of rating). Run the code and save the result to `top10.txt`.

## Ex9 (Ancillary data)

The idea is that ancillary data is small enough. We use the ancillary data to extract some information which will be stored in an attribute of the MapReduce class.

Complete the code of the mapper below to find the names of the top 10 popular books.

In [None]:
## Top 10 ISBN
%%writefile mr9.py
from mrjob.job import MRJob
from mrjob.step import MRStep

class Top10book(MRJob):
  
  # define how to pass a file as argument
  def configure_args(self):
    super(Top10book, self).configure_args()
    self.add_file_arg('--top10', help='Path to top10.txt')
  
  # define the method to process that file
  # to create a new attribute called top10ISBN
  def top10ISBN(self):
    self.top10ISBN = {}
    with open("top10.txt", encoding='ascii', errors='ignore') as f:
      for line in f:
        count, ISBN = line.split()
        self.top10ISBN[ISBN.strip('"')] = count
  
  # the method will be called before mapper 1
  def steps(self):
    return [MRStep(mapper=self.mapper1, mapper_init=self.top10ISBN)]
  
  # we can then use the attribute top10ISBN in the mapper below
  def mapper1(self, _, line):
    ### 
    
    # Some code here
    
    ###
    
if __name__ == '__main__':
  Top10book.run()


In [None]:
!python mr9.py --top10 /content/top10.txt /content/BX-Books.csv 

## Ex10 (extra)
Find the names of the top 10 rated books (top 10 average rating).