# Tutorial 4: MapReduce

__The goal of this assignment is to implement 5 functions related to MapReduce and create 5 queries with them.__

MapReduce is based on functional programming, an approach that encourages the use of functions to build program.

While MapReduce can effectively process large amount of data, functional programming is a more general framework.

The purpose of this tutorial is to introduce you to functional programming through the use of MapReduce functions.

In the following course, we will use other functions to build higher-level abstraction based on Spark and RDD.

__The MapReduce functions are explained in the example section of this notebook.__

__Grade scale__: 20 points
- __correct function/query__: 2 points
- __incorrect function/query__: 2 points

__Further documentations__:
* https://learnxinyminutes.com/docs/python/
* https://en.wikipedia.org/wiki/MapReduce
* https://en.wikipedia.org/wiki/Functional_programming
* https://www.dataquest.io/blog/introduction-functional-programming-python/

# Core

In [59]:
# define some helper functions with lambda
# lambdas are functions that must fit on 1 line 

inc = lambda x: x + 1
dec = lambda x: x - 1
square = lambda x: x * x

add = lambda x, y: x + y
sub = lambda x, y: x - y

ispos = lambda x: x > 0
isneg = lambda x: x < 0 

isodd = lambda x: x % 2 == 1
iseven = lambda x: x % 2 == 0

ident = lambda x: (x, x)
sumall = lambda x: ('sum', x)
lenall = lambda x: ('len', len(x))
leneach = lambda x: (x, len(x))
countall = lambda x: ('count', 1)
counteach = lambda x: (x, 1)

# Examples

In [2]:
# map(f: Callable, coll: Iterable) -> Iterable: apply f to every object of coll
# - range(x: int) -> List[int]: generate a list of number from 0 to x (excluded)
# - Python's map mus be converted to a list so it can be printed on the screen

list(map(inc, range(10)))

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

In [3]:
# filter(f: Callable[..., bool], coll: Iterable) -> Iterable: keep every object of coll that satisfy f
# - we call f a predicate as this functions must return either True of False (boolean)
# - f is satified when its returns value is True

list(filter(iseven, range(10)))

[0, 2, 4, 6, 8]

In [4]:
# reduce(f: Callable, coll: Iterable, init: Any) -> Any: apply f on 2 objects cumulatively starting with init
# - example: an addition is a reduce function applied to a list of number where f is the + sign
# - we must import reduce from functools as this function is not built anymore in Python 3

from functools import reduce

reduce(add, range(10), 0)

45

# Functions

__You must not use Python builtins functions (i.e map, filter, reduce) !__  

I used my secret teacher assistant power to make the test fail if you do.

Instead, you must use for loops to create and return a new collection.

In [5]:
def mymap(f, coll):
    """Apply f to every object of coll."""
    result = list()
    ### BEGIN SOLUTION
    for x in coll:
        y = f(x)
        result.append(y)
    ### END SOLUTION
    return result

In [6]:
### BEGIN HIDDEN TESTS
map = None
### END HIDDEN TESTS
assert mymap(inc, []) == []
assert mymap(square, []) == []
assert mymap(inc, [2]) == [3]
assert mymap(square, [2]) == [4]
assert mymap(inc, [1, 2, 3]) == [2, 3, 4]
assert mymap(square, [1, 2, 3]) == [1, 4, 9]
### BEGIN HIDDEN TESTS
first = lambda x: x[0]
assert mymap(first, ['abc', 'def']) == ['a', 'd']
assert mymap(first, [[0, 1], [2, 3], [4, 5]]) == [0, 2, 4]
### END HIDDEN TESTS

In [7]:
def myfilter(f, coll):
    """Keep every object x of coll that satisfies f(x) (f(x) returns True)."""
    res = list()
    ### BEGIN SOLUTION
    for x in coll:
        if f(x) is True:
            res.append(x)
    ### END SOLUTION
    return res

In [8]:
### BEGIN HIDDEN TESTS
filter = None
### END HIDDEN TESTS
assert myfilter(ispos, []) == []
assert myfilter(iseven, []) == []
assert myfilter(ispos, [1]) == [1]
assert myfilter(iseven, [1]) == []
assert myfilter(ispos, [0, 1, 2]) == [1, 2]
assert myfilter(iseven, [0, 1, 2]) == [0, 2]
### BEGIN HIDDEN TESTS
islen1 = lambda x: len(x) == 1
assert myfilter(islen1, ['a', 'ab', 'abc']) == ['a']
assert myfilter(islen1, [[0], [1], [0, 1]]) == [[0], [1]]
### END HIDDEN TESTS

In [9]:
def myreduce(f, coll, init):
    """Apply f every two object of coll cumulatively starting with init."""
    res = init
    ### BEGIN SOLUTION
    for x in coll:
        res = f(res, x)
    ### END SOLUTION
    return res

In [10]:
### BEGIN HIDDEN TESTS
import functools
reduce = None
functools.reduce = None
### END HIDDEN TESTS
assert myreduce(add, [], 0) == 0
assert myreduce(sub, [], 9) == 9
assert myreduce(add, [1, 2, 3, 4, 5], 0) == 15
assert myreduce(sub, [1, 2, 3, 4, 5], 0) == -15
assert myreduce(add, [1, 2, 3, 4, 5], 5) == 20
assert myreduce(sub, [1, 2, 3, 4, 5], 5) == -10
### BEGIN HIDDEN TESTS
assert myreduce(add, ['el', 'lo'], 'h') == 'hello'
assert myreduce(add, [[1], [3], [5]], [0]) == [0, 1, 3, 5]
### END HIDDEN TESTS

In [11]:
# you don't need to implement this function, just to understand it
# this function is required by MapReduce to distribute over reducers
# myshuffle will be called by MapReduce between mapper and reducer calls

def myshuffle(tuples):
    """Group tuple values (second entry) by their key (first entry)."""
    shuffled = dict()
    
    for key, value in tuples:
        if key not in shuffled:
            shuffled[key] = list()
            
        shuffled[key].append(value)
    
    return sorted(shuffled.items())

In [12]:
assert myshuffle([]) == []
assert myshuffle([(1, 'a')]) == [(1, ['a'])]
assert myshuffle([(1, 'a'), (1, 'b')]) == [(1, ['a', 'b'])]
assert myshuffle([(1, 'a'), (2, 'b')]) == [(1, ['a']), (2, ['b'])]
assert myshuffle([(2, 'a'), (1, 'b')]) == [(1, ['b']), (2, ['a'])]
assert myshuffle([(1, 'a'), (1, 'b'), (2, 'c')]) == [(1, ['a', 'b']), (2, ['c'])]
assert myshuffle([(1, 'a'), (2, 'b'), (1, 'c'), (2, 'd')]) == [(1, ['a', 'c']), (2, ['b', 'd'])]

In [34]:
# mapper(x: Any) -> Iterable[Tuple[key, value]] (where Key = Value = Any)

def mymapreduce(mapper, reducer, coll, init):
    """Perform the following steps on coll:
       1. apply mapper on every object of coll
       2. group the output of mapper with myshuffle function
       3. run reducer with init on shuffled values and for each shuffled group
       4. return the key and its reducer output for each shuffled group in a new coll
    """
    res = list()
    ### BEGIN SOLUTION
    mapped = mymap(mapper, coll)
    shuffled = myshuffle(mapped)
    for key, values in shuffled:
        res.append((key, myreduce(reducer, values, init)))
    ### END SOLUTION
    return res

In [35]:
assert mymapreduce(countall, add, [], 0) == []
assert mymapreduce(countall, add, [1], 0) == [('count', 1)]
assert mymapreduce(countall, add, [1, 2], 0) == [('count', 2)]
assert mymapreduce(countall, add, [1, 2, 3], 0) == [('count', 3)]
assert mymapreduce(countall, add, [1, 2, 3], 5) == [('count', 8)]
assert mymapreduce(countall, sub, [1, 2, 3], 0) == [('count', -3)]
assert mymapreduce(counteach, add, [], 0) == []
assert mymapreduce(counteach, add, [1], 0) == [(1, 1)]
assert mymapreduce(counteach, add, [1, 2], 0) == [(1, 1), (2, 1)]
assert mymapreduce(counteach, add, [1, 2, 1], 0) == [(1, 2), (2, 1)]
assert mymapreduce(counteach, add, [1, 2, 1], 5) == [(1, 7), (2, 6)]
assert mymapreduce(counteach, sub, [1, 2, 1], 0) == [(1, -2), (2, -1)]
assert mymapreduce(counteach, add, [1, 2, 3], 0) == [(1, 1), (2, 1), (3, 1)]
### BEGIN HIDDEN TESTS
squareall = lambda x: ['square',  x * x]
assert mymapreduce(squareall, add, [1], 0) == [('square', 1)]
assert mymapreduce(squareall, add, [1, 2], 0) == [('square', 5)]
assert mymapreduce(squareall, add, [1, 2, 3], 0) == [('square', 14)]
assert mymapreduce(squareall, add, [1, 2, 3], 2) == [('square', 16)]
assert mymapreduce(squareall, sub, [1, 2, 3], 0) == [('square', -14)]
### END HIDDEN TESTS

In [45]:
# this function is hard to implement and not required for the rest of the assignment !

def myparmapreduce(pool, mapper, reducer, coll, init):
    """Parallel implementation of mapreduce:
       1. mapper is run in parallel by pool.map()
       2. reducers are run in parallel with pool.map()
    """
    res = list()
    ### BEGIN SOLUTION
    if len(coll) == 0:
        return []
    
    def myreducer(values):
        return myreduce(reducer, values, init)
    
    mapped = pool.map(mapper, coll)
    keys, grouped = zip(*myshuffle(mapped))
    for key, reduced in zip(keys, pool.map(myreducer, grouped)):
        res.append((key, reduced))
    ### END SOLUTION
    return res

In [48]:
from multiprocessing.dummy import Pool
p = Pool()

assert myparmapreduce(p, countall, add, [], 0) == []
assert myparmapreduce(p, countall, add, [1], 0) == [('count', 1)]
assert myparmapreduce(p, countall, add, [1, 2], 0) == [('count', 2)]
assert myparmapreduce(p, countall, add, [1, 2, 3], 0) == [('count', 3)]
assert myparmapreduce(p, countall, add, [1, 2, 3], 5) == [('count', 8)]
assert myparmapreduce(p, countall, sub, [1, 2, 3], 0) == [('count', -3)]
assert myparmapreduce(p, counteach, add, [], 0) == []
assert myparmapreduce(p, counteach, add, [1], 0) == [(1, 1)]
assert myparmapreduce(p, counteach, add, [1, 2], 0) == [(1, 1), (2, 1)]
assert myparmapreduce(p, counteach, add, [1, 2, 1], 0) == [(1, 2), (2, 1)]
assert myparmapreduce(p, counteach, add, [1, 2, 1], 5) == [(1, 7), (2, 6)]
assert myparmapreduce(p, counteach, sub, [1, 2, 1], 0) == [(1, -2), (2, -1)]
assert myparmapreduce(p, counteach, add, [1, 2, 3], 0) == [(1, 1), (2, 1), (3, 1)]
### BEGIN HIDDEN TESTS
class MyPool():
    def __init__(self):
        self.calls = 0
        self.tasks = 0
        
    def map(self, f, coll, *args, **kwargs):
        res = list()
        
        for x in coll:
            res.append(f(x))
            
        self.calls += 1
        self.tasks += len(coll)
        
        return res

mp = MyPool()
squareall = lambda x: ['square',  x * x]
assert myparmapreduce(mp, squareall, add, [1], 0) == [('square', 1)]
assert myparmapreduce(mp, squareall, add, [1, 2], 0) == [('square', 5)]
assert myparmapreduce(mp, squareall, add, [1, 2, 3], 0) == [('square', 14)]
assert myparmapreduce(mp, squareall, add, [1, 2, 3], 2) == [('square', 16)]
assert myparmapreduce(mp, squareall, sub, [1, 2, 3], 0) == [('square', -14)]
### END HIDDEN TESTS

# Queries

__You must use mymapreduce() for every query.__

Think carefuly about the role and signature of mapper and reducer.

__1. Count word occurrences in the given sentence__
- _hint_: use `sentense.split(' ')` to convert a string to a list of words

In [55]:
sentence = "a problem is a big problem when data is big"

def Q1(s):
    ### BEGIN SOLUTION
    return mymapreduce(counteach, add, s.split(' '), 0)
    ### END SOLUTION
    
Q1(sentence)

[('a', 2), ('big', 2), ('data', 1), ('is', 2), ('problem', 2), ('when', 1)]

In [56]:
assert Q1(sentence) == [('a', 2), ('big', 2), ('data', 1), ('is', 2), ('problem', 2), ('when', 1)]
### BEGIN HIDDEN TESTS
assert Q1("a a a b b c test") == [("a", 3), ("b", 2), ("c", 1), ("test", 1)]
### END HIDDEN TESTS

__2. Count the number of non-blank characters in the given sentence__
- _hint_: aggregate all mapper values in a single key

In [60]:
sentence = "a problem is a big problem when data is big"

def Q2(s):
    ### BEGIN SOLUTION
    return mymapreduce(lenall, add, s.split(' '), 0)
    ### END SOLUTION
    
Q2(sentence)

[('len', 34)]

In [63]:
 assert Q2(sentence)[0][1] == 34  # e.g. [('chars', 34)]
### BEGIN HIDDEN TESTS
assert Q2("a a a b b c test")[0][1] == 10
### END HIDDEN TESTS

__3. Group the given list of record by their age__
- _hint_: you can concatenate Python lists with `+`

In [69]:
persons = [{'name': 'Alice', 'age': 22}, {'name': 'Bob',  'age': 22},
           {'name': 'Max',   'age': 24}, {'name': 'Lina', 'age': 24},
           {'name': 'Ethan', 'age': 30}, {'name': 'Eva',  'age': 20}]

def Q3(coll):
    ### BEGIN SOLUTION
    return mymapreduce(lambda x: [x['age'], [x['name']]],
                       add, coll, [])
    ### END SOLUTION

Q3(persons)

[(20, ['Eva']), (22, ['Alice', 'Bob']), (24, ['Max', 'Lina']), (30, ['Ethan'])]

In [70]:
assert Q3(persons) == [(20, ['Eva']), (22, ['Alice', 'Bob']), (24, ['Max', 'Lina']), (30, ['Ethan'])]
### BEGIN HIDDEN TESTS
assert Q3([{'name': 'A', 'age': 2}, {'name': 'B', 'age': 2},
           {'name': 'C', 'age': 2}, {'name': 'D', 'age': 3}]) == [
    (2, ['A', 'B', 'C']), (3, ['D'])
]
### END HIDDEN TESTS

__4. Find the youngest person in the following list__
- _hint_: you can only compare items two by two

In [78]:
persons = [{'name': 'Alice', 'age': 22}, {'name': 'Bob',  'age': 22},
           {'name': 'Max',   'age': 24}, {'name': 'Lina', 'age': 24},
           {'name': 'Ethan', 'age': 30}, {'name': 'Eva',  'age': 20}]

def Q4(coll):
    ### BEGIN SOLUTION
    return mymapreduce(
        lambda x: ['min', x],
        lambda x, y: y if not x or y['age'] < x['age'] else x,
        coll, None)
    ### END SOLUTION
    
Q4(persons)

[('min', {'name': 'Eva', 'age': 20})]

In [79]:
assert Q4(persons)[0][1] == {'age': 20, 'name': 'Eva'}
### BEGIN HIDDEN TESTS
assert Q4([{'name': 'A', 'age': 1}, {'name': 'B', 'age': 2},
           {'name': 'C', 'age': 3}, {'name': 'D', 'age': 4}])[0][1] \
       == {'name': 'A', 'age': 1}
### END HIDDEN TESTS

__5. Retrieve the persons who are younger than 23 year old__

In [84]:
persons = [{'name': 'Alice', 'age': 22}, {'name': 'Bob',  'age': 22},
           {'name': 'Max',   'age': 24}, {'name': 'Lina', 'age': 24},
           {'name': 'Ethan', 'age': 30}, {'name': 'Eva',  'age': 20}]

def Q5(coll):
    ### BEGIN SOLUTION
    return mymapreduce(
        lambda x: ['select', x],
        lambda x, y: x + [y] if y['age'] < 23 else x,
        coll, [])
    ### END SOLUTION
    
Q5(persons)

[('select',
  [{'name': 'Alice', 'age': 22},
   {'name': 'Bob', 'age': 22},
   {'name': 'Eva', 'age': 20}])]

In [85]:
assert Q5(persons)[0][1] == [
    {'name': 'Alice', 'age': 22},
    {'name': 'Bob', 'age': 22},
    {'name': 'Eva', 'age': 20}
]
### BEGIN HIDDEN TESTS
assert Q5([{'name': 'A', 'age': 20}, {'name': 'B', 'age': 21},
           {'name': 'C', 'age': 25}, {'name': 'D', 'age': 26}])[0][1] == [
    {'name': 'A', 'age': 20}, {'name': 'B', 'age': 21}
]
### END HIDDEN TESTS