# <center> MapReduce Design Patterns </center>

### <center>Based on "MapReduce Design Patterns" by Donald Miner and Adam Shook</center>

## <center> Linh B. Ngo </center>

### Motivation

- MapReduce is designed as a framework
    - The solution has to fit into the framework
    - There are clear boundaries on what can and cannot be done
    - To create a solution that fits within existing boundaries is a challenge
- MapReduce Design Pattern
    - A template for solving common and general data manipulation problems with MapReduce
    - Design patterns are not specific to a domain, but describe a general approach to solving a problem

### Basic Patterns

- Summarization
- Filtering
- Data Organization
- Join
- Metapatterns

### Summarization
- Grouping similar data together and perform an operation such as calculating a statistic, building an index, or just simply counting
- Examples:
    - Numerical summarizations
    - Inverted Index
    - Counting with Counters

In [None]:
!hdfs dfs

### Challenge

Create a directory named **intro-to-hadoop** inside your user directory on HDFS

In [None]:
!hdfs dfs -mkdir intro-to-hadoop

### Challenge

Upload the three data directories, **airlines**, **movielens**, and **text** into the newly created **intro-to-hadoop** directory. 

In [None]:
!hdfs dfs -put \
    /home/lngo/intro-to-hadoop/airlines/ \
    intro-to-hadoop/

In [None]:
!hdfs dfs -put \
    /home/lngo/intro-to-hadoop/movielens/ \
    intro-to-hadoop/

In [None]:
!hdfs dfs -put \
    /home/lngo/intro-to-hadoop/text/ \
    intro-to-hadoop/

### Challenge 

Check the health status of the directories above in HDFS using fsck:
```
hdfs fsck <path-to-directory> -files -blocks -locations
```

In [None]:
!hdfs fsck intro-to-hadoop/text -files -blocks -locations

## 1. The Hello World of Hadoop: Word Count

In [None]:
!hdfs dfs -cat intro-to-hadoop/text/gutenberg-shakespeare.txt \
    2>/dev/null | head -n 100

In [None]:
!hdfs dfs -cat intro-to-hadoop/text/gutenberg-shakespeare.txt \
    2>/dev/null \
    | head -n 100 \
    | python /home/lngo/intro-to-hadoop/wordcountMapper.py

In [None]:
!hdfs dfs -cat intro-to-hadoop/text/gutenberg-shakespeare.txt \
    2>/dev/null \
    | head -n 100 \
    | python /home/lngo/intro-to-hadoop/wordcountMapper.py \
    | sort

In [None]:
!hdfs dfs -cat intro-to-hadoop/text/gutenberg-shakespeare.txt \
    2>/dev/null \
    | head -n 100 \
    | python /home/lngo/intro-to-hadoop/wordcountMapper.py \
    | sort \
    | python /home/lngo/intro-to-hadoop/wordcountReducer.py

In [None]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/text/gutenberg-shakespeare.txt \
    -output intro-to-hadoop/output-wordcount \
    -file /home/lngo/intro-to-hadoop/wordcountMapper.py \
    -mapper wordcountMapper.py \
    -file /home/lngo/intro-to-hadoop/wordcountReducer.py \
    -reducer wordcountReducer.py \

In [None]:
!hdfs dfs -ls intro-to-hadoop/output-wordcount

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-wordcount/part-00000 \
    2>/dev/null | head -n 100

### Challenge

Modify *wordcountMapper.py* so that punctuations and capitalization are no longer factors in determining unique words

## 2. Movie Ratings and Recommendation

An independent movie company is looking to invest in a new movie project. With limited finance, the company wants to 
analyze the reaction of audiences, particularly toward various movie genres, in order to identify beneficial 
movie project to focus on. The company relies on data collected from a publicly available recommendation service 
by [MovieLens](http://dl.acm.org/citation.cfm?id=2827872). This 
[dataset](http://files.grouplens.org/datasets/movielens/ml-10m-README.html) contains **22884377** ratings and **586994**
 tag applications across **34208** movies. These data were created by **247753** users between January 09, 1995 and January 29, 2016. This dataset was generated on January 29, 2016. 

From this dataset, several analyses are possible, include the followings:
1.   Find movies which have the highest average ratings over the years and identify the corresponding genre.
2.   Find genres which have the highest average ratings over the years.
3.   Find users who rate movies most frequently in order to contact them for in-depth marketing analysis.

These types of analyses, which are somewhat ambiguous, demand the ability to quickly process large amount of data in 
elatively short amount of time for decision support purposes. In these situations, the sizes of the data typically 
make analysis done on a single machine impossible and analysis done using a remote storage system impractical. For 
remainder of the lessons, we will learn how HDFS provides the basis to store massive amount of data and to enable 
the programming approach to analyze these data.

In [None]:
!hdfs dfs -ls -h intro-to-hadoop/movielens

### 2.1 Find movies which have the highest average ratings over the years and identify the corresponding genre

- Find the average ratings of all movies over the years
- Identify the corresponding genres for each movie

In [None]:
!hdfs dfs -ls intro-to-hadoop/movielens

In [None]:
!hdfs dfs -cat intro-to-hadoop/movielens/README.txt

In [None]:
!hdfs dfs -cat intro-to-hadoop/movielens/links.csv \
    2>/dev/null | head -n 5

In [None]:
!hdfs dfs -cat intro-to-hadoop/movielens/movies.csv \
    2>/dev/null | head -n 5

In [None]:
!hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv \
    2>/dev/null | head -n 5

In [None]:
!hdfs dfs -cat intro-to-hadoop/movielens/tags.csv \
    2>/dev/null | head -n 5

### Note:

To write a MapReduce program, you have to be able to identify the necessary (Key,Value) that can contribute to the final realization of the required results. This is the reducing phase. From this (Key,Value) pair format, you will be able to develop the mapping phase. 

In [None]:
!hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv \
    2>/dev/null | head -n 5 | python /home/lngo/intro-to-hadoop/avgRatingMapper01.py

#### *Do we really need the headers?*

In [None]:
!hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv \
    2>/dev/null | head -n 5 | python /home/lngo/intro-to-hadoop/avgRatingMapper02.py

#### *The outcome is correct. Is it useful?*

In [None]:
!hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv \
    2>/dev/null | head -n 5 | python /home/lngo/intro-to-hadoop/avgRatingMapper03.py

In [None]:
!hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv 2>/dev/null \
    | head -n 5 \
    | python /home/lngo/intro-to-hadoop/avgRatingMapper03.py \
    | sort \
    | python /home/lngo/intro-to-hadoop/avgRatingReducer01.py

#### Non-HDFS correctness test

In [6]:
!hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv 2>/dev/null \
    | head -n 1000 \
    | python /home/lngo/intro-to-hadoop/avgRatingMapper03.py \
    | grep Matrix

Matrix, The (1999)	3.5
Matrix, The (1999)	4.0
Matrix, The (1999)	2.5
Matrix, The (1999)	4.5
Matrix, The (1999)	2.0


In [7]:
!hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv 2>/dev/null \
    | head -n 1000 \
    | python /home/lngo/intro-to-hadoop/avgRatingMapper03.py \
    | grep Matrix \
    | sort \
    | python /home/lngo/intro-to-hadoop/avgRatingReducer01.py

Matrix, The (1999)	3.3


In [8]:
# Manual calculation check via python
(3.5+4.0+2.5+4.5+2.0)/5

3.3

#### Full execution on HDFS

In [None]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-01 \
    -file /home/lngo/intro-to-hadoop/avgRatingMapper03.py \
    -mapper avgRatingMapper03.py \
    -file /home/lngo/intro-to-hadoop/avgRatingReducer01.py \
    -reducer avgRatingReducer01.py \

#### 2.1.1 First Error!!!

Go back to the first few lines of the previously and look for the INFO line **Submitted application application_xxxx_xxxx**. Running the logs command of yarn with the provided application ID is a straightforward way to access all available log information for that application. The syntax to view yarn log is:

```
!yarn logs -applicationId APPLICATION_ID
```

In [None]:
# Run the yarn view log command here
!yarn logs -applicationId application_1476193845089_0123

However, this information is often massive, as it contains the aggregated logs from all tasks (map and reduce) of the job, which can be in the hundreds. The example below demonstrates this problem by displaying all the possible information of a single-task MapReduce job.
In this example, the log of a container has three types of log (LogType): 
- stderr: Error messages from the actual task execution
- stdout: Print out messages if the task includes them
- syslog: Logging messages from the Hadoop MapReduce operation

One approach to reduce the number of possible output is to comment out all non-essential lines (lines containing **INFO**)

In [2]:
!'yarn logs -applicationId application_1476193845089_0123' | grep -v INFO

17/01/18 09:43:22 INFO impl.TimelineClientImpl: Timeline service address: http://dscim003.palmetto.clemson.edu:8188/ws/v1/timeline/
Unable to get ApplicationState. Attempting to fetch logs directly from the filesystem.
17/01/18 09:43:24 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
17/01/18 09:43:24 INFO compress.CodecPool: Got brand-new decompressor [.deflate]


Container: container_e176_1476193845089_0123_01_000004 on dsci001.palmetto.clemson.edu_45454
LogType:directory.info
Log Upload Time:Fri Oct 21 08:33:56 -0400 2016
LogLength:15155
Log Contents:
ls -l:
total 32
lrwxrwxrwx 1 lngo hadoop   75 Oct 21 08:33 avgRatingMapper03.py -> /data05/hadoop/yarn/local/usercache/lngo/filecache/138/avgRatingMapper03.py
lrwxrwxrwx 1 lngo hadoop   76 Oct 21 08:33 avgRatingReducer01.py -> /data06/hadoop/yarn/local/usercache/lngo/filecache/139/avgRatingReducer01.py
-rw------- 1 lngo hadoop  367 Oct 21 08:33 container_tokens
lrwxrwxrwx 1 lngo hadoop  101 Oct 21 08:33 job

Can we refine the information further:
- In a MapReduce setting, containers (often) execute the same task.

In [None]:
!'yarn logs -applicationId APPLICATION_ID' | 

In [3]:
!'yarn logs -applicationId application_1476193845089_0123' | grep '^Container:'

17/01/18 09:43:53 INFO impl.TimelineClientImpl: Timeline service address: http://dscim003.palmetto.clemson.edu:8188/ws/v1/timeline/
Unable to get ApplicationState. Attempting to fetch logs directly from the filesystem.
17/01/18 09:43:55 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
17/01/18 09:43:55 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
Container: container_e176_1476193845089_0123_01_000004 on dsci001.palmetto.clemson.edu_45454
Container: container_e176_1476193845089_0123_01_000028 on dsci001.palmetto.clemson.edu_45454
Container: container_e176_1476193845089_0123_01_000009 on dsci005.palmetto.clemson.edu_45454
Container: container_e176_1476193845089_0123_01_000025 on dsci014.palmetto.clemson.edu_45454
Container: container_e176_1476193845089_0123_01_000023 on dsci014.palmetto.clemson.edu_45454
Container: container_e176_1476193845089_0123_01_000001 on dsci015.palmetto.clemson.edu_45454
Container: container_e176_1476193845089_0123_01

Looking at the previous report, we can further identify container information:

```
Container: container_XXXXXX on  YYYY.palmetto.clemson.edu_ZZZZZ
```

- Container ID: container_XXXXXX
- Address of node where container is placed: YYYY.palmetto.clemson.edu

To request yarn to provide a more detailed log at container level, we run:
```
!\
    'yarn logs -applicationId APPLICATION_ID -containerId CONTAINER_ID --nodeAddress NODE_ADDRESS' \
    | grep -v INFO
```

In [5]:
!\
    'yarn logs -applicationId application_1476193845089_0123 -containerId container_e176_1476193845089_0123_01_000028 --nodeAddress dsci001.palmetto.clemson.edu' \
    | grep -v INFO

17/01/18 09:45:31 INFO impl.TimelineClientImpl: Timeline service address: http://dscim003.palmetto.clemson.edu:8188/ws/v1/timeline/
Unable to get ApplicationState. Attempting to fetch logs directly from the filesystem.
17/01/18 09:45:33 INFO impl.TimelineClientImpl: Timeline service address: http://dscim003.palmetto.clemson.edu:8188/ws/v1/timeline/
Unable to get logs for this container:container_e176_1476193845089_0123_01_000028for the application:application_1476193845089_0123
Please enable the application history service. Or 
Using yarn logs -applicationId <appId> -containerId <containerId> --nodeAddress <nodeHttpAddress> to get the container logs


This error message gives us some insights into the mechanism of Hadoop MapReduce. 
- Where are the map and reduce python scripts located?
- Where would the *movies.csv* file be, if the *-file* flag is used to upload this file?

In [None]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-01 \
    -file /home/lngo/intro-to-hadoop/avgRatingMapper04.py \
    -mapper avgRatingMapper04.py \
    -file /home/lngo/intro-to-hadoop/avgRatingReducer01.py \
    -reducer avgRatingReducer01.py \
    -file /home/lngo/intro-to-hadoop/movielens/movies.csv

#### 2.1.2 Second Error!!!

- HDFS is read only. Therefore, all output directories must not have existed prior to job submission
- This can be resolved either by specifying a new output directory or deleting the existing output directory

In [None]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-02 \
    -file /home/lngo/intro-to-hadoop/avgRatingMapper04.py \
    -mapper avgRatingMapper04.py \
    -file /home/lngo/intro-to-hadoop/avgRatingReducer01.py \
    -reducer avgRatingReducer01.py \
    -file /home/lngo/intro-to-hadoop/movielens/movies.csv

In [None]:
!hdfs dfs -ls intro-to-hadoop/output-movielens-02

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-02/part-00000 \
    2>/dev/null | head -n 10

*What about the movies' genres?*
- What is being passed from Map to Reduce?
- Can reducer do the same thing as mapper, that is, to load in external data?

In [None]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-03 \
    -file /home/lngo/intro-to-hadoop/avgRatingMapper02.py \
    -mapper avgRatingMapper02.py \
    -file /home/lngo/intro-to-hadoop/avgRatingReducer02.py \
    -reducer avgRatingReducer02.py \
    -file /home/lngo/intro-to-hadoop/movielens/movies.csv

In [None]:
!hdfs dfs -ls intro-to-hadoop/output-movielens-03

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-03/part-00000 \
    2>/dev/null | head -n 10

#### How does the number shuffly bytes in this example compare to the previous example?

### Challenge:

1. Modify *avgRatingReducer02.py* so that only movies with averaged ratings higher than 3.75 are collected
2. Further enhance your modification so that not only movies with averaged ratings higher than 3.75 are collected but these movies also need to be rated at least 5000 times. 

### 2.2 Find genres which have the highest average ratings over the years

- Identify the genres associated with a movie and its rating: Where to load *movies.csv* - Map side or Reduce side?
- Each movie can have multiple genres. This increases the amount of Key/Value pairs being shuffled. What can we do to optimize?

In [None]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-04 \
    -file /home/lngo/intro-to-hadoop/avgGenreMapper01.py \
    -mapper avgGenreMapper01.py \
    -file /home/lngo/intro-to-hadoop/avgRatingReducer01.py \
    -reducer avgRatingReducer01.py \
    -file /home/lngo/intro-to-hadoop/movielens/movies.csv

In [None]:
!hdfs dfs -ls intro-to-hadoop/output-movielens-04

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-04/part-00000

**Principle of Big Data Computation: Reduce data movement**

#### 2.2.1 Optimization through in-mapper reduction of Key/Value pairs

In [None]:
!hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv 2>/dev/null \
    | head -n 10 \
    | python /home/lngo/intro-to-hadoop/avgGenreMapper02.py \

In [None]:
!hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv 2>/dev/null \
    | head -n 10 \
    | python /home/lngo/intro-to-hadoop/avgGenreMapper02.py \
    | sort \
    | python /home/lngo/intro-to-hadoop/avgGenreReducer01.py

In [None]:
# make sure that the path to movies.csv is correct inside avgGenreMapper02.py
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-05 \
    -file /home/lngo/intro-to-hadoop/avgGenreMapper02.py \
    -mapper avgGenreMapper02.py \
    -file /home/lngo/intro-to-hadoop/avgGenreReducer01.py \
    -reducer avgGenreReducer01.py \
    -file /home/lngo/intro-to-hadoop/movielens/movies.csv

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-05/part-00000

**How different are the number of shuffle bytes between the two jobs?**

#### 2.2.2 Optimization through combiner function

In [None]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/text/gutenberg-shakespeare.txt \
    -output intro-to-hadoop/output-wordcount-01 \
    -file /home/lngo/intro-to-hadoop/wordcountMapper.py \
    -mapper wordcountMapper.py \
    -file /home/lngo/intro-to-hadoop/wordcountReducer.py \
    -reducer wordcountReducer.py

In [None]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/text/gutenberg-shakespeare.txt \
    -output intro-to-hadoop/output-wordcount-02 \
    -file /home/lngo/intro-to-hadoop/wordcountMapper.py \
    -mapper wordcountMapper.py \
    -file /home/lngo/intro-to-hadoop/wordcountReducer.py \
    -reducer wordcountReducer.py \
    -combiner wordcountReducer.py

In [None]:
# make sure that the path to movies.csv is correct inside avgGenreMapper02.py
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-06 \
    -file /home/lngo/intro-to-hadoop/avgGenreMapper02.py \
    -mapper avgGenreMapper02.py \
    -file /home/lngo/intro-to-hadoop/avgGenreReducer01.py \
    -reducer avgGenreReducer01.py \
    -file /home/lngo/intro-to-hadoop/avgGenreCombiner.py \
    -combiner avgGenreCombiner.py \
    -file /home/lngo/intro-to-hadoop/movielens/movies.csv

**How different are the number of shuffle bytes between the two jobs?**

### 2.3 Find users who rate movies most frequently in order to contact them for in-depth marketing analysis

- How do you define "frequently"?
    - At least once per week?

In [None]:
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-07 \
    -file /home/lngo/intro-to-hadoop/userMapper01.py \
    -mapper userMapper01.py \
    -file /home/lngo/intro-to-hadoop/userReducer01.py \
    -reducer userReducer01.py

#### Yay, error!!!

- You can see Yarn attemps to retry containers
- Since these are logical errors and not physical errors, resistance is futile!
- Yarn finally gets bored and shuts the job down ...

#### Challenge

Get the list of containers from the failed application

In [None]:
!'yarn logs -applicationId application_1476193845089_0282' | grep '^Container:'

**Where is the error message?**

In [None]:
!\
    'yarn logs -applicationId application_1476193845089_0282 -containerId container_e176_1476193845089_0282_01_000010 --nodeAddress dsci014.palmetto.clemson.edu' \
    | grep -v INFO

**Now that you know what the error is, how can you fix it?**
- What is the cause?

In [None]:
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-07-debug
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-07-debug \
    -file /home/lngo/intro-to-hadoop/userMapper01.py \
    -mapper userMapper01.py \
    -file /home/lngo/intro-to-hadoop/userDebugReducer01.py \
    -reducer userDebugReducer01.py

In [None]:
!hdfs dfs -ls intro-to-hadoop/output-movielens-07-debug

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-07-debug/part-00000 2>/dev/null \
    | head -n 100

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-movielens-07-debug/part-00000 2>/dev/null \
    | wc -l

#### Challenge

Modify the reducer to correct the above error

#### Challenge

From 2.2, we know which genre has the highest rating. Can we enhance the user study so that only users who provide **frequent** reviews on movies contain the genre with the highest rating are selected?

## <center> Final Cleanup </center>

Executing the cell below will clean up all HDFS output directories created as a result of previous MapReduce programs. 

In [None]:
!hdfs dfs -ls intro-to-hadoop

In [None]:
!hdfs dfs -rm -r intro-to-hadoop/output-wordcount
!hdfs dfs -rm -r intro-to-hadoop/output-wordcount-01
!hdfs dfs -rm -r intro-to-hadoop/output-wordcount-02
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-01
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-02
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-03
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-04
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-05
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-06
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-07
!hdfs dfs -rm -r intro-to-hadoop/output-movielens-07-debug