Let's develop some functions we can reuse to read mapper input and reducer input.

First - read mapper input file that we'll split on whitespace.
This function creates and returns a "generator"

In [None]:
def read_input(file):
    for line in file:
        # remove leading/trailing whitespace
        # split the line into words
        yield line.strip().split()

Testing the read_input function:
- Open a file, Call read_input to get a generator, Print results

In [None]:
f1 = open("fox.txt", 'r')
gen1 = read_input(f1)

In [None]:
for words in gen1:
    # Print the results returned by the generator
    print(words)

In [None]:
f1.close()

What do we need to calculate word count?
For each input, output the tuple [ word, 1 ].

In [None]:
def count_words(words):
    # Output tuples [word, 1] in tab-delimited format 
    for word in words:
        # For each word in the line, output a key/value pair
        # with the word as the key, and "1" as the value.
        print '%s\t%s' % (word, 1)

Test the count_words function.

In [None]:
"one two three three three two".split()

In [None]:
count_words("one two three three three two".split())

Test the count_words function using our function that reads mapper input.

In [None]:
f1 = open("fox.txt", 'r')
gen1 = read_input(f1)

for words in gen1:
    count_words(words)
    
f1.close()

Now a function that reads key/value reducer input.

In reduce input, the key is separated from the value with a tab.

In [None]:
def read_key_value(file):
    for line in file:
        # split the line into components, before and after the tab
        yield line.strip().split('\t', 1)

Test the read_key_value function.

In [None]:
f2 = open("wcMapOutput.txt", 'r')
gen2 = read_key_value(f2)

In [None]:
for word, counts in gen2:
    print(word, counts)

In [None]:
f2.close()

Define the reduce function

In [None]:
def word_count_reduce(kv_pairs):
    # Initialize the dictionary
    word2count = {}
    
    # Output tuples [word, 1] in tab-delimited format 
    for word, count in kv_pairs:
        # convert count (currently a string) to int
        try:
            count = int(count)
        except ValueError:
            continue

        try:
            word2count[word] = word2count[word]+count
        except:
            word2count[word] = count

    # write the tuples to stdout
    # Note: they are unsorted
    for word in word2count.keys():
        print '%s\t%s'% ( word, word2count[word] )

Test the reduce function

In [None]:
f2 = open("wcMapOutput.txt", 'r')
gen2 = read_key_value(f2)

word_count_reduce(gen2)

f2.close()

Refine the reduce function (avoid the dictionary)

In [None]:
def word_count_reduce2(kv_pairs):
    current_word = None
    current_count = 0
    word = None

    # input comes from STDIN
    for word, count in kv_pairs:
        # convert count (currently a string) to int
        try:
            count = int(count)
        except ValueError:
            # Count was not a number
            continue

        if current_word == word:
            current_count += count
        else:
            if current_word:
                # Output the count for current_word
                # Hadoop has sorted key/value pairs by key
                print '{}\t{}'.format(current_word, current_count)
            current_count = count
            current_word = word

    # Output the last word, if needed
    if current_word == word:
        print '{}\t{}'.format(current_word, current_count)


Test the refined reduce function

In [None]:
f2 = open("wcMapOutput.txt", 'r')
gen2 = read_key_value(f2)

word_count_reduce2(gen2)

f2.close()