Permalink
Browse files

Merge pull request #116 from rangadi/multi_input_format_for_loaders

Multi input format for loaders.
  • Loading branch information...
2 parents 31a18b3 + 0e72d72 commit 0d0403238ea89400981f9baffcb64d4d743526ab @rangadi rangadi committed Jan 19, 2012
Showing with 245 additions and 344 deletions.
  1. +1 −1 ...twitter/elephantbird/examples/proto/mapreduce/input/LzoAddressBookProtobufB64LineInputFormat.java
  2. +1 −1 ...m/twitter/elephantbird/examples/proto/mapreduce/input/LzoAddressBookProtobufBlockInputFormat.java
  3. +1 −1 .../com/twitter/elephantbird/examples/proto/mapreduce/input/LzoPersonProtobufB64LineInputFormat.java
  4. +1 −1 ...va/com/twitter/elephantbird/examples/proto/mapreduce/input/LzoPersonProtobufBlockInputFormat.java
  5. +1 −1 ...itter/elephantbird/examples/proto/mapreduce/output/LzoAddressBookProtobufB64LineOutputFormat.java
  6. +1 −1 ...twitter/elephantbird/examples/proto/mapreduce/output/LzoAddressBookProtobufBlockOutputFormat.java
  7. +1 −1 ...om/twitter/elephantbird/examples/proto/mapreduce/output/LzoPersonProtobufB64LineOutputFormat.java
  8. +1 −1 .../com/twitter/elephantbird/examples/proto/mapreduce/output/LzoPersonProtobufBlockOutputFormat.java
  9. +5 −2 examples/src/gen-piglet/pig/com/twitter/elephantbird/examples/proto/address_books.piglet
  10. +5 −2 examples/src/gen-piglet/pig/com/twitter/elephantbird/examples/proto/people.piglet
  11. +3 −7 examples/src/java/com/twitter/elephantbird/examples/ProtobufMRExample.java
  12. +3 −7 examples/src/java/com/twitter/elephantbird/examples/ThriftMRExample.java
  13. +6 −29 src/java/com/twitter/elephantbird/mapreduce/input/LzoProtobufB64LineInputFormat.java
  14. +6 −27 src/java/com/twitter/elephantbird/mapreduce/input/LzoProtobufBlockInputFormat.java
  15. +5 −24 src/java/com/twitter/elephantbird/mapreduce/input/LzoThriftB64LineInputFormat.java
  16. +5 −25 src/java/com/twitter/elephantbird/mapreduce/input/LzoThriftBlockInputFormat.java
  17. +9 −1 src/java/com/twitter/elephantbird/mapreduce/input/MultiInputFormat.java
  18. +6 −86 src/java/com/twitter/elephantbird/pig/load/LzoProtobufB64LinePigLoader.java
  19. +4 −26 src/java/com/twitter/elephantbird/pig/load/LzoProtobufBlockPigLoader.java
  20. +7 −57 src/java/com/twitter/elephantbird/pig/load/LzoThriftB64LinePigLoader.java
  21. +7 −14 src/java/com/twitter/elephantbird/pig/load/LzoThriftBlockPigLoader.java
  22. +2 −2 src/java/com/twitter/elephantbird/pig/load/MultiFormatLoader.java
  23. +80 −0 src/java/com/twitter/elephantbird/pig/load/ProtobufPigLoader.java
  24. +68 −0 src/java/com/twitter/elephantbird/pig/load/ThriftPigLoader.java
  25. +2 −2 src/java/com/twitter/elephantbird/pig/util/ThriftToPig.java
  26. +1 −1 src/java/com/twitter/elephantbird/proto/codegen/LzoProtobufB64LineInputFormatGenerator.java
  27. +1 −1 src/java/com/twitter/elephantbird/proto/codegen/LzoProtobufB64LineOutputFormatGenerator.java
  28. +0 −5 src/java/com/twitter/elephantbird/proto/codegen/LzoProtobufB64LinePigLoadGenerator.java
  29. +4 −7 src/java/com/twitter/elephantbird/proto/codegen/LzoProtobufB64LinePigLoaderGenerator.java
  30. +2 −2 src/java/com/twitter/elephantbird/proto/codegen/LzoProtobufB64LinePigletGenerator.java
  31. +1 −1 src/java/com/twitter/elephantbird/proto/codegen/LzoProtobufBlockInputFormatGenerator.java
  32. +1 −1 src/java/com/twitter/elephantbird/proto/codegen/LzoProtobufBlockOutputFormatGenerator.java
  33. +4 −7 src/java/com/twitter/elephantbird/proto/codegen/LzoProtobufBlockPigLoaderGenerator.java
@@ -6,7 +6,7 @@
public class LzoAddressBookProtobufB64LineInputFormat extends LzoProtobufB64LineInputFormat<AddressBook> {
public LzoAddressBookProtobufB64LineInputFormat() {
- setTypeRef(new TypeRef<AddressBook>(){});
+ super(new TypeRef<AddressBook>(){});
}
}
@@ -6,7 +6,7 @@
public class LzoAddressBookProtobufBlockInputFormat extends LzoProtobufBlockInputFormat<AddressBook> {
public LzoAddressBookProtobufBlockInputFormat() {
- setTypeRef(new TypeRef<AddressBook>(){});
+ super(new TypeRef<AddressBook>(){});
}
}
@@ -6,7 +6,7 @@
public class LzoPersonProtobufB64LineInputFormat extends LzoProtobufB64LineInputFormat<Person> {
public LzoPersonProtobufB64LineInputFormat() {
- setTypeRef(new TypeRef<Person>(){});
+ super(new TypeRef<Person>(){});
}
}
@@ -6,7 +6,7 @@
public class LzoPersonProtobufBlockInputFormat extends LzoProtobufBlockInputFormat<Person> {
public LzoPersonProtobufBlockInputFormat() {
- setTypeRef(new TypeRef<Person>(){});
+ super(new TypeRef<Person>(){});
}
}
@@ -6,7 +6,7 @@
public class LzoAddressBookProtobufB64LineOutputFormat extends LzoProtobufB64LineOutputFormat<AddressBook> {
public LzoAddressBookProtobufB64LineOutputFormat() {
- setTypeRef(new TypeRef<AddressBook>(){});
+ super(new TypeRef<AddressBook>(){});
}
}
@@ -6,7 +6,7 @@
public class LzoAddressBookProtobufBlockOutputFormat extends LzoProtobufBlockOutputFormat<AddressBook> {
public LzoAddressBookProtobufBlockOutputFormat() {
- setTypeRef(new TypeRef<AddressBook>(){});
+ super(new TypeRef<AddressBook>(){});
}
}
@@ -6,7 +6,7 @@
public class LzoPersonProtobufB64LineOutputFormat extends LzoProtobufB64LineOutputFormat<Person> {
public LzoPersonProtobufB64LineOutputFormat() {
- setTypeRef(new TypeRef<Person>(){});
+ super(new TypeRef<Person>(){});
}
}
@@ -6,7 +6,7 @@
public class LzoPersonProtobufBlockOutputFormat extends LzoProtobufBlockOutputFormat<Person> {
public LzoPersonProtobufBlockOutputFormat() {
- setTypeRef(new TypeRef<Person>(){});
+ super(new TypeRef<Person>(){});
}
}
@@ -1,4 +1,5 @@
-raw_data = load '$INPUT_FILES' using com.twitter.elephantbird.examples.proto.pig.load.LzoAddressBookProtobufBlockPigLoader()
+raw_data = load '$INPUT_FILES' using com.twitter.elephantbird.pig.load.LzoProtobufBlockPigLoader('com.twitter.elephantbird.examples.proto.AddressBookProtos.AddressBook')
+/**
as (
person: bag {
person_tuple: tuple (
@@ -13,7 +14,9 @@ raw_data = load '$INPUT_FILES' using com.twitter.elephantbird.examples.proto.pig
}
)
}
- );
+ )
+**/
+;
@@ -1,4 +1,5 @@
-raw_data = load '$INPUT_FILES' using com.twitter.elephantbird.examples.proto.pig.load.LzoPersonProtobufBlockPigLoader()
+raw_data = load '$INPUT_FILES' using com.twitter.elephantbird.pig.load.LzoProtobufBlockPigLoader('com.twitter.elephantbird.examples.proto.AddressBookProtos.Person')
+/**
as (
name: chararray,
id: int,
@@ -9,7 +10,9 @@ raw_data = load '$INPUT_FILES' using com.twitter.elephantbird.examples.proto.pig
type: chararray
)
}
- );
+ )
+**/
+;
@@ -18,8 +18,7 @@
import org.apache.hadoop.util.GenericOptionsParser;
import com.twitter.elephantbird.examples.proto.Examples.Age;
-import com.twitter.elephantbird.mapreduce.input.LzoProtobufB64LineInputFormat;
-import com.twitter.elephantbird.mapreduce.input.LzoProtobufBlockInputFormat;
+import com.twitter.elephantbird.mapreduce.input.MultiInputFormat;
import com.twitter.elephantbird.mapreduce.io.ProtobufWritable;
import com.twitter.elephantbird.mapreduce.output.LzoProtobufB64LineOutputFormat;
import com.twitter.elephantbird.mapreduce.output.LzoProtobufBlockOutputFormat;
@@ -94,11 +93,8 @@ int runLzoToText(String[] args, Configuration conf) throws Exception {
job.setMapperClass(LzoMapper.class);
job.setNumReduceTasks(0);
- if (conf.get("proto.test.format", "B64Line").equals("Block")) {
- job.setInputFormatClass(LzoProtobufBlockInputFormat.getInputFormatClass(Age.class, job.getConfiguration()));
- } else {
- job.setInputFormatClass(LzoProtobufB64LineInputFormat.getInputFormatClass(Age.class, job.getConfiguration()));
- }
+ // input format is same for both B64Line or block:
+ MultiInputFormat.setInputFormatClass(Age.class, job);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
@@ -18,8 +18,7 @@
import org.apache.hadoop.util.GenericOptionsParser;
import com.twitter.elephantbird.examples.thrift.Age;
-import com.twitter.elephantbird.mapreduce.input.LzoThriftB64LineInputFormat;
-import com.twitter.elephantbird.mapreduce.input.LzoThriftBlockInputFormat;
+import com.twitter.elephantbird.mapreduce.input.MultiInputFormat;
import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
import com.twitter.elephantbird.mapreduce.output.LzoThriftB64LineOutputFormat;
import com.twitter.elephantbird.mapreduce.output.LzoThriftBlockOutputFormat;
@@ -91,11 +90,8 @@ public int runLzoToText(String[] args, Configuration conf) throws Exception {
job.setMapperClass(LzoMapper.class);
job.setNumReduceTasks(0);
- if (conf.get("thrift.test.format", "B64Line").equals("Block")) {
- job.setInputFormatClass(LzoThriftBlockInputFormat.getInputFormatClass(Age.class, job.getConfiguration()));
- } else {
- job.setInputFormatClass(LzoThriftB64LineInputFormat.getInputFormatClass(Age.class, job.getConfiguration()));
- }
+ // input format is same for both B64Line and Block formats
+ MultiInputFormat.setInputFormatClass(Age.class, job);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
@@ -1,18 +1,10 @@
package com.twitter.elephantbird.mapreduce.input;
-import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import com.google.protobuf.Message;
-import com.twitter.elephantbird.mapreduce.io.ProtobufWritable;
-import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.TypeRef;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
/**
* This is the base class for all base64 encoded, line-oriented protocol buffer based input formats.
* Data is expected to be one base64 encoded serialized protocol buffer per line.
@@ -26,45 +18,30 @@
* for more information on error handling.
*/
-public class LzoProtobufB64LineInputFormat<M extends Message> extends LzoInputFormat<LongWritable, ProtobufWritable<M>> {
-
- private TypeRef<M> typeRef_;
+public class LzoProtobufB64LineInputFormat<M extends Message> extends MultiInputFormat<M> {
public LzoProtobufB64LineInputFormat() {
}
public LzoProtobufB64LineInputFormat(TypeRef<M> typeRef) {
- super();
- this.typeRef_ = typeRef;
- }
-
- // should remove this.
- protected void setTypeRef(TypeRef<M> typeRef) {
- typeRef_ = typeRef;
+ super(typeRef);
}
/**
* Returns {@link LzoProtobufB64LineInputFormat} class.
* Sets an internal configuration in jobConf so that remote Tasks
* instantiate appropriate object based on protoClass.
+ *
+ * @Deprecated Use {@link MultiInputFormat#setInputFormatClass(Class, org.apache.hadoop.mapreduce.Job)
*/
@SuppressWarnings("rawtypes")
public static <M extends Message> Class<LzoProtobufB64LineInputFormat>
getInputFormatClass(Class<M> protoClass, Configuration jobConf) {
- Protobufs.setClassConf(jobConf, LzoProtobufB64LineInputFormat.class, protoClass);
+ setClassConf(protoClass, jobConf);
return LzoProtobufB64LineInputFormat.class;
}
public static<M extends Message> LzoProtobufB64LineInputFormat<M> newInstance(TypeRef<M> typeRef) {
return new LzoProtobufB64LineInputFormat<M>(typeRef);
}
-
- @Override
- public RecordReader<LongWritable, ProtobufWritable<M>> createRecordReader(InputSplit split,
- TaskAttemptContext taskAttempt) throws IOException, InterruptedException {
- if (typeRef_ == null) {
- typeRef_ = Protobufs.getTypeRef(taskAttempt.getConfiguration(), LzoProtobufB64LineInputFormat.class);
- }
- return new LzoProtobufB64LineRecordReader<M>(typeRef_);
- }
}
@@ -1,16 +1,9 @@
package com.twitter.elephantbird.mapreduce.input;
-import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import com.google.protobuf.Message;
-import com.twitter.elephantbird.mapreduce.io.ProtobufWritable;
-import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.TypeRef;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
/**
* This is the base class for all blocked protocol buffer based input formats. That is, if you use
@@ -25,44 +18,30 @@
* for more information on error handling.
*/
-public class LzoProtobufBlockInputFormat<M extends Message> extends LzoInputFormat<LongWritable, ProtobufWritable<M>> {
-
- private TypeRef<M> typeRef_;
+public class LzoProtobufBlockInputFormat<M extends Message> extends MultiInputFormat<M> {
public LzoProtobufBlockInputFormat() {
}
public LzoProtobufBlockInputFormat(TypeRef<M> typeRef) {
- super();
- this.typeRef_ = typeRef;
- }
-
- protected void setTypeRef(TypeRef<M> typeRef) {
- typeRef_ = typeRef;
+ super(typeRef);
}
/**
* Returns {@link LzoProtobufBlockInputFormat} class.
* Sets an internal configuration in jobConf so that remote Tasks
* instantiate appropriate object based on protoClass.
+ *
+ * @Deprecated Use {@link MultiInputFormat#setInputFormatClass(Class, org.apache.hadoop.mapreduce.Job)
*/
@SuppressWarnings("unchecked")
public static <M extends Message> Class<LzoProtobufBlockInputFormat>
getInputFormatClass(Class<M> protoClass, Configuration jobConf) {
- Protobufs.setClassConf(jobConf, LzoProtobufBlockInputFormat.class, protoClass);
+ setClassConf(protoClass, jobConf);
return LzoProtobufBlockInputFormat.class;
}
public static<M extends Message> LzoProtobufBlockInputFormat<M> newInstance(TypeRef<M> typeRef) {
return new LzoProtobufBlockInputFormat<M>(typeRef);
}
-
- @Override
- public RecordReader<LongWritable, ProtobufWritable<M>> createRecordReader(InputSplit split,
- TaskAttemptContext taskAttempt) throws IOException, InterruptedException {
- if (typeRef_ == null) {
- typeRef_ = Protobufs.getTypeRef(taskAttempt.getConfiguration(), LzoProtobufBlockInputFormat.class);
- }
- return new LzoProtobufBlockRecordReader<M>(typeRef_);
- }
}
@@ -1,16 +1,8 @@
package com.twitter.elephantbird.mapreduce.input;
-import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.thrift.TBase;
-import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
-import com.twitter.elephantbird.util.ThriftUtils;
import com.twitter.elephantbird.util.TypeRef;
/**
@@ -25,35 +17,24 @@
* A small fraction of bad records are tolerated. See {@link LzoRecordReader}
* for more information on error handling.
*/
-public class LzoThriftB64LineInputFormat<M extends TBase<?, ?>>
- extends LzoInputFormat<LongWritable, ThriftWritable<M>> {
-
- private TypeRef<M> typeRef_;
+public class LzoThriftB64LineInputFormat<M extends TBase<?, ?>> extends MultiInputFormat<M> {
public LzoThriftB64LineInputFormat() {}
public LzoThriftB64LineInputFormat(TypeRef<M> typeRef) {
- typeRef_ = typeRef;
+ super(typeRef);
}
/**
* Returns {@link LzoThriftB64LineInputFormat} class for setting up a job.
* Sets an internal configuration in jobConf so that Task instantiates
* appropriate object for this generic class based on thriftClass
+ *
+ * @Deprecated Use {@link MultiInputFormat#setInputFormatClass(Class, org.apache.hadoop.mapreduce.Job)
*/
public static <M extends TBase<?, ?>> Class<LzoThriftB64LineInputFormat>
getInputFormatClass(Class<M> thriftClass, Configuration jobConf) {
- ThriftUtils.setClassConf(jobConf, LzoThriftB64LineInputFormat.class, thriftClass);
+ setClassConf(thriftClass, jobConf);
return LzoThriftB64LineInputFormat.class;
}
-
- @Override
- public RecordReader<LongWritable, ThriftWritable<M>> createRecordReader(InputSplit split,
- TaskAttemptContext taskAttempt) throws IOException, InterruptedException {
- if (typeRef_ == null) {
- typeRef_ = ThriftUtils.getTypeRef(taskAttempt.getConfiguration(), LzoThriftB64LineInputFormat.class);
- }
- return new LzoThriftB64LineRecordReader<M>(typeRef_);
- }
-
}
Oops, something went wrong.

0 comments on commit 0d04032

Please sign in to comment.