## Examples using the MapReduce programming model

1. Word count
2. Inverted index
3. Relational Join
4. Count number of friends
5. Assymetrical friendships
6. Trim DNA sequences
7. Matrix multiplication


In [1]:
import json

class MapReduce: 
    def __init__(self):
        self.intermediate = {}
        self.result = []

    def emit_intermediate(self, key, value):
        self.intermediate.setdefault(key, [])
        self.intermediate[key].append(value)

    def emit(self, value):
        self.result.append(value) 

    def execute(self, data, mapper, reducer):
        for line in data:
            record = json.loads(line)
            mapper(record)

        for key in self.intermediate:
            reducer(key, self.intermediate[key])

        #jenc = json.JSONEncoder(encoding='latin-1')
        jenc = json.JSONEncoder()
        for item in self.result:
            print jenc.encode(item)

### Ex. 1 : Word count

In [67]:
mr = MapReduce()

def mapper(record):
    # key: document identifier
    # value: document contents
    key = record[0] 
    value = record[1] 
    words = value.split()
    for w in words:
      mr.emit_intermediate(w, 1)

def reducer(key, list_of_values):
    # key: word
    # value: list of occurrence counts
    total = 0
    for v in list_of_values:
      total += v
    mr.emit((key, total))

inputdata = open('data/books.json')

In [85]:
!head -n 3 data/books.json

["milton-paradise.txt", "[ Paradise Lost by John Milton 1667 ] Book I Of Man ' s first disobedience , and the fruit Of that forbidden tree whose mortal taste Brought death into the World , and all our woe , With loss of Eden , till one greater Man Restore us , and regain the blissful seat , Sing , Heavenly Muse , that , on the secret top Of Oreb , or of Sinai , didst inspire That shepherd who first taught the chosen seed In the beginning how the heavens and earth Rose out of Chaos : or , if Sion hill Delight thee more , and Siloa ' s brook that flowed Fast by the oracle of God , I thence Invoke thy aid to my adventurous song , That with no middle flight intends to soar Above th ' Aonian mount , while it pursues Things unattempted yet in prose or rhyme ."]
["edgeworth-parents.txt", "[ The Parent ' s Assistant , by Maria Edgeworth ] THE ORPHANS . Near the ruins of the castle of Rossmore , in Ireland , is a small cabin , in which there once lived a widow and her four children . As long a

In [32]:
mr.execute(inputdata, mapper, reducer)

["all", 4]
["Rossmore", 1]
["Consumptive", 1]
["forbidden", 1]
["child", 2]
["eldest", 1]
["four", 1]
["Caesar", 1]
["winds", 1]
["Moses", 1]
["children", 1]
["seemed", 2]
["whose", 1]
["Parent", 1]
["father", 1]
["young", 1]
["vex", 1]
["to", 20]
["th", 1]
["smile", 1]
["Day", 1]
["flowed", 1]
["me", 2]
["woman", 1]
["Whitman", 2]
["song", 3]
["very", 4]
["void", 1]
["rhyme", 1]
["Oreb", 1]
["Pipe", 1]
["Where", 1]
["heaven", 1]
["Bentley", 1]
["--", 3]
["sang", 2]
["1667", 1]
["pleas", 1]
["queer", 1]
["Fast", 1]
["VOLUME", 1]
["Shakespeare", 1]
["utter", 1]
["Jane", 1]
["emptiness", 1]
["blue", 1]
["what", 3]
["darkness", 3]
["Cobler", 1]
["supplied", 1]
["Delight", 1]
["above", 2]
[";", 14]
["ever", 2]
["piped", 2]
["Apparrell", 1]
["body", 1]
["Emma", 2]
["Professor", 1]
["men", 2]
["here", 1]
["youngest", 1]
["let", 1]
["crippled", 1]
["sing", 1]
["[", 11]
["invented", 2]
["daughter", 1]
["laughter", 1]
["spheres", 1]
["(", 4]
["plume", 1]
["Blake", 1]
["heart", 1]
["Things", 1]


### Ex. 2 : Create an inverted index from a set of texts

In [69]:
mr = MapReduce() #Mapreduce object

def mapper(record):
    # key: word in document
    # value: document title
    key = record[1] 
    value = record[0] 
    keys = key.split()
    for k in keys:
        mr.emit_intermediate(k, value)

def reducer(key, list_of_values):
    # key: word in document
    # list_of_values: list of document titles
    list_of_values = list(set(list_of_values)) #remove duplicates
    #print(key)
    #print(list_of_values)
    
    mr.emit((key, list_of_values))

inputdata = open('data/books.json')

In [70]:
mr.execute(inputdata, mapper, reducer)

["all", ["milton-paradise.txt", "blake-poems.txt", "melville-moby_dick.txt"]]
["Rossmore", ["edgeworth-parents.txt"]]
["Consumptive", ["melville-moby_dick.txt"]]
["forbidden", ["milton-paradise.txt"]]
["child", ["blake-poems.txt"]]
["eldest", ["edgeworth-parents.txt"]]
["four", ["edgeworth-parents.txt"]]
["Caesar", ["shakespeare-caesar.txt"]]
["winds", ["whitman-leaves.txt"]]
["Moses", ["bible-kjv.txt"]]
["children", ["edgeworth-parents.txt"]]
["seemed", ["chesterton-ball.txt", "austen-emma.txt"]]
["whose", ["milton-paradise.txt"]]
["Parent", ["edgeworth-parents.txt"]]
["father", ["austen-emma.txt"]]
["young", ["chesterton-thursday.txt"]]
["vex", ["austen-emma.txt"]]
["to", ["milton-paradise.txt", "austen-emma.txt", "chesterton-ball.txt", "blake-poems.txt", "whitman-leaves.txt", "melville-moby_dick.txt", "edgeworth-parents.txt"]]
["th", ["milton-paradise.txt"]]
["smile", ["whitman-leaves.txt"]]
["Day", ["bible-kjv.txt"]]
["flowed", ["milton-paradise.txt"]]
["me", ["blake-poems.txt", "s

### Ex. 3 : Relational Join in MR

Implement equivalent of SQL statement:

    SELECT *
    FROM Orders, LineItem
    WHERE Order.order_id = LineItem.order_id

In [71]:
mr = MapReduce()

def mapper(record):
    # key: order_id
    # value: whole record, coming either from 'Order' or 'LineItem' table
    key = record[1]
    value = record
    #print(value)

    mr.emit_intermediate(key, value)

    
def reducer(key, list_of_values):
    # key: order_id
    # list_of_values: list of lists of records from 'Order' and 'LineItem' tables with same order_id
    
    item_list_pos = []
    for i in range(len(list_of_values)):
        if 'order' in list_of_values[i] :
            order_list_pos = i    #retrieve index of record from 'Order' table (this record is unique for ea. order_id)
        else: item_list_pos.append(i) #indices of all items for a given order_id
    
    for i in range(len(item_list_pos)):
        #order record followed by all corresponding items
        mr.emit(list_of_values[order_list_pos] + list_of_values[item_list_pos[i]])
        
inputdata = open('data/records.json')

In [83]:
!head -n 3 data/records.json #some records from the Order table

["order", "1", "36901", "O", "173665.47", "1996-01-02", "5-LOW", "Clerk#000000951", "0", "nstructions sleep furiously among "]
["order", "2", "78002", "O", "46929.18", "1996-12-01", "1-URGENT", "Clerk#000000880", "0", " foxes. pending accounts at the pending, silent asymptot"]
["order", "3", "123314", "F", "193846.25", "1993-10-14", "5-LOW", "Clerk#000000955", "0", "sly final accounts boost. carefully regular ideas cajole carefully. depos"]


In [84]:
!tail -n 3 data/records.json #some records from the LineItem table

["line_item", "32", "2743", "7744", "4", "4", "6582.96", "0.09", "0.03", "N", "O", "1995-08-04", "1995-10-01", "1995-09-03", "NONE", "REG AIR", "e slyly final pac"]
["line_item", "32", "85811", "8320", "5", "44", "79059.64", "0.05", "0.06", "N", "O", "1995-08-28", "1995-08-20", "1995-09-14", "DELIVER IN PERSON", "AIR", "symptotes nag according to the ironic depo"]
["line_item", "32", "11615", "4117", "6", "6", "9159.66", "0.04", "0.03", "N", "O", "1995-07-21", "1995-09-23", "1995-07-25", "COLLECT COD", "RAIL", " gifts cajole carefully."]


In [74]:
mr.execute(inputdata, mapper, reducer)

["order", "32", "130057", "O", "208660.75", "1995-07-16", "2-HIGH", "Clerk#000000616", "0", "ise blithely bold, regular requests. quickly unusual dep", "line_item", "32", "82704", "7721", "1", "28", "47227.60", "0.05", "0.08", "N", "O", "1995-10-23", "1995-08-27", "1995-10-26", "TAKE BACK RETURN", "TRUCK", "sleep quickly. req"]
["order", "32", "130057", "O", "208660.75", "1995-07-16", "2-HIGH", "Clerk#000000616", "0", "ise blithely bold, regular requests. quickly unusual dep", "line_item", "32", "197921", "441", "2", "32", "64605.44", "0.02", "0.00", "N", "O", "1995-08-14", "1995-10-07", "1995-08-27", "COLLECT COD", "AIR", "lithely regular deposits. fluffily "]
["order", "32", "130057", "O", "208660.75", "1995-07-16", "2-HIGH", "Clerk#000000616", "0", "ise blithely bold, regular requests. quickly unusual dep", "line_item", "32", "44161", "6666", "3", "2", "2210.32", "0.09", "0.02", "N", "O", "1995-08-07", "1995-10-07", "1995-08-23", "DELIVER IN PERSON", "AIR", " express accounts wake a

### Ex. 4 : Count the number of friends for each person in a social network dataset

Social network is described by a set of (person, friend) pairs.

=> identical to the word count example

In [75]:
mr = MapReduce()

def mapper(record):
    # key: person A
    # value: person A's friend (can be non-reciprocical)
    key = record[0]
    value = record[1]
    
    mr.emit_intermediate(key, 1)

def reducer(key, list_of_values):
    # key: person
    # value: list of friend occurrence counts
    total = 0
    for v in list_of_values:
        total += v
    mr.emit((key, total))


inputdata = open('data/friends.json')

In [86]:
!head -n 3 data/friends.json

["Myriel", "Geborand"]
["Myriel", "Champtercier"]
["Myriel", "Count"]


In [77]:
mr.execute(inputdata, mapper, reducer)

["MlleBaptistine", 3]
["Myriel", 5]
["Valjean", 16]
["MmeMagloire", 1]
["Champtercier", 1]
["Napoleon", 1]


### Ex. 5 : Assymetrical friendships

In [78]:
mr = MapReduce()

def mapper(record):

    person = record[0]
    friend = record[1]
    
    #emit both friend and reciprocical relationships
    mr.emit_intermediate((person, friend), 1)
    mr.emit_intermediate((friend, person), -1)
    
    #if reciprocical relationship exists, then values will sum to zero acording to:
    #    ((A,B),1)
    #    ((B,A),-1)
    #       ...
    #    ((B,A),1)
    #    ((A,B),-1)

def reducer(key, list_of_values):
    
    #if reciprocical relationship does not exist then sum(list_of_values) will not cancel out
    if (sum(list_of_values) != 0):
        mr.emit((key))

inputdata = open('data/friends.json')
mr.execute(inputdata, mapper, reducer)

["OldMan", "Myriel"]
["Valjean", "MmeDeR"]
["Valjean", "Montparnasse"]
["Myriel", "MmeMagloire"]
["MmeDeR", "Valjean"]
["Valjean", "Judge"]
["Myriel", "MlleBaptistine"]
["Napoleon", "Myriel"]
["Babet", "Valjean"]
["Myriel", "Napoleon"]
["Myriel", "OldMan"]
["Valjean", "Babet"]
["MlleBaptistine", "Valjean"]
["MmeMagloire", "Myriel"]
["Myriel", "Count"]
["Simplice", "Valjean"]
["Valjean", "Simplice"]
["Woman2", "Valjean"]
["Woman1", "Valjean"]
["MlleGillenormand", "Valjean"]
["Valjean", "Woman2"]
["Myriel", "Geborand"]
["MmeMagloire", "MlleBaptistine"]
["Count", "Myriel"]
["MlleBaptistine", "MmeMagloire"]
["Valjean", "Isabeau"]
["Valjean", "Labarre"]
["Cosette", "Valjean"]
["Valjean", "MlleBaptistine"]
["Fantine", "Valjean"]
["Valjean", "Gillenormand"]
["Valjean", "Marguerite"]
["Marguerite", "Valjean"]
["Valjean", "MlleGillenormand"]
["MmeMagloire", "Valjean"]
["Gillenormand", "Valjean"]
["Labarre", "Valjean"]
["Valjean", "MmeMagloire"]
["Valjean", "Fantine"]
["Valjean", "Woman1"]
["Isa

### Ex. 6 : Trim DNA sequences and remove duplicates

In [66]:
mr = MapReduce()

def mapper(record):
    seq_name = record[0]
    dna_seq = record[1]
    
    #trim last 10 char
    mr.emit_intermediate(dna_seq[:-10], 1)
    
def reducer(key, list_of_values):

    #redundancies are already grouped within one unique key
    mr.emit((key))
     
inputdata = open('data/dna.json')
mr.execute(inputdata, mapper, reducer)

"CTGCAGCCACCCCCTGCTGCCCCCACCTGAACCCTTGATCCCAGCTCGGCAGCCCCCGCAGTTTCCTGTTTGCCCACTCTCTTTGCCCAGCCTCAGGAACAGAGCTGATCCTTGAACTCTAAGTTCCACATCGCCAGCAAAAGTAAGCAGTGGCAGGGCCAGGCTGAGCTTATCAGTCTCCCAAGTCCCCAGCCCCTGCCCACACACATATATAGACCAGGGAAGAAGAGCTGGACACCCAGACTGTCGGAGAGCTCCGGGGAGGTAAGTGCTGCTACCTGCCTTCGGTGGCTCTGGCTCCATAGCGCCTCCCAGTTGATGCTCCACTGTCCAAAGTCCAAATCAATACCGTTGGATATCTCGCACCTTTAGCCATTCTAGCCAATGCTTCCATGGGCTTGAATTGTGTGTGGAGCCTTTCCATGACAATGCCTCCCTTCAGCCCAGCCCTCCTCCTCCTTCTTCTTCTTCAGGTCACCCACACCCTTCAGGATGAAAGCTGTGGTGCTGGCCGTGGCTCTGGTCTTCCTGACAGGTAGGTGCCTCTTGACCTGCGTGGGACTACCTTCCTGGGCACAGAGAACAGAATTCCCACTGTTCTCTTCCCTGACTCCGAGTCTAACCTAACATGGGTCTCCCCTCCATCCCCAGGGAGCCAGGCTTGGCACGTATGGCAGCAAGATGAACCCCAGTCCCAATGGGACAAAGTGAAGGATTTCGCTAATGTGTATGTGGATGCGGTCAAAGACAGCGGCAGAGACTATGTGTCCCAGTTTGAATCCTCCTCCTTGGGCCAACAGCTGAAGTAAGGAAAGACCTAGCGTGGGGCCTGGAGCAGGTCAAGGGCTGCCCATCCAGGGTGGGCAGAGAGACCAGTGAGAAGATGCTGGAACTGAGCTGGCTAGCCCTTCACGGGCTTTCCTACCAGCTGGGCACCATGGCAGGTTCCAGTGGAGGACTAGGGATGGGATTCATCTGGCTGTTGGGTAACCACAGCCC

### Ex. 7 : Matrix multiplication

In [82]:
mr = MapReduce()

def mapper(record):
    
    # Assume matrices dimensions are 5x5
    dim = 5
    matrix = record[0]
    
    if (matrix == "a") : 
        a_i = record[1]
        a_j = record[2]
        a_ij = record[3]
        #print 'matrix A : a_i = ', a_i, 'a_j = ', a_j, 'a_ij = ', a_ij
        # Emit one Aij element for each Cik element for which it will be used,
        # that is for every element of the corresponding column in B
        for d in range(dim):
            mr.emit_intermediate((a_i, d), (matrix, a_j, a_ij))

    if (matrix == "b") : 
        b_j = record[1]
        b_k = record[2]
        b_jk = record[3]
        #print 'matrix B : b_j = ', b_j, 'b_k = ', b_k, 'b_jk = ', b_jk
        for d in range(dim):
            mr.emit_intermediate((d, b_k), (matrix, b_j, b_jk))

    
def reducer(key, list_of_values):
    
    dim = 5
    #print 'key = ', key, '| list_of_values = ', list_of_values
    list_a = []
    list_b = []
    for list_idx in range(len(list_of_values)):
        if 'a' in list_of_values[list_idx] : list_a.append(list_of_values[list_idx])
        if 'b' in list_of_values[list_idx] : list_b.append(list_of_values[list_idx])       
       
    # A row and b column initialized at zero with the right dimension
    a_row = [0] * dim
    b_col = [0] * dim
    
    for i in range(len(list_a)):
        a_row[int(list_a[i][1])] =  list_a[i][2]
        # index of a_row is determined by int(list_a[i][1]), since list_a is sparse 
    for i in range(len(list_b)):
        b_col[int(list_b[i][1])] =  list_b[i][2]
    
    # dot product of column and row
    c_ik = sum([i*j for (i,j) in zip(a_row, b_col)])
    
    mr.emit((key[0], key[1], c_ik))

inputdata = open('data/matrix.json')

In [87]:
!head data/matrix.json

["a", 0, 0, 63]
["a", 0, 1, 45]
["a", 0, 2, 93]
["a", 0, 3, 32]
["a", 0, 4, 49]
["a", 1, 0, 33]
["a", 1, 3, 26]
["a", 1, 4, 95]
["a", 2, 0, 25]
["a", 2, 1, 11]


In [88]:
mr.execute(inputdata, mapper, reducer)

[1, 3, 7479]
[3, 0, 10512]
[2, 1, 9880]
[0, 3, 5964]
[4, 0, 11182]
[1, 2, 8282]
[3, 3, 2934]
[4, 4, 9981]
[2, 2, 10636]
[4, 1, 14591]
[1, 1, 6914]
[3, 2, 10587]
[0, 0, 11878]
[0, 4, 15874]
[1, 4, 9647]
[2, 3, 6973]
[4, 2, 10954]
[1, 0, 4081]
[0, 1, 14044]
[3, 1, 12037]
[2, 4, 8873]
[2, 0, 6844]
[4, 3, 1660]
[3, 4, 5274]
[0, 2, 16031]
