# MapReduce

Generic programming model for processing data on many machines
- parallel computation
- fault tolerance to machine failure

Basic algorithm
- **map** records to key, value pairs
- **collect** records by key
- **reduce** across keys

Very similar to the split-apply-combine many data scientists do in `pandas`

Designed to be parallelized
- process data in a way that it becomes independent
- mappers & reducers can run in parallel on different part of the data


## Resources

[What is MapReduce? - talend](https://www.talend.com/resources/what-is-mapreduce/)

[Data Science from Scratch](https://www.oreilly.com/library/view/data-science-from/9781492041122/)


## History

Introduced by Google in 2004
- failure tolerance allowed Google to scale with commodity hardware
- Dean & Ghemawat (2004) MapReduce: Simplified Data Processing on Large Clusters - [paper](https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf)
- watch [Building Software Systems At Google and Lessons Learned - Jeff Dean](https://www.youtube.com/watch?v=modXC5IWTJI) for the history of Google's early technical development
- read [The Friendship That Made Google Huge - Jeff Dean and Sanjay Ghemawat](https://www.newyorker.com/magazine/2018/12/10/the-friendship-that-made-google-huge) for more on Jeff Dean

In 2014 Google stopped using MapReduce

Hadoop MapReduce is an open source implementation of MapReduce


## Map / Collect / Reduce

Read files
- Files are lists of records
- tabular data (rows are records, columns are floats, strings etc)
- split files into records

Map
- extract k, v pairs (intermediate keys)

Collect 
- also called shuffle & sort, combine
- grouping by keys before sending to the reducer

Reduce
- aggregation / grouping step
- count, average, filter, sum etc

Output
- writing result to a file


## Example - word counting

> There are few absolute rules of data science, but one of them is that your first MapReduce example has to involve counting words - Data Science from Scratch 

Input to the pipeline is a number of records (files):

In [1]:
records = [
    ['data', 'scientist', 'data', 'engineer'],
    ['data', 'engineer', 'data', 'engineer'],
    ['data', 'scientist', 'data', 'scientist', 'data', 'scientist']
]

The map step transforms the records into a list of key, value pairs.

Map on a single record, to produce a list of key, value pairs:

In [2]:
def mapper(record):
    return [(k, 1) for k in record]

mapped = mapper(records[0])

mapped

[('data', 1), ('scientist', 1), ('data', 1), ('engineer', 1)]

The reduce step combines the records into counts.  As we are working on a single machine, there is no need for a combine/shuffle to organize by keys.

Reduce on a single mapped record:

In [3]:
from collections import Counter

def reduce(mapped):
    return list(Counter([m[0] for m in mapped]).items())

reduce(mapped)

[('data', 2), ('scientist', 1), ('engineer', 1)]

## Run the full map-reduce pipeline

Running the mapper on all the records:

In [4]:
mapped = []
for r in records:
    mapped.extend(mapper(r))

Sort by key and split:

In [5]:
import itertools, operator

sort = sorted(mapped, key=lambda tup: tup[0])
#sort = [list(group) for key, group in itertools.groupby(sort, operator.itemgetter(0))]

sort

[('data', 1),
 ('data', 1),
 ('data', 1),
 ('data', 1),
 ('data', 1),
 ('data', 1),
 ('data', 1),
 ('engineer', 1),
 ('engineer', 1),
 ('engineer', 1),
 ('scientist', 1),
 ('scientist', 1),
 ('scientist', 1),
 ('scientist', 1)]

Run the reduce step:

In [6]:
reduced = []
for r in sort:
    reduced.extend(reduce(r))
    
reduced

TypeError: 'int' object is not subscriptable

## Question

Where are opportunities for parallelism in the word count pipeline?

## Practical

Implement a MapReduce operation on the dataset below:
- map to extract the data a list of (key, value) tuples
- reduce to filter out london
- map
- reduce to find the average for each key

You do not need anything more than the Python standard library

In [7]:
dataset = [
    'berlin 20, london 25, auckland 30, melbourne 35',
    'berlin 5, london 5, auckland 15, melbourne 15',
    'berlin 10, london 5, auckland 10, melbourne 10',
]

In [17]:
import itertools

def map_rec(rec):
    first = rec.split(', ')
    second = [(f.split(' ')[0], f.split(' ')[1]) for f in first]
    return second

second = list(map(map_rec, dataset))
second = list(itertools.chain(*second))
sort = sorted(second, key=lambda tup: tup[0])

sort = [list(group) for key, group in itertools.groupby(sort, operator.itemgetter(0))]

res = []
for key_records in sort:
    s = [int(t[1]) for t in key_records]
    res.append((key_records[0][0], sum(s) / len(key_records)))
    
res

[('auckland', 18.333333333333332),
 ('berlin', 11.666666666666666),
 ('london', 11.666666666666666),
 ('melbourne', 20.0)]

[('auckland', 18.333333333333332),
 ('berlin', 11.666666666666666),
 ('london', 11.666666666666666),
 ('melbourne', 20.0)]

## Practical

Convert the following dataset to the desired output using MapReduce (Python standard library only):

In [None]:
dataset = [
    ('panda', 0), ('pink', 3), ('pirate', 3), ('panda', 1), ('pink', 4)
]

# (key, (sum, count))
desired_output = [
    ('panda', (1, 2)), ('pink', (7, 2)), ('pirate', (3, 1))
]