<a href="https://colab.research.google.com/github/Hendrik-Stolzke/Studium-AKI/blob/main/Assignment_MRJOBLIB.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Hadoop MapReduce with Python
There are two prominent *Python* APIs for interfacing *Hadoop MapReduce* clusters:

## *Snakebite* for *HDFS* access
The [Snakebite Lib](https://github.com/spotify/snakebite) allows easy access to *HDFS* file systems:  
```
>>> from snakebite.client import Client
>>> client = Client("localhost", 8020, use_trash=False)
>>> for x in client.ls(['/']):
...     print x
```

See [documentation](https://snakebite.readthedocs.io/en/latest/) for details.


## *MRJOB* for *MapReduce* job execution
The ``mrjob`` lib -> [see docu](https://mrjob.readthedocs.io/en/latest/index.html) is a power full *MapReduce* client for *Python*. Some of the key features are:

* local emulation (single and multi-core) a *Hadoop* cluster for development and debugging
* simple access, authentication and file transfer to *Hadoop* clusters
* powerful API for common cloud services, such as AWS or Azure   

In [1]:
#in colab, we need to clone the data from the repo
!git clone https://github.com/keuperj/DATA.git

Cloning into 'DATA'...
remote: Enumerating objects: 126, done.[K
remote: Counting objects: 100% (39/39), done.[K
remote: Compressing objects: 100% (28/28), done.[K
remote: Total 126 (delta 11), reused 39 (delta 11), pack-reused 87 (from 1)[K
Receiving objects: 100% (126/126), 185.56 MiB | 10.68 MiB/s, done.
Resolving deltas: 100% (32/32), done.
Updating files: 100% (86/86), done.


### Preparing our environment

In [2]:
!pip install mrjob boto3

Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl.metadata (7.3 kB)
Collecting boto3
  Downloading boto3-1.37.32-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore<1.38.0,>=1.37.32 (from boto3)
  Downloading botocore-1.37.32-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.12.0,>=0.11.0 (from boto3)
  Downloading s3transfer-0.11.4-py3-none-any.whl.metadata (1.7 kB)
Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.6/439.6 kB[0m [31m10.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading boto3-1.37.32-py3-none-any.whl (139 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.6/139.6 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.37.32-py3-none-any.whl (13.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.5/13.5 MB[0m

## A *MRJOB* Example: WordCount (again)
Since *Hadoop* works only on file in- and outputs, we do not have usual function based API. We need to pass our code (implementation of *Map* and *Reduce*) as executable *Python* scripts:

* use *Jupyter's* ``%%file`` magic command to write the cell to file
* create a executable script with ``__main__`` method
* inherit from the ``MRJob`` class
* implement ``mapper()`` and ``reducer()`` methods
* call ``run()`` at start

In [3]:
%%file wordcount.py
#this will save this cell as file

from mrjob.job import MRJob

class MRWordCount(MRJob):
    def mapper(self, _, line):
        for word in line.split():
            yield(word, 1)

    def reducer(self, word, counts):
        yield(word, sum(counts))

if __name__ == '__main__':
    MRWordCount.run()


Writing wordcount.py


### execute script from cmd
* ``-r local`` causes local multi-core emulation a *Hadoop* cluster.
* Input files are cmd arguments
* define ouput-file (see docs) or use streams: `` > out.txt``

In [None]:
! python wordcount.py -r local DATA/text1.rst DATA/text2.rst DATA/text3.rst

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/wordcount.root.20250411.063000.731616
Running step 1 of 1...
job output is in /tmp/wordcount.root.20250411.063000.731616/output
Streaming final output from /tmp/wordcount.root.20250411.063000.731616/output...
"breath"	3
"brown"	7
"but"	9
"buzz"	3
"by"	28
"cable"	3
"cajole"	3
"came"	3
"charm"	4
"cheek,"	3
"chumps"	3
"close"	3
"coast"	4
"conceptions,"	3
"confound"	3
"continued"	3
"control"	3
"convince"	3
"copy"	6
"copy."	3
"could"	9
"countless"	3
"countries"	4
"country,"	3
"country."	3
"created"	4
"daft"	6
"darkness"	3
"day"	3
"dear"	3
"decided"	3
"describe"	3
"devils"	3
"devious"	3
"didn\u2019t"	6
"dim"	3
"discotheques"	3
"do"	3
"dog."	4
"dogs"	3
"down"	3
"dozen"	3
"dozy"	3
"dragged"	3
"drawing"	3
"driven"	3
"drunk"	3
"dwell"	3
"earth"	3
"earth,"	3
"earthquakes"	3
"enemy"	3
"enjoy"	4
"entire"	4
"eternity"	3
"everything"	3
"exchanged"	3
"existence"	4
"existence,"	3
"exp

## Execution on AWS EMR
AWS EMR is a clound formation service which allows you to create *Hadoop*, *Spark* and other data analytics clusters with a few clicks.

**NOTE**: we are not endorsing AWS specifically, other cloud service providers have similar offers



### Case 1: create cluster on the fly
We create a cluster just for a single job:
* simple solution for large jobs that run only once (or only at sparse points in time)
* this approach cause a lot of over head: not suitable for small and frequent jobs  

First, we need a config file for the connection to EMR:
**fill in YOUR AWS credentials**

In [None]:
%%file mrjob.conf
runners:
  emr:
    aws_access_key_id: AKIA4KIF2TSEWFSWC4VT
    aws_secret_access_key: BA5oZyU3yikk5sENw7xPubWZzFNhq2eiVfZqihQP
    instance_type: m5.xlarge
    num_core_instances: 2
    region: eu-west-1

Overwriting mrjob.conf


In [None]:
! python wordcount.py -r emr --bootstrap-mrjob DATA/text1.rst DATA/text2.rst -c mrjob.conf


Using s3://mrjob-757042689da50525/tmp/ as our temp dir on S3
Creating temp directory /tmp/wordcount.root.20250411.063227.120206
uploading working dir files to s3://mrjob-757042689da50525/tmp/wordcount.root.20250411.063227.120206/files/wd...
Copying other local files to s3://mrjob-757042689da50525/tmp/wordcount.root.20250411.063227.120206/files/
Can't access IAM API, trying default instance profile: EMR_EC2_DefaultRole
Can't access IAM API, trying default service role: EMR_DefaultRole
Created new cluster j-4U1XKBE6GAJ5
Added EMR tags to cluster j-4U1XKBE6GAJ5: __mrjob_label=wordcount, __mrjob_owner=root, __mrjob_version=0.7.4
Waiting for Step 1 of 1 (s-02416526OS0GFTLEHNQ) to complete...
  PENDING (cluster is STARTING: Provisioning Amazon EC2 capacity)
  PENDING (cluster is STARTING: Provisioning Amazon EC2 capacity)
  PENDING (cluster is STARTING: Provisioning Amazon EC2 capacity)
  PENDING (cluster is STARTING: Provisioning Amazon EC2 capacity)
  PENDING (cluster is STARTING: Provisio

### Case 2: connect to existing cluster

In [None]:
%%file mrjob_cluster.conf
runners:
  emr:
    aws_access_key_id: AKIA4KIF2TSEWFSWC4VT
    aws_secret_access_key: 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
    region: eu-west-1

Overwriting mrjob_cluster.conf


We need the **ID** of the cluster we want to connect to.

In [None]:
! python wordcount.py -r emr --cluster-id=j-23NVQRXYUPUWK DATA/text1.rst DATA/text2.rst -c mrjob_cluster.conf

Using s3://mrjob-757042689da50525/tmp/ as our temp dir on S3
Creating temp directory /tmp/wordcount.root.20250411.064246.381206
uploading working dir files to s3://mrjob-757042689da50525/tmp/wordcount.root.20250411.064246.381206/files/wd...
Copying other local files to s3://mrjob-757042689da50525/tmp/wordcount.root.20250411.064246.381206/files/
Adding our job to existing cluster j-23NVQRXYUPUWK
  master node is ec2-34-242-59-145.eu-west-1.compute.amazonaws.com
Waiting for Step 1 of 1 (s-05740712BDU5N1EKKH44) to complete...
  PENDING (cluster is RUNNING: Running step)
  RUNNING for 0:00:19
  COMPLETED
Attempting to fetch counters from logs...
Waiting 10 minutes for logs to transfer to S3... (ctrl-c to skip)

To fetch logs immediately next time, set up SSH. See:
https://pythonhosted.org/mrjob/guides/emr-quickstart.html#configuring-ssh-credentials

Looking for step log in s3://aws-logs-846657657993-eu-west-1/elasticmapreduce/j-23NVQRXYUPUWK/steps/s-05740712BDU5N1EKKH44...
  Parsing step l

## Exercise
Use  *mrjob*  to  compute  employee  **top  annual  salaries** and  **gross pay** in the *CSV* table ``Baltimore_City_employee_Salaries_FY2014.csv``.

* use  ``import csv`` to read the data -> [API docs](https://docs.python.org/3/library/csv.html)
* use ``yield`` to return *producers* from *map* and *reduce* functions
* return top entries in both categories

In [7]:
%%file MRTopSalariesfile.py
from mrjob.job import MRJob
import csv

class MRTopSalaries(MRJob):
    def mapper(self, _, line):
        row = csv.reader([line], delimiter=',').__next__()
        try:
            if row[0].strip() == 'Name':
                return
            name = row[0].strip()
            annual_salary = float(row[5].strip().replace('$', '').replace(',', ''))
            gross_pay = float(row[6].strip().replace('$', '').replace(',', ''))
            yield 'annual', (name, annual_salary)
            yield 'gross', (name, gross_pay)
        except:
            pass

    def reducer(self, key, values):
        top_10 = sorted(values, key=lambda x: x[1], reverse=True)[:10]
        for name, amount in top_10:
            yield key, (name, amount)

if __name__ == '__main__':
    MRTopSalaries.run()

Writing MRTopSalariesfile.py


In [8]:
! python MRTopSalariesfile.py -r local /content/DATA/Baltimore_City_Employee_Salaries_FY2014.csv

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/MRTopSalariesfile.root.20250411.081317.218283
Running step 1 of 1...
job output is in /tmp/MRTopSalariesfile.root.20250411.081317.218283/output
Streaming final output from /tmp/MRTopSalariesfile.root.20250411.081317.218283/output...
"gross"	["Bernstein,Gregg L", 238772.04]
"gross"	["Batts,Anthony W", 193653.69]
"gross"	["Black,Harry E", 188328.5]
"gross"	["Charles,Ronnie E", 185741.81]
"gross"	["Nalewajko Jr,Stephen C", 176141.33]
"gross"	["Marcus Sr,Albert M", 173876.84]
"gross"	["Stokes,Charline B", 166442.42]
"gross"	["Harris Jr,William", 165892.21]
"gross"	["Makanjuola,Rafiu T", 165270.01]
"gross"	["Cheelsman III,Charles H", 165108.5]
"annual"	["Bernstein,Gregg L", 238772.0]
"annual"	["Charles,Ronnie E", 200000.0]
"annual"	["Batts,Anthony W", 193800.0]
"annual"	["Black,Harry E", 190000.0]
"annual"	["Swift,Michael", 187200.0]
"annual"	["Parthemos,Kaliope", 172000.0