## TASK

Create MapReduce Job which will calculate minimum and maximum date of certain badge record and their overall count per user. If possible try to leverage the combiner optimization technique. 

NOTE: Before you try working on Map Reduce Job try to achieve the same using pure python and working only on a single `0.xml` (available in the current directory).

In [3]:
from glob import glob
from datetime import datetime
from uuid import uuid4
from delorean import parse

import requests
import xmltodict

from job import Job


get_stackoverflow_badges_uri = (
    'https://s3.eu-central-1.amazonaws.com/learning.big.data/stackoverflow-badges/{}'.format)

## Records Reader

In [4]:
def record_reader(line):
    record = dict(xmltodict.parse(line.decode('utf-8'))['row'])        
    yield (
        record['@Id'],
        {k.replace('@', '').lower(): v for k, v in record.items()},
    )            

In [5]:
# -- test `record_reader`
response = requests.get(get_stackoverflow_badges_uri('0.xml'), stream=True)

records = []
for line in response.iter_lines():
    if line:
        records.append(next(record_reader(line)))
        
print(records[:2])

[('26066242', {'id': '26066242', 'userid': '8125167', 'name': 'Supporter', 'date': '2017-11-28T19:34:25.047', 'class': '3', 'tagbased': 'False'}), ('26066243', {'id': '26066243', 'userid': '9006638', 'name': 'Supporter', 'date': '2017-11-28T19:34:25.047', 'class': '3', 'tagbased': 'False'})]


## Mapper

In [6]:
def mapper(key, value):
    yield (
        value['userid'], 
        (value['date'], value['date'], 1),
    )

In [7]:
# -- test mapper
[next(mapper(key, value)) for key, value in records[:10]]

[('8125167', ('2017-11-28T19:34:25.047', '2017-11-28T19:34:25.047', 1)),
 ('9006638', ('2017-11-28T19:34:25.047', '2017-11-28T19:34:25.047', 1)),
 ('4892968', ('2017-11-28T19:34:25.047', '2017-11-28T19:34:25.047', 1)),
 ('3204673', ('2017-11-28T19:34:25.047', '2017-11-28T19:34:25.047', 1)),
 ('1108484', ('2017-11-28T19:34:25.047', '2017-11-28T19:34:25.047', 1)),
 ('3203282', ('2017-11-28T19:34:25.047', '2017-11-28T19:34:25.047', 1)),
 ('3926187', ('2017-11-28T19:34:25.047', '2017-11-28T19:34:25.047', 1)),
 ('4134228', ('2017-11-28T19:34:25.047', '2017-11-28T19:34:25.047', 1)),
 ('8474041', ('2017-11-28T19:34:25.047', '2017-11-28T19:34:25.047', 1)),
 ('9019981', ('2017-11-28T19:35:14.740', '2017-11-28T19:35:14.740', 1))]

## Reducer

In [8]:
def reducer(key, values):
    g_dt_min = None
    g_dt_max = None
    g_cnt = 0
    
    for dt_min, dt_max, cnt in values:
        if g_dt_min is None or parse(dt_min) < parse(g_dt_min):
            g_dt_min = dt_min
        
        if g_dt_max is None or parse(dt_max) > parse(g_dt_max):
            g_dt_max = dt_max
        
        g_cnt += cnt
        
    yield (key, (g_dt_min, g_dt_max, g_cnt))

In [9]:
# -- test reducer
next(reducer(
    '9019981', 
    [
        ('2017-11-28T19:34:25.047', '2017-11-28T19:34:25.047', 1),
        ('2017-11-27T19:34:25.047', '2017-11-27T19:34:25.047', 1),
        ('2017-11-30T19:34:25.047', '2017-11-30T19:34:25.047', 1),        
    ]))

('9019981', ('2017-11-27T19:34:25.047', '2017-11-30T19:34:25.047', 3))

## Job

In [10]:
Job(
    input_uris=[
        get_stackoverflow_badges_uri('0.xml'),
        get_stackoverflow_badges_uri('1.xml'),
        get_stackoverflow_badges_uri('2.xml'),   
        get_stackoverflow_badges_uri('3.xml'),           
    ], 
    record_reader=record_reader,
    mapper=mapper,  
    combiner=reducer,
    reducer=reducer,
).run()




JOB ID: 430ba5d8-bb47-410d-86da-b40248f269b4

INPUT SIZE: 46239976

OUTPUT PATH: /home/jovyan/work/map_reduce/.outputs/430ba5d8-bb47-410d-86da-b40248f269b4

EXECUTION TIME: 47.088

MAX SHUFFLE SIZE: 29295640

FILES:
+----------------------------+--------------+
|          filename          | size (bytes) |
| mapper_0__partition_0.json | 1816392      |
+----------------------------+--------------+
| mapper_0__partition_1.json | 1803434      |
+----------------------------+--------------+
| mapper_0__partition_2.json | 1822672      |
+----------------------------+--------------+
| mapper_0__partition_3.json | 1830581      |
+----------------------------+--------------+
| mapper_1__partition_0.json | 1808954      |
+----------------------------+--------------+
| mapper_1__partition_1.json | 1806645      |
+----------------------------+--------------+
| mapper_1__partition_2.json | 1777535      |
+----------------------------+--------------+
| mapper_1__partition_3.json | 1803755      |

In [19]:
!head -n 10 ./.outputs/4db61585-39b9-46ef-bf38-22f0bdbc458a/reducer_1.json

{"key": "9006638", "value": ["2017-11-28T19:34:25.047", "2017-11-30T04:44:18.877", 0]}
{"key": "3204673", "value": ["2017-11-28T19:34:25.047", "2017-11-28T19:34:25.047", 0]}
{"key": "8156390", "value": ["2017-11-28T19:34:25.047", "2017-11-28T19:34:25.047", 0]}
{"key": "2188539", "value": ["2017-11-28T19:34:25.047", "2017-11-28T19:34:25.047", 0]}
{"key": "8913542", "value": ["2017-12-06T13:52:20.907", "2017-11-29T13:46:55.063", 0]}
{"key": "3957942", "value": ["2017-11-28T19:34:25.047", "2017-11-28T19:43:23.213", 0]}
{"key": "1069995", "value": ["2017-11-28T19:34:25.047", "2017-11-28T19:34:25.047", 0]}
{"key": "4569922", "value": ["2017-11-28T19:34:25.047", "2017-11-28T19:34:25.047", 0]}
{"key": "7221943", "value": ["2017-11-28T19:34:25.047", "2017-11-28T19:34:25.047", 0]}
{"key": "3992826", "value": ["2017-11-28T19:38:04.527", "2017-11-28T19:38:04.527", 0]}
