#### Common warnings:

1. __Backup your solution into the 'work' directory inside the home directory ('/home/jovyan'). It is the only one that state will be saved over sessions.__

1. Please, ensure that you call the right interpreter (python2 or python3). Do not write just "python" without the major version. There is no guarantee that any particular version of Python is set as the default one in the Grading system.

1. One cell must contain only one programming language.
E.g. if a cell contains Python code and you also want to call a bash-command (using “!”) in it, you should move the bash to another cell.

1. Our IPython converter is an improved version of the standard converter Nbconvert and it can process most of Jupyter's magic commands correctly (e.g. it understands "%%bash" and executes the cell as a "bash"-script). However, we highly recommend to avoid magics wherever possible.

#### Hints for the YARN tasks:

1. Please, use relative HDFS paths, i.e. dir1/file1 instead of /user/jovyan/dir1/file1. When you submit the code it will be executed on a real Hadoop cluster. For instance, user ‘jovyan’ may not exist there.

1. Hadoop counters’ names should have only small latin letters. One exception: only the first letter of the name can be in upper case.

1. In the Hadoop logs the counter of stop words should be before the counter of total words. For doing this please take into account that the counters are printed in the lexicographical order.

# Hadoop Streaming assignment 1: Words Rating

The purpose of this task is to create your own WordCount program for Wikipedia dump processing and learn basic concepts of the MapReduce.

In this task you have to find the 7th word by popularity and its quantity in the reverse order (most popular first) in Wikipedia data (`/data/wiki/en_articles_part`).

There are several points for this task:

1) As an output, you have to get the 7th word and its quantity separated by a tab character.

2) You must use the second job to obtain a totally ordered result.

3) Do not forget to redirect all trash and output to /dev/null.

Here you can find the draft of the task main steps. You can use other methods for solution obtaining.

## Step 1. Create mapper and reducer.

<b>Hint:</b>  Demo task contains almost all the necessary pieces to complete this assignment. You may use the demo to implement the first MapReduce Job.

In [1]:
%%writefile mapper_wiki_parser.py

# Your code for mapper here.

import sys
import re


def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

for line in sys.stdin:
    try:
        article_id, text = line.strip().split('\t', 1)
    except ValueError as e:
        continue
    words = re.split("\W+", text, flags=re.UNICODE)
    for word in words:
        eprint("reporter:counter:Wiki stats,Total words,%d" % 1)
        print("%s\t%d" % (word.lower(), 1))

Overwriting mapper_wiki_parser.py


In [2]:
%%writefile reducer_wiki_parser.py

# Your code for reducer here.

import sys

current_key = None
word_sum = 0

for line in sys.stdin:
    try:
        key, count = line.strip().split('\t', 1)
        count = int(count)
        
        if current_key != key:
            if current_key:
                print("%s\t%d" % (current_key, word_sum))
            word_sum = 0
            current_key = key
        word_sum += count
    except ValueError as e:
        continue
    

if current_key:
    print("%s\t%d" % (current_key, word_sum))

Overwriting reducer_wiki_parser.py


In [4]:
# You can use this cell for other experiments: for example, for combiner.
#/work/week03_task01

## Step 2. Create sort job.

<b>Hint:</b> You may use MapReduce comparator to solve this step. Make sure that the keys are sorted in ascending order.

In [5]:
# Your code for sort job here. Don't forget to use magic writefile

## Step 3. Bash commands

<b> Hint: </b> For printing the exact row you may use basic UNIX commands. For instance, sed/head/tail/... (if you know other commands, you can use them).

To run both jobs, you must use two consecutive yarn-commands. Remember that the input for the second job is the ouput for the first job.

__NB__: Please, use a defined python major version (e.g. `python3 mappper.py` instead of `python mapper.py`)!

Only the answer to your task should be printed in the output stream (__stdout__) in the last cell. There should be no more output in this stream. In order to get rid of garbage [junk lines] (e.g. created by `hdfs dfs -rm` or `yarn` commands) redirect the output to /dev/null.

#### Final notice:

1. Please take into account that you must __not__ redirect __stderr__ to anywhere. Hadoop, Hive, and Spark print their logs to stderr and the Grading system also reads and analyses it.

1. During checking the code from the notebook, the system runs all notebook's cells and reads the output of only the last filled cell. It is clear that any exception should not be thrown in the running cells. If you decide to write some text in a cell, you should change the style of the cell to Markdown (Cell -> Cell type -> Markdown).

1. The Grader takes into account the output from the sample dataset you have in the notebook. Therefore, you have to "Run All" cells in the notebook before you send the ipynb solution.

1. The name of the notebook must contain only Roman letters, numbers and characters “-” or “_”. For example, Windows adds something like " (2)" (with the leading space) at the end of a filename if you try to download a file with the same name. This is a problem, because you will have a space character and curly braces "(" and ")". 

In [6]:
%%bash

OUT_DIR="coursera_mr_task1"$(date +"%s%6N")
NUM_REDUCERS=8

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

# Code for your first job
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapreduce.job.name="Streaming Word Count 1" \
    -D mapreduce.job.reduces=${NUM_REDUCERS} \
    -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
    -D mapreduce.map.output.key.field.separator='\t' \
    -D mapreduce.partition.keycomparator.options="-k2,2r" \
    -files mapper_wiki_parser.py,reducer_wiki_parser.py \
    -mapper "python3 mapper_wiki_parser.py" \
    -reducer "python3 reducer_wiki_parser.py" \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR} > /dev/null

# Code for obtaining the results
hdfs dfs -cat ${OUT_DIR}/part-00000 | sed -n '7p;8q'

the	1


rm: `coursera_mr_task11602415318608390': No such file or directory
20/10/11 11:22:00 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
20/10/11 11:22:01 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
20/10/11 11:22:01 INFO mapred.FileInputFormat: Total input files to process : 1
20/10/11 11:22:01 INFO mapreduce.JobSubmitter: number of splits:2
20/10/11 11:22:01 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
20/10/11 11:22:01 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1602402163270_0039
20/10/11 11:22:01 INFO conf.Configuration: resource-types.xml not found
20/10/11 11:22:01 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/10/11 11:22:01 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/10/11 11:22:01 INFO resource.ResourceUtils: Adding resource type - name 