## for Fault tolerance checking


In [None]:

%%writefile flaky_mapper.py

from random import random
import  sys

if random() < 0.5:
    raise SystemExit("I am lucky")

for line in sys.stdin:
    pass


In [None]:
!hdfs dfs -ls -h /data/wiki

In [None]:
%%bash


# Code for your first job

OUT_DIR="demo_mapper"
NUM_REDUCERS=0

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -mapper "python mapper1.py" \
    -input /data/test.txt \
    -output ${OUT_DIR} > /dev/null



## let's count only number of lines in the wiki article

### without reducer

In [None]:

%%bash


# Code for your first job

OUT_DIR="wc_mr"


hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -mapper "wc -l" \
    -numReduceTasks 0 \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR} > /dev/null


In [None]:
#!hdfs dfs -ls -h /user/jovyan/wc_mr

#!hdfs dfs -cat /user/jovyan/wc_mr/part-00000 

!hdfs dfs -text /user/jovyan/wc_mr/*


## bash script for reducer


In [None]:
%%writefile reducer.sh

#!usr/bin/env bash
awk '{line_count += $1} END {print line_count}'



In [None]:
%%bash


# Code for your first job

OUT_DIR="wc_mr_withReducer"


hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -mapper "wc -l" \
    -reducer "awk '{line_count += \$1} END {print line_count}'" \
    -numReduceTasks 1 \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR} > /dev/null

In [None]:
!hdfs dfs -text /user/jovyan/wc_mr_withReducer/*

## python streaming for line count

In [None]:
%%writefile mapper.py

from __future__ import print_function

import sys

line_count = sum(1 for _ in sys.stdin)

print(line_count)


In [None]:
%%writefile reducer.py

import sys

line_count = sum(
    int(value) for value in sys.stdin
)


print(line_count)


In [None]:
%%bash


# Code for your first job

OUT_DIR="Streaming_python_wc_mr_withReducer"


hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files mapper.py,reducer.py \
    -mapper "python mapper.py" \
    -reducer "python reducer.py" \
    -numReduceTasks 1 \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR} > /dev/null

In [None]:
!hdfs dfs -text /user/jovyan/Streaming_python_wc_mr_withReducer/*

## Word count 

In [None]:
%%writefile mapper.py

from __future__ import print_function
import sys
import re

for line in sys.stdin:
    article_id, content = line.split("\t", 1)
    words = re.split("\W+", content)
    for word in words:
        print(word, 1, sep="\t")
        

In [None]:
%%writefile reducer.py

from __future__ import print_function
import sys

current_word = None
word_count = 0

for line in sys.stdin:
    word, counts = line.split("\t", 1)
    counts = int(counts)
    if word == current_word:
        word_count += counts
    else:
        if current_word:
            print(current_word, word_count, sep = "\t")
        current_word = word
        word_count = counts
    
if current_word:
    print(current_word, word_count, sep = "\t")

In [None]:
%%bash


# Code for your first job

OUT_DIR="Streaming_python_wc_mr"


hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files mapper.py,reducer.py \
    -mapper "python mapper.py" \
    -reducer "python reducer.py" \
    -numReduceTasks 1 \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR} > /dev/null

In [None]:
#!hdfs dfs -text /user/jovyan/Streaming_python_wc_mr/* | tail -c 80 

#!hdfs dfs -ls -h /user/jovyan/Streaming_python_wc_mr


#!hdfs dfs -text /data/test.txt

#!hdfs dfs -text /data/wiki/en_articles_part/* | head -c 80

#!hdfs dfs -ls -h /data

## word count using test file

In [1]:
%%writefile mapper.py

import sys
import re

reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode

for line in sys.stdin:
    try:
        article_id, text = unicode(line.strip()).split('\t', 1)
    except ValueError as e:
        continue
    words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
    for word in words:
        print >> sys.stderr, "reporter:counter:Wiki stats,Total words,%d" % 1
        print "%s\t%d" % (word.lower(), 1)


Overwriting mapper.py


In [2]:
%%writefile reducer.py

from __future__ import print_function
import sys

current_word = None
word_count = 0

for line in sys.stdin:
    word, counts = line.split("\t", 1)
    counts = int(counts)
    if word == current_word:
        word_count += counts
    else:
        if current_word:
            print(current_word, word_count, sep = "\t")
        current_word = word
        word_count = counts
    
if current_word:
    print(current_word, word_count, sep = "\t")

Overwriting reducer.py


## python counter

In [None]:
%%writefile counter_python.py
import sys
import re

def bar():
    with open('stderr_logs.txt') as f:
        for line in f:
            if sys.argv[1] in line:
                line = line.split("=")[1]
                print(line)
        
if __name__ == '__main__':
    bar()

# write a counter_python.py script to count the total words


In [None]:
%%bash


# Code for your first job

OUT_DIR="Streaming_python_wc_mr"
LOGS="stderr_logs.txt"

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
    -files mapper.py,reducer.py \
    -mapper "python mapper.py" \
    -reducer "python reducer.py" \
    -numReduceTasks 1 \
    -input /data/wiki/en_articles_part \
    -output ${OUT_DIR} > /dev/null 2> $LOGS
    cat 'stderr_logs.txt' | python counter_python.py "Total words" 
    cat $LOGS >&2

11937375



In [None]:

import sys
import re


#f = sys.stdin
# If you need to open a file instead:
with open('stderr_logs.txt') as f:
    for line in f:
        if "Total words" in line:
            line = line.split("=")[1]
            print(line)

In [None]:
#!hdfs dfs -text /user/jovyan/Streaming_python_wc_mr/* 

#!hdfs dfs -ls -h /user/jovyan/Streaming_python_wc_mr/part-00000

#!hdfs dfs -text /data/test.txt

#!hdfs dfs -text /data/wiki/en_articles_part/* | head -c 80

#!hdfs dfs -ls -h /data/wiki



In [16]:
%%bash

cat 'stderr_logs.txt' | python counter_python.py "Total words" 


17



In [14]:
import sys
import re


#f = sys.stdin
# If you need to open a file instead:
with open('stderr_logs.txt') as f:
    for line in f:
            print(line)

19/09/19 18:58:05 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032

19/09/19 18:58:05 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032

19/09/19 18:58:07 INFO mapred.FileInputFormat: Total input files to process : 1

19/09/19 18:58:07 INFO mapreduce.JobSubmitter: number of splits:2

19/09/19 18:58:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1568917062479_0002

19/09/19 18:58:10 INFO impl.YarnClientImpl: Submitted application application_1568917062479_0002

19/09/19 18:58:10 INFO mapreduce.Job: The url to track the job: http://23d8740cdaf7:8088/proxy/application_1568917062479_0002/

19/09/19 18:58:10 INFO mapreduce.Job: Running job: job_1568917062479_0002

19/09/19 18:58:32 INFO mapreduce.Job: Job job_1568917062479_0002 running in uber mode : false

19/09/19 18:58:32 INFO mapreduce.Job:  map 0% reduce 0%

19/09/19 18:59:00 INFO mapreduce.Job:  map 100% reduce 0%

19/09/19 18:59:19 INFO mapreduce.Job:  map 100% reduce 100%

19/09/1

In [None]:
filename = "test.txt"


In [None]:
%%bash

cat $filename 