# Ex 2.1 Hadoop MapReduce with Python
There are two prominent *Python* APIs for interfacing *Hadoop MapReduce* clusters:

## *Snakebite* for *HDFS* access
The [Snakebite Lib](https://github.com/spotify/snakebite) allows easy access to *HDFS* file systems:  
```
>>> from snakebite.client import Client
>>> client = Client("localhost", 8020, use_trash=False)
>>> for x in client.ls(['/']):
...     print x
```

See [documentation](https://snakebite.readthedocs.io/en/latest/) for details.


## *MRJOB* for *MapReduce* job execution
The ``mrjob`` lib -> [see docu](https://mrjob.readthedocs.io/en/latest/index.html) is a power full *MapReduce* client for *Python*. Some of the key features are:

* local emulation (single and multi-core) a *Hadoop* cluster for development and debugging
* simple access, authentication and file transfer to *Hadoop* clusters
* powerful API for common cloud services, such as AWS or Azure   

### Preparing our environment

In [None]:
#install mrjob lib and boto3 for AWS S3 access
#!conda install -c conda-forge -y mrjob boto3
!pip install mrjob boto3

Collecting mrjob
[?25l  Downloading https://files.pythonhosted.org/packages/8e/58/fc28ab743aba16e90736ad4e29694bd2adaf7b879376ff149306d50c4e90/mrjob-0.7.4-py2.py3-none-any.whl (439kB)
[K     |████████████████████████████████| 440kB 3.9MB/s 
[?25hCollecting boto3
[?25l  Downloading https://files.pythonhosted.org/packages/0e/02/d0e07f3782cc054269ae0649ab1e3a0205fee1168545d4e502e62c27ba7e/boto3-1.17.87-py2.py3-none-any.whl (131kB)
[K     |████████████████████████████████| 133kB 20.7MB/s 
Collecting s3transfer<0.5.0,>=0.4.0
[?25l  Downloading https://files.pythonhosted.org/packages/63/d0/693477c688348654ddc21dcdce0817653a294aa43f41771084c25e7ff9c7/s3transfer-0.4.2-py2.py3-none-any.whl (79kB)
[K     |████████████████████████████████| 81kB 7.4MB/s 
[?25hCollecting botocore<1.21.0,>=1.20.87
[?25l  Downloading https://files.pythonhosted.org/packages/df/5c/23301b0c674ad31e48cfe778059b199e869397447ada7556fcddac290dfe/botocore-1.20.87-py2.py3-none-any.whl (7.6MB)
[K     |███████████████

## A *MRJOB* Example: WordCount (again)
Since *Hadoop* works only on file in- and outputs, we do not have usual function based API. We need to pass our code (implementation of *Map* and *Reduce*) as executable *Python* scripts:

* use *Jupyter's* ``%%file`` magic command to write the cell to file
* create a executable script with ``__main__`` method
* inherit from the ``MRJob`` class
* implement ``mapper()`` and ``reducer()`` methods
* call ``run()`` at start

In [None]:
%%file wordcount.py 
#this will save this cell as file

from mrjob.job import MRJob

class MRWordCount(MRJob):
    def mapper(self, _, line):
        for word in line.split():
            yield(word, 1)
 
    def reducer(self, word, counts):
        yield(word, sum(counts))
        
if __name__ == '__main__':
    MRWordCount.run()
            

Overwriting wordcount.py


### execute script from cmd
* ``-r local`` causes local multi-core emulation a *Hadoop* cluster.
* Input files are cmd arguments
* define ouput-file (see docs) or use streams: `` > out.txt``

In [None]:
! python wordcount.py -r local *.rst > out.txt

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/wordcount.root.20210604.124606.172395
Running step 1 of 1...
job output is in /tmp/wordcount.root.20210604.124606.172395/output
Streaming final output from /tmp/wordcount.root.20210604.124606.172395/output...
Removing temp directory /tmp/wordcount.root.20210604.124606.172395...


 -> results in **out.txt** 

## Execution on AWS EMR
AWS EMR is a clound formation service which allows you to create *Hadoop*, *Spark* and other data analytics clusters with a few clicks.

**NOTE**: we are not endorsing AWS specifically, other cloud service providers have similar offers



### Connect to existing cluster

In [None]:
%%file mrjob_cluster.conf
runners:
  emr:
    aws_access_key_id: AKIA4KIF2TSEWXWXQI76
    aws_secret_access_key: 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
    region: eu-west-1

Writing mrjob_cluster.conf


We need the **ID** of the cluster we want to connect to - here pre-set to our Cluster today

In [None]:
! python wordcount.py -r emr --cluster-id=j-L1BO0NYZIYY0 text1.rst text2.rst -c mrjob_cluster.conf  

Using s3://mrjob-42e7145df80ebe94/tmp/ as our temp dir on S3
Creating temp directory /tmp/wordcount.root.20210604.124702.937603
uploading working dir files to s3://mrjob-42e7145df80ebe94/tmp/wordcount.root.20210604.124702.937603/files/wd...
Copying other local files to s3://mrjob-42e7145df80ebe94/tmp/wordcount.root.20210604.124702.937603/files/
Adding our job to existing cluster j-L1BO0NYZIYY0
  master node is ec2-52-212-16-94.eu-west-1.compute.amazonaws.com
Waiting for Step 1 of 1 (s-CJBA2WQYXGLS) to complete...
  PENDING (cluster is RUNNING: Running step)
  RUNNING for 0:00:32
  RUNNING for 0:01:02
  COMPLETED
Attempting to fetch counters from logs...
Waiting 10 minutes for logs to transfer to S3... (ctrl-c to skip)

To fetch logs immediately next time, set up SSH. See:
https://pythonhosted.org/mrjob/guides/emr-quickstart.html#configuring-ssh-credentials

Traceback (most recent call last):
  File "wordcount.py", line 14, in <module>
    MRWordCount.run()
  File "/usr/local/lib/python

## Exercise
Use  *mrjob*  to  compute  employee  **top  annual  salaries** and  **gross pay** in the *CSV* table ``Baltimore_City_employee_Salaries_FY2014.csv``.

* use  ``import csv`` to read the data -> [API docs](https://docs.python.org/3/library/csv.html)
* use ``yield`` to return *producers* from *map* and *reduce* functions
* return top entries in both categories 

In [None]:
import csv
with open('/content/Baltimore_City_Employee_Salaries_FY2014.csv', newline='') as csvfile:
    reader = csv.reader(csvfile, delimiter=' ', quotechar='|') 
    for row in reader:
      print(row) 
      break    



['', 'Name,JobTitle,AgencyID,Agency,HireDate,AnnualSalary,GrossPay']


In [None]:
%%file topannual.py 
#this will save this cell as file

from mrjob.job import MRJob
import csv

class MRWordCount(MRJob):
    def mapper(self, _, line):  
        reader = line.split(',')        
        salary=reader[-2]
        grosspay=reader[-1] 

        try:
          float(salary)
          float(grosspay)
        except:
          salary=0
          grosspay=0        
               
        yield("topentries",(float(salary), float(grosspay)))
 
    def reducer(self, key, values):
        salary,grosspay=zip(*values)
        salary=list(salary)
        grosspay=list(grosspay)
        salary.sort(reverse=True)
        grosspay.sort(reverse=True)
        yield(salary[:10], grosspay[:10])
        
if __name__ == '__main__':
    MRWordCount.run()
            

Overwriting topannual.py


In [None]:
! python topannual.py -r local *.csv > out2.txt

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/topannual.root.20210604.141446.839447
Running step 1 of 1...
job output is in /tmp/topannual.root.20210604.141446.839447/output
Streaming final output from /tmp/topannual.root.20210604.141446.839447/output...
Removing temp directory /tmp/topannual.root.20210604.141446.839447...
