# Mapreduce with bash

In this notebook we're going to use `bash` to write a mapper and a reducer to count words in a file. This example will serve to illustrate the main features of Hadoop's MapReduce framework.

# Table of contents
- [What is MapReduce?](#mapreduce)
- [The mapper](#mapper)
    - [Test the mapper](#testmapper)
- [Hadoop it up](#hadoop)
    - [What is Hadoop Streaming?](#hadoopstreaming)
    - [List your Hadoop directory](#hdfs_ls)
    - [Test MapReduce with a dummy reducer](#dummyreducer)
    - [Shuffling and sorting](#shuffling&sorting)
- [The reducer](#reducer)
    - [Test and run](#run)
- [Run a mapreduce job with more data](#moredata)
    - [Sort the output with `sort`](#sortoutput)
    - [Sort the output with another MapReduce job](#sortoutputMR)
    - [Configure sort with `KeyFieldBasedComparator`](#KeyFieldBasedComparator)
    - [Specifying Configuration Variables with the -D Option](#configuration_variables)
    - [What is word count useful for?](#wordcount)


## What is MapReduce? <a name="mapreduce"></a>

MapReduce is a computing paradigm designed to allow parallel distributed processing of massive amounts of data.

Data is split across several computer nodes, there it is processed by one or more mappers. The results emitted by the mappers are first sorted and then passed to one or more reducers that process and combine the data to return the final result.

![Map & Reduce](https://github.com/hueptk0711/Big-Data/blob/main/MapReduce%20Tutorials/mapreduce.png?raw=1)
With [Hadoop Streaming](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html) it is possible to use any programming language to define a mapper and/or a reducer. Here we're going to use the Unix `bash` scripting language ([here](https://www.gnu.org/software/bash/manual/html_node/index.html) is the official documentation for the language).

## The mapper <a name="mapper"></a>
Let's write a mapper script called `map.sh`. The mapper splits each input line into words and for each word it outputs a line containing the word and `1` separated by a tab.

Example: for the input
<html>
<pre>
apple orange
banana apple peach
</pre>
</html>

`map.sh` outputs:
<html>
<pre>
apple   1
orange  1
banana  1
apple  1
peach  1
</pre>
</html>


The <a href="https://ipython.readthedocs.io/en/stable/interactive/magics.html">_cell magic_</a> [`%%writefile`](https://ipython.readthedocs.io/en/stable/interactive/magics.html#cellmagic-writefile) allows us to write the contents of the cell to a file.

In [1]:
%%writefile map.sh
#!/bin/bash

while read line
do
 for word in $line
 do
  if [ -n "$word" ]
  then
     echo -e ${word}"\t1"
  fi
 done
done

Writing map.sh


After running the cell above, you should have a new file `map.sh` in your current directory.
The file can be seen in the left panel of JupyterLab or by using a list command on the bash command-line.

**Note:** you can execute a single bash command in a Jupyter notebook cell by prepending an exclamation point to the command.

In [2]:
!ls -hl map.sh

-rw-r--r-- 1 root root 124 May  6 14:12 map.sh


### Test the mapper <a name="testmapper"></a>
We're going to test the mapper on on the command line with a small text file `fruits.txt` by first creating the text file.
In this file `apple` for instance appears two times, that's what we want our mapreduce job to compute.

In [3]:
%%writefile fruits.txt
apple banana
peach orange peach peach
pineapple peach apple

Writing fruits.txt


In [4]:
!cat fruits.txt

apple banana
peach orange peach peach
pineapple peach apple


Test the mapper

In [5]:
!chmod +x map.sh

In [6]:
!cat fruits.txt|./map.sh

apple	1
banana	1
peach	1
orange	1
peach	1
peach	1
pineapple	1
peach	1
apple	1


If the script `map.sh` does not have the executable bit set, you need to set the correct permissions.

In [7]:
!chmod 700 map.sh

## Hadoop it up <a name="hadoop"></a>
Let us now run a MapReduce job with Hadoop Streaming.

In [8]:
HADOOP_URL = "https://dlcdn.apache.org/hadoop/common/stable/hadoop-3.4.0.tar.gz"

import requests
import os
import tarfile

def download_and_extract_targz(url):
    response = requests.get(url)
    filename = url.rsplit('/', 1)[-1]
    HADOOP_HOME = filename[:-7]
    # set HADOOP_HOME environment variable
    os.environ['HADOOP_HOME'] = HADOOP_HOME
    if os.path.isdir(HADOOP_HOME):
      print("Not downloading, Hadoop folder {} already exists".format(HADOOP_HOME))
      return
    if response.status_code == 200:
        with open(filename, 'wb') as file:
            file.write(response.content)
        with tarfile.open(filename, 'r:gz') as tar_ref:
            extract_path = tar_ref.extractall(path='.')
            # Get the names of all members (files and directories) in the archive
            all_members = tar_ref.getnames()
            # If there is a top-level directory, get its name
            if all_members:
              top_level_directory = all_members[0]
              print(f"ZIP file downloaded and extracted successfully. Contents saved at: {top_level_directory}")
    else:
        print(f"Failed to download ZIP file. Status code: {response.status_code}")


download_and_extract_targz(HADOOP_URL)

ZIP file downloaded and extracted successfully. Contents saved at: hadoop-3.4.0


In [9]:

# HADOOP_HOME was set earlier when downloading Hadoop distribution
print("HADOOP_HOME is {}".format(os.environ['HADOOP_HOME']))

os.environ['PATH'] = ':'.join([os.path.join(os.environ['HADOOP_HOME'], 'bin'), os.environ['PATH']])
print("PATH is {}".format(os.environ['PATH']))

HADOOP_HOME is hadoop-3.4.0
PATH is hadoop-3.4.0/bin:/opt/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tools/node/bin:/tools/google-cloud-sdk/bin


In [10]:
import shutil

# set variable JAVA_HOME (install Java if necessary)
def is_java_installed():
    os.environ['JAVA_HOME'] = os.path.realpath(shutil.which("java")).split('/bin')[0]
    return os.environ['JAVA_HOME']

def install_java():
    # Uncomment and modify the desired version
    # java_version= 'openjdk-11-jre-headless'
    # java_version= 'default-jre'
    # java_version= 'openjdk-17-jre-headless'
    # java_version= 'openjdk-18-jre-headless'
    java_version= 'openjdk-19-jre-headless'

    print(f"Java not found. Installing {java_version} ... (this might take a while)")
    try:
        cmd = f"apt install -y {java_version}"
        subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        stdout_result = subprocess_output.stdout
        # Process the results as needed
        print("Done installing Java {}".format(java_version))
        os.environ['JAVA_HOME'] = os.path.realpath(shutil.which("java")).split('/bin')[0]
        print("JAVA_HOME is {}".format(os.environ['JAVA_HOME']))
    except subprocess.CalledProcessError as e:
        # Handle the error if the command returns a non-zero exit code
        print("Command failed with return code {}".format(e.returncode))
        print("stdout: {}".format(e.stdout))

# Install Java if not available
if is_java_installed():
    print("Java is already installed: {}".format(os.environ['JAVA_HOME']))
else:
    print("Installing Java")
    install_java()

Java is already installed: /usr/lib/jvm/java-11-openjdk-amd64


### What is Hadoop Streaming <a name="hadoopstreaming"></a>

Hadoop Streaming is a library included in the Hadoop distribution that enables you to develop MapReduce executables in languages other than Java.

Mapper and/or reducer can be any sort of executables that read the input from stdin and emit the output to stdout. By default, input is read line by line and the prefix of a line up to the first tab character is the key; the rest of the line (excluding the tab character) will be the value.

If there is no tab character in the line, then the entire line is considered as key and the value is null. The default input format is specified in the class `TextInputFormat` (see the [API documentation](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/TextInputFormat.html)) but this can can be customized for instance by defining another field separator (see the [Hadoop Streaming documentation](https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html#Customizing_How_Lines_are_Split_into_KeyValue_Pairs).

This is an example of MapReduce streaming invocation syntax:
<html>
<pre>
    mapred streaming \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /usr/bin/wc

</pre>
</html>

You can find the full official documentation for Hadoop Streaming from Apache Hadoop here: [https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html](https://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html).

All options for the Hadoop Streaming command are described here: [Streaming Command Options](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html#Streaming_Command_Options) and can be listed with the command

In [11]:
!mapred streaming --help

Usage: $HADOOP_HOME/bin/hadoop jar hadoop-streaming.jar [options]
Options:
  -input          <path> DFS input file(s) for the Map step.
  -output         <path> DFS output directory for the Reduce step.
  -mapper         <cmd|JavaClassName> Optional. Command to be run as mapper.
  -combiner       <cmd|JavaClassName> Optional. Command to be run as combiner.
  -reducer        <cmd|JavaClassName> Optional. Command to be run as reducer.
  -file           <file> Optional. File/dir to be shipped in the Job jar file.
                  Deprecated. Use generic option "-files" instead.
  -inputformat    <TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName>
                  Optional. The input format class.
  -outputformat   <TextOutputFormat(default)|JavaClassName>
                  Optional. The output format class.
  -partitioner    <JavaClassName>  Optional. The partitioner class.
  -numReduceTasks <num> Optional. Number of reduce tasks.
  -inputreader    <spec> Optional. In

Now in order to run a mapreduce job that we need to "upload" the input file to the Hadoop file system.

### List your Hadoop directory <a name="hdfs_ls"></a>

With the command `hdfs dfs -l` you can view the content of your HDFS home directory.

`hdfs dfs` you can run a filesystem command on the Hadoop fileystem. The complete list of commands can be found in the [System Shell Guide](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSCommands.html#dfs).

In [12]:
!hdfs dfs -ls

Found 6 items
drwxr-xr-x   - root root       4096 2024-05-02 13:24 .config
-rw-r--r--   1 root root         60 2024-05-06 14:12 fruits.txt
drwxr-xr-x   - root root       4096 2024-03-04 08:05 hadoop-3.4.0
-rw-r--r--   1 root root  965537117 2024-05-06 14:13 hadoop-3.4.0.tar.gz
-rwx------   1 root root        124 2024-05-06 14:12 map.sh
drwxr-xr-x   - root root       4096 2024-05-02 13:25 sample_data


Now create a directory `wordcount` with a subdirectory `input` on the Hadoop filesystem.

In [13]:
%%bash
hdfs dfs -mkdir -p wordcount

Copy the file fruits.txt to Hadoop in the folder `wordcount/input`.

Why do we need this step? Because the file `fruits.txt` needs to reside on the Hadoop filesystem in order to enjoy of all of the features of Hadoop (data partitioning, distributed processing, fault tolerance).

In [14]:
%%bash
hdfs dfs -rm -r wordcount/input 2>/dev/null
hdfs dfs -mkdir wordcount/input
hdfs dfs -put fruits.txt wordcount/input

Let's check if the file is there now.

**Note:** it is convenient use the option `-h` for `ls` to show file sizes in human-readable form (showing sizes in Kilobytes, Megabytes, Gigabytes, etc.)

In [15]:
!hdfs dfs -ls -h -R wordcount/input

-rw-r--r--   1 root root         60 2024-05-06 14:14 wordcount/input/fruits.txt


### Test MapReduce with a dummy reducer <a name="dummyreducer"></a>

Let's try to run the mapper using a dummy reducer (`/bin/cat` does nothing else than echoing the data it receives).

**Warning:** mapreduce tends to produce a verbose output, so be ready to see a long output. What you should look for is a message of the kind <html><pre>"INFO mapreduce.Job: Job ... completed successfully"</pre></html>

**Note:** at the beginning of next cell you'll see a command `hadoop fs -rmr wordcount/output 2>/dev/null`. This is needed because when you run a job several times mapreduce will give an error if you try to overwrite the same output directory.

In [16]:
%%bash
hdfs dfs -rm -r wordcount/output 2>/dev/null
mapred streaming \
  -files map.sh \
  -input wordcount/input \
  -output wordcount/output \
  -mapper map.sh \
  -reducer /bin/cat

2024-05-06 14:14:11,505 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2024-05-06 14:14:11,778 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2024-05-06 14:14:11,778 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2024-05-06 14:14:11,806 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2024-05-06 14:14:12,225 INFO mapred.FileInputFormat: Total input files to process : 1
2024-05-06 14:14:12,251 INFO mapreduce.JobSubmitter: number of splits:1
2024-05-06 14:14:12,744 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1979356913_0001
2024-05-06 14:14:12,744 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-05-06 14:14:13,190 INFO mapred.LocalDistributedCacheManager: Localized file:/content/map.sh as file:/tmp/hadoop-root/mapred/local/job_local1979356913_0001_778a14ab-a892-46c1-ab14-d63f33e4bcb2/map.sh
2024-05-06 14:14:13,366 INFO mapreduce.Job: The url to track the 

The output of the mapreduce job is in the `output` subfolder of the input directory. Let's check what's inside it.

In [17]:
!hdfs dfs -ls wordcount/output

Found 2 items
-rw-r--r--   1 root root          0 2024-05-06 14:14 wordcount/output/_SUCCESS
-rw-r--r--   1 root root         78 2024-05-06 14:14 wordcount/output/part-00000


If `output` contains a file named `_SUCCESS` that means that the mapreduce job completed successfully.

**Note:** when dealing with Big Data it's always advisable to pipe the output of `cat` commands to `head` (or `tail`).

In [18]:
!hdfs dfs -cat wordcount/output/part*|head

apple	1
apple	1
banana	1
orange	1
peach	1
peach	1
peach	1
peach	1
pineapple	1


We have gotten as expected all the output from the mapper. Something worth of notice is that the data outputted from the mapper _**has been sorted**_. We haven't asked for that but this step is automatically performed by the mapper as soon as the number of reducers is $\gt 0$.

### Shuffling and sorting <a name="shuffling&sorting"></a>
The following picture illustrates the concept of shuffling and sorting that is automatically performed by Hadoop after each map before passing the output to reduce. In the picture the outputs of the two mapper tasks are shown. The arrows represent shuffling and sorting done before delivering the data to one reducer (rightmost box).
![Shuffle & sort](https://github.com/hueptk0711/Big-Data/blob/main/MapReduce%20Tutorials/shuffle_sort.png?raw=1)
The shuffling and sorting phase is often one of the most costly in a MapReduce job.


<b>Note:</b> the job ran with two mappers because $2$ is the default number of mappers in Hadoop.

## The reducer <a name="reducer"></a>
Let's now write a reducer script called `reduce.sh`.

In [19]:
%%writefile reduce.sh
#!/bin/bash

currkey=""
currcount=0
while IFS=$'\t' read -r key val
do
  if [[ $key == $currkey ]]
  then
      currcount=$(( currcount + val ))
  else
    if [ -n "$currkey" ]
    then
      echo -e ${currkey} "\t" ${currcount}
    fi
    currkey=$key
    currcount=1
  fi
done
# last one
echo -e ${currkey} "\t" ${currcount}

Writing reduce.sh


Set permission for the reducer script.

In [20]:
!chmod 700 reduce.sh

### Test and run <a name="run"></a>

Test map and reduce on the shell

In [21]:
!cat fruits.txt|./map.sh|sort|./reduce.sh

apple 	 2
banana 	 1
orange 	 1
peach 	 4
pineapple 	 1


Once we've made sure that the reducer script runs correctly on the shell, we can run it on the cluster.

In [22]:
%%bash
hdfs dfs -rm -r wordcount/output 2>/dev/null
mapred streaming \
  -file map.sh \
  -file reduce.sh \
  -input wordcount/input \
  -output wordcount/output \
  -mapper map.sh \
  -reducer reduce.sh

Deleted wordcount/output
packageJobJar: [map.sh, reduce.sh] [] /tmp/streamjob1788553120462534689.jar tmpDir=null


2024-05-06 14:14:23,741 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
2024-05-06 14:14:24,726 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2024-05-06 14:14:24,943 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2024-05-06 14:14:24,943 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2024-05-06 14:14:24,974 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2024-05-06 14:14:25,253 INFO mapred.FileInputFormat: Total input files to process : 1
2024-05-06 14:14:25,283 INFO mapreduce.JobSubmitter: number of splits:1
2024-05-06 14:14:25,654 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local66711868_0001
2024-05-06 14:14:25,654 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-05-06 14:14:26,143 INFO mapred.LocalDistributedCacheManager: Localized file:/content/map.sh as file:/tmp/hadoop-root/mapred/local/job_local6671186

Let's check the output on the HDFS filesystem

In [23]:
!hdfs dfs -cat wordcount/output/part*|head

apple 	 2
banana 	 1
orange 	 1
peach 	 4
pineapple 	 1


## Run a mapreduce job with more data <a name="moredata"></a>

Let's create a datafile by downloading some real data, for instance from a Web page. This example will be used to introduce some advanced configurations.

Next, we download a URL with `wget` and filter out HTML tags with a `sed` regular expression.

In [24]:
%%bash
URL=https://www.derstandard.at/story/2000110819049/und-wo-warst-du-beim-fall-der-mauer
wget -qO- $URL | sed -e 's/<[^>]*>//g;s/^ //g' >sample_article.txt

In [25]:
!cat sample_article.txt|./map.sh|head

	1
	1
	1
	1
window.DERSTANDARD.pageConfig.init({"edition":"at","environment":"Production","baseUrls":{"currentDocument":"https://www.derstandard.at","authorization":"https://apps.derstandard.at/autorisierung","userprofile":"https://apps.derstandard.at/userprofil","staticfiles":"https://at.staticfiles.at"},"settings":{"disableNotifications":false}})	1
	1
	1
	1
Und	1
wo	1


As usual, with real data there's some more work to do. Here we see that the mapper script doesn't skip empty lines. Let's modify it so that empty lines are skipped.

In [26]:
%%writefile map.sh
#!/bin/bash

while read line
do
 for word in $line
 do
  if [[ "$line" =~ [^[:space:]] ]]
  then
    if [ -n "$word" ]
    then
    echo -e ${word} "\t1"
    fi
  fi
 done
done

Overwriting map.sh


In [27]:
!cat sample_article.txt|./map.sh|head

window.DERSTANDARD.pageConfig.init({"edition":"at","environment":"Production","baseUrls":{"currentDocument":"https://www.derstandard.at","authorization":"https://apps.derstandard.at/autorisierung","userprofile":"https://apps.derstandard.at/userprofil","staticfiles":"https://at.staticfiles.at"},"settings":{"disableNotifications":false}}) 	1
Und 	1
wo 	1
warst 	1
du 	1
beim 	1
Fall 	1
der 	1
Mauer? 	1
- 	1


Now the output of `map.sh` looks better!

<b>Note:</b> when working with real data we need in general some more preprocessing in order to remove control characters or invalid unicode.

Time to run MapReduce again with the new data, but first we need to "put" the data on HDFS.

In [28]:
%%bash
hdfs dfs -rm -r wordcount/input 2>/dev/null
hdfs dfs -put sample_article.txt wordcount/input

Deleted wordcount/input


In [29]:
# check that the folder wordcount/input on HDFS only contains sample_article.txt
!hdfs dfs -ls -h wordcount/input

-rw-r--r--   1 root root     29.0 K 2024-05-06 14:14 wordcount/input


Check the reducer

In [30]:
!cat sample_article.txt|./map.sh|./reduce.sh|head

window.DERSTANDARD.pageConfig.init({"edition":"at","environment":"Production","baseUrls":{"currentDocument":"https://www.derstandard.at","authorization":"https://apps.derstandard.at/autorisierung","userprofile":"https://apps.derstandard.at/userprofil","staticfiles":"https://at.staticfiles.at"},"settings":{"disableNotifications":false}}) 	 1
Und 	 1
wo 	 1
warst 	 1
du 	 1
beim 	 1
Fall 	 1
der 	 1
Mauer? 	 1
- 	 1


In [31]:
%%bash
hadoop fs -rmr wordcount/output 2>/dev/null
mapred streaming \
  -file map.sh \
  -file reduce.sh \
  -input wordcount/input \
  -output wordcount/output \
  -mapper map.sh \
  -reducer reduce.sh

Deleted wordcount/output
packageJobJar: [map.sh, reduce.sh] [] /tmp/streamjob5471666965070249423.jar tmpDir=null


2024-05-06 14:14:41,174 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
2024-05-06 14:14:42,425 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2024-05-06 14:14:42,831 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2024-05-06 14:14:42,834 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2024-05-06 14:14:42,896 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2024-05-06 14:14:43,364 INFO mapred.FileInputFormat: Total input files to process : 1
2024-05-06 14:14:43,422 INFO mapreduce.JobSubmitter: number of splits:1
2024-05-06 14:14:44,041 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local42192596_0001
2024-05-06 14:14:44,041 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-05-06 14:14:44,757 INFO mapred.LocalDistributedCacheManager: Localized file:/content/map.sh as file:/tmp/hadoop-root/mapred/local/job_local4219259

Check the output on HDFS

In [32]:
!hdfs dfs -ls wordcount/output

Found 2 items
-rw-r--r--   1 root root          0 2024-05-06 14:14 wordcount/output/_SUCCESS
-rw-r--r--   1 root root      29352 2024-05-06 14:14 wordcount/output/part-00000


This job took a few seconds and this is quite some time for such a small file (4KB). This is due to the overhead of distributing the data and running the Hadoop framework.
The advantage of Hadoop can be appreciated only for large datasets.

In [33]:
!hdfs dfs -cat wordcount/output/part-00000|head

!!n.frames[t]; 	 1
!0)); 	 1
!1)) 	 1
!1, 	 1
!= 	 1
!== 	 2
!function 	 2
!r 	 1
"'+n+'"',o)}return{key:r,value:e.substr(t+1)}},t._renewCache=function(){t._cache=t._getCacheFromString(t._document.cookie),t._cachedDocumentCookie=t._document.cookie},t._areEnabled=function(){var 	 1
"))}function 	 1
cat: Unable to write to output stream.


### Sort the output with `sort` <a name="sortoutput"></a>

We've obtained a list of tokens that appear in the file followed by their frequencies.

The output of the reducer is sorted by key (the word) because that's the ordering that the reducer becomes from the mapper. If we're interested in sorting the data by frequency, we can use the Unix `sort` command (with the options `k2`, `n`, `r` respectively "by field 2", "numeric", "reverse").

In [34]:
!hdfs dfs -cat wordcount/output/part-00000|sort -k2nr|head

= 	 40
{ 	 22
var 	 22
&& 	 19
strict";function 	 13
} 	 12
in 	 12
not 	 12
to 	 10
e&&e.__esModule?e:{"default":e}}function 	 9


The most common word appears to be "die" (the German for the definite article "the").

### Sort the output with another MapReduce job <a name="sortoutputMR"></a>

If we wanted to sort the output of the reducer using the mapreduce framework, we could employ a simple trick: create a mapper that interchanges words with their frequency values. Since by construction mappers sort their output by key, we get the desired sorting as a side-effect.

Call the new mapper `swap_keyval.sh`.

In [35]:
%%writefile swap_keyval.sh
#!/bin/bash
# This script will read one line at a time and swap key/value
# For instance, the line "word 100" will become "100 word"

while read key val
do
 printf "%s\t%s\n" "$val" "$key"
done

Writing swap_keyval.sh


We are going to run the swap mapper script on the output of the previous mapreduce job. Note that in the below cell we are not deleting the previous output but instead we're saving the output from the current job in a new folder `output_sorted`.

Nice thing about running a job on the output of a preceding job is that we do not need to upload files to HDFS because the data is already on HDFS. Not so nice: writing data to disk at each step of a data transformation pipeline takes time and this can be costly for longer data pipelines. This is one of the shortcomings of MapReduce that are addressed by [Apache Spark](https://spark.apache.org/).

In [36]:
%%bash
hdfs dfs -rm -r wordcount/output2 2>/dev/null
mapred streaming \
  -file swap_keyval.sh \
  -input wordcount/output \
  -output wordcount/output2 \
  -mapper swap_keyval.sh

packageJobJar: [swap_keyval.sh] [] /tmp/streamjob7483636840258024631.jar tmpDir=null


2024-05-06 14:14:58,204 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
2024-05-06 14:14:59,131 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2024-05-06 14:14:59,335 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2024-05-06 14:14:59,335 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2024-05-06 14:14:59,365 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2024-05-06 14:14:59,692 INFO mapred.FileInputFormat: Total input files to process : 1
2024-05-06 14:14:59,726 INFO mapreduce.JobSubmitter: number of splits:1
2024-05-06 14:15:00,108 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1256038990_0001
2024-05-06 14:15:00,108 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-05-06 14:15:00,618 INFO mapred.LocalDistributedCacheManager: Localized file:/content/swap_keyval.sh as file:/tmp/hadoop-root/mapred/local/job_lo

Check the output on HDFS

In [37]:
!hdfs dfs -ls wordcount/output2

Found 2 items
-rw-r--r--   1 root root          0 2024-05-06 14:15 wordcount/output2/_SUCCESS
-rw-r--r--   1 root root      27860 2024-05-06 14:15 wordcount/output2/part-00000


In [38]:
!hdfs dfs -cat wordcount/output2/part-00000|head

1	!!n.frames[t];
1	überraschen.
1	über
1	©
1	},
1	}();
1	}(),
1	{};
1	y(){E["default"].debug("User
1	y(),j(),void(ne=D());case
cat: Unable to write to output stream.


Mapper uses by default ascending order to sort by key. We could have changed that with an option but for now let's look at the end of the file.

In [39]:
!hdfs dfs -cat wordcount/output2/part-00000|tail

7	==
7	:
7	die
7	?
7	if
7	typeof
8	0
8	n(e){return
9	e&&e.__esModule?e:{"default":e}}function
9	r


### Configure sort with `KeyFieldBasedComparator` <a name="KeyFieldBasedComparator"></a>

In general, we can determine how mappers are going to sort their output by configuring the comparator directive to use the special class [`KeyFieldBasedComparator`](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedComparator.html)
<html><pre>-D mapreduce.job.output.key.comparator.class=\
    org.apache.hadoop.mapred.lib.KeyFieldBasedComparator</pre></html>
    
This class has some options similar to the Unix `sort`(`-n` to sort numerically, `-r` for reverse sorting, `-k pos1[,pos2]` for specifying fields to sort by).

Let us see the comparator in action on our data to get the desired result. Note that this time we are removing `output2` because we're running the second mapreduce job again.

In [40]:
%%bash
hdfs dfs -rmr wordcount/output2 2>/dev/null
comparator_class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator
mapred streaming \
  -D mapreduce.job.output.key.comparator.class=$comparator_class \
  -D mapreduce.partition.keycomparator.options=-nr \
  -file swap_keyval.sh \
  -input wordcount/output \
  -output wordcount/output2 \
  -mapper swap_keyval.sh

Deleted wordcount/output2
packageJobJar: [swap_keyval.sh] [] /tmp/streamjob12970272091197247414.jar tmpDir=null


2024-05-06 14:15:12,970 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
2024-05-06 14:15:13,839 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2024-05-06 14:15:14,033 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2024-05-06 14:15:14,033 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2024-05-06 14:15:14,075 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2024-05-06 14:15:14,306 INFO mapred.FileInputFormat: Total input files to process : 1
2024-05-06 14:15:14,340 INFO mapreduce.JobSubmitter: number of splits:1
2024-05-06 14:15:14,705 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1260432321_0001
2024-05-06 14:15:14,705 INFO mapreduce.JobSubmitter: Executing with tokens: []
2024-05-06 14:15:15,144 INFO mapred.LocalDistributedCacheManager: Localized file:/content/swap_keyval.sh as file:/tmp/hadoop-root/mapred/local/job_lo

In [41]:
!hdfs dfs -ls wordcount/output2

Found 2 items
-rw-r--r--   1 root root          0 2024-05-06 14:15 wordcount/output2/_SUCCESS
-rw-r--r--   1 root root      27860 2024-05-06 14:15 wordcount/output2/part-00000


In [42]:
!hdfs dfs -cat wordcount/output2/part-00000|head

40	=
22	{
22	var
19	&&
13	strict";function
12	not
12	}
12	in
10	to
9	e&&e.__esModule?e:{"default":e}}function
cat: Unable to write to output stream.


Now we get the output in the desired order.

### Specifying Configuration Variables with the -D Option <a name="configuration_variables"></a>

With the `-D` option it is possible to override options set in the default configuration file [`mapred_default.xml`](https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml)
(see the [Apache Hadoop documentation](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html#Specifying_Configuration_Variables_with_the_-D_Option)).

One option that might come handy when dealing with out-of-memory issues in the sorting phase is the size in MB of the memory reserved for sorting `mapreduce.task.io.sort.mb`:
 <html>
    <pre>-D mapreduce.task.io.sort.mb=512
    </pre>
 </html>

 **Note:** the maximum value for `mapreduce.task.io.sort.mb` is 2047.   

## What is word count useful for? <a name="wordcount"></a>
Counting the frequencies of words is at the basis of _indexing_ and it facilitates the retrieval of relevant documents in search engines.