# <center> Introduction to Hadoop MapReduce </center>

Python Jupyter notebook supports execution of Linux command inside the notebook cells. This is done by adding the **!** to the beginning of the command line. It should be noted that each command begins with a **!** will create a new bash shell and close this cell once the execution is done:
- Full path is required
- Temporary results and environmental variables will be lost

In [None]:
!module list

We need to initialize Kerberos authentication mechanism

In [None]:
!cypress-kinit

In [None]:
!klist

Interaction with Hadoop Distributed File System is done through `hdfs` and its sub-commands

In [None]:
!hdfs

In [None]:
!hdfs dfs

### Challenge

Create a directory named **intro-to-hadoop** inside your user directory on HDFS

In [None]:
!hdfs dfs -ls /

In [None]:
!ls /

In [None]:
!hdfs dfs -ls /user/lngo

In [None]:
!hdfs dfs -mkdir intro-to-hadoop

### Challenge

Upload the **text** directory into the newly created **intro-to-hadoop** directory. 

In [None]:
!hdfs dfs -put

In [None]:
!hdfs dfs -put \
    _______________________ \
    intro-to-hadoop/

### Challenge 

Check the health status of the directories above in HDFS using fsck:
```
hdfs fsck <path-to-directory> -files -blocks -locations
```

In [None]:
!hdfs fsck _________________ -files -blocks -locations

## MapReduce Programming Paradigm

**What is “map”?**
– A function/procedure that is applied to every individual
elements of a collection/list/array/…

```
int square(x) { return x*x;}
map square [1,2,3,4] -> [1,4,9,16]
```

**What is “reduce”?**
– A function/procedure that performs an operation on a list.
This operation will “fold/reduce” this list into a single value
(or a smaller subset)

```
reduce ([1,2,3,4]) using sum -> 10
reduce ([1,2,3,4]) using multiply -> 24
```

MapReduce is an old concept in functional programming. It is naturally applicable in HDFS: 
- `map` tasks are performed on top of individual data blocks (mainly to filter and decrease raw data contents while increase data value
- `reduce` tasks are performed on intermediate results from `map` tasks (should now be significantly decreased in size) to calculate the final results. 

## 1. The Hello World of Hadoop: Word Count

In [None]:
!mkdir codes

In [None]:
!hdfs dfs -cat intro-to-hadoop/text/gutenberg-shakespeare.txt \
    2>/dev/null | head -n 100

In [None]:
%%writefile codes/wordcountMapper.py
#!/usr/bin/env python                                          
import sys                                                                                                
for oneLine in sys.stdin:
    oneLine = oneLine.strip()
    for word in oneLine.split(" "):
        if word != "":
            print ('%s\t%s' % (word, 1)) 

In [None]:
!hdfs dfs -cat intro-to-hadoop/text/gutenberg-shakespeare.txt \
    2>/dev/null \
    | head -n 100 \
    | python ./codes/wordcountMapper.py

In [None]:
!hdfs dfs -cat intro-to-hadoop/text/gutenberg-shakespeare.txt \
    2>/dev/null \
    | head -n 100 \
    | python ./codes/wordcountMapper.py \
    | sort

In [None]:
%%writefile codes/wordcountReducer.py
#!/usr/bin/env python
import sys

current_word = None
total_word_count = 0

for line in sys.stdin:
    line = line.strip()
    word, count = line.split("\t", 1)
    try:
        count = int(count)
    except ValueError:
        continue
    
    if current_word == word:
        total_word_count += count
    else:
        if current_word:
            print ("%s\t%s" % (current_word, total_word_count))
        current_word = word
        total_word_count = 1
        
if current_word == word:
    print ("%s\t%s" % (current_word, total_word_count))

In [None]:
!hdfs dfs -cat intro-to-hadoop/text/gutenberg-shakespeare.txt \
    2>/dev/null \
    | head -n 100 \
    | python ./codes/wordcountMapper.py \
    | sort \
    | python ./codes/wordcountReducer.py

In [None]:
!hdfs dfs -rm -R intro-to-hadoop/output-wordcount
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/text/gutenberg-shakespeare.txt \
    -output intro-to-hadoop/output-wordcount \
    -file ./codes/wordcountMapper.py \
    -mapper wordcountMapper.py \
    -file ./codes/wordcountReducer.py \
    -reducer wordcountReducer.py \

In [None]:
!hdfs dfs -ls intro-to-hadoop/output-wordcount

In [None]:
!hdfs dfs -cat intro-to-hadoop/output-wordcount/part-00000 \
    2>/dev/null | head -n 100

### Challenge

Modify *wordcountMapper.py* so that punctuations and capitalization are no longer factors in determining unique words

In [None]:
%%writefile codes/wordcountEnhancedMapper.py
#!/usr/bin/env python                                          
import sys                     
import string

translator = str.maketrans('', '', string.punctuation)

for oneLine in sys.stdin:
    oneLine = oneLine.strip()
    for word in oneLine.split(" "):
        if word != "":
            newWord = word.translate(translator).lower()
            print ('%s\t%s' % (_______, 1)) 

In [None]:
!hdfs dfs -rm -R intro-to-hadoop/output-wordcount-enhanced
!ssh dsciutil yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input intro-to-hadoop/text/gutenberg-shakespeare.txt \
    -output intro-to-hadoop/output-wordcount \
    -file ____________________________________________________ \
    -mapper _____________________ \
    -file ____________________________________________________ \
    -reducer _____________________ \