# MapReduce Using `MRJob`

## A Job Posting Dataset

The sample dataset we will use (`data/job-data/job-data-2018-09-08-00-00-37.txt`) contains job postings from on one of the US job search websites. The data is stored with each row as a JSON document representing a job posting record. 

The example below shows a sample job postings from the data file. The sample record has been formatted with 4 spaces indentation. In the real file, each record is stored as a JSON document in one row.

## 1. Protocols For Input & Output

mrjob assumes that all data is newline-delimited bytes. Each job has an *input protocol*, an *output protocol*, and an *internal protocol*.

The default *input* protocol is `RawValueProtocol`, which just reads in a line as a `str`.
The default *output* and *internal* protocols are both `JSONProtocol`, which reads and writes JSON strings separated by a tab character.

The protocols can be changed by overwritting the corresponding attributes: `INPUT_PROTOCOL`, `INTERNAL_PROTOCOL`, and `OUTPUT_PROTOCOL`.

For more information, see [Protocols](https://pythonhosted.org/mrjob/guides/writing-mrjobs.html#job-protocols).

`JSONValueProtocol` encodes value as a JSON and discard key (key is read in as None). To load the job posting dataset, we can set `INPUT_PROTOCOL = JSONValueProtocol` which automaticall loads input data as Python `dict` objects.

The example below loads the data into `mapper` and generates output of key-value pairs where keys are *jobId*(`int`) and values are *jobLocation*(`json`). Note that no `reducer` is provided, this type of jobs are sometimes called *map-only* jobs.

In [176]:
%%file mr-jobs/1_protocols.py
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol

class MRTest(MRJob):
    
    INPUT_PROTOCOL = JSONValueProtocol
    
    def mapper(self, _, value):
        yield value.get('jobId', None), value.get('jobLocation', None)

        
if __name__ == '__main__':
    MRTest.run()

Overwriting mr-jobs/1_protocols.py


In [177]:
!python3 mr-jobs/1_protocols.py ../data/job-data/job-data-2018-09-08-00-00-37.txt --output-dir mr-output

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/1_protocols.hadoop.20180913.212535.243539
job output is in mr-output
Removing temp directory /tmp/1_protocols.hadoop.20180913.212535.243539...


## 2. Filtering

Keys:

- Filtering pattern aims to find a subset of data but (often) not change the actural records. We can set `OUTPUT_PROTOCOL = JSONValueProtocol` to ignore the key field for each record in the output.
- Filtering patterns usually don't need a reducer if each record is filtered individually and the evaluation does not depend on other records.
- Filtering usually serves as an abstract pattern for some other patterns.

Applications:

- Data cleaning
- Events tracking
- Records matching
- Random sampling
- Top K records

### 2.1 Simple Filtering

Simple filtering is often used when data cleaning, events tracking, outliers removing, etc. are needed.

*Example*: Find all jobs with titles relavant to *Data Scientist*.

In [170]:
%%file mr-jobs/2.1_simple_filtering.py
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol

class MRSimpleFiltering(MRJob):
    
    INPUT_PROTOCOL = JSONValueProtocol
    OUTPUT_PROTOCOL = JSONValueProtocol
    
    def mapper(self, _, value):
        title = value.get('title', '').lower()
        if title.find('data scientist') > -1:
            yield _, value
        
if __name__ == '__main__':
    MRSimpleFiltering.run()

Overwriting mr-jobs/2.1_simple_filtering.py


In [171]:
!python3 mr-jobs/2.1_simple_filtering.py ../data/job-data/job-data-2018-09-08-00-00-37.txt --output-dir mr-output

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/2.hadoop.20180913.212433.152130
job output is in mr-output
Removing temp directory /tmp/2.hadoop.20180913.212433.152130...


### 2.2 Random Sampling

- Random sampling pattern allows us to create a subset (usually much smaller) of our larger dataset forn quick exploration. Thus each record should have an equal probability of being selected. 

- If the goal is to split the dataset into *training* and *test* sets for machine learning modeling and evaluation, then we need to make sure:
  1. each record can only be selected into one of the two datasets
  2. sampling is reproducible

The `sample` function below return either `True` or `False` based on the key and fraction:
1. split fraction into *numerator* and *denominator*, e.g.: 0.125 -> 125/1000
2. calculate the hash value of the key. Here we will use MD5, which is a widely used hash function producing a 128-bit hash value.
3. calculate hash value modulo *denominator*, if it's less than *numerator*, return True, otherwise return False.

Note: if you just want to randomly sample the dataset, then a simple random number generator will work.

In [125]:
import decimal
import hashlib

def sample(key, fraction):
    if fraction > 1 or fraction < 0:
        raise ValueError('Invalid fraction value')
    # calculate numerator and denominator
    frac = decimal.Decimal(str(fraction)).as_tuple()
    numer = sum([v*10**i for i, v in enumerate(frac.digits[::-1])])
    denom = 10**(-frac.exponent)
    # calculate hash value using md5
    hash_val = hashlib.md5(str(key).encode()).hexdigest()
    return (int(hash_val, 16) % denom) < numer

In [126]:
# test the function with the code below
print(sum([sample(i, fraction=0.3) for i in range(1000)]))

291


Example: Create a subset with 10% of full dataset.

In [168]:
%%file mr-jobs/2.1_random_sampling.py
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol

import decimal
import hashlib

class MRRandomSampling(MRJob):
    
    INPUT_PROTOCOL = JSONValueProtocol
    OUTPUT_PROTOCOL = JSONValueProtocol
    
    def mapper(self, _, value):
        key = value.get('jobId', 0)
        if MRRandomSampling._sample(key=key, fraction=.1):
            yield _, value
    
    @staticmethod
    def _sample(key, fraction=1):
        if fraction > 1 or fraction < 0:
            raise ValueError('Invalid fraction value')
        frac = decimal.Decimal(str(fraction)).as_tuple()
        numer = sum([v*10**i for i, v in enumerate(frac.digits[::-1])])
        denom = 10**(-frac.exponent)
        hash_val = hashlib.md5(str(key).encode()).hexdigest()
        return (int(hash_val, 16) % denom) < numer
    
        
if __name__ == '__main__':
    MRRandomSampling.run()

Overwriting mr-jobs/2.1_random_sampling.py


In [169]:
!python3 mr-jobs/2.1_random_sampling.py ../data/job-data/job-data-2018-09-08-00-00-37.txt --output-dir mr-output

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/2.hadoop.20180913.212423.435049
job output is in mr-output
Removing temp directory /tmp/2.hadoop.20180913.212423.435049...


Example: Create a reproducible train/test split.

In [166]:
%%file mr-jobs/2.1_train_test_splitting.py
from mrjob.job import MRJob
from mrjob.protocol import JSONValueProtocol

import decimal
import hashlib

class MRTrainTestSplit(MRJob):
    
    INPUT_PROTOCOL = JSONValueProtocol
    OUTPUT_PROTOCOL = JSONValueProtocol
    
    SUBSET ='train'
    TEST_SIZE = .3
    
    def mapper(self, _, value):
        key = value.get('jobId', 0)
        flag = MRTrainTestSplit._sample(key=key, fraction=self.TEST_SIZE)
        if self.SUBSET == 'train':
            flag = not flag
        if flag:
            yield _, value
    
    @staticmethod
    def _sample(key, fraction=1):
        if fraction > 1 or fraction < 0:
            raise ValueError('Invalid fraction value')
        frac = decimal.Decimal(str(fraction)).as_tuple()
        numer = sum([v*10**i for i, v in enumerate(frac.digits[::-1])])
        denom = 10**(-frac.exponent)
        hash_val = hashlib.md5(str(key).encode()).hexdigest()
        return (int(hash_val, 16) % denom) < numer
    
        
if __name__ == '__main__':
    MRTrainTestSplit.run()

Overwriting mr-jobs/2.1_train_test_splitting.py


In [167]:
!python3 mr-jobs/2.1_train_test_splitting.py ../data/job-data/job-data-2018-09-08-00-00-37.txt --output-dir mr-output

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/2.hadoop.20180913.212418.006807
job output is in mr-output
Removing temp directory /tmp/2.hadoop.20180913.212418.006807...
