Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Both B64Line and Block inputformat both extend MultiInputFormat.

Whether input is in block or base64 format is checked at runtime.
Main cost is that input file is opened twice on the mapper.
will avoid this cost in a later patch.
  • Loading branch information...
commit 6644575be18025e215b7871e1de9e9516a20777d 1 parent dfee495
@rangadi rangadi authored
View
35 src/java/com/twitter/elephantbird/mapreduce/input/LzoProtobufB64LineInputFormat.java
@@ -1,18 +1,10 @@
package com.twitter.elephantbird.mapreduce.input;
-import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import com.google.protobuf.Message;
-import com.twitter.elephantbird.mapreduce.io.ProtobufWritable;
-import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.TypeRef;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
/**
* This is the base class for all base64 encoded, line-oriented protocol buffer based input formats.
* Data is expected to be one base64 encoded serialized protocol buffer per line.
@@ -26,45 +18,30 @@
* for more information on error handling.
*/
-public class LzoProtobufB64LineInputFormat<M extends Message> extends LzoInputFormat<LongWritable, ProtobufWritable<M>> {
-
- private TypeRef<M> typeRef_;
+public class LzoProtobufB64LineInputFormat<M extends Message> extends MultiInputFormat<M> {
public LzoProtobufB64LineInputFormat() {
}
public LzoProtobufB64LineInputFormat(TypeRef<M> typeRef) {
- super();
- this.typeRef_ = typeRef;
- }
-
- // should remove this.
- protected void setTypeRef(TypeRef<M> typeRef) {
- typeRef_ = typeRef;
+ super(typeRef);
}
/**
* Returns {@link LzoProtobufB64LineInputFormat} class.
* Sets an internal configuration in jobConf so that remote Tasks
* instantiate appropriate object based on protoClass.
+ *
+ * @Deprecated Use {@link MultiInputFormat#setInputFormatClass(Class, org.apache.hadoop.mapreduce.Job)
*/
@SuppressWarnings("rawtypes")
public static <M extends Message> Class<LzoProtobufB64LineInputFormat>
getInputFormatClass(Class<M> protoClass, Configuration jobConf) {
- Protobufs.setClassConf(jobConf, LzoProtobufB64LineInputFormat.class, protoClass);
+ setClassConf(protoClass, jobConf);
return LzoProtobufB64LineInputFormat.class;
}
public static<M extends Message> LzoProtobufB64LineInputFormat<M> newInstance(TypeRef<M> typeRef) {
return new LzoProtobufB64LineInputFormat<M>(typeRef);
}
-
- @Override
- public RecordReader<LongWritable, ProtobufWritable<M>> createRecordReader(InputSplit split,
- TaskAttemptContext taskAttempt) throws IOException, InterruptedException {
- if (typeRef_ == null) {
- typeRef_ = Protobufs.getTypeRef(taskAttempt.getConfiguration(), LzoProtobufB64LineInputFormat.class);
- }
- return new LzoProtobufB64LineRecordReader<M>(typeRef_);
- }
}
View
33 src/java/com/twitter/elephantbird/mapreduce/input/LzoProtobufBlockInputFormat.java
@@ -1,16 +1,9 @@
package com.twitter.elephantbird.mapreduce.input;
-import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import com.google.protobuf.Message;
-import com.twitter.elephantbird.mapreduce.io.ProtobufWritable;
-import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.TypeRef;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* This is the base class for all blocked protocol buffer based input formats. That is, if you use
@@ -25,44 +18,30 @@
* for more information on error handling.
*/
-public class LzoProtobufBlockInputFormat<M extends Message> extends LzoInputFormat<LongWritable, ProtobufWritable<M>> {
-
- private TypeRef<M> typeRef_;
+public class LzoProtobufBlockInputFormat<M extends Message> extends MultiInputFormat<M> {
public LzoProtobufBlockInputFormat() {
}
public LzoProtobufBlockInputFormat(TypeRef<M> typeRef) {
- super();
- this.typeRef_ = typeRef;
- }
-
- protected void setTypeRef(TypeRef<M> typeRef) {
- typeRef_ = typeRef;
+ super(typeRef);
}
/**
* Returns {@link LzoProtobufBlockInputFormat} class.
* Sets an internal configuration in jobConf so that remote Tasks
* instantiate appropriate object based on protoClass.
+ *
+ * @Deprecated Use {@link MultiInputFormat#setInputFormatClass(Class, org.apache.hadoop.mapreduce.Job)
*/
@SuppressWarnings("unchecked")
public static <M extends Message> Class<LzoProtobufBlockInputFormat>
getInputFormatClass(Class<M> protoClass, Configuration jobConf) {
- Protobufs.setClassConf(jobConf, LzoProtobufBlockInputFormat.class, protoClass);
+ setClassConf(protoClass, jobConf);
return LzoProtobufBlockInputFormat.class;
}
public static<M extends Message> LzoProtobufBlockInputFormat<M> newInstance(TypeRef<M> typeRef) {
return new LzoProtobufBlockInputFormat<M>(typeRef);
}
-
- @Override
- public RecordReader<LongWritable, ProtobufWritable<M>> createRecordReader(InputSplit split,
- TaskAttemptContext taskAttempt) throws IOException, InterruptedException {
- if (typeRef_ == null) {
- typeRef_ = Protobufs.getTypeRef(taskAttempt.getConfiguration(), LzoProtobufBlockInputFormat.class);
- }
- return new LzoProtobufBlockRecordReader<M>(typeRef_);
- }
}
View
29 src/java/com/twitter/elephantbird/mapreduce/input/LzoThriftB64LineInputFormat.java
@@ -1,16 +1,8 @@
package com.twitter.elephantbird.mapreduce.input;
-import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TBase;
-import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
-import com.twitter.elephantbird.util.ThriftUtils;
import com.twitter.elephantbird.util.TypeRef;
/**
@@ -25,35 +17,24 @@
* A small fraction of bad records are tolerated. See {@link LzoRecordReader}
* for more information on error handling.
*/
-public class LzoThriftB64LineInputFormat<M extends TBase<?, ?>>
- extends LzoInputFormat<LongWritable, ThriftWritable<M>> {
-
- private TypeRef<M> typeRef_;
+public class LzoThriftB64LineInputFormat<M extends TBase<?, ?>> extends MultiInputFormat<M> {
public LzoThriftB64LineInputFormat() {}
public LzoThriftB64LineInputFormat(TypeRef<M> typeRef) {
- typeRef_ = typeRef;
+ super(typeRef);
}
/**
* Returns {@link LzoThriftB64LineInputFormat} class for setting up a job.
* Sets an internal configuration in jobConf so that Task instantiates
* appropriate object for this generic class based on thriftClass
+ *
+ * @Deprecated Use {@link MultiInputFormat#setInputFormatClass(Class, org.apache.hadoop.mapreduce.Job)
*/
public static <M extends TBase<?, ?>> Class<LzoThriftB64LineInputFormat>
getInputFormatClass(Class<M> thriftClass, Configuration jobConf) {
- ThriftUtils.setClassConf(jobConf, LzoThriftB64LineInputFormat.class, thriftClass);
+ setClassConf(thriftClass, jobConf);
return LzoThriftB64LineInputFormat.class;
}
-
- @Override
- public RecordReader<LongWritable, ThriftWritable<M>> createRecordReader(InputSplit split,
- TaskAttemptContext taskAttempt) throws IOException, InterruptedException {
- if (typeRef_ == null) {
- typeRef_ = ThriftUtils.getTypeRef(taskAttempt.getConfiguration(), LzoThriftB64LineInputFormat.class);
- }
- return new LzoThriftB64LineRecordReader<M>(typeRef_);
- }
-
}
View
30 src/java/com/twitter/elephantbird/mapreduce/input/LzoThriftBlockInputFormat.java
@@ -1,16 +1,8 @@
package com.twitter.elephantbird.mapreduce.input;
-import java.io.IOException;
-
-import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
-import com.twitter.elephantbird.util.ThriftUtils;
import com.twitter.elephantbird.util.TypeRef;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TBase;
/**
@@ -24,37 +16,25 @@
* A small fraction of bad records are tolerated. See {@link LzoRecordReader}
* for more information on error handling.
*/
-public class LzoThriftBlockInputFormat<M extends TBase<?, ?>>
- extends LzoInputFormat<LongWritable, ThriftWritable<M>> {
- // implementation is exactly same as LzoThriftB64LineINputFormat
-
- private TypeRef<M> typeRef_;
+public class LzoThriftBlockInputFormat<M extends TBase<?, ?>> extends MultiInputFormat<M> {
public LzoThriftBlockInputFormat() {}
public LzoThriftBlockInputFormat(TypeRef<M> typeRef) {
- typeRef_ = typeRef;
+ super(typeRef);
}
/**
* Returns {@link LzoThriftBlockInputFormat} class for setting up a job.
* Sets an internal configuration in jobConf so that Task instantiates
* appropriate object for this generic class based on thriftClass
+ *
+ * @Deprecated Use {@link MultiInputFormat#setInputFormatClass(Class, org.apache.hadoop.mapreduce.Job)
*/
@SuppressWarnings("unchecked")
public static <M extends TBase<?, ?>> Class<LzoThriftBlockInputFormat>
getInputFormatClass(Class<M> thriftClass, Configuration jobConf) {
- ThriftUtils.setClassConf(jobConf, LzoThriftBlockInputFormat.class, thriftClass);
+ setClassConf(thriftClass, jobConf);
return LzoThriftBlockInputFormat.class;
}
-
- @Override
- public RecordReader<LongWritable, ThriftWritable<M>> createRecordReader(InputSplit split,
- TaskAttemptContext taskAttempt) throws IOException, InterruptedException {
- if (typeRef_ == null) {
- typeRef_ = ThriftUtils.getTypeRef(taskAttempt.getConfiguration(), LzoThriftBlockInputFormat.class);
- }
- return new LzoThriftBlockRecordReader<M>(typeRef_);
- }
-
}
View
10 src/java/com/twitter/elephantbird/mapreduce/input/MultiInputFormat.java
@@ -62,7 +62,15 @@ public MultiInputFormat(TypeRef<M> typeRef) {
*/
public static void setInputFormatClass(Class<?> clazz, Job job) {
job.setInputFormatClass(MultiInputFormat.class);
- HadoopUtils.setInputFormatClass(job.getConfiguration(), CLASS_CONF_KEY, clazz);
+ setClassConf(clazz, job.getConfiguration());
+ }
+
+ /**
+ * Stores supplied clazz's name in configuration. This configuration is
+ * read on the remote tasks to initialize the input format correctly.
+ */
+ protected static void setClassConf(Class<?> clazz, Configuration conf) {
+ HadoopUtils.setInputFormatClass(conf, CLASS_CONF_KEY, clazz);
}
@SuppressWarnings("unchecked") // return type is runtime dependent
Please sign in to comment.
Something went wrong with that request. Please try again.