# MapReduce

We will use Hadoop Streaming to execute a MapReduce code written in Python.

In [1]:
import os

We will use the [airline on-time performance data](http://stat-computing.org/dataexpo/2009/), but before proceeding, recall that the data set is encoded in `latin-1`. However, the Python 3 interpreter expects the standard input and output to be in `utf-8` encoding. Thus, we have to explicitly state that the Python interpreter should use `latin-1` for all IO operations, which we can do by setting the Python environment variable `PYTHONIOENCODING` equal to `latin-1`. We can set the environment variables of the current IPython kernel by modifying the `os.environ` dictionary.

In [2]:
os.environ['PYTHONIOENCODING'] = 'latin-1'

Let's use the shell to check if the variable is set correctly. If you are not familiar with the following syntax (i.e., Python variable = ! shell command), [this notebook](https://github.com/UI-DataScience/info490-fa15/blob/master/Week4/assignment/unix_ipython.ipynb) from the previous semester might be useful.

In [3]:
python_io_encoding = ! echo $PYTHONIOENCODING
assert_equal(python_io_encoding.s, 'latin-1')

## Mapper

Write a Python script that
  - Reads data from `STDIN`,
  - Skips the first line (The first line of `2001.csv` is the header that has the column titles.)
  - Outputs to `STDOUT` the `Origin` and `DepDelay` columns separated with a tab.

In [4]:
%%writefile mapper.py
#!/usr/bin/env python3

import sys

for line in sys.stdin:
    if not(line.startswith('Year')):
        currentline = line.split(",")
        print(currentline[16]+'\t'+currentline[15])

Overwriting mapper.py


We need make the file executable.

In [5]:
! chmod u+x mapper.py

Before testing the mapper code on the entire data set, let's first create a small file and test our code on this small data set.

In [6]:
! head -n 50 $HOME/data/2001.csv > 2001.csv.head
map_out_head = ! ./mapper.py < 2001.csv.head
print('\n'.join(map_out_head))

BWI	-4
BWI	-5
BWI	11
BWI	-3
BWI	0
BWI	-3
BWI	-8
BWI	-6
BWI	2
BWI	2
BWI	2
BWI	-6
BWI	-8
BWI	-3
BWI	-5
PHL	20
PHL	100
PHL	1
PHL	-2
PHL	-7
PHL	NA
PHL	4
PHL	3
PHL	-4
PHL	-5
PHL	-4
PHL	17
PHL	-5
PHL	0
PHL	-2
PHL	97
PHL	3
PHL	-4
PHL	NA
PHL	17
PHL	NA
PHL	2
PHL	27
PHL	3
PHL	-6
PHL	-3
PHL	-3
PHL	-5
PHL	-2
PHL	-3
PHL	1
CLT	32
CLT	18
CLT	38


## Reducer

Write a Python script that

  - Reads key-value pairs from `STDIN`,
  - Computes the minimum and maximum departure delays at each airport,
  - Outputs to `STDOUT` the airports and the minimum and maximum departure delays at each airport, separated with tabs.
  
For example,

```shell
$ ./mapper.py < 2001.csv.head | sort -n -k 1 | ./reducer.py
```

should give

```
BWI	-8	11
CLT	18	38
PHL	-7	100
```

In [23]:
%%writefile reducer.py
#!/usr/bin/env python3

import sys

sep = '\t'

minmax = dict()

for line in sys.stdin:
    # Strip off leading and trailing whitespace
    # Note by construction, we should have no leading white space
    line = line.strip()
            
    # We split the line into a word and count, based on predefined
    # separator token.
    #
    # Note we haven't dealt with punctuation.
            
    air, depdel = line.split('\t', 1)
    
    
    # We will assume count is always an integer value
            
    if depdel=='NA':
        dep = 0
    else:
        dep = int(depdel)
    
    if air not in minmax.keys():
        minmax[air] = [float("inf"),-float("inf")]
    
    # word is either repeated or new
            
    if dep>minmax[air][1]:
        minmax[air][1]=dep
                
    if dep<minmax[air][0]:
        minmax[air][0]=dep

for i in sorted(minmax):
    print(i+'\t'+str(minmax[i][0])+'\t'+str(minmax[i][1]))

Overwriting reducer.py


In [24]:
! chmod u+x reducer.py

In [25]:
red_head_out = ! ./mapper.py < 2001.csv.head | sort -n -k 1 | ./reducer.py
print('\n'.join(red_head_out))

BWI	-8	11
CLT	18	38
PHL	-7	100


If the previous tests on the smaller data set were successful, we can run the mapreduce on the entire data set.

In [27]:
mapred_out = ! ./mapper.py < $HOME/data/2001.csv | sort -n -k 1 | ./reducer.py
print('\n'.join(mapred_out[:10]))

ABE	-30	666
ABI	-19	285
ABQ	-30	576
ACT	-22	234
ACY	106	106
ADQ	-17	348
AKN	-23	322
ALB	-25	1037
AMA	-16	635
ANC	-61	1287


## HDFS: Reset

We will do some cleaning up before we run Hadoop streaming. Let's first stop the [namenode and datanodes](https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html).

In [29]:
! $HADOOP_PREFIX/sbin/stop-dfs.sh
! $HADOOP_PREFIX/sbin/stop-yarn.sh

Stopping namenodes on [982f5797ebdb]
982f5797ebdb: stopping namenode
localhost: stopping datanode
Stopping secondary namenodes [0.0.0.0]
0.0.0.0: stopping secondarynamenode
stopping yarn daemons
stopping resourcemanager
localhost: stopping nodemanager
no proxyserver to stop


If there are any temporary files created during the previous Hadoop operation, we want to clean them up.

In [30]:
! rm -rf /tmp/*

We will simply [format the namenode](https://wiki.apache.org/hadoop/GettingStartedWithHadoop#Formatting_the_Namenode) and delete all files in our HDFS. Note that our HDFS is in an ephemeral Docker container, so all data will be lost anyway when the Docker container is shut down.

In [31]:
! echo "Y" | $HADOOP_PREFIX/bin/hdfs namenode -format 2> /dev/null

Formatting using clusterid: CID-f462fe0c-9281-4ff0-8b1d-02b163a16666


After formatting the namenode, we restart the namenode and datanodes.

In [32]:
!$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
!$HADOOP_PREFIX/sbin/start-dfs.sh
!$HADOOP_PREFIX/sbin/start-yarn.sh

Starting namenodes on [982f5797ebdb]
982f5797ebdb: starting namenode, logging to /usr/local/hadoop/logs/hadoop-data_scientist-namenode-982f5797ebdb.out
localhost: starting datanode, logging to /usr/local/hadoop/logs/hadoop-data_scientist-datanode-982f5797ebdb.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop/logs/hadoop-data_scientist-secondarynamenode-982f5797ebdb.out
starting yarn daemons
starting resourcemanager, logging to /usr/local/hadoop/logs/yarn--resourcemanager-982f5797ebdb.out
localhost: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-data_scientist-nodemanager-982f5797ebdb.out


Sometimes when the namenode is restarted, it enteres Safe Mode, not allowing any changes to the file system. We do want to make changes, so we manually leave Safe Mode.

In [33]:
! $HADOOP_PREFIX/bin/hdfs dfsadmin -safemode leave

Safe mode is OFF


## HDFS: Create directory

- Create a new directory in HDFS at `/user/data_scientist`.

In [39]:
# Create a new directory in HDFS at /user/data_scientist.

!$HADOOP_PREFIX/bin/hdfs dfs -mkdir /user
!$HADOOP_PREFIX/bin/hdfs dfs -mkdir /user/data_scientist

mkdir: `/user': File exists


In [40]:
ls_user = ! $HADOOP_PREFIX/bin/hdfs dfs -ls /user/
print('\n'.join(ls_user))

Found 1 items
drwxr-xr-x   - data_scientist supergroup          0 2016-04-17 22:22 /user/data_scientist


In [41]:
assert_true('/user/data_scientist' in ls_user.s)

- Create a new directory in HDFS at `/user/data_scientist/wc/in`

In [42]:
# Create a new directory in HDFS at `/user/data_scientist/wc/in`

!$HADOOP_PREFIX/bin/hdfs dfs -mkdir /user/data_scientist/wc
!$HADOOP_PREFIX/bin/hdfs dfs -mkdir /user/data_scientist/wc/in

In [43]:
ls_wc = ! $HADOOP_PREFIX/bin/hdfs dfs -ls wc
print('\n'.join(ls_wc))

Found 1 items
drwxr-xr-x   - data_scientist supergroup          0 2016-04-17 22:26 wc/in


## HDFS: Copy

- Copy `/home/data_scientist/data/2001.csv` from local file system into our new HDFS directory `wc/in`.

In [45]:
# Copy `/home/data_scientist/data/2001.csv` from local file system into our new HDFS directory `wc/in`.

!$HADOOP_PREFIX/bin/hdfs dfs -put /home/data_scientist/data/2001.csv /user/data_scientist/wc/in

In [46]:
ls_wc_in = ! $HADOOP_PREFIX/bin/hdfs dfs -ls wc/in
print('\n'.join(ls_wc_in))

Found 1 items
-rw-r--r--   1 data_scientist supergroup  600411462 2016-04-17 22:27 wc/in/2001.csv


## Python Hadoop Streaming

- Run `mapper.py` and `reducer.py` via Hadoop Streaming.
- Use `/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar`.
- We need to pass the `PYTHONIOENCODING` environment variable to our Hadoop streaming task. To find out how to set `PYTHONIOENCODING` to `latin-1` in a Hadoop streaming task, use the `--help` and `-info` options.

In [75]:
# Run Python code via Hadoop streaming
!$HADOOP_PREFIX/bin/hdfs dfs -rm -r -f /user/data_scientist/wc/out
!$HADOOP_PREFIX/bin/hadoop jar $HADOOP_PREFIX/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar -files mapper.py,reducer.py -input /user/data_scientist/wc/in -output /user/data_scientist/wc/out -mapper mapper.py -reducer reducer.py -cmdenv PYTHONIOENCODING='latin-1'

packageJobJar: [/tmp/hadoop-unjar7322456167908406384/] [] /tmp/streamjob3140646852738829676.jar tmpDir=null
16/04/17 22:52:11 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8032
16/04/17 22:52:11 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8032
16/04/17 22:52:12 INFO mapred.FileInputFormat: Total input paths to process : 1
16/04/17 22:52:12 INFO mapreduce.JobSubmitter: number of splits:5
16/04/17 22:52:12 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1460930317330_0003
16/04/17 22:52:12 INFO impl.YarnClientImpl: Submitted application application_1460930317330_0003
16/04/17 22:52:12 INFO mapreduce.Job: The url to track the job: http://982f5797ebdb:8088/proxy/application_1460930317330_0003/
16/04/17 22:52:12 INFO mapreduce.Job: Running job: job_1460930317330_0003
16/04/17 22:52:18 INFO mapreduce.Job: Job job_1460930317330_0003 running in uber mode : false
16/04/17 22:52:18 INFO mapreduce.Job:  map 0% reduce 0%
16/04/1

In [76]:
ls_wc_out = ! $HADOOP_PREFIX/bin/hdfs dfs -ls wc/out
print('\n'.join(ls_wc_out))

Found 2 items
-rw-r--r--   1 data_scientist supergroup          0 2016-04-17 22:52 wc/out/_SUCCESS
-rw-r--r--   1 data_scientist supergroup       2821 2016-04-17 22:52 wc/out/part-00000


In [78]:
stream_out = ! $HADOOP_PREFIX/bin/hdfs dfs -cat wc/out/part-00000
print('\n'.join(stream_out[:10]))

ABE	-30	666
ABI	-19	285
ABQ	-30	576
ACT	-22	234
ACY	106	106
ADQ	-17	348
AKN	-23	322
ALB	-25	1037
AMA	-16	635
ANC	-61	1287


## Cleanup

In [80]:
! $HADOOP_PREFIX/bin/hdfs dfs -rm -r -f -skipTrash wc/out

Deleted wc/out


# Apache Pig

We will run Pig to compute the average rating for each movie in the MovieLens data set.

Since each data file in the MovieLens includes a header row, we employ the following Bash script to 
remove the first line from each file and make copies of the data files we will be using.

In [2]:
%%bash

sed '1d' $HOME/data/ml-latest-small/ratings.csv > ratings.csv
sed '1d' $HOME/data/ml-latest-small/movies.csv > movies.csv

echo
echo '***** Ratings File *****'
head ratings.csv

echo
echo '***** Movies File *****'
head movies.csv


***** Ratings File *****
1,16,4.0,1217897793
1,24,1.5,1217895807
1,32,4.0,1217896246
1,47,4.0,1217896556
1,50,4.0,1217896523
1,110,4.0,1217896150
1,150,3.0,1217895940
1,161,4.0,1217897864
1,165,3.0,1217897135
1,204,0.5,1217895786

***** Movies File *****
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


## Pig: Average

Write a Pig script that

- Imports `ratings.csv` and `movies.csv`,
- Groups all reviews by movie ID and uses [AVG](https://pig.apache.org/docs/r0.7.0/piglatin_ref2.html#AVG) to compute the average rating for each movie,
- Joins the two data sets on the movie ID column, and
- Uses the DUMP command to display the first 10 rows.

The resulting schema should contain five columns:

```
(movie ID from ratings.csv, average rating, movie ID from movies.csv, title, genre)
```

For example, the first line should be

```
(1,3.9073275862068964,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
```

In [3]:
%%writefile average.pig

ratings = LOAD 'ratings.csv' USING PigStorage(',') AS (userID:int, movieID:int, rating:double, timestamp:int) ;
movies = LOAD 'movies.csv' USING PigStorage(',') AS (movieID:int, title:chararray, genre:chararray) ;
groups = GROUP ratings BY movieID ;
avg = FOREACH groups GENERATE group, AVG (ratings.rating) ;
results = JOIN avg by group, movies by movieID ;
Top_Results = LIMIT results 10 ;
DUMP Top_Results

Writing average.pig


We run Pig in local mode and redirect the standard error output to a file.

In [4]:
average_ratings = !pig -x local -f average.pig 2> pig_stderr.log
print('\n'.join(average_ratings))

(1,3.9073275862068964,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy)
(2,3.3532608695652173,2,Jumanji (1995),Adventure|Children|Fantasy)
(3,3.189655172413793,3,Grumpier Old Men (1995),Comedy|Romance)
(4,2.8181818181818183,4,Waiting to Exhale (1995),Comedy|Drama|Romance)
(5,3.25,5,Father of the Bride Part II (1995),Comedy)
(6,4.073913043478261,6,Heat (1995),Action|Crime|Thriller)
(7,3.381818181818182,7,Sabrina (1995),Comedy|Romance)
(8,3.6666666666666665,8,Tom and Huck (1995),Adventure|Children)
(9,2.869565217391304,9,Sudden Death (1995),Action)
(10,3.6,10,GoldenEye (1995),Action|Adventure|Thriller)


## Cleanup

In [7]:
%%bash
# Remove pig log files
rm -f pig*.log

# Remove our pig scripts
rm -f *.pig

# Remove csv files
rm movies.csv ratings.csv