# Hadoop WordCount Example in Python

Slight adaptions to the code from last week to enable local processing

## new `mapper.py`

In [1]:
#!/usr/bin/env python
import sys 

def mapper(textinput):
    # input comes from STDIN (standard input)
    returnstring = ""
    for line in textinput.splitlines():
        # remove leading and trailing whitespace
        line = line.strip()
        # split the line into words
        words = line.split()
        # increase counters
        for word in words:
            # write the results to STDOUT (standard output);
            # what we output here will be the input for the
            # Reduce step, i.e. the input for reducer.py
            #
            # tab-delimited; the trivial word count is 1
            returnstring = returnstring + ('%s\t%s\n' % (word, 1))
    print (returnstring)
    return returnstring

mapper(sys.stdin.read())




''

## new `reducer.py`

In [2]:
#!/usr/bin/env python

import sys 
from operator import itemgetter

         
def reducer(textinput):
    current_word = None
    current_count = 0
    word = None
    returnstring = ""
    # input comes from STDIN
    for line in textinput.splitlines():
        # remove leading and trailing whitespace
        line = line.strip()
        
        #print("»" + line + "»")
        
        # parse the input we got from mapper.py
        try:
            word, count = line.split('\t', 1)
        except ValueError:
            # e.g. empty line
            continue

        # convert count (currently a string) to int
        try:
            count = int(count)
        except ValueError:
            # count was not a number, so silently
            # ignore/discard this line
            continue

        # this IF-switch only works because Hadoop sorts map output
        # by key (here: word) before it is passed to the reducer
        if current_word == word:
            current_count += count
        else:
            if current_word:
                # write result to STDOUT
                returnstring = returnstring + ('%s\t%s\n' % (current_word, current_count))
            current_count = count
            current_word = word

    # do not forget to output the last word if needed!
    if current_word == word:
        returnstring = returnstring + ('%s\t%s\n' % (current_word, current_count))
    print (returnstring)
    return returnstring
            


reducer(sys.stdin.read())

None	0



'None\t0\n'

In [3]:
output = mapper("Hallo hallo hallo ich bin ein Test\nHallo ich bin noch ein Test")
output = reducer(output)
print(output)

Hallo	1
hallo	1
hallo	1
ich	1
bin	1
ein	1
Test	1
Hallo	1
ich	1
bin	1
noch	1
ein	1
Test	1

Hallo	1
hallo	2
ich	1
bin	1
ein	1
Test	1
Hallo	1
ich	1
bin	1
noch	1
ein	1
Test	1

Hallo	1
hallo	2
ich	1
bin	1
ein	1
Test	1
Hallo	1
ich	1
bin	1
noch	1
ein	1
Test	1



In [4]:
output = mapper("Hallo hallo hallo ich bin ein Test\nHallo ich bin noch ein Test")
output=sorted(output.split('\n'))
output='\n'.join(output)
output = reducer(output)
print(output)
print(type(output))

Hallo	1
hallo	1
hallo	1
ich	1
bin	1
ein	1
Test	1
Hallo	1
ich	1
bin	1
noch	1
ein	1
Test	1

Hallo	2
Test	2
bin	2
ein	2
hallo	2
ich	2
noch	1

Hallo	2
Test	2
bin	2
ein	2
hallo	2
ich	2
noch	1

<class 'str'>


## Funktionalität im Cluster testen: 


`time hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -combiner reducer.py -input test_dir -output result_dir`