Home

Patrick Jaromin edited this page May 3, 2015 · 35 revisions

Mara Annotations for MapReduce – A Design for Faster MapReduce Development

Contents

What is Mara?

Mara is a design and implementation for developing Hadoop MapReduce jobs in Java. Mara grew out of the need to simplify and standardize the process in use at Conversant.

What Mara is not

It is not an abstraction of MapReduce. Mara uses all of the standard low-level mapreduce components. It doesn't attempt to isolate or hide the developer from mapreduce or modify the paradigm. It's aim is to make developing Java-based mapreduce simpler and less tedious without abstracting away the power or requiring you to think a different way.

Key Features

> Simplified configuration of Jobs

Mara's primary goal was to remove much of the boilerplate involved in configuring a MapReduce job, while retaining the core concepts and framework components. It eliminates the long chains of job.setX... typical of standard Hadoop/Java MapReduce and replaces them with a combination of explicit Java annotations, reflection, and convention over configuration.

> Automatic detection and configuration of key-value types.

Another annoyance of configuring MapReduce is the need to explicitly tell Hadoop about your key/value inputs and outputs...and keep these in-sync with the type parameters applied to the component classes and unit tests. In the majority of cases, mara will detect and configure these for you automatically - including map-only jobs. If you change your types in your component class,

Although the framework provides annotations for specifying key-value pairs input/output from each stage of a job (via the @KeyValue annotation), in the majority of cases manual configuration isn't necessary. The container will use the generic type parameters applied to your component classes to configure the key-value pairs appropriately.

> Easily share resources among components

The distributed cache is a very useful mechanism for MapReduce. However it's also rather tedious to use and requires appropriate resource management. Mara introduces the @Distribute and @Resource tags for 'sending' a serializable object or primitive to the mapreduce component where it's needed without developer concern for the mechanics. This useful abstraction also makes it easy to write unit tests, replacing the distribution mechanism to one appropriate for a test environment.

> Easily test annotated jobs using MRUnit.

The mara-test package includes base test cases that leverage your annotated driver. This makes it possible to test your driver along with your job while making it far eesier to keep your unit test cases 'in-sync' with changes to the job. Furthermore, as mentioned, the @Distribute and @Resource abstractions are supported so you can exercise resource distribution without requiring the test be run on a Hadoop client with a functioning distributed cache.

> Command-line container for driver execution

The default com.conversantmedia.mapreduce.tool.RunJob container that ships with Conversant Mara bootstraps annotated driver classes and provides information on the available drivers including id, version, description, and implementation class.

> Command-line argument configuration and type conversion

Mara provides the @Option tag to configure command-line options, validate required arguments, and provide a default help message specific to your driver. Automatic type conversion eliminates the need to manually convert string values into the types required by the job.

> Support for HBase, Avro, and multiple inputs/outputs

In addition to the above, Mara already includes support for Avro inputs and output, multiple inputs and outputs, and HBase tables. And adding new tags is as easy as implementing a simple handler interface.

Design Goals

The primary design goal is simple: eliminate as much of the boilerplate configuration method calls as possible, and replace them with a combination of explicit annotations and logical defaults. We should do as much by convention or through reflection as possible to keep things uncluttered and clear. Secondary goals include lessening or eliminating the need to explicitly subclass framework components, and removal of the main method in drivers through the use of a standard container. It would be useful to `cache mechanism, if possible leveraging it to pass objects into MapReduce components.

Replacing the Boilerplate

The majority of the annotations are replacements for the various job configuration methods used by nearly all MapReduce jobs to specify the various components, inputs, outputs, etc. The canonical “WordCount” job includes several method calls on the job to configure the inputs, outputs, and various framework components. We also call static methods on the FileInputFormat and FileOutputFormat classes for configuring out input and output paths

These calls may be replaced using the sample of available job configuration below:

Annotation target comments
@Driver TYPE Specifies a driver. Value is driver’s id used by container
@JobInfo FIELD The Job to configure. Includes job name.
@MapperInfo FIELD The Job’s mapper
@CombinerInfo FIELD The Job’s combiner
@ReducerInfo FIELD The Job’s reducer
@FileInput FIELD Used for setting input path and format. Default input path is ${context.input}
@FileOutput FIELD Used for setting output path and format. Default output path is ${context.output}

More on annotations...

Example: WordCount

Let's dive right into an example. The canonical WordCount is demonstrated below in 'traditional' MapReduce and re-implemented with Mara.

Here's the standard implementation:

public class WordCount {
	public static void main(String[] args) {
		try {
			Job job = new Job(new Configuration(), "WordCount");

			job.setInputFormatClass(FileInputFormat.class);
			job.setOutputFormatClass(FileOutputFormat.class);

			job.setMapperClass(WordCountMapper.class);
			job.setReducerClass(WordCountReducer.class);
			job.setCombinerClass(WordCountReducer.class);

			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);

			FileInputFormat.addInputPath(job, new Path(args[0]));
			FileOutputFormat.setOutputPath(job, new Path(args[1]));

			job.waitForCompletion(true);

                } catch (Exception e) {
			e.printStackTrace();
		}
	}
}

Rewritten in Mara

The rewritten driver is shown below. In addition to requiring fewer lines of code, we also gain the benefit of a fully ‘production ready’ job including support for Hadoop configuration properties and overrides, friendly ‘help’ text, required argument checking, and useful debugging features such as the ability to dump the configuration options or perform a 'dry run.'

@Driver("annotated-wordcount")
public class AnnotatedWordCount {

	@JobInfo(value="Annotated Word Count")
	@MapperInfo(WordCountMapper.class)
	@CombinerInfo(WordCountReducer.class)
	@ReducerInfo(WordCountReducer.class)
	private Job job;
}

More examples...

Unit Testing

Mara supports unit testing and TDD for MapReduce through the Apache MRUnit project. Although MRUnit makes unit testing MapReduce feasible, there are still numerous headaches with writing and maintaining these tests. First, the driver class isn’t exercised in standard MRUnit. This often means copying code and manually porting to the MRUnit equivalent. It also leaves no coverage for your base configuration which can result in problems that are only exposed upon deployment to a full Hadoop environment.

Because you must essentially translate the driver configuration into MRUnit, future changes to your driver configuration must also be duplicated in the unit tests. This can become rather tedious and error prone. Furthermore effective testing of certain jobs – such as those employing Avro or MultipleOutputs – require the use of mocks to work properly.

Also, for testing with MultipleOutputs typically need to create a method in mappers or reducers for directly injecting the mock version. Not a huge problem, but it’s always awkward to add code to a production class solely for the purpose of unit testing.

Mara eliminates these complexities and removes the need to separately write bootstrap code for annotated drivers in the majority of cases.

More on unit testing...

Additional Features

  • Reflection to detect output key/value types on mappers and reducers.
  • Runtime parameter value support using OGNL expressions
  • @NamedOutput annotation on Mapper/Reducer classes for bootstrapping MultipleOutputs.
  • MultipleInputs and MultipleTableInput support
  • @TableInput annotation for basic HBase table scans. Presently supports creating default Scan or providing bean property for runtime creation of custom Scan bean.
  • @TableOutput configures for outputting to HBase.