# <center> Introduction to Hadoop MapReduce </center>

## 3. Optimization

First principle of optimizing Hadoop workflow: **Reduce data movement in the shuffle phase**

In [11]:
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-movielens-02
!ssh dsciutil yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input /repository/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-02 \
    -file /home/lngo/git/intro-to-hadoop-python/avgRatingMapper04.py \
    -mapper avgRatingMapper04.py \
    -file /home/lngo/git/intro-to-hadoop-python/avgRatingReducer01.py \
    -reducer avgRatingReducer01.py \
    -file /home/lngo/git/intro-to-hadoop-python/movielens/movies.csv

17/09/11 15:15:00 INFO fs.TrashPolicyDefault: Moved: 'hdfs://dsci/user/lngo/intro-to-hadoop/output-movielens-02' to trash at: hdfs://dsci/user/lngo/.Trash/Current/user/lngo/intro-to-hadoop/output-movielens-021505157300121
17/09/11 15:15:01 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/home/lngo/git/intro-to-hadoop-python/avgRatingMapper04.py, /home/lngo/git/intro-to-hadoop-python/avgRatingReducer01.py, /home/lngo/git/intro-to-hadoop-python/movielens/movies.csv] [/usr/hdp/2.6.0.3-8/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.0.3-8.jar] /hadoop_java_io_tmpdir/streamjob1763328031961229584.jar tmpDir=null
17/09/11 15:15:03 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/09/11 15:15:03 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/09/11 15:15:03 INFO hdfs.DFSClient: Created HDFS_DELEGA

- What is being passed from Map to Reduce?
- Can reducer do the same thing as mapper, that is, to load in external data?
- If we load external data on the reduce side, do we need to do so on the map side?

In [13]:
%%writefile codes/avgRatingReducer02.py
#!/usr/bin/env python
import sys
import csv

movieFile = "./movies.csv"
movieList = {}

with open(movieFile, mode = 'r') as infile:
    reader = csv.reader(infile)
    for row in reader:
        movieList[row[0]] = {}
        movieList[row[0]]["title"] = row[1]
        movieList[row[0]]["genre"] = row[2]

current_movie = None
current_rating_sum = 0
current_rating_count = 0

for line in sys.stdin:
    line = line.strip()
    movie, rating = line.split("\t", 1)
    try:
        rating = float(rating)
    except ValueError:
        continue

    if current_movie == movie:
        current_rating_sum += rating
        current_rating_count += 1
    else:
        if current_movie:
            rating_average = current_rating_sum / current_rating_count
            movieTitle = movieList[current_movie]["title"]
            movieGenres = movieList[current_movie]["genre"]
            print ("%s\t%s\t%s" % (movieTitle, rating_average, movieGenres))    
        current_movie = movie
        current_rating_sum = rating
        current_rating_count = 1

if current_movie == movie:
    rating_average = current_rating_sum / current_rating_count
    movieTitle = movieList[current_movie]["title"]
    movieGenres = movieList[current_movie]["genre"]
    print ("%s\t%s\t%s" % (movieTitle, rating_average, movieGenres))

Overwriting codes/avgRatingReducer02.py


In [None]:
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-movielens-03
!ssh dsciutil yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-03 \
    -file /home/lngo/intro-to-hadoop/codes/avgRatingMapper02.py \
    -mapper avgRatingMapper02.py \
    -file /home/lngo/intro-to-hadoop/codes/avgRatingReducer02.py \
    -reducer avgRatingReducer02.py \
    -file /home/lngo/intro-to-hadoop/movielens/movies.csv

In [None]:
!ssh dsciutil hdfs dfs -ls intro-to-hadoop/output-movielens-03

In [None]:
!ssh dsciutil hdfs dfs -cat intro-to-hadoop/output-movielens-03/part-00000 \
    2>/dev/null | head -n 10

How does the number shuffle bytes in this example compare to the previous example?

#### Find genres which have the highest average ratings over the years

Common optimization approaches:

1. In-mapper reduction of key/value pairs
2. Additional combiner function

In [14]:
%%writefile codes/avgGenreMapper01.py
#!/usr/bin/env python
import sys
import csv

# for nonHDFS run
# movieFile = "./movielens/movies.csv"

# for HDFS run
#movieFile = "./movies.csv"
movieList = {}

with open(movieFile, mode = 'r') as infile:
    reader = csv.reader(infile)
    for row in reader:
        movieList[row[0]] = {}
        movieList[row[0]]["title"] = row[1]
        movieList[row[0]]["genre"] = row[2]

for oneMovie in sys.stdin:
    oneMovie = oneMovie.strip()
    ratingInfo = oneMovie.split(",")
    try:
        genreList = movieList[ratingInfo[1]]["genre"]
        rating = float(ratingInfo[2])
        for genre in genreList.split("|"):
            print ("%s\t%s" % (genre, rating))
    except ValueError:
        continue

Writing codes/avgGenreMapper01.py


In [15]:
%%writefile codes/avgGenreReducer01.py
#!/usr/bin/env python
import sys
import csv
import json

current_genre = None
current_rating_sum = 0
current_rating_count = 0

for line in sys.stdin:
    line = line.strip()
    genre, ratingString = line.split("\t", 1)
    ratingInfo = json.loads(ratingString)

    if current_genre == genre:
        try:
            current_rating_sum += ratingInfo["total_rating"]
            current_rating_count += ratingInfo["total_count"]
        except ValueError:
            continue    
    else:
        if current_genre:
            rating_average = current_rating_sum / current_rating_count
            print ("%s\t%s" % (current_genre, rating_average))    
        current_genre = genre
        try:
            current_rating_sum = ratingInfo["total_rating"]
            current_rating_count = ratingInfo["total_count"]
        except ValueError:
            continue

if current_genre == genre:
    rating_average = current_rating_sum / current_rating_count
    print ("%s\t%s" % (current_genre, rating_average))

Writing codes/avgGenreReducer01.py


In [None]:
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-movielens-04
!ssh dsciutil yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-04 \
    -file /home/lngo/intro-to-hadoop/codes/avgGenreMapper01.py \
    -mapper avgGenreMapper01.py \
    -file /home/lngo/intro-to-hadoop/codes/avgGenreReducer01.py \
    -reducer avgRatingReducer01.py \
    -file /home/lngo/intro-to-hadoop/movielens/movies.csv

In [None]:
!ssh dsciutil hdfs dfs -ls intro-to-hadoop/output-movielens-04

In [None]:
!ssh dsciutil hdfs dfs -cat intro-to-hadoop/output-movielens-04/part-00000

#### 2.2.1 Optimization through in-mapper reduction of Key/Value pairs

In [1]:
!ssh dsciutil hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv 2>/dev/null \
    | head -n 10 \

userId,movieId,rating,timestamp
1,122,2.0,945544824
1,172,1.0,945544871
1,1221,5.0,945544788
1,1441,4.0,945544871
1,1609,3.0,945544824
1,1961,3.0,945544871
1,1972,1.0,945544871
2,441,2.0,1008942733
2,494,2.0,1008942733


In [4]:
!ssh dsciutil hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv 2>/dev/null \
    | head -n 10 \
    | python /home/lngo/intro-to-hadoop/avgGenreMapper01.py \

Comedy	2.0
Romance	2.0
Action	1.0
Sci-Fi	1.0
Thriller	1.0
Crime	5.0
Drama	5.0
Comedy	4.0
Romance	4.0
Drama	3.0
Thriller	3.0
Drama	3.0
Horror	1.0
Comedy	2.0
Action	2.0
Adventure	2.0
Thriller	2.0


In [22]:
%%writefile codes/avgGenreMapper02.py
#!/usr/bin/env python

import sys
import csv
import json

# for nonHDFS run
# movieFile = "./movielens/movies.csv"

# for HDFS run
movieFile = "./movies.csv"

movieList = {}
genreList = {}

with open(movieFile, mode = 'r') as infile:
    reader = csv.reader(infile)
    for row in reader:
        movieList[row[0]] = {}
        movieList[row[0]]["title"] = row[1]
        movieList[row[0]]["genre"] = row[2]

for oneMovie in sys.stdin:
    oneMovie = oneMovie.strip()
    ratingInfo = oneMovie.split(",")
    try:
        genres = movieList[ratingInfo[1]]["genre"]
        rating = float(ratingInfo[2])
        for genre in genres.split("|"):
            if genre in genreList:
                genreList[genre]["total_rating"] += rating
                genreList[genre]["total_count"] += 1
            else:
                genreList[genre] = {}
                genreList[genre]["total_rating"] = rating
                genreList[genre]["total_count"] = 1
    except ValueError:
        continue
        
for genre in genreList:
    print ("%s\t%s" % (genre, json.dumps(genreList[genre])))

Overwriting codes/avgGenreMapper02.py


In [18]:
!ssh dsciutil hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv 2>/dev/null \
    | head -n 10 \
    | python /home/lngo/git/intro-to-hadoop-python/avgGenreMapper02.py \

Sci-Fi	{"total_rating": 1.0, "total_count": 1}
Romance	{"total_rating": 6.0, "total_count": 2}
Thriller	{"total_rating": 6.0, "total_count": 3}
Drama	{"total_rating": 11.0, "total_count": 3}
Comedy	{"total_rating": 8.0, "total_count": 3}
Adventure	{"total_rating": 2.0, "total_count": 1}
Action	{"total_rating": 3.0, "total_count": 2}
Horror	{"total_rating": 1.0, "total_count": 1}
Crime	{"total_rating": 5.0, "total_count": 1}


In [19]:
!ssh dsciutil hdfs dfs -cat intro-to-hadoop/movielens/ratings.csv 2>/dev/null \
    | head -n 10 \
    | python /home/lngo/git/intro-to-hadoop-python/avgGenreMapper02.py \
    | sort \
    | python /home/lngo/git/intro-to-hadoop-python/avgGenreReducer01.py

Action	1.5
Adventure	2.0
Comedy	2.6666666666666665
Crime	5.0
Drama	3.6666666666666665
Horror	1.0
Romance	3.0
Sci-Fi	1.0
Thriller	2.0


In [25]:
# make sure that the path to movies.csv is correct inside avgGenreMapper02.py
!ssh dsciutil hdfs dfs -rm -R intro-to-hadoop/output-movielens-05
!ssh dsciutil yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-05 \
    -file /home/lngo/git/intro-to-hadoop-python/codes/avgGenreMapper02.py \
    -mapper avgGenreMapper02.py \
    -file /home/lngo/git/intro-to-hadoop-python/codes/avgGenreReducer01.py \
    -reducer avgGenreReducer01.py \
    -file /home/lngo/git/intro-to-hadoop-python/movielens/movies.csv

17/09/11 15:33:39 INFO fs.TrashPolicyDefault: Moved: 'hdfs://dsci/user/lngo/intro-to-hadoop/output-movielens-05' to trash at: hdfs://dsci/user/lngo/.Trash/Current/user/lngo/intro-to-hadoop/output-movielens-051505158419435
17/09/11 15:33:41 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/home/lngo/git/intro-to-hadoop-python/codes/avgGenreMapper02.py, /home/lngo/git/intro-to-hadoop-python/codes/avgGenreReducer01.py, /home/lngo/git/intro-to-hadoop-python/movielens/movies.csv] [/usr/hdp/2.6.0.3-8/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.0.3-8.jar] /hadoop_java_io_tmpdir/streamjob2129448351179621896.jar tmpDir=null
17/09/11 15:33:42 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/09/11 15:33:42 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/09/11 15:33:42 INFO hdfs.DFSClient: Created H

In [26]:
!ssh dsciutil hdfs dfs -cat intro-to-hadoop/output-movielens-05/part-00000

(no genres listed)	3.20801494311
Action	3.45445315141
Adventure	3.50709193718
Animation	3.61049896425
Children	3.41664063098
Comedy	3.41746035479
Crime	3.67850196299
Documentary	3.72277231013
Drama	3.67427737349
Fantasy	3.50299123143
Film-Noir	3.9408055354
Horror	3.27526021431
IMAX	3.63709765837
Musical	3.54391359231
Mystery	3.66150976856
Romance	3.54246199954
Sci-Fi	3.45517283887
Thriller	3.51269029345
War	3.80326678256
Western	3.57161948559


**How different are the number of shuffle bytes between the two jobs?**

#### 2.2.2 Optimization through combiner function

In [None]:
!ssh dsciutil yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/text/gutenberg-shakespeare.txt \
    -output intro-to-hadoop/output-wordcount-01 \
    -file /home/lngo/git/intro-to-hadoop-python/codes/wordcountMapper.py \
    -mapper wordcountMapper.py \
    -file /home/lngo/git/intro-to-hadoop-python/codes/wordcountReducer.py \
    -reducer wordcountReducer.py

In [None]:
!ssh dsciutil yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/text/gutenberg-shakespeare.txt \
    -output intro-to-hadoop/output-wordcount-02 \
    -file /home/lngo/git/intro-to-hadoop-python/codes/wordcountMapper.py \
    -mapper wordcountMapper.py \
    -file /home/lngo/git/intro-to-hadoop-python/codes/wordcountReducer.py \
    -reducer wordcountReducer.py \
    -combiner wordcountReducer.py

In [27]:
%%writefile codes/avgGenreCombiner.py
#!/usr/bin/env python

import sys
import csv
import json

genreList = {}

for line in sys.stdin:
    line = line.strip()
    genre, ratingString = line.split("\t", 1)
    ratingInfo = json.loads(ratingString)

    if genre in genreList:
        genreList[genre]["total_rating"] += ratingInfo["total_rating"]
        genreList[genre]["total_count"] += ratingInfo["total_count"]
    else:
        genreList[genre] = {}
        genreList[genre]["total_rating"] = ratingInfo["total_rating"]
        genreList[genre]["total_count"] = 1

for genre in genreList:
    print ("%s\t%s" % (genre, json.dumps(genreList[genre])))

Writing codes/avgGenreCombiner.py


In [28]:
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-movielens-06
!ssh dsciutil yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/movielens/ratings.csv \
    -output intro-to-hadoop/output-movielens-06 \
    -file /home/lngo/git/intro-to-hadoop-python/codes/avgGenreMapper02.py \
    -mapper avgGenreMapper02.py \
    -file /home/lngo/git/intro-to-hadoop-python/codes/avgGenreReducer01.py \
    -reducer avgGenreReducer01.py \
    -file /home/lngo/git/intro-to-hadoop-python/codes/avgGenreCombiner.py \
    -combiner avgGenreCombiner.py \
    -file /home/lngo/git/intro-to-hadoop-python/movielens/movies.csv

rm: `intro-to-hadoop/output-movielens-06': No such file or directory
17/09/11 15:36:14 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/home/lngo/git/intro-to-hadoop-python/codes/avgGenreMapper02.py, /home/lngo/git/intro-to-hadoop-python/codes/avgGenreReducer01.py, /home/lngo/git/intro-to-hadoop-python/codes/avgGenreCombiner.py, /home/lngo/git/intro-to-hadoop-python/movielens/movies.csv] [/usr/hdp/2.6.0.3-8/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.0.3-8.jar] /hadoop_java_io_tmpdir/streamjob5344462887892835959.jar tmpDir=null
17/09/11 15:36:15 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/09/11 15:36:15 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/09/11 15:36:16 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 14253 for lngo on ha-hdfs:dsci
17/09/11 15:36:16 INFO securit

**How different are the number of shuffle bytes between the two jobs?**

## <center> Final Cleanup </center>

Executing the cell below will clean up all HDFS output directories created as a result of previous MapReduce programs. 

In [2]:
!ssh dsciutil hdfs dfs -ls intro-to-hadoop

Found 10 items
drwxr-xr-x   - lngo hdfs          0 2017-04-13 14:53 intro-to-hadoop/movielens
drwxr-xr-x   - lngo hdfs          0 2017-04-13 15:07 intro-to-hadoop/output-movielens-01
drwxr-xr-x   - lngo hdfs          0 2017-04-13 15:25 intro-to-hadoop/output-movielens-02
drwxr-xr-x   - lngo hdfs          0 2017-04-13 15:29 intro-to-hadoop/output-movielens-03
drwxr-xr-x   - lngo hdfs          0 2017-04-13 15:36 intro-to-hadoop/output-movielens-04
drwxr-xr-x   - lngo hdfs          0 2017-04-13 15:46 intro-to-hadoop/output-movielens-05
drwxr-xr-x   - lngo hdfs          0 2017-04-13 15:54 intro-to-hadoop/output-movielens-06
drwxr-xr-x   - lngo hdfs          0 2017-04-13 15:49 intro-to-hadoop/output-wordcount-01
drwxr-xr-x   - lngo hdfs          0 2017-04-13 15:51 intro-to-hadoop/output-wordcount-02
drwxr-xr-x   - lngo hdfs          0 2016-10-17 14:01 intro-to-hadoop/text


In [29]:
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-wordcount
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-wordcount-01
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-wordcount-02
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-movielens-01
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-movielens-02
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-movielens-03
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-movielens-04
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-movielens-05
!ssh dsciutil hdfs dfs -rm -r intro-to-hadoop/output-movielens-06
!rm -Rf codes/

rm: `intro-to-hadoop/output-wordcount': No such file or directory
rm: `intro-to-hadoop/output-wordcount-01': No such file or directory
rm: `intro-to-hadoop/output-wordcount-02': No such file or directory
rm: `intro-to-hadoop/output-movielens-01': No such file or directory
17/09/11 15:38:33 INFO fs.TrashPolicyDefault: Moved: 'hdfs://dsci/user/lngo/intro-to-hadoop/output-movielens-02' to trash at: hdfs://dsci/user/lngo/.Trash/Current/user/lngo/intro-to-hadoop/output-movielens-021505158713772
rm: `intro-to-hadoop/output-movielens-03': No such file or directory
rm: `intro-to-hadoop/output-movielens-04': No such file or directory
17/09/11 15:38:42 INFO fs.TrashPolicyDefault: Moved: 'hdfs://dsci/user/lngo/intro-to-hadoop/output-movielens-05' to trash at: hdfs://dsci/user/lngo/.Trash/Current/user/lngo/intro-to-hadoop/output-movielens-051505158722193
17/09/11 15:38:45 INFO fs.TrashPolicyDefault: Moved: 'hdfs://dsci/user/lngo/intro-to-hadoop/output-movielens-06' to trash at: hdfs://dsci/user/ln