Skip to content

Commit

Permalink
For batch hadoop indexing, make hadoop input format configuration. Gi…
Browse files Browse the repository at this point in the history
…ven input format must extend from org.apache.hadoop.mapreduce.InputFormat
  • Loading branch information
himanshug committed Mar 18, 2015
1 parent 4467d1c commit 3f7a7ba
Show file tree
Hide file tree
Showing 15 changed files with 560 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
Expand Down Expand Up @@ -90,11 +90,7 @@ public boolean run()
);

JobHelper.injectSystemProperties(groupByJob);
if (config.isCombineText()) {
groupByJob.setInputFormatClass(CombineTextInputFormat.class);
} else {
groupByJob.setInputFormatClass(TextInputFormat.class);
}
JobHelper.setInputFormat(groupByJob, config);
groupByJob.setMapperClass(DetermineCardinalityMapper.class);
groupByJob.setMapOutputKeyClass(LongWritable.class);
groupByJob.setMapOutputValueClass(BytesWritable.class);
Expand Down Expand Up @@ -241,7 +237,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Text text,
Writable value,
Context context
) throws IOException, InterruptedException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public boolean run()
);

JobHelper.injectSystemProperties(groupByJob);
groupByJob.setInputFormatClass(TextInputFormat.class);
JobHelper.setInputFormat(groupByJob, config);
groupByJob.setMapperClass(DeterminePartitionsGroupByMapper.class);
groupByJob.setMapOutputKeyClass(BytesWritable.class);
groupByJob.setMapOutputValueClass(NullWritable.class);
Expand Down Expand Up @@ -174,7 +174,7 @@ public boolean run()
} else {
// Directly read the source data, since we assume it's already grouped.
dimSelectionJob.setMapperClass(DeterminePartitionsDimSelectionAssumeGroupedMapper.class);
dimSelectionJob.setInputFormatClass(TextInputFormat.class);
JobHelper.setInputFormat(dimSelectionJob, config);
config.addInputPaths(dimSelectionJob);
}

Expand Down Expand Up @@ -260,7 +260,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Text text,
Writable value,
Context context
) throws IOException, InterruptedException
{
Expand Down Expand Up @@ -341,7 +341,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Text text,
Writable value,
Context context
) throws IOException, InterruptedException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,9 @@

package io.druid.indexer;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.JsonConfigProvider;
Expand All @@ -51,14 +32,6 @@
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
import io.druid.timeline.partition.ShardSpecLookup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;

import java.io.File;
import java.io.IOException;
Expand All @@ -69,6 +42,36 @@
import java.util.Set;
import java.util.SortedSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;

/**
*/
public class HadoopDruidIndexerConfig
Expand Down Expand Up @@ -301,9 +304,9 @@ public boolean isCombineText()
return schema.getTuningConfig().isCombineText();
}

public StringInputRowParser getParser()
public InputRowParser getParser()
{
return (StringInputRowParser) schema.getDataSchema().getParser();
return schema.getDataSchema().getParser();
}

public HadoopyShardSpec getShardSpec(Bucket bucket)
Expand All @@ -316,6 +319,11 @@ public Job addInputPaths(Job job) throws IOException
return pathSpec.addInputPaths(this, job);
}

public Class<? extends InputFormat> getInputFormatClass()
{
return pathSpec.getInputFormat();
}

/********************************************
Granularity/Bucket Helper Methods
********************************************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,33 @@

package io.druid.indexer;

import com.metamx.common.RE;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.MapInputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.segment.indexing.granularity.GranularitySpec;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;
import org.joda.time.DateTime;

import java.io.IOException;
import com.metamx.common.RE;

public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<LongWritable, Text, KEYOUT, VALUEOUT>
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Writable, Writable, KEYOUT, VALUEOUT>
{
private HadoopDruidIndexerConfig config;
private StringInputRowParser parser;
private InputRowParser parser;
protected GranularitySpec granularitySpec;

@Override
Expand All @@ -48,20 +60,20 @@ public HadoopDruidIndexerConfig getConfig()
return config;
}

public StringInputRowParser getParser()
public InputRowParser getParser()
{
return parser;
}

@Override
protected void map(
LongWritable key, Text value, Context context
Writable key, Writable value, Context context
) throws IOException, InterruptedException
{
try {
final InputRow inputRow;
try {
inputRow = parser.parse(value.toString());
inputRow = parseInputRow(value, parser);
}
catch (Exception e) {
if (config.isIgnoreInvalidRows()) {
Expand All @@ -83,6 +95,17 @@ protected void map(
}
}

abstract protected void innerMap(InputRow inputRow, Text text, Context context)
public final static InputRow parseInputRow(Writable value, InputRowParser parser)
{
if(parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before
return ((StringInputRowParser)parser).parse(value.toString());
} else {
return parser.parse(value);
}
}

abstract protected void innerMap(InputRow inputRow, Writable value, Context context)
throws IOException, InterruptedException;

}
Loading

0 comments on commit 3f7a7ba

Please sign in to comment.