In [1]:
%load_ext dockermagic

# MapReduce

## Java Map Reduce API - WordCount

In [2]:
%mkdir wc_java

mkdir: wc_java: File exists


In [3]:
%%writefile wc_java/WordCountDriver.java
// WordCountDriver.java
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountDriver extends Configured implements Tool {
  public int run(String[] args) throws Exception {
   if (args.length != 2) {
    System.out.printf("Usage: %s [generic options] <inputdir> <outputdir>\n", getClass().getSimpleName());
    return -1;
   }

   Job job = Job.getInstance(getConf(), "Word Count");
   job.setJarByClass(WordCountDriver.class);
   FileInputFormat.setInputPaths(job, new Path(args[0]));
   FileOutputFormat.setOutputPath(job, new Path(args[1]));
   job.setMapperClass(WordCountMapper.class);
   job.setReducerClass(WordCountReducer.class);
   job.setMapOutputKeyClass(Text.class);
   job.setMapOutputValueClass(IntWritable.class);
   job.setOutputKeyClass(Text.class);
   job.setOutputValueClass(IntWritable.class);
   boolean success = job.waitForCompletion(true);
   return success ? 0 : 1;
 }

 public static void main(String[] args) throws Exception {
   int exitCode = ToolRunner.run(new Configuration(), new WordCountDriver(), args);
   System.exit(exitCode);
 }
}

Overwriting wc_java/WordCountDriver.java


In [4]:
%%writefile wc_java/WordCountMapper.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>
{
  private final static IntWritable one = new IntWritable(1);
  private Text wordObject = new Text();

  @Override
  public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {

   String line = value.toString();
   for (String word : line.split("\\W+")) {
    if (word.length() > 0) {
     wordObject.set(word);
     context.write(wordObject, one);
    }
   }
  }
}

Overwriting wc_java/WordCountMapper.java


In [5]:
%%writefile wc_java/WordCountReducer.java
// WordCountReducer.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
  private IntWritable wordCountWritable = new IntWritable();
  @Override
   public void reduce(Text key, Iterable<IntWritable> values, Context context)
             throws IOException, InterruptedException {
         int wordCount = 0;
         for (IntWritable value : values) {
             wordCount += value.get();
         }
         wordCountWritable.set(wordCount);
         context.write(key, wordCountWritable);
    }
}

Overwriting wc_java/WordCountReducer.java


In [6]:
%%bash

docker cp wc_java hadoop:/opt
docker exec hadoop chown -R hadoop:hadoop /opt/wc_java

In [7]:
%%dockerexec -u hadoop hadoop
source /opt/envvars.sh

# compile source code and create jar file
cd /opt/wc_java
javac -classpath `hadoop classpath` *.java
jar cvf wc.jar *.class
ls

added manifest
adding: WordCountDriver.class(in = 2277) (out= 1104)(deflated 51%)
adding: WordCountMapper.class(in = 1921) (out= 813)(deflated 57%)
adding: WordCountReducer.class(in = 1711) (out= 719)(deflated 57%)
WordCountDriver.class
WordCountDriver.java
WordCountMapper.class
WordCountMapper.java
WordCountReducer.class
WordCountReducer.java
wc.jar


In [9]:
%%dockerexec -u hadoop hadoop
source /opt/envvars.sh

[ ! -d "/opt/datasets" ] && mkdir /opt/datasets
cd /opt/datasets
# download book "The Complete Works of William Shakespeare, by William Shakespeare" from Gutenberg Project
wget -q -c http://www.gutenberg.org/files/100/100-0.txt -O shakespeare.txt
    
# create directory in HDFS and put file
hdfs dfs -mkdir shakespeare
hdfs dfs -put shakespeare.txt shakespeare
hdfs dfs -ls -h shakespeare

mkdir: `shakespeare': File exists
2021-01-11 00:44:02,619 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Found 1 items
-rw-r--r--   2 hadoop hadoop      5.5 M 2021-01-11 00:44 shakespeare/shakespeare.txt


In [10]:
%%dockerexec -u hadoop hadoop
source /opt/envvars.sh

# run wordcount using 2 reducers
cd /opt/wc_java
hadoop jar wc.jar WordCountDriver -D mapreduce.job.reduces=2 shakespeare shakespeare-output

2021-01-11 00:44:22,952 INFO client.RMProxy: Connecting to ResourceManager at hadoop/172.17.0.2:8032
2021-01-11 00:44:23,405 INFO client.AHSProxy: Connecting to Application History server at hadoop/172.17.0.2:10200
2021-01-11 00:44:23,901 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/hadoop/.staging/job_1610315082616_0002
2021-01-11 00:44:24,217 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2021-01-11 00:44:24,684 INFO input.FileInputFormat: Total input files to process : 1
2021-01-11 00:44:24,784 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2021-01-11 00:44:24,920 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2021-01-11 00:44:24,958 INFO mapreduce.JobSubmitter: number of splits:1
2021-01-11 00:44:25,265 INFO sasl.SaslDataTransferCl

In [11]:
%%dockerexec -u hadoop -w /opt/wc_java hadoop
source /opt/envvars.sh

# check output files
hdfs dfs -ls shakespeare-output

# get output from HDFS
hdfs dfs -getmerge shakespeare-output shakespeare-output.txt

# head shakespeare-output.txt
head shakespeare-output.txt

Found 3 items
-rw-r--r--   2 hadoop hadoop          0 2021-01-11 00:45 shakespeare-output/_SUCCESS
-rw-r--r--   2 hadoop hadoop     168219 2021-01-11 00:45 shakespeare-output/part-r-00000
-rw-r--r--   2 hadoop hadoop     166517 2021-01-11 00:45 shakespeare-output/part-r-00001
2021-01-11 00:45:29,373 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2021-01-11 00:45:29,625 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
1	117
10	3
100	6
1000	1
1004	1
102	1
1020	1
1024	1
1028	1
1033	1


## Hadoop Streaming

In [12]:
%mkdir wc_streaming

mkdir: wc_streaming: File exists


In [13]:
%%writefile wc_streaming/wordmapper.py
#!/usr/bin/env python3
# wordmapper.py

import sys
for line in sys.stdin:
  line = line.strip()
  words = line.split()
  for word in words:
   print('%s\t%s' % (word, 1))

Overwriting wc_streaming/wordmapper.py


In [14]:
%%writefile wc_streaming/wordreducer.py
#!/usr/bin/env python3
# wordreducer.py

import sys

thisword = None
wordcount = 0
word = None

for line in sys.stdin:
    line = line.strip()
    word, count = line.split('\t', 1)
    count = int(count)

    if thisword == word:
        wordcount += count
    else:
        if thisword:
            print('%s\t%d' % (thisword, wordcount))
        wordcount = count
        thisword = word

if thisword == word:
    print('%s\t%d' % (thisword, wordcount))

Overwriting wc_streaming/wordreducer.py


In [15]:
%%bash

docker cp wc_streaming hadoop:/opt
docker exec hadoop chown -R hadoop:hadoop /opt/wc_streaming

In [16]:
%%dockerexec -u hadoop hadoop
source /opt/envvars.sh

cd /opt/datasets

# download book "Ulysses, by James Joyce" from Gutenberg Project
wget -q -c http://www.gutenberg.org/files/4300/4300-0.txt -O ulysses.txt
    
# create directory in HDFS and put file
hdfs dfs -mkdir ulysses
hdfs dfs -put ulysses.txt ulysses
hdfs dfs -ls -h ulysses

2021-01-11 00:46:18,584 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
Found 1 items
-rw-r--r--   2 hadoop hadoop      1.5 M 2021-01-11 00:46 ulysses/ulysses.txt


In [19]:
%%dockerexec -u hadoop hadoop
source /opt/envvars.sh

cd /opt/wc_streaming

# local test
head -n 20 /opt/datasets/ulysses.txt | python3 ./wordmapper.py | sort | python3 ./wordreducer.py

#4300]	1
1,	1
2008	1
2019	1
27,	1
August	1
Author:	1
Character	1
Date:	1
December	1
EBook	1
English	1
Gutenberg	2
James	2
Joyce	2
Language:	1
Last	1
License	1
Project	2
Release	1
The	1
This	1
Title:	1
UTF-8	1
Ulysses	1
Ulysses,	1
Updated:	1
You	1
[EBook	1
almost	1
and	1
anyone	1
anywhere	1
at	2
away	1
by	1
copy	1
cost	1
eBook	2
encoding:	1
for	1
give	1
included	1
is	1
it	2
it,	1
may	1
no	2
of	3
online	1
or	2
re-use	1
restrictions	1
set	1
terms	1
the	3
this	1
under	1
use	1
whatsoever.	1
with	2
www.gutenberg.org	1
﻿	1


In [20]:
%%dockerexec -u hadoop hadoop
source /opt/envvars.sh

cd /opt/wc_streaming

# execute using Hadoop Streaming
hadoop jar \
$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar \
-input ulysses \
-output ulysses-output \
-mapper wordmapper.py \
-reducer wordreducer.py \
-file wordmapper.py \
-file wordreducer.py

2021-01-11 00:47:07,621 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [wordmapper.py, wordreducer.py, /tmp/hadoop-unjar8766782340396153472/] [] /tmp/streamjob5137315082503567748.jar tmpDir=null
2021-01-11 00:47:10,556 INFO client.RMProxy: Connecting to ResourceManager at hadoop/172.17.0.2:8032
2021-01-11 00:47:10,941 INFO client.AHSProxy: Connecting to Application History server at hadoop/172.17.0.2:10200
2021-01-11 00:47:10,993 INFO client.RMProxy: Connecting to ResourceManager at hadoop/172.17.0.2:8032
2021-01-11 00:47:10,994 INFO client.AHSProxy: Connecting to Application History server at hadoop/172.17.0.2:10200
2021-01-11 00:47:11,635 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/hadoop/.staging/job_1610315082616_0003
2021-01-11 00:47:11,879 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2021-01-11 00

In [21]:
%%dockerexec -u hadoop -w /opt/wc_java hadoop
source /opt/envvars.sh

# check output files
hdfs dfs -ls ulysses-output

# get output from HDFS
hdfs dfs -getmerge ulysses-output ulysses-output.txt

# head output
head ulysses-output.txt

Found 2 items
-rw-r--r--   2 hadoop hadoop          0 2021-01-11 00:47 ulysses-output/_SUCCESS
-rw-r--r--   2 hadoop hadoop     530683 2021-01-11 00:47 ulysses-output/part-00000
2021-01-11 00:48:03,097 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
"Defects,"	1
"Information	1
"Plain	2
"Project	5
"Right	1
#4300]	1
$5,000)	1
%	4
&c,	2
&c.	1
