This is a python 3 notebook that does use any exotic libraries.

In [1]:
"""
This function behaves the same as the built in map function (except that this materializes the iterator and the build in one doesn't)
"""
def standardMap(f, seq):
    return [f(x) for x in seq]

In [2]:
from random import shuffle
"""
This function randomizes the order of the data passed in and then returns the list the results from applying the 
passed in function to each element of the shuffled list.
"""
def randomMap(f, seq):
    retval = [x for x in seq]
    shuffle(retval)
    return [f(x) for x in retval]

In [3]:
"""
Show results of the three different map functions at our disposal.
"""
def tryDifferentMaps(f, seq):
    print("builtin: " + str([x for x in map(f, seq)]))
    print("standard: " + str(standardMap(f, seq)))
    print("random: " + str(randomMap(f, seq)))

In [4]:
f = lambda x: x + 1
tryDifferentMaps(f, range(1, 5))

builtin: [2, 3, 4, 5]
standard: [2, 3, 4, 5]
random: [3, 5, 4, 2]


In [5]:
"""
This function is equivalent to the built in reduce function
"""
def standardReduce(f, seq):
    curval = seq[0]
    for newval in seq[1:]:
        curval = f(curval, newval)
    return curval

In [6]:

"""
This function is similar to the built in reduce function, but it shuffles the sequence before operating on it.
This is comparable to the problem faced when working on multiple processes/computers in parallel.
"""
def randomReduce(f, seq):
    rseq = [x for x in seq]
    shuffle(rseq)
    curval = rseq[0]
    for newval in rseq[1:]:
        curval = f(curval, newval)
    return curval

In [7]:
from functools import reduce

In [8]:
"""
Show results of the three different reduce functions at our disposal.
"""
def tryDifferentReductions(f, seq):
    print("builtin: " + str(reduce(f, seq)))
    print("standard: " + str(standardReduce(f, seq)))
    print("random: " + str(randomReduce(f, seq)))

In [9]:
f = lambda x, y: x*y # a commutative function
tryDifferentReductions(f, range(2,6))

builtin: 120
standard: 120
random: 120


In [10]:
f = lambda x, y: pow(x,y) # a non-commutative function
tryDifferentReductions(f, range(2,6))

builtin: 1152921504606846976
standard: 1152921504606846976
random: 59604644775390625


In [11]:
f = lambda x, y: x+" "+y # another non-commutative function
tryDifferentReductions(f, "this is a sentece".split())

builtin: this is a sentece
standard: this is a sentece
random: a this sentece is


In [12]:
"""
The simplest version of map reduce that I could show.  This version does not account for the shuffle step, which would be
necessary to do a hash join efficiently.
"""
def simpleMapReduce(mapFunc, reduceFunc, seq):
    return randomReduce(reduceFunc, randomMap(mapFunc, seq))
    

In [13]:
mapFunc = lambda x:x
reduceFunc = lambda x,y: x+y
simpleMapReduce(mapFunc, reduceFunc, range(1,10))

45

In [14]:
"""
This is similar to the simple map reduce algorithm above, but it breaks the data apart and does each subset on its own before
recombining everything together at the end.
"""
def randomMapReduce(mapFunc, reduceFunc, seq):
    workingList = [] #this variable will hold intermediate results after doing some, but not all reduction operations
    
    #break the input sequence up and treat each operation as if it's happening in parallel.
    sizePerChunk = 4
    def chunks():
        for i in range(0, len(seq), sizePerChunk):
            yield seq[i:i+sizePerChunk]
    chunkedList = list(chunks())
    for i in range(len(chunkedList)):
        workingList.append(simpleMapReduce( mapFunc, reduceFunc, chunkedList[i]))
    
    #once the parallel operations have been done, do the last combination task.
    return randomReduce(reduceFunc, workingList)

In [15]:
mapFunc = lambda x:x
reduceFunc = lambda x,y: x+y
randomMapReduce(mapFunc, reduceFunc, range(1,10))

45

In [16]:
"""
This function accepts two sorted lists and the comparison operator used to sort them and efficiently combines them into a 
single sorted list
"""
def mergeLists (left, right, comparator):
    retval = []
    leftPointer = 0
    rightPointer = 0
    while leftPointer+rightPointer < len(left)+len(right):
        #print("leftPointer: %i, rightPointer: %i"%(leftPointer, rightPointer))
        if leftPointer > len(left)-1:
            retval.append(right[rightPointer])
            rightPointer += 1
        elif rightPointer > len(right)-1:
            retval.append(left[leftPointer])
            leftPointer += 1
        elif comparator(left[leftPointer], right[rightPointer]):
            retval.append(left[leftPointer])
            leftPointer += 1
        else:
            retval.append(right[rightPointer])
            rightPointer += 1
    return retval

In [17]:
mapFunc = lambda x:[x]
reduceFunc = lambda x,y: mergeLists(x, y, lambda x, y: x<y)
simpleMapReduce(mapFunc, reduceFunc, range(1,10))

[1, 2, 3, 4, 5, 6, 7, 8, 9]

In [18]:
mapFunc = lambda x:[x]
reduceFunc = lambda x,y: mergeLists(x, y, lambda x, y: x>y)
simpleMapReduce(mapFunc, reduceFunc, range(1,10))

[9, 8, 7, 6, 5, 4, 3, 2, 1]

In [19]:
#9. Write map reduce to count the letters in the sentence 'I used join to split this sentence.'
mapFunc = lambda x:len(x)
reduceFunc = lambda x,y: x+y
seq = 'I used join to split this sentence.'.split()
simpleMapReduce(mapFunc, reduceFunc, seq)

29

In [20]:
#10. Write map reduce to get the count of words of each length in the above sentence.

#this is a dictionary wrapper that is identical to dictionary except that, in the case of invalid references, this function
#returns 0 instead of throwing an error.
class default0Dict(dict):
    def __missing__(self, key):
        return 0
    
mapFunc = lambda x:default0Dict({len(x):1})
reduceFunc = lambda x,y: default0Dict({key:x[key]+y[key] for key in #take the sum of the values in the two dictionaries for
                                                                   #this key
                                       set(x.keys()) | set(y.keys())} #take the union of the list of keys from both dicts
                                                                    #to determine which keys to have in the new dict
                                     )
seq = 'I used join to split this sentence.'.split()
randomMapReduce(mapFunc, reduceFunc, seq)

{1: 1, 2: 1, 4: 3, 5: 1, 9: 1}