# Hadoop Streaming assignment 1: Words Rating

The purpose of this task is to create your own WordCount program for Wikipedia dump processing and learn basic concepts of the MapReduce.

In this task you have to find the 7th word by popularity and its quantity in the reverse order (most popular first) in Wikipedia data (`/data/wiki/en_articles_part`).

There are several points for this task:

1) As an output, you have to get the 7th word and its quantity separated by a tab character.

2) You must use the second job to obtain a totally ordered result.

3) Do not forget to redirect all trash and output to /dev/null.

Here you can find the draft of the task main steps. You can use other methods for solution obtaining.

## Step 1. Create mapper and reducer.

<b>Hint:</b>  Demo task contains almost all the necessary pieces to complete this assignment. You may use the demo to implement the first MapReduce Job.

In [20]:
%%writefile mapper1.py
#!/usr/bin/python3

import sys
import re

for line in sys.stdin:
    try:
        id, text = re.split('\s+', line.strip(), 1)
        words = re.split('\W*\s+\W*', text)
        for word in words:
            print("{}\t{}".format(word.lower(), 1))
    except ValueError as e:
        continue


Overwriting mapper1.py


In [21]:
%%writefile mapper2.py
#!/usr/bin/python3

import sys

for line in sys.stdin:
    try:
        word, count = line.strip().split('\t', 1)
        print("{}\t{}".format(count, word))
    except ValueError as e:
        continue


Overwriting mapper2.py


In [22]:
%%writefile reducer1.py
#!/usr/bin/python3

from __future__ import print_function

import sys

key = None
total = 0
for line in sys.stdin:
    try:
        word, count = line.strip().split('\t', 1)
        count = int(count)
        if key != word:
            if key is not None:
                print("{}\t{}".format(key, total))
            key=word
            total = 0
        total += count
    except ValueError as e:
            continue

if key is not None:
    print("{}\t{}".format(key, total))

# Your code for reducer here.

Overwriting reducer1.py


In [23]:
%%writefile reducer2.py
#!/usr/bin/python3

import sys

for line in sys.stdin:
    try:
        count, word = line.strip().split('\t', 1)
        print("{}\t{}".format(word, count))
    except ValueError as e:
        continue



Overwriting reducer2.py


In [28]:
# %%bash

# # You can use this cell for other experiments: for example, for combiner.
# # hdfs dfs -copyToLocal /data/wiki/en_articles_part dir_on_host/

# #cat dir_on_host/en_articles_part/articles-part | tail -n2 |  /usr/bin/python3.5 mapper1.py | sort | /usr/bin/python3.5 reducer1.py | python3.5 mapper2.py | sort -k1,1nr | python3.5 reducer2.py
# hdfs dfs -cat /data/wiki/en_articles_part/* | tail -n2 |  python2 mapper1.py | sort | python2 reducer1.py | python2 mapper2.py #| sort -k1,1nr | python2 reducer2.py


## Step 2. Create sort job.

<b>Hint:</b> You may use MapReduce comparator to solve this step. Make sure that the keys are sorted in ascending order.

In [25]:
# Your code for sort job here. Don't forget to use magic writefile

## Step 3. Bash commands

<b> Hint: </b> For printing the exact row you may use basic UNIX commands. For instance, sed/head/tail/... (if you know other commands, you can use them).

To run both jobs, you must use two consecutive yarn-commands. Remember that the input for the second job is the ouput for the first job.

In [27]:
%%bash

#!/bin/bash


# OUT_DIR="assignment1_"$(date +"%s%6N")
OUT_DIR="/user/${USER}/job1/"
# OUT_DIR2="answer_"$(date +"%s%6N")
OUT_DIR2="/user/${USER}/job2/"

hdfs dfs -rm -r -skipTrash ${OUT_DIR}
hdfs dfs -rm -r -skipTrash ${OUT_DIR2}


# Code for your first job
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
        -D mapreduce.job.name="Streaming wordcount" \
        -D mapreduce.job.reduces=4 \
        -D mapreduce.job.maxtries=1 \
        -files mapper1.py,reducer1.py \
        -mapper "python mapper1.py" \
        -reducer "python reducer1.py" \
        -input /data/wiki/en_articles_part \
        -output ${OUT_DIR} > /dev/null

# #         -combiner "python2 reducer1.py" \

# hdfs dfs -cat ${OUT_DIR}/* | head -n4


# echo "starting the second job."

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
        -D mapreduce.job.name="Streaming wordrating" \
        -D mapreduce.job.reduces=1 \
        -D mapreduce.job.maxtries=1 \
        -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
        -D mapreduce.partition.keycomparator.options=-k1,1nr \
        -files reducer2.py,mapper2.py \
        -mapper "python mapper2.py" \
        -reducer "python reducer2.py" \
        -input ${OUT_DIR} \
        -output ${OUT_DIR2} > /dev/null


# # Code for obtaining the results
hdfs dfs -cat ${OUT_DIR2}/part-00000 | sed -n '7p;8q'


starting the second job.


rm: `/user/jovyan/job1/': No such file or directory
rm: `/user/jovyan/job2/': No such file or directory
20/07/23 00:46:08 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
20/07/23 00:46:09 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
20/07/23 00:46:10 INFO mapred.FileInputFormat: Total input files to process : 1
20/07/23 00:46:10 INFO mapreduce.JobSubmitter: number of splits:2
20/07/23 00:46:10 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1595461094862_0004
20/07/23 00:46:11 INFO impl.YarnClientImpl: Submitted application application_1595461094862_0004
20/07/23 00:46:11 INFO mapreduce.Job: The url to track the job: http://83d2bee5f05d:8088/proxy/application_1595461094862_0004/
20/07/23 00:46:11 INFO mapreduce.Job: Running job: job_1595461094862_0004
20/07/23 00:46:18 INFO mapreduce.Job: Job job_1595461094862_0004 running in uber mode : false
20/07/23 00:46:18 INFO mapreduce.Job:  map 0% reduce 0%
20/07/23 00:46:31 INFO ipc.Client

In [19]:
# %%bash

# /usr/bin/python3 --version

Python 3.5.2


In [None]:
# %%bash

# OUT_DIR="/user/jovyan/job1/"
# # OUT_DIR2="answer_"$(date +"%s%6N")
# OUT_DIR2="/user/jovyan/job2/"

# hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null
# hdfs dfs -rm -r -skipTrash ${OUT_DIR2} > /dev/null

In [None]:
# %%bash

# which cat
# hdfs dfs -ls
# ls /usr/bin/python*
# hdfs dfs -cat answer/* | head -n10
# hdfs dfs -rm -r answer
# hdfs dfs -rm -r assignment1
# hdfs dfs -cat assignment1_1594194332272282/* | head -n7

In [None]:
# %%bash

# /usr/bin/python3.5 -c "a=1; print(f'test{a}')"