# Ex 2.1 Hadoop MapReduce with Python
There are two prominent *Python* APIs for interfacing *Hadoop MapReduce* clusters:

## *Snakebite* for *HDFS* access
The [Snakebite Lib](https://github.com/spotify/snakebite) allows easy access to *HDFS* file systems:  
```
>>> from snakebite.client import Client
>>> client = Client("localhost", 8020, use_trash=False)
>>> for x in client.ls(['/']):
...     print x
```

See [documentation](https://snakebite.readthedocs.io/en/latest/) for details.


## *MRJOB* for *MapReduce* job execution
The ``mrjob`` lib -> [see docu](https://mrjob.readthedocs.io/en/latest/index.html) is a power full *MapReduce* client for *Python*. Some of the key features are:

* local emulation (single and multi-core) a *Hadoop* cluster for development and debugging
* simple access, authentication and file transfer to *Hadoop* clusters
* powerful API for common cloud services, such as AWS or Azure   

### Preparing our environment

In [1]:
#install mrjob lib and boto3 for AWS S3 access
!conda install -c conda-forge -y mrjob boto3

#or !pip install mrjob boto3

Collecting package metadata (current_repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: /home/andrea/anaconda3/envs/tf-gpu

  added / updated specs:
    - boto3
    - mrjob


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    boto3-1.17.86              |     pyhd8ed1ab_0          70 KB  conda-forge
    botocore-1.20.87           |     pyhd8ed1ab_0         4.7 MB  conda-forge
    ca-certificates-2021.5.30  |       ha878542_0         136 KB  conda-forge
    certifi-2021.5.30          |   py39hf3d152e_0         141 KB  conda-forge
    google-api-core-1.26.3     |     pyhd8ed1ab_0          59 KB  conda-forge
    google-api-python-client-2.7.0|     pyhd8ed1ab_0         3.9 MB  conda-forge
    google-auth-httplib2-0.1.0 |     pyhd8ed1ab_0          13 KB  conda-forge
    googleapis-common-protos-1.53.0|   py39hf3d152e_0         127 KB  conda-forge
    httpl

## A *MRJOB* Example: WordCount (again)
Since *Hadoop* works only on file in- and outputs, we do not have usual function based API. We need to pass our code (implementation of *Map* and *Reduce*) as executable *Python* scripts:

* use *Jupyter's* ``%%file`` magic command to write the cell to file
* create a executable script with ``__main__`` method
* inherit from the ``MRJob`` class
* implement ``mapper()`` and ``reducer()`` methods
* call ``run()`` at start

In [2]:
%%file wordcount.py 
#this will save this cell as file

from mrjob.job import MRJob

class MRWordCount(MRJob):
    def mapper(self, _, line):
        for word in line.split():
            yield(word, 1)
 
    def reducer(self, word, counts):
        yield(word, sum(counts))
        
if __name__ == '__main__':
    MRWordCount.run()
            

Overwriting wordcount.py


### execute script from cmd
* ``-r local`` causes local multi-core emulation a *Hadoop* cluster.
* Input files are cmd arguments
* define ouput-file (see docs) or use streams: `` > out.txt``

In [3]:
! python wordcount.py -r local *.rst > out.txt

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/wordcount.andrea.20210604.123534.071240
Running step 1 of 1...
job output is in /tmp/wordcount.andrea.20210604.123534.071240/output
Streaming final output from /tmp/wordcount.andrea.20210604.123534.071240/output...
Removing temp directory /tmp/wordcount.andrea.20210604.123534.071240...


 -> results in **out.txt** 

## Execution on AWS EMR
AWS EMR is a clound formation service which allows you to create *Hadoop*, *Spark* and other data analytics clusters with a few clicks.

**NOTE**: we are not endorsing AWS specifically, other cloud service providers have similar offers



### Connect to existing cluster

In [6]:
%%file mrjob_cluster.conf
runners:
  emr:
    #aws_access_key_id: YOUR_KEY_ID
    aws_access_key_id: AKIA4KIF2TSEWXWXQI76 
    #aws_secret_access_key: YOUR_KEY_SECRET
    aws_secret_access_key: pGKnuZYi3HGCK5yAnCWaOMpofDNl7F0CmZDypHfx 
    region: eu-west-1

Writing mrjob_cluster.conf


We need the **ID** of the cluster we want to connect to - here pre-set to our Cluster today

In [None]:
! python wordcount.py -r emr --cluster-id=j-L1BO0NYZIYY0 text1.rst text2.rst -c mrjob_cluster.conf  

In [7]:
! python topannual.py -r emr --cluster-id=j-L1BO0NYZIYY0 Baltimore_City_Employee_Salaries_FY2014.csv -c mrjob_cluster.conf

Using s3://mrjob-42e7145df80ebe94/tmp/ as our temp dir on S3
Creating temp directory /tmp/topannual.andrea.20210604.142304.971084
uploading working dir files to s3://mrjob-42e7145df80ebe94/tmp/topannual.andrea.20210604.142304.971084/files/wd...
Copying other local files to s3://mrjob-42e7145df80ebe94/tmp/topannual.andrea.20210604.142304.971084/files/
Adding our job to existing cluster j-L1BO0NYZIYY0
  master node is ec2-52-212-16-94.eu-west-1.compute.amazonaws.com
Waiting for Step 1 of 1 (s-1PLPFAK10P0MF) to complete...
  PENDING (cluster is RUNNING: Running step)
  RUNNING for 0:00:43
  COMPLETED
Attempting to fetch counters from logs...
Waiting 10 minutes for logs to transfer to S3... (ctrl-c to skip)

To fetch logs immediately next time, set up SSH. See:
https://pythonhosted.org/mrjob/guides/emr-quickstart.html#configuring-ssh-credentials

Looking for step log in s3://aws-logs-846657657993-eu-west-1/elasticmapreduce/j-L1BO0NYZIYY0/steps/s-1PLPFAK10P0MF...
  Parsing step log: s3://aw

## Exercise
Use  *mrjob*  to  compute  employee  **top  annual  salaries** and  **gross pay** in the *CSV* table ``Baltimore_City_employee_Salaries_FY2014.csv``.

* use  ``import csv`` to read the data -> [API docs](https://docs.python.org/3/library/csv.html)
* use ``yield`` to return *producers* from *map* and *reduce* functions
* return top entries in both categories 

In [4]:
%%file topannual.py 
#this will save this cell as file

from mrjob.job import MRJob
import csv

class MRWordCount(MRJob):

    def mapper(self, _, line): 
        reader = line.split(',') 
        salary = reader[-2] 
        grosspay=reader[-1] 
        
        try: 
            float(salary) 
            float(grosspay) 
        except: 
            salary = 0 
            grosspay = 0 
        yield("topentries", (float(salary), float(grosspay))) 
        
    def reducer(self, key, values): 
        salary, grosspay = zip(*values) 
        salary, grosspay = list(salary), list(grosspay) 
        salary.sort(reverse=True) 
        grosspay.sort(reverse=True) 
        yield(salary[:10], grosspay[:10]) 


if __name__ == '__main__':
    MRWordCount.run()

Writing topannual.py


In [5]:
! python topannual.py -r local *.csv > topannual.txt

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/topannual.andrea.20210604.141148.081371
Running step 1 of 1...
job output is in /tmp/topannual.andrea.20210604.141148.081371/output
Streaming final output from /tmp/topannual.andrea.20210604.141148.081371/output...
Removing temp directory /tmp/topannual.andrea.20210604.141148.081371...
