<DIV ALIGN=CENTER>

# Introduction to Map/Reduce
## Professor Robert J. Brunner
  
</DIV>  
-----
-----

## Introduction

In this IPython Notebook, we introduce the map/reduce programming
paradigm. Simply put, this approach to computing breaks tasks down into
a map phase (where an algorithm is mapped onto data) and a reduce phase,
where the outputs of the map phase are aggregated into a concise output.
The map phase is designed to be parallel, and to move the computation to
the data, which, when using HDFS, can be widely distributed. In this
case, a map phase can be executed against a large quantity of data very
quickly. The map phase identifies keys and associates with them a value.
The reduce phase collects keys and aggregates their values. The standard
example used to demonstrate this programming approach is a word count
problem, where words (or tokens) are the keys) and the number of
occurrences of each word (or token) is the value.

As this technique was popularized by large web search companies like
Google and Yahoo who were processing large quantities of unstructured
text data, this approach quickly became popular for a wide range of
problems.  Of course, not every problem can be transformed into a
map-reduce approach, which is why we will explore Spark in several
weeks. The standard MapReduce approach uses Hadoop, which was built
using Java. Rather than switching to a new language, however, we will
use Hadoop Streaming to execute Python code. In the rest of this
notebook, we introduce a simple Python WordCount example code. We first
demonstrate this code running at the Unix command line, before switching
to running the code by using Hadoop Streaming.

-----






-----
### Mapper: Word Count

The first Python code we will write is the map Python program. This
program simply reads data from STDIN, tokenizes each line into words and
outputs each word on a separate line along with a count of one. Thus our
map program generates a list of word tokens as the keys and the value is
always one.

-----

In [1]:
%%writefile /home/data_scientist/hadoop/mapper.py
#!/usr/bin/env python3

# These examples are based off the blog post by Michale Noll:
# 
# http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
#

import sys

# We explicitly define the word/count separator token.
sep = '\t'

# We open STDIN and STDOUT
with sys.stdin as fin:
    with sys.stdout as fout:
    
        # For every line in STDIN
        for line in fin:
        
            # Strip off leading and trailing whitespace
            line = line.strip()
            
            # We split the line into word tokens. Use whitespace to split.
            # Note we don't deal with punctuation.
            
            words = line.split()
            
            # Now loop through all words in the line and output

            for word in words:
                fout.write("{0}{1}1\n".format(word, sep))

Writing /home/data_scientist/hadoop/mapper.py


-----

### Reducer: Word Count

The second Python program we write is our reduce program. In this code,
we read key-value pairs from STDIN and use the fact that the Hadoop
process first sorts all key-value pairs before sending the map output to
the reduce process to accumulate the cumulative count of each word. The
following code could easily be made more sophisticated by using `yield`
statements and iterators, but for clarity we use the simple approach of
tracking when the current word becomes different than the previous word
to output the key-cumulative count pairs.

-----

In [2]:
%%writefile /home/data_scientist/hadoop/reducer.py
#!/usr/bin/env python3

import sys

# We explicitly define the word/count separator token.
sep = '\t'

# We open STDIN and STDOUT
with sys.stdin as fin:
    with sys.stdout as fout:
    
        # Keep track of current word and count
        cword = None
        ccount = 0
        word = None
   
        # For every line in STDIN
        for line in fin:
        
            # Strip off leading and trailing whitespace
            # Note by construction, we should have no leading white space
            line = line.strip()
            
            # We split the line into a word and count, based on predefined
            # separator token.
            #
            # Note we haven't dealt with punctuation.
            
            word, scount = line.split('\t', 1)
            
            # We will assume count is always an integer value
            
            count = int(scount)
            
            # word is either repeated or new
            
            if cword == word:
                ccount += count
            else:
                # We have to handle first word explicitly
                if cword != None:
                    fout.write("{0:s}{1:s}{2:d}\n".format(cword, sep, ccount))
                
                # New word, so reset variables
                cword = word
                ccount = count
        else:
            # Output final word count
            if cword == word:
                fout.write("{0:s}{1:s}{2:d}\n".format(word, sep, ccount))

Writing /home/data_scientist/hadoop/reducer.py


-----
### Testing Python Map-Reduce

Before we begin using Hadoop, we should first test our Python codes out
to ensure they work as expected. First, we should change the permissions
of the two programs to be executable, which we can do with the Unix
`chmod` command.

-----

In [3]:
%%bash

chmod u+x /home/data_scientist/hadoop/mapper.py
chmod u+x /home/data_scientist/hadoop/reducer.py

ls -la /home/data_scientist/hadoop

total 1560
drwxr-xr-x 2 data_scientist users    4096 Apr  1 21:06 .
drwxr-xr-x 1 data_scientist users    4096 Apr  1 21:05 ..
-rw-r--r-- 1 data_scientist users 1580927 Nov 21 11:09 book.txt
-rwxr--r-- 1 data_scientist users     849 Apr  1 21:06 mapper.py
-rwxr--r-- 1 data_scientist users    1496 Apr  1 21:06 reducer.py


-----

#### Testing Mapper.py

To test out the map Python code, we can run the Python `mapper.py` code
and specify that the code should redirect STDIN to read the book text
data. This is done in the following code cell, we pipe the output into
the Unix `head` command in order to restrict the output, which would be
one line per word found in the book text file. In the second code cell,
we next pipe the output of  `mapper.py` into the Unix `sort` command,
which is done automatically by Hadoop. To see the result of this
operation, we next pipe the result into the Unix `uniq` command to count
duplicates, pipe this result into a new sort routine to sort the output
by the number of occurrences of a word, and finally display the last few
lines with the Unix `tail` command to verify the program is operating
correctly.

With these sequence of Unix commands, we have (in a single-node)
replicated the steps performed by Hadoop MapReduce: Map, Sort, and
Reduce.

-----

In [4]:
%%bash

cd /home/data_scientist/hadoop

./mapper.py <  book.txt | wc -l

267949


In [5]:
%%bash

cd /home/data_scientist/hadoop

./mapper.py <  book.txt | sort -n -k 1 | \
 uniq -c -d | sort -n -k 1 | tail -10

   2391 with	1
   2430 I	1
   2712 he	1
   3035 his	1
   4619 in	1
   4790 to	1
   5841 a	1
   6551 and	1
   8134 of	1
  13608 the	1


-----

#### Testing Reducer.py

To test out the reduce Python code, we run the previous code cell, but
rather than piping the result into the Unix `tail` command, we pipe the
result of the sort command into the Python `reducer.py` code. This
simulates the Hadoop model, where the map output is key sorted before
being passed into the reduce process. First, we will simply count the
number of lines displayed by the reduce process, which will indicate the
number of  unique _word tokens_ in the book. Next, we will sort the
output by the number of times each word token appears and display the
last few lines to compare with the previous results.

-----

In [6]:
%%bash

cd /home/data_scientist/hadoop

./mapper.py <  book.txt | sort -n -k 1 | \
./reducer.py | wc -l

49316


In [7]:
%%bash

cd /home/data_scientist/hadoop

./mapper.py <  book.txt | sort -n -k 1 | \
./reducer.py | sort -n -k 2 | tail -10

with	2391
I	2430
he	2712
his	3035
in	4619
to	4790
a	5841
and	6551
of	8134
the	13608


-----

## Setup Local Hadoop Environment


-----

In [8]:
# make sure we stop the namenode and datanodes if there are any running from previous run
!$HADOOP_PREFIX/sbin/stop-dfs.sh
!$HADOOP_PREFIX/sbin/stop-yarn.sh

# Clean up temp files if there are any created during the previous Hadoop operation.
!rm -rf /tmp/*

# Format the namenode and delete all files in our HDFS.
!echo "Y" | $HADOOP_PREFIX/bin/hdfs namenode -format 2> /dev/null

Stopping namenodes on [eeacbe10081d]
eeacbe10081d: no namenode to stop
localhost: no datanode to stop
Stopping secondary namenodes [0.0.0.0]
0.0.0.0: no secondarynamenode to stop
stopping yarn daemons
no resourcemanager to stop
localhost: no nodemanager to stop
no proxyserver to stop
rm: cannot remove ‘/tmp/hsperfdata_root’: Operation not permitted
Formatting using clusterid: CID-a0918add-0e43-4739-a0e0-e574759bd7ed


In [9]:
# Restart namenode and datanodes
!$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
!$HADOOP_PREFIX/sbin/start-dfs.sh
!$HADOOP_PREFIX/sbin/start-yarn.sh

Starting namenodes on [eeacbe10081d]
eeacbe10081d: starting namenode, logging to /usr/local/hadoop/logs/hadoop-data_scientist-namenode-eeacbe10081d.out
localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-data_scientist-datanode-eeacbe10081d.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-data_scientist-secondarynamenode-eeacbe10081d.out
starting yarn daemons
starting resourcemanager, logging to /usr/local/hadoop/logs/yarn--resourcemanager-eeacbe10081d.out
localhost: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-data_scientist-nodemanager-eeacbe10081d.out


In [10]:
# Sometimes when the namenode is restarted, it enteres Safe Mode, 
# not allowing any changes to the file system. 
# We do want to make changes, so we manually leave Safe Mode.

!$HADOOP_PREFIX/bin/hdfs dfsadmin -safemode leave

!$HADOOP_PREFIX/bin/hdfs dfs -mkdir -p /user/$NB_USER

Safe mode is OFF


-----

## Python Hadoop Streaming

We are now ready to actually run our Python codes via Hadoop Streaming.
The main command to perform this task is `$HADOOP_PREFIX/bin/hadoop jar
hadoop-streaming-X.X.X.jar`, where the current version of the streaming
jar file is `hadoop-streaming-2.7.2.jar` as shown in the following code
cell.

-----

In [11]:
!ls $HADOOP_PREFIX/share/hadoop/tools/lib/hadoop-streaming*

/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar


-----


Running this Hadoop command by supplying the `-help` flag will provide
a useful summary of the different options. Note that `jar` is short for
Java Archive, which is a compressed archive of compiled Java code that
can be executed to perform different operations. In this case, we will
run the Java Hadoop streaming jar file to enable our Python code to work
within Hadoop.

-----

In [12]:
# Run the Map Reduce task within Hadoop
!$HADOOP_PREFIX/bin/hadoop --help

Usage: hadoop [--config confdir] [COMMAND | CLASSNAME]
  CLASSNAME            run the class named CLASSNAME
 or
  where COMMAND is one of:
  fs                   run a generic filesystem user client
  version              print the version
  jar <jar>            run a jar file
                       note: please use "yarn jar" to launch
                             YARN applications, not this command.
  checknative [-a|-h]  check native hadoop and compression libraries availability
  distcp <srcurl> <desturl> copy file or directories recursively
  archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive
  classpath            prints the class path needed to get the
  credential           interact with credential providers
                       Hadoop jar and the required libraries
  daemonlog            get/set the log level for each daemon
  trace                view and modify Hadoop tracing settings

Most commands print help when invoked w/

-----


For our map/reduce Python example to
run successfully, we will need to specify five flags:

1. `-files`: a comma separated list of files to be copied to the Hadoop cluster.
2. `-input`: the HDFS input file(s) to be used for the map task.
3. `-output`: the HDFS output directory, used for the reduce task.
4. `-mapper`: the command to run for the map task.
5. `-reducer`: the command to run for the reduce task.

Given our previous setup, we will run the full command as follows:

    $HADOOP_PREFIX/bin/hadoop jar hs.jar -files mapper.py,reducer.py -input wc/in \
        -output wc/out -mapper mapper.py -reducer reducer.py 

When this command is run, a series of messages will be displayed to the
screen (via STDERR) showing the progress of our Hadoop Streaming task.
At the end of the stream of information messages will be a statement
indicating the location of the output directory as shown below. Note, we
can append Bash redirection to ignore the Hadoop messages, simply by
appending `2> /dev/null` to the end of any Hadoop command, which sends
all STDERR messages to a non-existent Unix device, which is akin to
nothing. 

For example, to ignore any messages from the `hdfs dfs -rm -r -f wc/out`
command, we would use the following syntax:

```bash
$HADOOP_PREFIX/bin/hdfs dfs -rm -r -f wc/out 2> /dev/null
```

Doing this, however, does hide all messages, which can make debugging
problems more difficult. As a result, you should only do this when your
commands work correctly and you want to improve the appearance of your
Notebook.


-----

In [13]:
!$HADOOP_PREFIX/bin/hdfs dfs -rm -r -f wc
!$HADOOP_PREFIX/bin/hdfs dfs -mkdir -p wc/in
!$HADOOP_PREFIX/bin/hdfs dfs -put $HOME/hadoop/book.txt wc/in/book.txt

In [14]:
%%bash

# Change into correct working directory
cd $HOME/hadoop

# Delete output directory (if it exists)
$HADOOP_PREFIX/bin/hdfs dfs -rm -r -f wc/out

# Grab current streaming lib jar filename
streaming_file=$(ls $HADOOP_PREFIX/share/hadoop/tools/lib/hadoop-streaming*)

# Run the Map Reduce task within Hadoop
$HADOOP_PREFIX/bin/hadoop jar $streaming_file \
    -files mapper.py,reducer.py -input wc/in \
    -output wc/out -mapper mapper.py -reducer reducer.py

packageJobJar: [/tmp/hadoop-unjar5345246585859287398/] [] /tmp/streamjob7104143533387876812.jar tmpDir=null


17/04/01 21:07:38 INFO client.RMProxy: Connecting to ResourceManager at eeacbe10081d/172.17.0.2:8032
17/04/01 21:07:38 INFO client.RMProxy: Connecting to ResourceManager at eeacbe10081d/172.17.0.2:8032
17/04/01 21:07:39 INFO mapred.FileInputFormat: Total input paths to process : 1
17/04/01 21:07:39 INFO mapreduce.JobSubmitter: number of splits:2
17/04/01 21:07:39 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1491080842837_0001
17/04/01 21:07:39 INFO impl.YarnClientImpl: Submitted application application_1491080842837_0001
17/04/01 21:07:39 INFO mapreduce.Job: The url to track the job: http://eeacbe10081d:8088/proxy/application_1491080842837_0001/
17/04/01 21:07:39 INFO mapreduce.Job: Running job: job_1491080842837_0001
17/04/01 21:07:46 INFO mapreduce.Job: Job job_1491080842837_0001 running in uber mode : false
17/04/01 21:07:46 INFO mapreduce.Job:  map 0% reduce 0%
17/04/01 21:07:51 INFO mapreduce.Job:  map 100% reduce 0%
17/04/01 21:07:57 INFO mapreduce.Job:  map 100% r

-----

### Hadoop Results

In order to view the results of our Hadoop Streaming task, we must use
HDFS DFS commands to examine the directory and files generated by our
Python Map/Reduce programs. The following list of DFS commands might
prove useful to view the results of this map/reduce job.

```bash
$HADOOP_PREFIX/bin/hdfs dfs -ls wc

$HADOOP_PREFIX/bin/hdfs dfs -ls wc/out

$HADOOP_PREFIX/bin/hdfs dfs -count -h wc/out/part-00000

$HADOOP_PREFIX/bin/hdfs dfs -tail wc/out/part-00000
```

We demonstrate using several of these commands below. Note that these
Hadoop HDFS commands can be intermixed with Unix commands to perform
additional text processing. The important point is that direct file I/O
operations must use HDFS commands to work with the HDFS file system.

-----

In [15]:
!$HADOOP_PREFIX/bin/hdfs dfs -ls wc/out

Found 2 items
-rw-r--r--   1 data_scientist supergroup          0 2017-04-01 21:07 wc/out/_SUCCESS
-rw-r--r--   1 data_scientist supergroup     522427 2017-04-01 21:07 wc/out/part-00000


In [16]:
!$HADOOP_PREFIX/bin/hdfs dfs -count -h wc/out/part-00000

           0            1            510.2 K wc/out/part-00000


In [17]:
!$HADOOP_PREFIX/bin/hdfs dfs -tail wc/out/part-00000 | tail -10

“Come	1
“Defects,”	1
“I	1
“Information	1
“J”	1
“Plain	2
“Project	5
“Right	1
“Viator”	1
•	1


-----

To compare this map/reduce Hadoop Streaming task output to our previous
python only output, we must apply several Unix commands as follows:

```bash
`$HADOOP_PREFIX/bin/hdfs dfs -cat wc/out/part-00000 | sort -n -k 2 | tail -10

```

This is demonstrated below, where the output should match the Python
only map-reduce approach.

-----

In [18]:
!$HADOOP_PREFIX/bin/hdfs dfs -cat wc/out/part-00000 | sort -n -k 2 | tail -10

with	2391
I	2430
he	2712
his	3035
in	4619
to	4790
a	5841
and	6551
of	8134
the	13608


### Hadoop Cleanup

Following the successful run of our map/reduce Python programs, we have
created a new directory `wc/out`, which contains two files. If we wish
to rerun this Hadoop Streaming map/reduce task, we must either specify a
different output directory, or else we must clean up the results of the
previous run. To remove the output directory, we can simply use the HDFS
`-rm -r -f wc/out` command, which will immediately delete the `wc/out`
directory. The successful completion of this command is indicated by
Hadoop, and this can also be verified by listing the contents of the
`wc` directory.


-----

In [19]:
# !$HADOOP_PREFIX/bin/hdfs dfs -r -f wc/out

# Having the namenode and datanodes running in the background consumes quite a bit of memory. 
# So I think we should shut down the nodes at the end of the notebook:

!$HADOOP_PREFIX/sbin/stop-dfs.sh
!$HADOOP_PREFIX/sbin/stop-yarn.sh

Stopping namenodes on [eeacbe10081d]
eeacbe10081d: stopping namenode
localhost: stopping datanode
Stopping secondary namenodes [0.0.0.0]
0.0.0.0: stopping secondarynamenode
stopping yarn daemons
stopping resourcemanager
localhost: stopping nodemanager
localhost: nodemanager did not stop gracefully after 5 seconds: killing with kill -9
no proxyserver to stop


-----

### Student Activity

In the preceding cells, we introduced Hadoop map/reduce by using a
simple word count task. Now that you have run the Notebook, go back and
make the following changes to see how the results change.

1. We ignored punctuation, modify the original mapper Python code to
token on white space or punctuation. How does this change the Python
map-reduce output?
2. Try downloading a different text from Project Gutenberg. Can you make
your map-reduce application run across multiple texts?
3. Can you make your map-reduce code compute bi-grams instead of
unigrams?

-----