Skip to content

Hadoop MapReduce

Hoa Nguyen edited this page Sep 1, 2016 · 27 revisions

#Requirements Hadoop installed with one NameNode and at least three DataNodes

#MapReduce Traditionally, most computations operated on a model where the data is brought from a external database to the machine that has the program on it. In addition to the parallelization that occurs from the MapReduce paradigm, Hadoop reverses the traditional model by bringing the program to the data.

For example, when a MapReduce job like grep or word count is performed, the program is taken from the NameNode (which is called the JobTracker in the MapReduce context) to the DataNodes (which are called the TaskTracker in the MapReduce context) where the relevant data blocks are located. Hadoop is relatively efficient about balancing the workload, but the details aren’t too important at this point.

Let’s dig a bit deeper into the various stages of a MapReduce job, by looking at the data engineering version of the “Hello World” program -- the word count. Even though it’s cliche, we’ll return to this example several times because it’s useful to see the same program repeated with different languages and tools. Additionally, it’s become a standard question for data engineering interviews.

Before we look at the code, let’s review the various stages of MapReduce to see how the data transforms at each stage. For this example, let’s assume that the following text:

So call a big meeting
Get everyone out out
Make every Who holler
Make every Who shout shout

is already copied into HDFS on a cluster with a NameNode and 12 DataNodes. For pedagogical reasons, let’s also assume that the data has been split into three blocks:

Block A Block B Block C
So call a big meeting Get everyone out out
Make every Who holler
Make every Who shout shout

with each block replicated three times and distributed throughout the DataNodes.

When the MapReduce job begins, the JobTracker brings the WordCount.java program to the DataNodes. Only three of the TaskTrackers will receive the program (one per block of data) but the remaining TaskTrackers will be ready to execute the program from the beginning if one of the original TaskTrackers fails in the middle of the job.

Once WordCount.java has been distributed, the following steps begin

##Map Phase

  1. Record Reader - This takes the files in the input directory and turns them into records. Specifically, the files are read and turned into key-value pairs where the key is the location of each record and the value is the record content. For example, the block that begins “Get everyone…” will be split into two key-value pairs (one per line by default). The first will be (0, Get everyone out out) since 0 bytes have been read from the block so far and the second will be (21, Make every Who holler) since the second line begins after byte 21.

  1. Mapper - This is the important “business logic” of the Map Phase where a user-defined Map function is applied to each record. Specifically, the Mapper takes the key-value pairs from the record reader, and applies a function to them to get an intermediate key-value pair. In the example, each line is parsed into words and converted into a pair with the key and value being the word and 1, respectively.

  1. Combiner - While the computing power and storage abilities of computers has increased exponentially over the years, network speeds only grow linearly. This means that the worst bottleneck in the MapReduce process comes from moving the various pieces of data within the cluster network. The combiner is an optional, but important step to solve this problem by combining the data before sending it. The combiner is like a local reducer for each machine and often uses the same code as the Reducer. In our example, any duplicate words on a given machine will be summed up, so (shout, 2) will be sent once instead of (shout, 1) being moved on the network twice. This may not seem worth it on such a small data set, but if you have multiple repeating words it can drastically improve performance.

  1. Partitioner - The partitioner is the hand-off between the Map and Reduce phases. Specifically the partitioner splits the intermediate key-value pairs into separate “shards” so they can be processed by the Reduce phase.

After the Map phase is fully completed, the three TaskTrackers that were handling the Map phase begin handling the Reduce phase. In order for the data to be correctly Reduced, any duplicate keys on the cluster must be Reduced on the same machine. For example, the (Make, 1) from the blue machine and the (Make, 1) from the orange machine must be put together on the same machine for the Reduce phase. At the same time, the most efficient way to process the data is to distribute the workload in a roughly even manner.

Both of these task are accomplished with a clever trick: the MD5 hash function is applied to each key to generate a 32 hexadecimal digit number, which is then modded by the number of active TaskTrackers. For example, the MD5 hash for the key ‘every’, which is

83ab982dd08483187289a75163dc50fe (or 175019836766693582402690402259205640446 in decimal)

becomes 1 when modded by 3 (for the 3 active TaskTrackers), so each key-value pairs with ‘every’ will be assigned to the TaskTracker marked as 1. Similarly, the modded hash for the key ‘Who’ is 0, so each ‘Who’ key-value pair will be assigned to the TaskTracker marked as 0. Since the MD5 algorithm generates an even distribution of hashes, the workload should be roughly even while still ensuring that the same keys are handled by the same machine. Finally, the partition writes the intermediate key-value pairs to HDFS before the reduce phase begins.

##Reduce Phase

  1. Shuffler - The Shuffler moves the intermediate key-value pairs to the machine where they will be reduced. Specifically, the pairs written to HDFS are moved to the actual TaskTracker that the Partitioner assigned them to. This is when the cluster’s network throughput peaks and can be a bottleneck in the MapReduce job.

2.Sort - The sort phase simply sorts the key-value pairs on each machine (alphabetically with upper-case preceding lower-case). This makes it easier for the Reducer to process since duplicate keys will be placed next to each other.

3.Reducer - This is the important “business logic” of the Reduce Phase where a user-defined Reduce function is applied to the key-value pairs. Specifically the reduce function groups, filters, or aggregates the key-value pairs. In our example, any duplicate words on a given machine will be summed up, so (Make, 1) and (Make, 1) will be reduced to just (Make, 2). Note that in this example, this is the exact same code as the Combiner, but now all of the same keys are guaranteed to be on the same machines from the Partition and Shuffle phases.

4.Output Format - Finally, the output format sorts all the key-value pairs from each machine and writes them to HDFS. While the Output Format can be customized, the default is to list all the keys on a newline with the values separated by a tab. Note that sorting all the keys is much simpler since the keys are already sorted on each machine.

We’re now ready to look at the WordCount MapReduce in Hadoop’s native language, Java. There are three components: the driver, the mapper, and the reducer; the rest will be taken care of by Hadoop.

Below is the standard MapReduce code for WordCount.java.

##Driver:

We’ll start with the simplest part, the driver, which is mostly generic, boiler-plate code that configures the various parts of the MapReduce job.

First is some standard Java and Hadoop libraries:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

Note the 2nd and 3rd to last lines are where the standard InputFormat (for the RecordReader) and OutputFormat are set. Next we’ll look at the beginning of the WordCount class:

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
  }

  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
  }

These are place holders for the Mapper and Reducer functions, we’ll return to them later. The next part of the driver is beginning the main function, which will read in the command line arguments and give usage instructions.

 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
 }

The rest of the driver creates a Job configuration where the jar, mapper, combiner, reducer, and output key-value classes are set.

 Job job = new Job(conf, "word count");
 job.setJarByClass(WordCount.class);
 job.setMapperClass(TokenizerMapper.class);
 job.setCombinerClass(IntSumReducer.class);
 job.setReducerClass(IntSumReducer.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);
 for (int i = 0; i < otherArgs.length - 1; ++i) {
   FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
 }
 FileOutputFormat.setOutputPath(job,
   new Path(otherArgs[otherArgs.length - 1]));
 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

Note that the Mapper is set to the TokenizerMapper class and both the Combiner and Reducer are set to the same IntSumReduce class. These are the names for the Mapper and Reducer classes that we’ll write below. This is also where we specify that the output key-value pairs will be a Text and IntWritable, corresponding to pairs like (Make, 1). The last few lines specify that the output will be written to the location specified at the end of the command line.

Note: Hadoop uses a special class known as Writables for some of its variables. The advantages of Writables are not important at this level, so it is fine to consider an IntWritable as the Hadoop version of an Int.

We’re now ready to look at the content of the Mapper class from above:

public static class TokenizerMapper 
   extends Mapper<Object, Text, Text, IntWritable>{
}

This class extends the more general Mapper class and has the following syntax:

Mapper<(input key class)(input value class)(output key class)(output value class)>

where the input is from the RecordReader and the output is from the Mapper (i.e. the intermediate key-value pairs). The input key is usually left as a generic Object, and the value is Text because it contains a line like “Get everyone out”. The output key is Text because it’s a word like “everyone” and the output value is an IntWritable, specifically 1.

The entire content of the Mapper class fits in a few lines:

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
  
public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
  StringTokenizer itr = new StringTokenizer(value.toString());
  while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
  }
}

First, a static IntWritable with the value 1 is initialized (for the value of the pairs), then a word variable is created to hold each word. In the map function, each word is tokenized (i.e. segmented) until the all the words have been processed. The special context variable is used for writing to an actual file; the last line writes the intermediate key-value pair (word, one) to a file.

The last remaining part is the Reducer class:

public static class IntSumReducer 
   extends Reducer<Text,IntWritable,Text,IntWritable> {
}

which has a similar syntax as the Mapper except the first two are the classes for the intermediate key-value pair [e.g. (Get, 1)], and the last two are the classes for the output key-value pair to the OutputFormat [e.g. (Who, 2)]. The contents of the Reducer class are:

private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values, 
                   Context context
                   ) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
    sum += val.get();
  }
  result.set(sum);
  context.write(key, result);
}

It begins by initializing the result variable to hold the final word count for the various words. Then the reduce function simply sums up the values for each key and finishes by writing the corresponding (key, result) to a file with the context variable.

Note: The majority of the code is boiler-plate that repeats for most MapReduce programs; the “business logic” is contained in a couple lines. You can get around this by using higher-level technologies like Pig, Hive, and Scalding, but it’s nice to be familiar with a few  low-level examples. You can find many more examples that will help you learn Java MapReduce by looking at [MapReduce Design Patterns](http://shop.oreilly.com/product/0636920025122.do).

#Running the word count example

If you have not done so already, you should install and start Hadoop on your node, using Pegasus, a product created by Insight, or by hand using these instructions.

Next, create the Who.txt file:

namenode$ mkdir ~/hadoop-examples
namenode$ nano ~/hadoop-examples/Who.txt

and copy the following text into it:

So call a big meeting
Get everyone out out
Make every Who holler
Make every Who shout shout

Create an examples directory in HDFS and copy the Who.txt file into that directory

namenode$ hdfs dfs -mkdir /examples
namenode$ hdfs dfs -copyFromLocal ~/hadoop-examples/Who.txt /examples/Who.txt

You can run the word count example jar:

$namenode: hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar wordcount /examples/ /examples/word_count_example_output

You should see 93 bytes written at the end of the output. If so, view the resulting file with:

$namenode: hdfs dfs -cat /examples/word_count_example_output/part-r-00000

Should result in the following output:

#Job History Tracker for Troubleshooting

You can also see the JobHistory (assuming you started YARN and the JobHistory Server from the Hadoop Intro Dev) by going to port 8088 of your NameNode’s Public DNS on your browser

NameNode-Public-DNS:8088

This is really useful for troubleshooting jobs that don’t work.

#Alternatives to Hadoop MR Most data engineers never write low-level Java MapReduce code because, as you saw, there is so much boilerplate code to do simple task. Luckily there are many alternatives available, a few are listed below:

##Spark Spark is a general batch processing engine that can perform iterative computations at scale through the use of resilient distributed datasets.

##Pig Pig is a high-level language that resemble Python in many ways. It’s very useful for cleaning data and we’ll be learning more about it in the next Dev.

##Hive Hive is a high-level language that resembles SQL, albeit with a few less features in its Hive Query Language (HQL). It’s very useful for cleaning and processing structured, tabular data and we’ll be learning more about it in a future Dev.

##Hadoop Streaming (Piping) Hadoop Streaming (which is NOT real-time like Spark Streaming) is a tool that allows you to write the Map and Reduce functions in any language. The only constraint is that the input and output have to be simple text, much like piping in a Bash terminal.

##Cascalog and JCascalog Cascalog and JCascalog are higher-level libraries written in Clojure and Java respectively by Nathan Marz. They offer a simpler way to think about MapReduce jobs and are described in detail in Chapters 6 and 7 of Nathan Marz’s Big Data book.

##Cascading and Scalding Cascading is a high level tool for reducing boiler-plate code, written in Java. Scalding is an extension on top of Cascading that allows developers to use Scala rather than Java. You can learn more about them here.

Clone this wiki locally