### Simple Example on how to use counters in Hadoop streaming

By: Vahid Mostofi

In [1]:
!mkdir data
!echo "dog cat cat apple ninja" > data/text1.txt
!echo "ninja apple cat keyboard horse" > data/another_file.txt
!echo "water cat dog spider spider" > data/yet_another_file.txt

mkdir: cannot create directory ‘data’: File exists


In [2]:
!hadoop fs -mkdir -p /outputs
!hadoop fs -mkdir -p /inputs
!hadoop fs -put data /inputs

put: `/inputs/data/another_file.txt': File exists
put: `/inputs/data/yet_another_file.txt': File exists
put: `/inputs/data/text1.txt': File exists


In [18]:
!hadoop dfs -ls /inputs/data


Found 3 items
-rw-r--r--   3 root supergroup         31 2020-09-30 21:18 /inputs/data/another_file.txt
-rw-r--r--   3 root supergroup         24 2020-09-30 21:18 /inputs/data/text1.txt
-rw-r--r--   3 root supergroup         28 2020-09-30 21:18 /inputs/data/yet_another_file.txt


In [19]:
%%writefile mapper.py
#!/opt/bitnami/python/bin/python
# -*-coding:utf-8 -*
import sys

for line in sys.stdin: # reads from stdin
    line = line.strip()
    words = line.split()
    # find number of lines
    print("reporter:counter:%s,%s,%d" % ('example-info','line_count',1), file=sys.stderr)
    for word in words: # writes to stdout
        # find sum of the lengths of all words
        print("reporter:counter:%s,%s,%d" % ('example-info','len',len(word)), file=sys.stderr)
        print("%s\t%d" % (word, 1))

Overwriting mapper.py


In [20]:
%%writefile reducer.py
#!/opt/bitnami/python/bin/python
# -*-coding:utf-8 -*

import sys
total = 0
lastword = None

for line in sys.stdin:
    # find number of lines
    print("reporter:counter:%s,%s,%d" % ('example-info','line_count',1), file=sys.stderr)
    line = line.strip()
    
    word, count = line.split()
    count = int(count)

    if lastword is None:
        lastword = word
    if word == lastword:
        total += count
    else:
        print("%s\t%d occurences" % (lastword, total))
        total = count
        lastword = word
    
if lastword is not None:
    print("%s\t%d occurences" % (lastword, total))

Overwriting reducer.py


In [21]:
!hdfs dfs -rm -R /outputs/out
!hadoop jar /opt/hadoop-3.2.1/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar \
    -D mapreduce.job.reduces=2 \
    -file $PWD/mapper.py \
    -file $PWD/reducer.py \
    -mapper mapper.py \
    -reducer reducer.py \
    -input /inputs/data \
    -output /outputs/out

Deleted /outputs/out
2020-10-21 17:54:33,873 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/training/counters/mapper.py, /training/counters/reducer.py, /tmp/hadoop-unjar1789546134438123751/] [] /tmp/streamjob3943389933522019500.jar tmpDir=null
2020-10-21 17:54:34,668 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.18.0.6:8032
2020-10-21 17:54:34,826 INFO client.AHSProxy: Connecting to Application History server at historyserver/172.18.0.4:10200
2020-10-21 17:54:34,848 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.18.0.6:8032
2020-10-21 17:54:34,849 INFO client.AHSProxy: Connecting to Application History server at historyserver/172.18.0.4:10200
2020-10-21 17:54:34,983 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1603141762614_0004
2020-10-21 17:54:35,087 INFO sasl.SaslDataTransferClient: SASL encrypti

we can see the counter values in the output of Map-Reduce Job

example-info ....