<a href="https://colab.research.google.com/github/Ananassio/Big-Data-Analytics/blob/main/Week_3/Assignment_1_MRJOBLIB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Hadoop MapReduce with Python
There are two prominent *Python* APIs for interfacing *Hadoop MapReduce* clusters:

## *Snakebite* for *HDFS* access
The [Snakebite Lib](https://github.com/spotify/snakebite) allows easy access to *HDFS* file systems:  
```
>>> from snakebite.client import Client
>>> client = Client("localhost", 8020, use_trash=False)
>>> for x in client.ls(['/']):
...     print x
```

See [documentation](https://snakebite.readthedocs.io/en/latest/) for details.


## *MRJOB* for *MapReduce* job execution
The ``mrjob`` lib -> [see docu](https://mrjob.readthedocs.io/en/latest/index.html) is a power full *MapReduce* client for *Python*. Some of the key features are:

* local emulation (single and multi-core) a *Hadoop* cluster for development and debugging
* simple access, authentication and file transfer to *Hadoop* clusters
* powerful API for common cloud services, such as AWS or Azure   

In [1]:
#in colab, we need to clone the data from the repo
!git clone https://github.com/keuperj/DATA.git

Cloning into 'DATA'...
remote: Enumerating objects: 101, done.[K
remote: Counting objects: 100% (14/14), done.[K
remote: Compressing objects: 100% (12/12), done.[K
remote: Total 101 (delta 2), reused 14 (delta 2), pack-reused 87[K
Receiving objects: 100% (101/101), 146.44 MiB | 14.37 MiB/s, done.
Resolving deltas: 100% (23/23), done.
Checking out files: 100% (69/69), done.


### Preparing our environment

In [2]:
!pip install mrjob boto3

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[K     |████████████████████████████████| 439 kB 5.2 MB/s 
[?25hCollecting boto3
  Downloading boto3-1.24.10-py3-none-any.whl (132 kB)
[K     |████████████████████████████████| 132 kB 51.3 MB/s 
Collecting s3transfer<0.7.0,>=0.6.0
  Downloading s3transfer-0.6.0-py3-none-any.whl (79 kB)
[K     |████████████████████████████████| 79 kB 7.5 MB/s 
[?25hCollecting jmespath<2.0.0,>=0.7.1
  Downloading jmespath-1.0.0-py3-none-any.whl (23 kB)
Collecting botocore<1.28.0,>=1.27.10
  Downloading botocore-1.27.10-py3-none-any.whl (8.9 MB)
[K     |████████████████████████████████| 8.9 MB 45.4 MB/s 
[?25hCollecting urllib3<1.27,>=1.25.4
  Downloading urllib3-1.26.9-py2.py3-none-any.whl (138 kB)
[K     |████████████████████████████████| 138 kB 47.0 MB/s 
Installing collected packages: urllib3, jmespath, botocore, s3transfer,

## A *MRJOB* Example: WordCount (again)
Since *Hadoop* works only on file in- and outputs, we do not have usual function based API. We need to pass our code (implementation of *Map* and *Reduce*) as executable *Python* scripts:

* use *Jupyter's* ``%%file`` magic command to write the cell to file
* create a executable script with ``__main__`` method
* inherit from the ``MRJob`` class
* implement ``mapper()`` and ``reducer()`` methods
* call ``run()`` at start

In [3]:
%%file wordcount.py 
#this will save this cell as file

from mrjob.job import MRJob

class MRWordCount(MRJob):
    def mapper(self, _, line):
        for word in line.split():
            yield(word, 1)
 
    def reducer(self, word, counts):
        yield(word, sum(counts))
        
if __name__ == '__main__':
    MRWordCount.run()
            

Writing wordcount.py


### execute script from cmd
* ``-r local`` causes local multi-core emulation a *Hadoop* cluster.
* Input files are cmd arguments
* define ouput-file (see docs) or use streams: `` > out.txt``

In [4]:
! python wordcount.py -r local DATA/text1.rst DATA/text2.rst DATA/text3.rst

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/wordcount.root.20220616.100637.511509
Running step 1 of 1...
job output is in /tmp/wordcount.root.20220616.100637.511509/output
Streaming final output from /tmp/wordcount.root.20220616.100637.511509/output...
"breath"	3
"brown"	7
"but"	9
"buzz"	3
"by"	28
"cable"	3
"cajole"	3
"came"	3
"charm"	4
"cheek,"	3
"chumps"	3
"close"	3
"coast"	4
"conceptions,"	3
"confound"	3
"continued"	3
"control"	3
"convince"	3
"copy"	6
"copy."	3
"could"	9
"countless"	3
"countries"	4
"country,"	3
"country."	3
"created"	4
"daft"	6
"darkness"	3
"day"	3
"dear"	3
"decided"	3
"describe"	3
"devils"	3
"devious"	3
"didn\u2019t"	6
"dim"	3
"discotheques"	3
"do"	3
"dog."	4
"dogs"	3
"down"	3
"dozen"	3
"dozy"	3
"dragged"	3
"drawing"	3
"driven"	3
"drunk"	3
"dwell"	3
"earth"	3
"earth,"	3
"earthquakes"	3
"enemy"	3
"enjoy"	4
"entire"	4
"eternity"	3
"everything"	3
"exchanged"	3
"existence"	4
"existence,"	3
"exp

## Execution on AWS EMR
AWS EMR is a clound formation service which allows you to create *Hadoop*, *Spark* and other data analytics clusters with a few clicks.

**NOTE**: we are not endorsing AWS specifically, other cloud service providers have similar offers



### Case 1: create cluster on the fly 
We create a cluster just for a single job:
* simple solution for large jobs that run only once (or only at sparse points in time)
* this approach cause a lot of over head: not suitable for small and frequent jobs  

First, we need a config file for the connection to EMR:
**fill in YOUR AWS credentials**

In [5]:
%%file mrjob.conf
runners:
  emr:
    aws_access_key_id: key_id
    aws_secret_access_key: KEY
    instance_type: m5.xlarge
    num_core_instances: 2
    region: eu-west-1

Writing mrjob.conf


In [6]:
! python wordcount.py -r emr --bootstrap-mrjob DATA/text1.rst DATA/text2.rst -c mrjob.conf 


Traceback (most recent call last):
  File "wordcount.py", line 14, in <module>
    MRWordCount.run()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/job.py", line 616, in run
    cls().execute()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/job.py", line 687, in execute
    self.run_job()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/job.py", line 634, in run_job
    with self.make_runner() as runner:
  File "/usr/local/lib/python3.7/dist-packages/mrjob/job.py", line 713, in make_runner
    return self._runner_class()(**self._runner_kwargs())
  File "/usr/local/lib/python3.7/dist-packages/mrjob/emr.py", line 358, in __init__
    self._fix_s3_tmp_and_log_uri_opts()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/emr.py", line 599, in _fix_s3_tmp_and_log_uri_opts
    self._set_cloud_tmp_dir()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/emr.py", line 617, in _set_cloud_tmp_dir
    for bucket_name in self.fs.s3.get_all_bucket_names():
  File "/usr/local/lib

### Case 3: connect to existing cluster

In [7]:
%%file mrjob_cluster.conf
runners:
  emr:
    aws_access_key_id: KEY_id
    aws_secret_access_key: KEY
    region: eu-west-1

Writing mrjob_cluster.conf


We need the **ID** of the cluster we want to connect to.

In [8]:
! python wordcount.py -r emr --cluster-id= j- DATA/text1.rst DATA/text2.rst -c mrjob_cluster.conf  

Traceback (most recent call last):
  File "wordcount.py", line 14, in <module>
    MRWordCount.run()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/job.py", line 616, in run
    cls().execute()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/job.py", line 687, in execute
    self.run_job()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/job.py", line 634, in run_job
    with self.make_runner() as runner:
  File "/usr/local/lib/python3.7/dist-packages/mrjob/job.py", line 713, in make_runner
    return self._runner_class()(**self._runner_kwargs())
  File "/usr/local/lib/python3.7/dist-packages/mrjob/emr.py", line 358, in __init__
    self._fix_s3_tmp_and_log_uri_opts()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/emr.py", line 599, in _fix_s3_tmp_and_log_uri_opts
    self._set_cloud_tmp_dir()
  File "/usr/local/lib/python3.7/dist-packages/mrjob/emr.py", line 617, in _set_cloud_tmp_dir
    for bucket_name in self.fs.s3.get_all_bucket_names():
  File "/usr/local/lib

## Exercise
Use  *mrjob*  to  compute  employee  **top  annual  salaries** and  **gross pay** in the *CSV* table ``Baltimore_City_employee_Salaries_FY2014.csv``.

* use  ``import csv`` to read the data -> [API docs](https://docs.python.org/3/library/csv.html)
* use ``yield`` to return *producers* from *map* and *reduce* functions
* return top entries in both categories 

In [9]:
import pandas as pd

In [10]:
table = pd.read_csv('/content/DATA/Baltimore_City_Employee_Salaries_FY2014.csv')
table

Unnamed: 0,Name,JobTitle,AgencyID,Agency,HireDate,AnnualSalary,GrossPay
0,"Aaron,Keontae E",AIDE BLUE CHIP,W02200,Youth Summer,06/10/2013,11310.0,873.63
1,"Aaron,Patricia G",Facilities/Office Services II,A03031,OED-Employment Dev,10/24/1979,53428.0,52868.38
2,"Aaron,Petra L",ASSISTANT STATE'S ATTORNEY,A29005,States Attorneys Office,09/25/2006,68300.0,67439.19
3,"Abaineh,Yohannes T",EPIDEMIOLOGIST,A65026,HLTH-Health Department,07/23/2009,62000.0,58654.74
4,"Abbene,Anthony M",POLICE OFFICER TRAINEE,A99416,Police Department,07/24/2013,43999.0,39686.95
...,...,...,...,...,...,...,...
18976,"Zotamou,Jean Marie D",AIDE BLUE CHIP,W02235,Youth Summer,05/21/2014,11310.0,
18977,"Zotamou,Pivot D",AIDE BLUE CHIP,W02629,Youth Summer,05/21/2014,11310.0,
18978,"Zovistoski,Zachary D",POLICE OFFICER TRAINEE,A99416,Police Department,12/17/2013,43999.0,21070.03
18979,"Zubyk,Stanislav T",POLICE OFFICER,A99262,Police Department,01/23/2013,44104.0,48608.12


In [11]:
{table[' Name'][0],table['AnnualSalary'][0]}

{11310.0, 'Aaron,Keontae E'}

In [59]:
di = {table[' Name'][0]:table['AnnualSalary'][0],
      table[' Name'][1]:table['AnnualSalary'][1]}

In [60]:
sort_orders = sorted(di.items(), key=lambda x: x[1], reverse=True)
sort_orders

[('Aaron,Patricia G', 53428.0), ('Aaron,Keontae E', 11310.0)]

In [14]:
cols = 'Name,JobTitle,AgencyID,Agency,HireDate,AnnualSalary,GrossPay'.split(',')
cols

['Name',
 'JobTitle',
 'AgencyID',
 'Agency',
 'HireDate',
 'AnnualSalary',
 'GrossPay']

In [64]:
cols[0]

'Name'

In [67]:
%%file salary.py 
#this will save this cell as file
import pandas as pd
from mrjob.job import MRJob
table = pd.read_csv('/content/DATA/Baltimore_City_Employee_Salaries_FY2014.csv')


class salary(MRJob):
    def mapper(self, _, line):
      
      yield(('Salary','Grosspay') ,({table[' Name'][0]:table['AnnualSalary'][0]},{table[' Name'][0]:table['GrossPay'][0]}))
 
    def reducer(self, word, val):
      sala,gross = val;
      Salary = sorted(sala.items(), key=lambda x: x[1], reverse=True)
      GrossPay = sorted(gross.items(), key=lambda x: x[1], reverse=True)
      yield(word, (Salary,GrossPay))
        
if __name__ == '__main__':
    salary.run()


Overwriting salary.py


In [68]:
! python salary.py -r local /content/DATA/Baltimore_City_Employee_Salaries_FY2014.csv

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/salary.root.20220616.103618.331025
Running step 1 of 1...

Probable cause of failure:

+ /usr/bin/python3 salary.py --step-num=0 --reducer
Traceback (most recent call last):
  File "salary.py", line 19, in <module>
    salary.run()
  File "/tmp/salary.root.20220616.103618.331025/step/000/reducer/00000/wd/mrjob.zip/mrjob/job.py", line 616, in run
  File "/tmp/salary.root.20220616.103618.331025/step/000/reducer/00000/wd/mrjob.zip/mrjob/job.py", line 681, in execute
  File "/tmp/salary.root.20220616.103618.331025/step/000/reducer/00000/wd/mrjob.zip/mrjob/job.py", line 795, in run_reducer
  File "/tmp/salary.root.20220616.103618.331025/step/000/reducer/00000/wd/mrjob.zip/mrjob/job.py", line 866, in reduce_pairs
  File "/tmp/salary.root.20220616.103618.331025/step/000/reducer/00000/wd/mrjob.zip/mrjob/job.py", line 889, in _combine_or_reduce_pairs
  File "salary.py", line 1