#### Common warnings:

1. __Backup your solution into the 'work' directory inside the home directory ('/home/jovyan'). It is the only one that state will be saved over sessions.__

1. Please, ensure that you call the right interpreter (python2 or python3). Do not write just "python" without the major version. There is no guarantee that any particular version of Python is set as the default one in the Grading system.

1. One cell must contain only one programming language.
E.g. if a cell contains Python code and you also want to call a bash-command (using “!”) in it, you should move the bash to another cell.

1. Our IPython converter is an improved version of the standard converter Nbconvert and it can process most of Jupyter's magic commands correctly (e.g. it understands "%%bash" and executes the cell as a "bash"-script). However, we highly recommend to avoid magics wherever possible.

#### Hints for the YARN tasks:

1. Please, use relative HDFS paths, i.e. dir1/file1 instead of /user/jovyan/dir1/file1. When you submit the code it will be executed on a real Hadoop cluster. For instance, user ‘jovyan’ may not exist there.

1. Hadoop counters’ names should have only small latin letters. One exception: only the first letter of the name can be in upper case.

1. In the Hadoop logs the counter of stop words should be before the counter of total words. For doing this please take into account that the counters are printed in the lexicographical order.

# Hadoop Streaming assignment 4: Word Groups

Calculate statistics for groups of words which are equal up to permutations of letters. For example, ‘emit’, ‘item’ and ‘time’ are the same words up to a permutation of letters. Determine such groups of words and sum all their counts. Apply stop words filter. Filter out groups that consist of only one word.

Output: count of occurrences for the group of words, number of unique words in the group, comma-separated list of the words in the group in lexicographical order:

sum <tab> group size <tab> word1,word2,...

Example: assume ‘emit’ occurred 3 times, 'item' -- 2 times, 'time' -- 5 times; 3 + 2 + 5 = 10, group contains 3 words, so for this group result is:

10 3 emit,item,time

The result of the task is the output line with word ‘english’.

The result on the sample dataset:
7823  5   english,helsing,hesling,shengli,shingle

NB: Do not forget about the lexicographical order of words in the group: 'emit,item,time' is OK, 'emit,time,item' is not.

Important notice:

Sometimes it is tricky to set up all the flags correctly:

you have to correctly specify the condition for a complex key (several fields / columns)
you have to correctly type the names of partitioners and comparators
you have to make sure that the logic of combiner is aligned with mappers and reducers
it may not work on distributed environment (Hadoop cluster) even though it works in Docker Sandbox
We made a decision to allow solutions without combiners. Please try to submit your solution without combiner if your solution with it is marked as incorrect.

## Step 1. Create the mapper.

<b>Hint:</b> Create the mapper, which calculates Total word and Stop word amounts. You may redirect this information to sys.stderr. This will make it possible to parse these data on the next steps.

Example of the redirections (see the "eprint" definition in the cell below):

`eprint("reporter:counter:Wiki stats,Total words,%d" % count)`

Remember about the Distributed cache. If we add option `-files mapper.py,reducer.py,/datasets/stop_words_en.txt`, then `mapper.py, reducer.py` and `stop_words_en.txt` file will be in the same directory on the datanodes. Hence, it is necessary to use a relative path `stop_words_en.txt` from the mapper to access this txt file.

In [1]:
%%writefile mapper.py


import sys
import re


def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

path = 'stop_words_en.txt'
stop_words = []

# Your code for reading stop words here
file = open(path, 'r') 
for line in file: 
    stop_words.append(line.rstrip('\n')) 

for line in sys.stdin:
    try:
        article_id, text = line.strip().split('\t', 1)
    except ValueError as e:
        continue

    words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
    
    for word in words:
        if word.lower() not in stop_words:
            print("%s\t%d" % (word.lower(), 1))           
    

Overwriting mapper.py


## Step 2. Create the reducer.

Create the reducer, which will accumulate the information after the mapper step. You may implement the combiner if you want. It can be useful from optimizing and speed up your computations (see the lectures from the Week 2 for more details).

In [2]:
%%writefile reducer.py

# Your code for reducer here.
import sys

word_dict = {}

for line in sys.stdin:
    try:
        word, count = line.strip().split('\t', 1)
    except ValueError as e:
        continue
    key = ''.join(sorted(word)) 
    if key in word_dict.keys():
        word_dict[key]['count'] += 1
        if word not in word_dict[key]['words']:
            word_dict[key]['words'].append(word)
            word_dict[key]['words'].sort()
    else:
        word_dict[key] = {
            'count': 1,
            'words': []
        }
        word_dict[key]['words'].append(word)
        
for key in word_dict.keys():
    words = ''
    for word in word_dict[key]['words']:
        words += word
        words += ';'
    words = words[:-1]

    print("%d\t%s\t%d\t%s" % (word_dict[key]['count'], key, len(word_dict[key]['words']), words))



Overwriting reducer.py


In [3]:
%%writefile mapper_1.py

import sys

for line in sys.stdin:
    print(line)

Overwriting mapper_1.py


In [6]:
%%writefile reducer_1.py

import sys
    
word_dict = {}

for line in sys.stdin:
    try:
        count, key, size, words = line.strip().split('\t')
        count = int(count)
        word_array = words.split(';')
        if key in word_dict.keys():
            word_dict[key]['count'] += count
            for word in word_array:
                if word not in word_dict[key]['words']:
                    word_dict[key]['words'].append(word)
                    word_dict[key]['words'].sort()
        else:
            word_dict[key] = {
                'count': count,
                'words': word_array
            }
    except ValueError as e:
        continue
        
result = []        
        
for key in word_dict.keys():
    if len(word_dict[key]['words']) > 1:
        words = ''
        for word in word_dict[key]['words']:
            words += word
            words += ','
        words = words[:-1]
        
        result.append("%d\t%d\t%s" % (word_dict[key]['count'], len(word_dict[key]['words']), words))

result.sort(key = lambda x: int(x.split('\t', 1)[0]), reverse=True)
for string in result:
    print(string)
        

Overwriting reducer_1.py


# Only the answer to your task should be printed in the output stream (__stdout__) in the last cell. There should be no more output in this stream. In order to get rid of garbage [junk lines] (e.g. created by `hdfs dfs -rm` or `yarn` commands) redirect the output to /dev/null.

#### Final notice:

1. Please take into account that you must __not__ redirect __stderr__ to anywhere. Hadoop, Hive, and Spark print their logs to stderr and the Grading system also reads and analyses it.

1. During checking the code from the notebook, the system runs all notebook's cells and reads the output of only the last filled cell. It is clear that any exception should not be thrown in the running cells. If you decide to write some text in a cell, you should change the style of the cell to Markdown (Cell -> Cell type -> Markdown).

1. The Grader takes into account the output from the sample dataset you have in the notebook. Therefore, you have to "Run All" cells in the notebook before you send the ipynb solution.

1. The name of the notebook must contain only Roman letters, numbers and characters “-” or “_”. For example, Windows adds something like " (2)" (with the leading space) at the end of a filename if you try to download a file with the same name. This is a problem, because you will have a space character and curly braces "(" and ")". 

In [7]:
%%bash

OUT_DIR="coursera_mr_task4"
OUT_DIR1="coursera_mr_task4_b"
NUM_REDUCERS=4
NUM_REDUCERS1=1

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null
hdfs dfs -rm -r -skipTrash ${OUT_DIR1} > /dev/null

# Stub code for your job

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapred.jab.name="Streaming wordCount" \
    -D mapreduce.job.reduces=${NUM_REDUCERS} \
    -files mapper.py,reducer.py,/datasets/stop_words_en.txt \
    -mapper "python3 mapper.py" \
    -reducer "python3 reducer.py" \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR} > /dev/null
    
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -D mapreduce.job.reduces=${NUM_REDUCERS1} \
    -D mapred.jab.name="Sorting wordCount" \
    -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
    -D stream.map.output.field.separator="\t" \
    -D stream.num.map.output.key.fields=4 \
    -D mapreduce.map.output.key.field.separator="\t" \
    -D mapreduce.partition.keycomparator.options=-k2,2nr \
    -files mapper_1.py,reducer_1.py \
    -mapper 'python3 mapper_1.py' \
    -reducer 'python3 reducer_1.py' \
    -input ${OUT_DIR} \
    -output ${OUT_DIR1} > /dev/null
    
hdfs dfs -cat ${OUT_DIR1}/part-00000 | grep -m 1 english

7820	5	english,helsing,hesling,shengli,shingle


20/10/05 02:34:37 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
20/10/05 02:34:37 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
20/10/05 02:34:37 INFO mapred.FileInputFormat: Total input files to process : 1
20/10/05 02:34:37 INFO mapreduce.JobSubmitter: number of splits:2
20/10/05 02:34:38 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
20/10/05 02:34:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1601856088211_0036
20/10/05 02:34:38 INFO conf.Configuration: resource-types.xml not found
20/10/05 02:34:38 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/10/05 02:34:38 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/10/05 02:34:38 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/10/05 02:34:38 INFO impl.Ya