# Ex 2.1 Hadoop MapReduce with Python
There are two prominent *Python* APIs for interfacing *Hadoop MapReduce* clusters:

## *Snakebite* for *HDFS* access
The [Snakebite Lib](https://github.com/spotify/snakebite) allows easy access to *HDFS* file systems:  
```
>>> from snakebite.client import Client
>>> client = Client("localhost", 8020, use_trash=False)
>>> for x in client.ls(['/']):
...     print x
```

See [documentation](https://snakebite.readthedocs.io/en/latest/) for details.


## *MRJOB* for *MapReduce* job execution
The ``mrjob`` lib -> [see docu](https://mrjob.readthedocs.io/en/latest/index.html) is a power full *MapReduce* client for *Python*. Some of the key features are:

* local emulation (single and multi-core) a *Hadoop* cluster for development and debugging
* simple access, authentication and file transfer to *Hadoop* clusters
* powerful API for common cloud services, such as AWS or Azure   

### Preparing our environment

In [1]:
#install mrjob lib and boto3 for AWS S3 access
!conda install -c conda-forge -y mrjob boto3

#or !pip install mrjob boto3

Collecting package metadata (current_repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: /home/student/anaconda3

  added / updated specs:
    - boto3
    - mrjob


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    boto3-1.9.66               |           py37_0         107 KB
    botocore-1.12.189          |             py_0         2.6 MB
    ca-certificates-2020.4.5.2 |       hecda079_0         147 KB  conda-forge
    cachetools-4.1.0           |             py_1          12 KB  conda-forge
    certifi-2020.4.5.2         |   py37hc8dfbb8_0         152 KB  conda-forge
    conda-4.8.3                |   py37hc8dfbb8_1         3.0 MB  conda-forge
    google-api-core-1.20.0     |   py37hc8dfbb8_0         2.9 MB  conda-forge
    google-api-python-client-1.9.3|     pyh9f0ad1d_0          46 KB  conda-forge
    google-auth-1.16.1         |     pyh9f0ad1d_0 

## A *MRJOB* Example: WordCount (again)
Since *Hadoop* works only on file in- and outputs, we do not have usual function based API. We need to pass our code (implementation of *Map* and *Reduce*) as executable *Python* scripts:

* use *Jupyter's* ``%%file`` magic command to write the cell to file
* create a executable script with ``__main__`` method
* inherit from the ``MRJob`` class
* implement ``mapper()`` and ``reducer()`` methods
* call ``run()`` at start

In [17]:
%%file wordcount.py 
#this will save this cell as file

from mrjob.job import MRJob

class MRWordCount(MRJob):
    def mapper(self, _, line):
        for word in line.split():
            yield(word, 1)
 
    def reducer(self, word, counts):
        yield(word, sum(counts))
        
if __name__ == '__main__':
    MRWordCount.run()

Overwriting wordcount.py


### execute script from cmd
* ``-r local`` causes local multi-core emulation a *Hadoop* cluster.
* Input files are cmd arguments
* define ouput-file (see docs) or use streams: `` > out.txt``

In [18]:
! python wordcount.py -r local text1.rst text2.rst text3.rst

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/wordcount.student.20200612.092011.697418
Running step 1 of 1...
job output is in /tmp/wordcount.student.20200612.092011.697418/output
Streaming final output from /tmp/wordcount.student.20200612.092011.697418/output...
"soul,"	7
"souls"	4
"sphinx"	3
"splendour"	3
"spot,"	4
"spring"	4
"stalks,"	3
"steal"	3
"still"	3
"stray"	3
"stream;"	3
"strength"	3
"strikes"	3
"stroke"	3
"subline"	3
"sun"	3
"supplies"	4
"surface"	3
"sustains"	3
"sweet"	4
"take"	3
"taken"	4
"talents."	3
"tall"	3
"teems"	3
"text"	3
"texts"	3
"texts."	4
"than"	3
"that"	21
"the"	182
"their"	10
"then"	12
"then,"	3
"there"	7
"these"	10
"they"	10
"think"	3
"this"	4
"thousand"	6
"thousands"	3
"throw"	3
"times"	3
"to"	24
"-"	3
"--"	6
"A"	16
"Adjusting"	3
"Alex"	3
"All"	3
"Almighty,"	3
"Alphabet"	3
"Amazingly"	3
"And"	3
"Bawds"	3
"Baz,"	3
"Big"	6
"Blind"	9
"Blowzy"	3
"Bookmarksgrove"	4
"Bookmarksgrove,"	3
"Braw

## Execution on AWS EMR
AWS EMR is a clound formation service which allows you to create *Hadoop*, *Spark* and other data analytics clusters with a few clicks.

**NOTE**: we are not endorsing AWS specifically, other cloud service providers have similar offers



### Case 1: create cluster on the fly 
We create a cluster just for a single job:
* simple solution for large jobs that run only once (or only at sparse points in time)
* this approach cause a lot of over head: not suitable for small and frequent jobs  

First, we need a config file for the connection to EMR:
**fill in YOUR AWS credentials**

In [21]:
%%file mrjob.conf
runners:
  emr:
    aws_access_key_id: AKIA4KIF2TSESGNTP6BS
    aws_secret_access_key: GtDb5Y5y69cmXSDiDjJLVBa/BVeFVTNRdoTo2zdK
    instance_type: m5.xlarge
    num_core_instances: 2
    region: eu-west-1

Overwriting mrjob.conf


In [22]:
!python wordcount.py -r emr --bootstrap-mrjob text1.rst text2.rst -c mrjob.conf

Using s3://mrjob-42e7145df80ebe94/tmp/ as our temp dir on S3
Creating temp directory /tmp/wordcount.student.20200612.092356.116469
writing master bootstrap script to /tmp/wordcount.student.20200612.092356.116469/b.sh
uploading working dir files to s3://mrjob-42e7145df80ebe94/tmp/wordcount.student.20200612.092356.116469/files/wd...
Copying other local files to s3://mrjob-42e7145df80ebe94/tmp/wordcount.student.20200612.092356.116469/files/
Can't access IAM API, trying default instance profile: EMR_EC2_DefaultRole
Can't access IAM API, trying default service role: EMR_DefaultRole
Created new cluster j-23FTJRVZP9M1T
Added EMR tags to cluster j-23FTJRVZP9M1T: __mrjob_label=wordcount, __mrjob_owner=student, __mrjob_version=0.7.2
Waiting for Step 1 of 1 (s-2E4PEGJM3AJN8) to complete...
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running boots

### Case 3: connect to existing cluster

In [23]:
%%file mrjob_cluster.conf
runners:
  emr:
    aws_access_key_id: AKIA4KIF2TSESGNTP6BS
    aws_secret_access_key: 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
    region: eu-west-1

Writing mrjob_cluster.conf


We need the **ID** of the cluster we want to connect to.

In [24]:
! python wordcount.py -r emr --cluster-id=j-CLUSTERID text1.rst text2.rst -c mrjob_cluster.conf  

Using s3://mrjob-42e7145df80ebe94/tmp/ as our temp dir on S3
Creating temp directory /tmp/wordcount.student.20200612.093122.675790
uploading working dir files to s3://mrjob-42e7145df80ebe94/tmp/wordcount.student.20200612.093122.675790/files/wd...
Copying other local files to s3://mrjob-42e7145df80ebe94/tmp/wordcount.student.20200612.093122.675790/files/
Adding our job to existing cluster j-CLUSTERID
Traceback (most recent call last):
  File "wordcount.py", line 14, in <module>
    MRWordCount.run()
  File "/home/student/anaconda3/lib/python3.7/site-packages/mrjob/job.py", line 616, in run
    cls().execute()
  File "/home/student/anaconda3/lib/python3.7/site-packages/mrjob/job.py", line 687, in execute
    self.run_job()
  File "/home/student/anaconda3/lib/python3.7/site-packages/mrjob/job.py", line 636, in run_job
    runner.run()
  File "/home/student/anaconda3/lib/python3.7/site-packages/mrjob/runner.py", line 507, in run
    self._run()
  File "/home/student/anaconda3/lib/python3.7

## Exercise
Use  *mrjob*  to  compute  employee  **top  annual  salaries** and  **gross pay** in the *CSV* table ``Baltimore_City_employee_Salaries_FY2014.csv``.

* use  ``import csv`` to read the data -> [API docs](https://docs.python.org/3/library/csv.html)
* use ``yield`` to return *producers* from *map* and *reduce* functions
* return top entries in both categories 

In [33]:
import csv
with open('Baltimore_City_Employee_Salaries_FY2014.csv', newline='') as csvfile:
    balitmore_reader = csv.reader(csvfile, delimiter=' ', quotechar='|')
    for row in balitmore_reader:
        length=len(row)
        #yield(row,row(length))
        #print (row)
       # print(row)


In [40]:
with open('Baltimore_City_Employee_Salaries_FY2014.csv', newline='') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        yield(row, row["AnnualSalary"])
        yield(row, row["GrossPay"])
        #print(row["AnnualSalary"])
        #print(row["GrossPay"])

SyntaxError: unexpected EOF while parsing (<ipython-input-40-674e3f3a7e68>, line 7)

In [44]:
%%file topsalaries.py 
#this will save this cell as file

from mrjob.job import MRJob

class MRTopSalaries(MRJob):
    def mapper(self, _, line):
        with open('Baltimore_City_Employee_Salaries_FY2014.csv', newline='') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                yield(row, row["AnnualSalary"])
                yield(row, row["GrossPay"])
 
    def reducer(self, word, pay):
        yield(word, max(pay))
        
if __name__ == '__main__':
    MRTopSalaries.run()

Overwriting topsalaries.py


In [45]:
! python topsalaries.py

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/topsalaries.student.20200612.094826.176168
Running step 1 of 1...
reading from STDIN
^C
Traceback (most recent call last):
  File "topsalaries.py", line 17, in <module>
    MRTopSalaries.run()
  File "/home/student/anaconda3/lib/python3.7/site-packages/mrjob/job.py", line 616, in run
    cls().execute()
  File "/home/student/anaconda3/lib/python3.7/site-packages/mrjob/job.py", line 687, in execute
    self.run_job()
  File "/home/student/anaconda3/lib/python3.7/site-packages/mrjob/job.py", line 636, in run_job
    runner.run()
  File "/home/student/anaconda3/lib/python3.7/site-packages/mrjob/runner.py", line 507, in run
    self._run()
  File "/home/student/anaconda3/lib/python3.7/site-packages/mrjob/sim.py", line 160, in _run
    self._run_step(step, step_num)
  File "/home/student/anaconda3/lib/python3.7/site-packages/mrjob/sim.py", line 169, in _run_step
    self.