# Simple WordCount Example for MapReduce using Hadoop Streaming and Python

By: Vahid Mostofi

### move input files to hdfs

In [60]:
!hadoop fs -mkdir -p /outputs
!hadoop fs -mkdir -p /inputs
!hadoop fs -put data /inputs

put: `/inputs/data/shakespeare.txt': File exists


### list the content of the HDFS folder we just created
The three files we created are now stored on the HDFS

In [61]:
!hdfs dfs -rm -r /outputs/result
!hadoop dfs -ls /inputs/data

rm: `/outputs/result': No such file or directory

Found 2 items
drwxr-xr-x   - root supergroup          0 2023-11-21 02:42 /inputs/data/.ipynb_checkpoints
-rw-r--r--   3 root supergroup    5450373 2023-11-21 02:42 /inputs/data/shakespeare.txt


### mapper
this is the word count example, so we need to create the mapper and reducer.

 * the ```%%writefile mapper.py``` tells Jupyter to save the content of the cell as a file named mapper.py in the same direcotry. So the #!/opt/bit.... is the first line of the file.
 * the ```#!/opt/bitnami/python/bin/python``` specifies the Python path for running the file.
 * the mapper reads from the stdin, which is provided to it by the MapReduce framework and also writes to stdout (using print).

In [62]:
%%writefile mapper.py
#!/opt/bitnami/python/bin/python
# -*-coding:utf-8 -*
import sys
import string

for line in sys.stdin: # reads from stdin
    print("your message B", file=sys.stderr)
    line = line.strip()
    for i in string.punctuation:
        line = line.replace(i,' ')
    line = line.lower()
    words = line.split()

    for word in words: # writes to stdout
        if word[0] < "a" :
            reducer = 0
        elif "a" <= word[0] and word[0] < "e":
            reducer = 1
        elif "e" <= word[0] and word[0] < "l":
            reducer = 2
        elif "l" <= word[0] and word[0] < "r":
            reducer = 3
        else:
            reducer = 4
        output = str(reducer)+"-"+word
        print("%s\t%d" % (output, 1))

Overwriting mapper.py


### reducer
similarly to previous cell, here we create a file, named ```reducer.py``` and store the logic for our reducer.

In [63]:
%%writefile reducer.py
#!/opt/bitnami/python/bin/python
# -*-coding:utf-8 -*

import sys
total = 0
lastword = None

for line in sys.stdin:
    line = line.strip()
    reducer, new_line = line.split("-")
    # recuperer la cle et la valeur et conversion de la valeur en int
    word, count = new_line.split()
    count = int(count)

    # passage au mot suivant (plusieurs cles possibles pour une même exécution de programme)
    if lastword is None:
        lastword = word
    if word == lastword:
        total += count
    else:
        print("%s\t%d occurences" % (lastword, total))
        total = count
        lastword = word

if lastword is not None:
    print("%s\t%d occurences" % (lastword, total))

Overwriting reducer.py


### run MapReduce
now to we need to run the map reduce program using our mapper.py and reducer.py

 * ```!hadoop jar /opt/hadoop-3.2.1/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar ``` specifies the path for streaming module of MapReduce
 * ```-file``` tells map-reduce which files should be moved to the worker nodes, you can also move txt files and read them in your mapper.py or reducer.py
 * ```-mapper``` and ```-reducer``` specify the the commands for mapper and reducer, because mapper.py file has the path to python as the first line, the system would know how to execute it.
 * ```-input``` tells which folder should be scanned for input, all the files in this folder would be fed to the mappers (mapper.py) as stdin
 * ```-output``` specifies the path for the folder which the output of the map-reduce execution should be stored. Remmeber the folder must be empty, in other words every single execution of the following command needs a new folder. You don't need to crate the folder before hand, just make sure you use a new path each time.
 
 * for more infomration about map-reduce streaming API please use  http://hadoop.apache.org/docs/r1.2.1/streaming.html


In [64]:
!hadoop jar /opt/hadoop-3.2.1/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar \
    -D map.output.key.field.separator=- \
    -D mapred.text.key.partitioner.options=-k1,1 \
    -D mapred.reduce.tasks=5 \
    -file $PWD/mapper.py\
    -file $PWD/reducer.py\
    -mapper mapper.py \
    -reducer reducer.py \
    -input /inputs/data \
    -output /outputs/result \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

2023-11-21 03:23:48,378 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/training/mapper.py, /training/reducer.py, /tmp/hadoop-unjar1623727409404518037/] [] /tmp/streamjob8606559555682019331.jar tmpDir=null
2023-11-21 03:23:48,921 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.18.0.3:8032
2023-11-21 03:23:49,025 INFO client.AHSProxy: Connecting to Application History server at historyserver/172.18.0.5:10200
2023-11-21 03:23:49,042 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.18.0.3:8032
2023-11-21 03:23:49,042 INFO client.AHSProxy: Connecting to Application History server at historyserver/172.18.0.5:10200
2023-11-21 03:23:49,147 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1700533158708_0005
2023-11-21 03:23:49,218 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = fals

### lets look at the output of the map reduce

In [65]:
!hdfs dfs -ls /outputs/result

Found 6 items
-rw-r--r--   3 root supergroup          0 2023-11-21 03:24 /outputs/result/_SUCCESS
-rw-r--r--   3 root supergroup     105518 2023-11-21 03:24 /outputs/result/part-00000
-rw-r--r--   3 root supergroup     104584 2023-11-21 03:24 /outputs/result/part-00001
-rw-r--r--   3 root supergroup     151472 2023-11-21 03:24 /outputs/result/part-00002
-rw-r--r--   3 root supergroup       2877 2023-11-21 03:24 /outputs/result/part-00003
-rw-r--r--   3 root supergroup     140808 2023-11-21 03:24 /outputs/result/part-00004


In [66]:
!hdfs dfs -head /outputs/result/part-00000
!hdfs dfs -head /outputs/result/part-00001
!hdfs dfs -head /outputs/result/part-00002
!hdfs dfs -head /outputs/result/part-00003
!hdfs dfs -head /outputs/result/part-00004

2023-11-21 03:24:18,568 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
e	119 occurences
each	238 occurences
eager	9 occurences
eagerly	3 occurences
eagerness	1 occurences
eagle	27 occurences
eagles	9 occurences
eaning	1 occurences
eanlings	1 occurences
ear	199 occurences
earing	1 occurences
earl	155 occurences
earldom	5 occurences
earlier	1 occurences
earliest	3 occurences
earliness	1 occurences
earls	11 occurences
early	50 occurences
earn	13 occurences
earned	2 occurences
earnest	38 occurences
earnestly	9 occurences
earnestness	4 occurences
earns	1 occurences
ears	157 occurences
earth	317 occurences
earthen	1 occurences
earthlier	1 occurences
earthly	32 occurences
earthquake	6 occurences
earthquakes	1 occurences
earthy	6 occurences
eas	4 occurences
ease	54 occurences
eased	2 occurences
easeful	1 occurences
eases	1 occurences
easier	9 occurences
easiest	1 occurences
easiliest	1 occurences
easily	28 occurences
easiness	

### make a directory out of HDFS and store the results

In [67]:
!rm -r /outputs/res_out_of_hdfs
!mkdir -p /outputs/res_out_of_hdfs
!hdfs dfs -copyToLocal /outputs/result/* /outputs/res_out_of_hdfs

2023-11-21 03:24:26,225 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false


In [68]:
!ls /outputs/res_out_of_hdfs

_SUCCESS  part-00000  part-00001  part-00002  part-00003  part-00004


In [69]:
!head /outputs/res_out_of_hdfs/part-00000
!head /outputs/res_out_of_hdfs/part-00001
!head /outputs/res_out_of_hdfs/part-00002
!head /outputs/res_out_of_hdfs/part-00003
!head /outputs/res_out_of_hdfs/part-00004

e	119 occurences
each	238 occurences
eager	9 occurences
eagerly	3 occurences
eagerness	1 occurences
eagle	27 occurences
eagles	9 occurences
eaning	1 occurences
eanlings	1 occurences
ear	199 occurences
l	23 occurences
la	78 occurences
laban	2 occurences
label	2 occurences
labell	1 occurences
labienus	1 occurences
labio	1 occurences
labor	10 occurences
laboring	2 occurences
labors	1 occurences
r	92 occurences
rabbit	4 occurences
rabble	13 occurences
rabblement	2 occurences
race	17 occurences
rack	21 occurences
rackers	1 occurences
racket	1 occurences
rackets	1 occurences
racking	1 occurences
1	90 occurences
10	5 occurences
100	1 occurences
101	1 occurences
102	1 occurences
103	1 occurences
104	1 occurences
105	1 occurences
106	1 occurences
107	1 occurences
a	14725 occurences
aaron	97 occurences
abaissiez	1 occurences
abandon	10 occurences
abandoned	2 occurences
abase	2 occurences
abash	1 occurences
abate	14 occurences
abated	3 occurences
abatement	3 occurences


In [70]:
!hdfs dfs -rm -r /outputs/result

Deleted /outputs/result
