# 02a: First Steps with MapReduce

**Action:** test your python transcriptions on the Declaration of Independence, using a pipeline such as:

cat 1.txt | ./mapper.py | sort -k 1 | ./reducer.py

and check that the output looks reasonable. What happens if you don’t have the sorting step (“shuffle”) in the pipeline? Can you explain the output in terms of the operation of the reducer code?

You may need to give your python scripts the executable mode (permission) bit, by executing a command such as:

chmod +x mapper.py reducer.py

In [None]:
cat 1.txt |python ./mapper.py | sort -k 1 | python ./reducer.py
&	1
1776.	1
4,	1
a	15
A	1
abdicated	1
abolish	1
abolishing	3
absolute	3
Absolved	1
abuses	1
accommodation	1
accordingly	1
accustomed.	1
...

**Action:** using either method, check that the output file’s contents are the same as those from running the local word histogram job.

In [None]:
hadoop jar /usr/local/hadoop-2.6.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar   wordcount 1.txt output
16/02/16 12:38:49 INFO client.RMProxy: Connecting to ResourceManager at dsm1/158.223.50.51:8032
16/02/16 12:38:50 INFO input.FileInputFormat: Total input paths to process : 1
16/02/16 12:38:50 INFO mapreduce.JobSubmitter: number of splits:1
16/02/16 12:38:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1452790691884_0299
16/02/16 12:38:51 INFO impl.YarnClientImpl: Submitted application application_1452790691884_0299
16/02/16 12:38:51 INFO mapreduce.Job: The url to track the job: http://dsm1:8088/proxy/application_1452790691884_0299/
16/02/16 12:38:51 INFO mapreduce.Job: Running job: job_1452790691884_0299
16/02/16 12:38:55 INFO mapreduce.Job: Job job_1452790691884_0299 running in uber mode : false
16/02/16 12:38:55 INFO mapreduce.Job:  map 0% reduce 0%
16/02/16 12:39:00 INFO mapreduce.Job:  map 100% reduce 0%
16/02/16 12:39:09 INFO mapreduce.Job:  map 100% reduce 100%
16/02/16 12:39:09 INFO mapreduce.Job: Job job_1452790691884_0299 completed successfully
16/02/16 12:39:09 INFO mapreduce.Job: Counters: 49
	File System Counters
		FILE: Number of bytes read=9069
		FILE: Number of bytes written=228957
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=8275
		HDFS: Number of bytes written=6512
		HDFS: Number of read operations=6
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=1
		Launched reduce tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=2353
		Total time spent by all reduces in occupied slots (ms)=6464
		Total time spent by all map tasks (ms)=2353
		Total time spent by all reduce tasks (ms)=6464
		Total vcore-seconds taken by all map tasks=2353
		Total vcore-seconds taken by all reduce tasks=6464
		Total megabyte-seconds taken by all map tasks=2409472
		Total megabyte-seconds taken by all reduce tasks=6619136
	Map-Reduce Framework
		Map input records=8
		Map output records=1314
		Map output bytes=13430
		Map output materialized bytes=9069
		Input split bytes=101
		Combine input records=1314
		Combine output records=642
		Reduce input groups=642
		Reduce shuffle bytes=9069
		Reduce input records=642
		Reduce output records=642
		Spilled Records=1284
		Shuffled Maps =1
		Failed Shuffles=0
		Merged Map outputs=1
		GC time elapsed (ms)=52
		CPU time spent (ms)=1600
		Physical memory (bytes) snapshot=433057792
		Virtual memory (bytes) snapshot=2000158720
		Total committed heap usage (bytes)=402653184
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=8174
	File Output Format Counters 
		Bytes Written=6512
[cbutl002@dsm1 big_dat]$ 


In [None]:
# Output:
[cbutl002@dsm1 big_dat]$ hadoop fs -cat output/part-r-00000
&	1
--That	1
1776.	1
4,	1
A	1
Absolved	1
Acts	2
Administration	1
Allegiance	1
Alliances,	1
America,	2
And	1
Annihilation,	1
Appropriations	1
Arbitrary	1

Without the sorting step, the counter does not work properly, only counting each word a maximum of once.  The words are also printed in the order that they are written in the Declaration of Independence.  The output reflects the code of the reducer and mapper.  The mapper sorts each word in the text into two columns, that of the word, and that of its count (1 per word).  Then, the reducer reads this and goes through to check whether or not the word it reads is the same as a previous word, and if it is, it adds to its count.  

**Action:** Add the second Gutenberg e-text to your dataset, and re-run the streaming MapReduce job with both files as input. (You can either pass multiple -input switches to hadoop, or you can put both e-texts in an HDFS directory of their own, and pass the directory name itself as the -input). What do you expect to happen?

In [None]:
[cbutl002@dsm1 big_dat]$ hadoop jar /usr/local/hadoop-2.6.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar   wordcount 1.txt 2.txt output
16/02/16 12:42:29 INFO client.RMProxy: Connecting to ResourceManager at dsm1/158.223.50.51:8032
16/02/16 12:42:30 INFO input.FileInputFormat: Total input paths to process : 2
16/02/16 12:42:30 INFO mapreduce.JobSubmitter: number of splits:2
16/02/16 12:42:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1452790691884_0301
16/02/16 12:42:30 INFO impl.YarnClientImpl: Submitted application application_1452790691884_0301
16/02/16 12:42:30 INFO mapreduce.Job: The url to track the job: http://dsm1:8088/proxy/application_1452790691884_0301/
16/02/16 12:42:30 INFO mapreduce.Job: Running job: job_1452790691884_0301
16/02/16 12:42:39 INFO mapreduce.Job: Job job_1452790691884_0301 running in uber mode : false
16/02/16 12:42:39 INFO mapreduce.Job:  map 0% reduce 0%
16/02/16 12:42:44 INFO mapreduce.Job:  map 100% reduce 0%
16/02/16 12:42:49 INFO mapreduce.Job:  map 100% reduce 100%
16/02/16 12:42:50 INFO mapreduce.Job: Job job_1452790691884_0301 completed successfully
16/02/16 12:42:50 INFO mapreduce.Job: Counters: 50
	File System Counters
		FILE: Number of bytes read=19987
		FILE: Number of bytes written=356347
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=19211
		HDFS: Number of bytes written=13371
		HDFS: Number of read operations=9
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
	Job Counters 
		Launched map tasks=2
		Launched reduce tasks=1
		Data-local map tasks=1
		Rack-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=5551
		Total time spent by all reduces in occupied slots (ms)=2757
		Total time spent by all map tasks (ms)=5551
		Total time spent by all reduce tasks (ms)=2757
		Total vcore-seconds taken by all map tasks=5551
		Total vcore-seconds taken by all reduce tasks=2757
		Total megabyte-seconds taken by all map tasks=5684224
		Total megabyte-seconds taken by all reduce tasks=2823168
	Map-Reduce Framework
		Map input records=271
		Map output records=3085
		Map output bytes=31033
		Map output materialized bytes=19993
		Input split bytes=202
		Combine input records=3085
		Combine output records=1456
		Reduce input groups=1343
		Reduce shuffle bytes=19993
		Reduce input records=1456
		Reduce output records=1343
		Spilled Records=2912
		Shuffled Maps =2
		Failed Shuffles=0
		Merged Map outputs=2
		GC time elapsed (ms)=58
		CPU time spent (ms)=2280
		Physical memory (bytes) snapshot=716533760
		Virtual memory (bytes) snapshot=2998452224
		Total committed heap usage (bytes)=603979776
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	File Input Format Counters 
		Bytes Read=19009
	File Output Format Counters 
		Bytes Written=13371
[cbutl002@dsm1 big_dat]$ 


In [None]:
[cbutl002@dsm1 big_dat]$ hadoop fs -cat output/part-r-00000
"AS-IS".	1
"Defects".	1
"PROJECT	2
"Project	2
"Project").	1
"Right	1
"Small	6
"public	1
"small	1
#2]	1
&	1
(*)	1
(212-254-5093)	1
(72600.2026@compuserve.com);	1
(_)	1
(and	2
(as	1
(if	2
(or	3


I passed both 1.txt and 2.txt into the mapper and reducer. I thought that the list of words would be longer, and each word would have more entries, based on the two different texts.  One difference between running the job with hadoop was that the wordcounter ordered words first if they began with a capital letter.  

## 04a: Typed Bytes and Sequence Files

Disclaimer: I had a lot of trouble with the him-her typedbytes tasks.  This is my best effort to let you see my progress and understanding of the task. I figured it would be more valuable to try to understand the proper way to use Hadoop input, than to work on the third lab tasks, since they are unrealistically long. 

**Action:** the following segments of code should all be saved in a single file named typedbytes.py:

In [None]:
# %load /Users/cmbutler/Documents/BIG_DATA_APPLICATIONS/typedbytes.py


%load -r 1-3 ../code/typedbytes/typedbytes.py

#! /usr/bin/env python

import sys, struct

%load -s encode_string ../code/typedbytes/typedbytes.py

def encode_string(string):
    if isinstance(string, bytearray):
        b = string
    elif isinstance(string, str):
        b = bytearray(string)
    elif isinstance(string, unicode):
        b = bytearray(string, 'utf-8')
    return bytearray(0) + b'\x07' + struct.pack('>i', len(b)) + b



%load -s encode_file ../code/typedbytes/typedbytes.py

def encode_file(path):
    size = os.path.getsize(path)
    fh = open(path, 'rb')
    contents = fh.read()
    fh.close()
    return encode_string(path) + encode_string(contents)



%load -s read_typedbytes ../code/typedbytes/typedbytes.py

def read_typedbytes(fh, byte):
    if byte == 7:
        len, = struct.unpack('>i', fh.read(4))
        return fh.read(len)



%load -s typedbytes ../code/typedbytes/typedbytes.py

def typedbytes(fh):
    while True:
        x = fh.read(1)
        if x == '':
            break
        key = read_typedbytes(fh, ord(x))
        x = fh.read(1)
        if x == '':
            break
        value = read_typedbytes(fh, ord(x))
        yield (key, value)



**Action:** write a program which encodes the Project Gutenberg metadata as typedbytes, writing the result to standard output. Test it on a small subset of the metadata (you can look at the binary contents of the result with Unix utilities such as less) and od))

This code is adapted directly from the 04a worksheet.  Using a command line following this structure: 
find . -name '*.rdf' | xargs python encode.py
We can execute the following code:


In essence, this code imports the typed bytes module, with the path speficied.  Then, it takes the arguments given by the user, the file path to the data to be encoded.  From there, it writes to std.out the encoded file.

In [None]:
# From Kate on forum

# %load /Users/cmbutler/Documents/BIG_DATA_APPLICATIONS/encoder2.py
import sys, os
sys.path += ['.']
import typedbytes
for d, dirs, files in os.walk('./cache/epub/**'):
        for f in files:
                sys.stdout.write(str(typedbytes.encode_file(path)))

This code is similar, but it can be modified slightly to be run a bit simpler in python.  In the first code, the file path was given and the code runs through each.  In this code, os.walk function grabs data from each file in the specified file path, making it more useful for further editting. 

**Action:** generate a Sequence File containing the metadata for e-texts 1 to 9 of Project Gutenberg (for test purposes).

In [None]:
# The output:

SEQ/org.apache.hadoop.typedbytes.TypedBytesWritable/org.apache.hadoop.typedbytes.TypedBytesWritable *org.apache.hadoop.io.compress.DefaultCodec    ·ë6%0≥7∆˘HD‹
è

**Action:** construct this mapper, and test it on a small typed bytes input like:

<generate typedbytes> | python mapper.py | od -t x1

to check that the output typedbytes are well-formed.

This is the mapper I created for the rdf library data.  It combines the codes of 03b and 04a.

In [None]:
#! /usr/bin/env python

## this is a workaround for Python 2 encoding issues
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

sys.path += ['.']

import os, re, codecs

from rdflib import Graph
from rdflib.term import bind
from rdflib.namespace import Namespace, RDF

from typedbytes import typedbytes, encode_string

## need to do this explicitly in this version of RDFLib
DCTERMS = Namespace(u'http://purl.org/dc/terms/')
PGTERMS = Namespace(u'http://www.gutenberg.org/2009/pgterms/')

bind(DCTERMS.RFC4646, str)

for (key, value) in typedbytes(sys.stdin):
    g = Graph()
    g.parse(data=value)
    for x in g.triples((None, RDF['type'], PGTERMS['book'])):
        book = x[0]
        lang = [x for x in g.triples((book, DCTERMS['language'], None))]
        if not lang:
            continue
        lang = lang[0][2]
        language = [x for x in g.triples((lang, RDF['value'], None))]
        if not language:
            continue
        language = language[0][2]
        agent = [x for x in g.triples((book, DCTERMS['creator'], None))]
        if not agent:
            continue
        agent = agent[0][2]
        author = [x for x in g.triples((agent, PGTERMS['name'], None))]
        if not author:
            continue
        author = author[0][2]
        okey = book.toPython().split("/")[-1]
        ovalue = str(author.toPython()) + '\t' + str(language.toPython())
        sys.stdout.write("%s%s" % (encode_string(okey), encode_string(ovalue)))
        sys.stdout.flush()

This code reads the typedbytes input and sorts it.  

**Action:** run the mapreduce job above on the small subset of the catalogue, and check that the output from the job is as expected. How many mappers are used in the job? (Compare with the one-mapper-per-file when running on the catalogue files directly).

**Action:** construct the sequence file for the whole of the metadata catalogue, and run the MapReduce job above on it. How long does it take? If you increase the number of mappers from the default (using e.g. -D mapred.map.tasks=20), does the job finish faster?

This created an empty file.  I don't quite understand how, because the code was taken from the worksheet.  If anything, I think that the encoder isn't taking the input correctly, or there is some issue with the input.  I'm going to try to continue without the necessary data!

## 04b: Sequence Files for His and Hers Analysis

You will need to adapt the mapper and reducer from your previous attempts. Differences include:

1.    we now have the file path as the MapReduce key in the mapper, so we do not need to interrogate the environment for mapreduce_map_input_file;
2.    the output from the mapper and the reducer will need to be encoded as typed bytes, as with the mapper in the previous worksheet;
3.    both the mapper and the reducer will need to accept input as typed bytes, and iterate over all the given key, value pairs.

**Action:** adapt the mapper and reducer previously developed to handle typedbytes input/output. You should be able to test your mapper and reducer by producing some typedbytes data as input to a shell pipeline.

In [None]:
# the mapper

#! /usr/bin/env python

import sys, os, re, codecs
sys.path += ['.']

c = {}; catalog = codecs.open('catalog.dat','r','utf8')
for line in catalog:
    x = line.strip().split("\t")
    c[x[0]] = (x[1], x[2])

#input = os.environ['mapreduce_map_input_file'].split("/")[-1].split(".")[0]
for (key, value) in typedbytes(sys.stdin):
    try:
        him = 0; her = 0
        for line in sys.stdin:
            him += len(re.findall(r'\b(he|him|himself|his)\b', line))
            her += len(re.findall(r'\b(she|her|herself|hers)\b', line))
    except:
        None
    else:
        if c[input][1] != "en":
            None

sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
    
ovalue = str(him.toPython()) + '\t' + str(her.toPython())
sys.stdout.write("%s\t%s" % (typedbytes.encode_string(c[key][0]), typedbytes.encode_string(ovalue)))
sys.stdout.flush()    


Without the author catalogue data, it is impossible to check this sort of psuedocode.  So, I'll just explain my reasoning.

I kept the same code to run through each line in the author catalog.  The value of interest will still be in the same format as before.  Then, I modified the code so that it iterates through each key,value pair in typedbytes(sys.stdin).  I kept the same code for the loop to try, assuming there is no Unicode error.  If there is a unicode error, I didn't want to end the process, so I let it continue through the loop process.

Then, I write the counter data to sys.stdout with utf-8 encoding, as before.

The print statement needed to be modified to be in typedbytes. I kept the code from the mapper from the 04a labsheet, modifying it slightly for this task. 

In [None]:
# The reducer

#! /usr/bin/env python

import sys, os, re, codecs
sys.path += ['.']

current_M_word = None; current_M_count = 0; M_word = ''
current_F_word = None; current_F_count = 0; F_word = ''
for key, value in typedbyes(sys.stdin):
    (word, count) = line.split('\t', 1)
    if word == re.(r'\b(he|him|himself|his)\b', line)
        if current_M_word == current_M_word:
            M_count = int(M_count)
            current_M_count += M_count
        else:
            if current_M_word:
                print "%s\t%s" % (current_M_word, current_M_count)
                current_M_count = M_count
                current_M_word = M_word
        if M_word == current_M_word:
            print "%s\t%s" % (M_word, M_current_count)
            sys.stdout.write("%s\t%s" % (typedbytes.encode_string(c[key][0]), typedbytes.encode_string(ovalue)))
    if  word == re.(r'\b(she|her|herself|hers)\b', line)
        if current_F_word == current_F_word:
            F_count = int(M_count)
            current_F_count += F_count
        else:
            if current_F_word:
                print "%s\t%s" % (current_F_word, current_F_count)
                current_F_count = F_count
                current_F_word = F_word
        if F_word == current_F_word:
            print "%s\t%s" % (F_word, F_current_count)
            sys.stdout.write("%s\t%s" % (typedbytes.encode_string(c[key][0]), typedbytes.encode_string(ovalue)))
            


I designed the reducer similarly to the wordcount reducer.  I figured, essentially the same process will be happening, except it is necessary to check the word type, i.e. whether the word is a male or female pronoun.  

So, I read in the std.in typedbytes data, and split the data into it's proper columns.  Then, I have an if statement to check if the word is male or female oriented.  From there, I compute the counts of each words in the same way for each as it has been done with the wordcount reducer.

Armed with a catalogue file, typed bytes-aware mapper and reducer, and an encoded Sequence File of Project Gutenberg data, we should be in a position to run a MapReduce job. You will need to pass -file arguments not just for the mapper and reducer, but also for the typedbytes.py module and for the catalogue data file, as well as requesting -io typedbytes and -inputformat org.apache.hadoop.mapred.SequenceFileInputFormat. 

**Action:** run this on a subset of the data first, getting a sense of how long it takes, and then estimate an appropriate number of mappers to run for the full job. You may need to use -D mapred.child.java.opts=-Xmx1024M in order to complete the job on even the largest Project Gutenberg e-text.

I wish I had gotten to do this task! I'll keep trying!