## live session 2 - map reduce

step-by-step approach to map reduce framework.

we will start with examining a simple "word count" program implemented in python

### follow along here
https://zettadatanet.wordpress.com/2015/04/04/a-hands-on-introduction-to-mapreduce-in-python/

### wordcount - the completely manual way

In [1]:
import sys
import re


In [2]:
# wordcount implementation using python dictionary
sums = {}


filename = 'pg2701.txt'
f = open(filename, 'r')

for line in f:
    line = re.sub( r'^\W+|\W+$', '', line )
    words = re.split(r'\W+', line)

    for word in words:
        word = word.lower()
        sums[word] = sums.get( word, 0 ) + 1

print sums



### wordcount - using map reduce

three steps
1. map
2. shuffle
3. reduce

#### 1. map step

for each word you encounter, "emit" a word and the number 1 for future counting

In [4]:
filename = 'now.txt'
#filename = 'pg2701.txt'
f = open(filename, 'r')

map_output = list()
for line in f:
    line = re.sub( r'^\W+|\W+$', '', line )
    words = re.split(r"\W+", line)
    
    for word in words:
        print( word.lower() + "\t1" )
        map_output.append( (word.lower(),1))

now	1
is	1
the	1
time	1
for	1
all	1
good	1
men	1
to	1
come	1
to	1
the	1
aid	1
of	1
the	1
party	1


In [5]:
map_output

[('now', 1),
 ('is', 1),
 ('the', 1),
 ('time', 1),
 ('for', 1),
 ('all', 1),
 ('good', 1),
 ('men', 1),
 ('to', 1),
 ('come', 1),
 ('to', 1),
 ('the', 1),
 ('aid', 1),
 ('of', 1),
 ('the', 1),
 ('party', 1)]

#### 2. shuffle step

sort so that the same item is next to each other

In [6]:
# shuffle step 

map_sorted = sorted(map_output)
map_sorted

[('aid', 1),
 ('all', 1),
 ('come', 1),
 ('for', 1),
 ('good', 1),
 ('is', 1),
 ('men', 1),
 ('now', 1),
 ('of', 1),
 ('party', 1),
 ('the', 1),
 ('the', 1),
 ('the', 1),
 ('time', 1),
 ('to', 1),
 ('to', 1)]

#### 3. reduce step

loop thru sorted list and count occurences

In [7]:
# reduce step

previous = None
sum = 0

for key, value in map_sorted:

    if key != previous:
        if previous is not None:
            print str( sum ) + '\t' + previous
        previous = key
        sum = 0
    
    sum = sum + value

print str( sum ) + '\t' + previous


1	aid
1	all
1	come
1	for
1	good
1	is
1	men
1	now
1	of
1	party
3	the
1	time
2	to


### what else can you do with map-reduce?

what if we want to get the distribution of number of letters in each word?
how would you change your mapper? reducer?

In [17]:
filename = 'now.txt'
#filename = 'pg2701.txt'
f = open(filename, 'r')

map_output = list()
for line in f:
    line = re.sub( r'^\W+|\W+$', '', line )
    words = re.split(r"\W+", line)
    
    for word in words:
        print( word.lower() + "\t" + word.lower()[0] + "\t1" )
        map_output.append( (word.lower()[0],1))

now	n	1
is	i	1
the	t	1
time	t	1
for	f	1
all	a	1
good	g	1
men	m	1
to	t	1
come	c	1
to	t	1
the	t	1
aid	a	1
of	o	1
the	t	1
party	p	1


In [18]:
map_output

[('n', 1),
 ('i', 1),
 ('t', 1),
 ('t', 1),
 ('f', 1),
 ('a', 1),
 ('g', 1),
 ('m', 1),
 ('t', 1),
 ('c', 1),
 ('t', 1),
 ('t', 1),
 ('a', 1),
 ('o', 1),
 ('t', 1),
 ('p', 1)]

In [19]:
# shuffle step 

map_sorted = sorted(map_output)
map_sorted

[('a', 1),
 ('a', 1),
 ('c', 1),
 ('f', 1),
 ('g', 1),
 ('i', 1),
 ('m', 1),
 ('n', 1),
 ('o', 1),
 ('p', 1),
 ('t', 1),
 ('t', 1),
 ('t', 1),
 ('t', 1),
 ('t', 1),
 ('t', 1)]

In [20]:
# reduce step

previous = None
sum = 0

for key, value in map_sorted:

    if key != previous:
        if previous is not None:
            print str( sum ), '\t' , previous
        previous = key
        sum = 0
    
    sum = sum + value

print str( sum ), '\t' , previous


2 	a
1 	c
1 	f
1 	g
1 	i
1 	m
1 	n
1 	o
1 	p
6 	t


### using python map and reduce functions

In [None]:
import mrjob

In [None]:
from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)



MRWordFrequencyCount.

In [None]:
def f(line):
    words = re.split(r'\W+', line)
    out = list()
    for word in words:
        word = word.lower()
        out.append( (word.lower(),1))
    return out

In [None]:
fp = open(filename)
lines = fp.readlines()
map_output = map(f , lines)

In [None]:
sort_output = sorted(map_output[0])