Permalink
Browse files

long pending update to how a Thrift/Protobuf

class is set in conf.

previous getInputFormatClass() which sets the conf
is not intuitive. setClassConf() makes is clearer.

before :
  job.setInputFormat(
       LzoFormat.getInformat(job, ThriftType.class)
   );
now:
  LzoFormat.setClassConf(conf, ThriftType.class);
  job.setInputFormat(LzoFormat.class);
  • Loading branch information...
1 parent fe8fa0c commit b92bea064aa4b07699bde13fad21961a7794dcfc Raghu Angadi committed Apr 3, 2012
Showing with 98 additions and 176 deletions.
  1. +4 −2 examples/src/java/com/twitter/elephantbird/examples/ProtobufMRExample.java
  2. +4 −2 examples/src/java/com/twitter/elephantbird/examples/ThriftMRExample.java
  3. +2 −3 src/java/com/twitter/elephantbird/cascading2/scheme/LzoProtobufScheme.java
  4. +2 −3 src/java/com/twitter/elephantbird/cascading2/scheme/LzoThriftScheme.java
  5. +1 −1 src/java/com/twitter/elephantbird/mapred/input/DeprecatedInputFormatWrapper.java
  6. +6 −8 src/java/com/twitter/elephantbird/mapred/input/DeprecatedLzoProtobufBlockInputFormat.java
  7. +4 −20 src/java/com/twitter/elephantbird/mapred/input/DeprecatedLzoThriftB64LineInputFormat.java
  8. +6 −6 src/java/com/twitter/elephantbird/mapred/output/DeprecatedLzoProtobufBlockOutputFormat.java
  9. +6 −5 src/java/com/twitter/elephantbird/mapred/output/DeprecatedLzoThriftB64LineOutputFormat.java
  10. +6 −5 src/java/com/twitter/elephantbird/mapred/output/DeprecatedLzoThriftBlockOutputFormat.java
  11. +2 −21 src/java/com/twitter/elephantbird/mapreduce/input/LzoProtobufB64LineInputFormat.java
  12. +2 −21 src/java/com/twitter/elephantbird/mapreduce/input/LzoProtobufBlockInputFormat.java
  13. +2 −18 src/java/com/twitter/elephantbird/mapreduce/input/LzoThriftB64LineInputFormat.java
  14. +0 −18 src/java/com/twitter/elephantbird/mapreduce/input/LzoThriftBlockInputFormat.java
  15. +1 −1 src/java/com/twitter/elephantbird/mapreduce/input/MultiInputFormat.java
  16. +6 −9 src/java/com/twitter/elephantbird/mapreduce/output/LzoProtobufB64LineOutputFormat.java
  17. +6 −9 src/java/com/twitter/elephantbird/mapreduce/output/LzoProtobufBlockOutputFormat.java
  18. +10 −8 src/java/com/twitter/elephantbird/mapreduce/output/LzoThriftB64LineOutputFormat.java
  19. +10 −8 src/java/com/twitter/elephantbird/mapreduce/output/LzoThriftBlockOutputFormat.java
  20. +16 −6 src/java/com/twitter/elephantbird/util/HadoopUtils.java
  21. +1 −1 src/java/com/twitter/elephantbird/util/Protobufs.java
  22. +1 −1 src/java/com/twitter/elephantbird/util/ThriftUtils.java
@@ -66,9 +66,11 @@ public int runTextToLzo(String[] args, Configuration conf) throws Exception {
job.setInputFormatClass(TextInputFormat.class);
if (conf.get("proto.test.format", "B64Line").equals("Block")) {
- job.setOutputFormatClass(LzoProtobufBlockOutputFormat.getOutputFormatClass(Age.class, job.getConfiguration()));
+ LzoProtobufBlockOutputFormat.setClassConf(Age.class, job.getConfiguration());
+ job.setOutputFormatClass(LzoProtobufBlockOutputFormat.class);
} else { // assume B64Line
- job.setOutputFormatClass(LzoProtobufB64LineOutputFormat.getOutputFormatClass(Age.class, job.getConfiguration()));
+ LzoProtobufB64LineOutputFormat.setClassConf(Age.class, job.getConfiguration());
+ job.setOutputFormatClass(LzoProtobufB64LineOutputFormat.class);
}
FileInputFormat.setInputPaths(job, new Path(args[0]));
@@ -62,9 +62,11 @@ public int runTextToLzo(String[] args, Configuration conf) throws Exception {
job.setInputFormatClass(TextInputFormat.class);
if (conf.get("thrift.test.format", "B64Line").equals("Block")) {
- job.setOutputFormatClass(LzoThriftBlockOutputFormat.getOutputFormatClass(Age.class, job.getConfiguration()));
+ LzoThriftBlockOutputFormat.setClassConf(Age.class, job.getConfiguration());
+ job.setOutputFormatClass(LzoThriftBlockOutputFormat.class);
} else { // assume B64Line
- job.setOutputFormatClass(LzoThriftB64LineOutputFormat.getOutputFormatClass(Age.class, job.getConfiguration()));
+ LzoThriftB64LineOutputFormat.setClassConf(Age.class, job.getConfiguration());
+ job.setOutputFormatClass(LzoThriftB64LineOutputFormat.class);
}
FileInputFormat.setInputPaths(job, new Path(args[0]));
@@ -41,9 +41,8 @@ public LzoProtobufScheme(Class protoClass) {
@Override
public void sinkConfInit(HadoopFlowProcess hfp, Tap tap, JobConf conf) {
- conf.setOutputFormat(
- DeprecatedLzoProtobufBlockOutputFormat.getOutputFormatClass(protoClass, conf)
- );
+ DeprecatedLzoProtobufBlockOutputFormat.setClassConf(protoClass, conf);
+ conf.setOutputFormat(DeprecatedLzoProtobufBlockOutputFormat.class);
}
@Override
@@ -36,9 +36,8 @@ public LzoThriftScheme(Class thriftClass) {
@Override
public void sinkConfInit(HadoopFlowProcess hfp, Tap tap, JobConf conf) {
- conf.setOutputFormat(
- DeprecatedLzoThriftBlockOutputFormat.getOutputFormatClass(thriftClass, conf)
- );
+ DeprecatedLzoThriftBlockOutputFormat.setClassConf(thriftClass, conf);
+ conf.setOutputFormat(DeprecatedLzoThriftBlockOutputFormat.class);
}
protected ThriftWritable<M> prepareBinaryWritable() {
@@ -59,7 +59,7 @@
*/
public static void setInputFormat(Class<?> realInputFormatClass, JobConf jobConf) {
jobConf.setInputFormat(DeprecatedInputFormatWrapper.class);
- HadoopUtils.setInputFormatClass(jobConf, CLASS_CONF_KEY, realInputFormatClass);
+ HadoopUtils.setClassConf(jobConf, CLASS_CONF_KEY, realInputFormatClass);
}
@SuppressWarnings("unchecked")
@@ -4,6 +4,8 @@
import com.twitter.elephantbird.mapreduce.io.ProtobufWritable;
import com.twitter.elephantbird.util.TypeRef;
import com.twitter.elephantbird.util.Protobufs;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -45,15 +47,11 @@ protected void setProtobufWritable(W protobufWritable) {
}
/**
- * Returns {@link DeprecatedLzoProtobufBlockInputFormat} class.
- * Sets an internal configuration in jobConf so that remote Tasks
- * instantiate appropriate object based on protoClass.
+ * Stores supplied class name in configuration. This configuration is
+ * read on the remote tasks to initialize the input format correctly.
*/
- @SuppressWarnings("unchecked")
- public static <M extends Message> Class<DeprecatedLzoProtobufBlockInputFormat>
- getInputFormatClass(Class<M> protoClass, JobConf jobConf) {
- Protobufs.setClassConf(jobConf, DeprecatedLzoProtobufBlockInputFormat.class, protoClass);
- return DeprecatedLzoProtobufBlockInputFormat.class;
+ public static void setClassConf(Class<? extends Message> protoClass, Configuration conf) {
+ Protobufs.setClassConf(conf, DeprecatedLzoProtobufBlockInputFormat.class, protoClass);
}
@Override
@@ -23,30 +23,14 @@
extends DeprecatedLzoInputFormat<LongWritable, ThriftWritable<M>> {
/**
- * Returns DeprecatedLzoThriftB64LineInputFormat class for setting up a job.
- * Sets an internal configuration in jobConf so that Task instantiates
- * appropriate object for this generic class based on thriftClass
+ * Stores supplied class name in configuration. This configuration is
+ * read on the remote tasks to initialize the input format correctly.
*/
- //@SuppressWarnings("unchecked")
- public static <M extends TBase<?, ?>> Class<DeprecatedLzoThriftB64LineInputFormat>
- getInputFormatClass(Class<M> thriftClass, Configuration jobConf) {
- return getInputFormatClass(
- DeprecatedLzoThriftB64LineInputFormat.class, thriftClass, jobConf);
+ public static void setClassConf(Class<? extends TBase<?, ?>> thriftClass, Configuration conf) {
+ ThriftUtils.setClassConf(conf, DeprecatedLzoThriftB64LineInputFormat.class, thriftClass);
}
/**
- * Sets an internal configuration in jobConf so that Task instantiates
- * appropriate object for this generic class based on thriftClass.
- * Returns formatClass.
- */
- public static <T extends InputFormat, M extends TBase<?, ?>> Class<T> getInputFormatClass(
- Class<T> formatClass, Class<M> thriftClass, Configuration jobConf) {
- ThriftUtils.setClassConf(jobConf, formatClass, thriftClass);
- return formatClass;
- }
-
-
- /**
* Return a DeprecatedLzoThriftB64LineRecordReader to handle the work.
* @throws IOException
*/
@@ -1,7 +1,6 @@
package com.twitter.elephantbird.mapred.output;
import com.twitter.elephantbird.mapreduce.io.ProtobufBlockWriter;
-import com.twitter.elephantbird.mapreduce.io.ProtobufConverter;
import com.twitter.elephantbird.mapreduce.io.ProtobufWritable;
import com.twitter.elephantbird.util.TypeRef;
import com.twitter.elephantbird.util.Protobufs;
@@ -22,12 +21,13 @@
*/
public class DeprecatedLzoProtobufBlockOutputFormat<M extends Message>
extends DeprecatedLzoOutputFormat<NullWritable, ProtobufWritable<M>> {
- @SuppressWarnings("unchecked")
- public static <M extends Message> Class<DeprecatedLzoProtobufBlockOutputFormat>
- getOutputFormatClass(Class<M> protoClass, Configuration jobConf) {
- Protobufs.setClassConf(jobConf, DeprecatedLzoProtobufBlockOutputFormat.class, protoClass);
- return DeprecatedLzoProtobufBlockOutputFormat.class;
+ /**
+ * Stores supplied class name in configuration. This configuration is
+ * read on the remote tasks to initialize the output format correctly.
+ */
+ public static void setClassConf(Class<? extends Message> protoClass, Configuration conf) {
+ Protobufs.setClassConf(conf, DeprecatedLzoProtobufBlockOutputFormat.class, protoClass);
}
@Override
@@ -21,12 +21,13 @@
*/
public class DeprecatedLzoThriftB64LineOutputFormat<M extends TBase<?, ?>>
extends DeprecatedLzoOutputFormat<NullWritable, ThriftWritable<M>> {
- @SuppressWarnings("unchecked")
- public static <M extends TBase<?, ?>> Class<DeprecatedLzoThriftB64LineOutputFormat>
- getOutputFormatClass(Class<M> thriftClass, Configuration jobConf) {
- ThriftUtils.setClassConf(jobConf, DeprecatedLzoThriftB64LineOutputFormat.class, thriftClass);
- return DeprecatedLzoThriftB64LineOutputFormat.class;
+ /**
+ * Stores supplied class name in configuration. This configuration is
+ * read on the remote tasks to initialize the output format correctly.
+ */
+ public static void setClassConf(Class<? extends TBase<?, ?>> thriftClass, Configuration conf) {
+ ThriftUtils.setClassConf(conf, DeprecatedLzoThriftB64LineOutputFormat.class, thriftClass);
}
@Override
@@ -20,12 +20,13 @@
*/
public class DeprecatedLzoThriftBlockOutputFormat<M extends TBase<?, ?>>
extends DeprecatedLzoOutputFormat<NullWritable, ThriftWritable<M>> {
- @SuppressWarnings("unchecked")
- public static <M extends TBase<?, ?>> Class<DeprecatedLzoThriftBlockOutputFormat>
- getOutputFormatClass(Class<M> thriftClass, Configuration jobConf) {
- ThriftUtils.setClassConf(jobConf, DeprecatedLzoThriftBlockOutputFormat.class, thriftClass);
- return DeprecatedLzoThriftBlockOutputFormat.class;
+ /**
+ * Stores supplied class name in configuration. This configuration is
+ * read on the remote tasks to initialize the output format correctly.
+ */
+ public static void setClassConf(Class<? extends TBase<?, ?>> thriftClass, Configuration conf) {
+ ThriftUtils.setClassConf(conf, DeprecatedLzoThriftBlockOutputFormat.class, thriftClass);
}
@Override
@@ -1,7 +1,5 @@
package com.twitter.elephantbird.mapreduce.input;
-import org.apache.hadoop.conf.Configuration;
-
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.TypeRef;
@@ -10,14 +8,11 @@
* Data is expected to be one base64 encoded serialized protocol buffer per line.
* <br><br>
*
- * Do not use LzoProtobufB64LineInputFormat.class directly for setting
- * InputFormat class for a job. Use getInputFormatClass() or newInstance(typeRef) instead.
- *
- * <p>
* A small fraction of bad records are tolerated. See {@link LzoRecordReader}
* for more information on error handling.
+ *
+ * @Deprecated use {@link MultiInputFormat}
*/
-
public class LzoProtobufB64LineInputFormat<M extends Message> extends MultiInputFormat<M> {
public LzoProtobufB64LineInputFormat() {
@@ -27,20 +22,6 @@ public LzoProtobufB64LineInputFormat(TypeRef<M> typeRef) {
super(typeRef);
}
- /**
- * Returns {@link LzoProtobufB64LineInputFormat} class.
- * Sets an internal configuration in jobConf so that remote Tasks
- * instantiate appropriate object based on protoClass.
- *
- * @Deprecated Use {@link MultiInputFormat#setInputFormatClass(Class, org.apache.hadoop.mapreduce.Job)
- */
- @SuppressWarnings("rawtypes")
- public static <M extends Message> Class<LzoProtobufB64LineInputFormat>
- getInputFormatClass(Class<M> protoClass, Configuration jobConf) {
- setClassConf(protoClass, jobConf);
- return LzoProtobufB64LineInputFormat.class;
- }
-
public static<M extends Message> LzoProtobufB64LineInputFormat<M> newInstance(TypeRef<M> typeRef) {
return new LzoProtobufB64LineInputFormat<M>(typeRef);
}
@@ -1,7 +1,5 @@
package com.twitter.elephantbird.mapreduce.input;
-import org.apache.hadoop.conf.Configuration;
-
import com.google.protobuf.Message;
import com.twitter.elephantbird.util.TypeRef;
@@ -10,14 +8,11 @@
* the ProtobufBlockWriter to write your data, this input format can read it.
* <br> <br>
*
- * Do not use LzoProtobufBlockInputFormat.class directly for setting
- * InputFormat class for a job. Use getInputFormatClass() instead.<p>
- *
- * <p>
* A small fraction of bad records are tolerated. See {@link LzoRecordReader}
* for more information on error handling.
+ *
+ * @Deprecated use {@link MultiInputFormat}
*/
-
public class LzoProtobufBlockInputFormat<M extends Message> extends MultiInputFormat<M> {
public LzoProtobufBlockInputFormat() {
@@ -27,20 +22,6 @@ public LzoProtobufBlockInputFormat(TypeRef<M> typeRef) {
super(typeRef);
}
- /**
- * Returns {@link LzoProtobufBlockInputFormat} class.
- * Sets an internal configuration in jobConf so that remote Tasks
- * instantiate appropriate object based on protoClass.
- *
- * @Deprecated Use {@link MultiInputFormat#setInputFormatClass(Class, org.apache.hadoop.mapreduce.Job)
- */
- @SuppressWarnings("unchecked")
- public static <M extends Message> Class<LzoProtobufBlockInputFormat>
- getInputFormatClass(Class<M> protoClass, Configuration jobConf) {
- setClassConf(protoClass, jobConf);
- return LzoProtobufBlockInputFormat.class;
- }
-
public static<M extends Message> LzoProtobufBlockInputFormat<M> newInstance(TypeRef<M> typeRef) {
return new LzoProtobufBlockInputFormat<M>(typeRef);
}
@@ -1,6 +1,5 @@
package com.twitter.elephantbird.mapreduce.input;
-import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.TBase;
import com.twitter.elephantbird.util.TypeRef;
@@ -10,12 +9,10 @@
* deserializes that into the Thrift object.
* Returns <position, thriftObject> pairs. <br><br>
*
- * Do not use LzoThriftB64LineInputFormat.class directly for setting
- * InputFormat class for a job. Use getInputFormatClass() instead.<p>
- *
- * <p>
* A small fraction of bad records are tolerated. See {@link LzoRecordReader}
* for more information on error handling.
+ *
+ * @Deprecated use {@link MultiInputFormat}
*/
public class LzoThriftB64LineInputFormat<M extends TBase<?, ?>> extends MultiInputFormat<M> {
@@ -24,17 +21,4 @@ public LzoThriftB64LineInputFormat() {}
public LzoThriftB64LineInputFormat(TypeRef<M> typeRef) {
super(typeRef);
}
-
- /**
- * Returns {@link LzoThriftB64LineInputFormat} class for setting up a job.
- * Sets an internal configuration in jobConf so that Task instantiates
- * appropriate object for this generic class based on thriftClass
- *
- * @Deprecated Use {@link MultiInputFormat#setInputFormatClass(Class, org.apache.hadoop.mapreduce.Job)
- */
- public static <M extends TBase<?, ?>> Class<LzoThriftB64LineInputFormat>
- getInputFormatClass(Class<M> thriftClass, Configuration jobConf) {
- setClassConf(thriftClass, jobConf);
- return LzoThriftB64LineInputFormat.class;
- }
}
@@ -9,10 +9,6 @@
* Reads Thrift objects written in blocks using LzoThriftBlockOutputFormat
* <br><br>
*
- * Do not use LzoThriftBlockInputFormat.class directly for setting
- * InputFormat class for a job. Use getInputFormatClass() instead.
- *
- * <p>
* A small fraction of bad records are tolerated. See {@link LzoRecordReader}
* for more information on error handling.
*/
@@ -23,18 +19,4 @@ public LzoThriftBlockInputFormat() {}
public LzoThriftBlockInputFormat(TypeRef<M> typeRef) {
super(typeRef);
}
-
- /**
- * Returns {@link LzoThriftBlockInputFormat} class for setting up a job.
- * Sets an internal configuration in jobConf so that Task instantiates
- * appropriate object for this generic class based on thriftClass
- *
- * @Deprecated Use {@link MultiInputFormat#setInputFormatClass(Class, org.apache.hadoop.mapreduce.Job)
- */
- @SuppressWarnings("unchecked")
- public static <M extends TBase<?, ?>> Class<LzoThriftBlockInputFormat>
- getInputFormatClass(Class<M> thriftClass, Configuration jobConf) {
- setClassConf(thriftClass, jobConf);
- return LzoThriftBlockInputFormat.class;
- }
}
@@ -69,7 +69,7 @@ public static void setInputFormatClass(Class<?> clazz, Job job) {
* read on the remote tasks to initialize the input format correctly.
*/
public static void setClassConf(Class<?> clazz, Configuration conf) {
- HadoopUtils.setInputFormatClass(conf, CLASS_CONF_KEY, clazz);
+ HadoopUtils.setClassConf(conf, CLASS_CONF_KEY, clazz);
}
@SuppressWarnings("unchecked") // return type is runtime dependent
@@ -16,8 +16,7 @@
* This is the class for all base64 encoded, line-oriented protocol buffer based output formats.
* Data is written as one base64 encoded serialized protocol buffer per line.<br><br>
*
- * Do not use LzoProtobufB64LineOutputFormat.class directly for setting
- * OutputFormat class for a job. Use getOutputFormatClass() or getInstance() instead.
+ * Do not forget to set Protobuf class using setClassConf().
*/
public class LzoProtobufB64LineOutputFormat<M extends Message> extends LzoOutputFormat<M, ProtobufWritable<M>> {
@@ -34,16 +33,14 @@ public LzoProtobufB64LineOutputFormat(TypeRef<M> typeRef) {
}
/**
- * Returns {@link LzoProtobufBlockOutputFormat} class.
* Sets an internal configuration in jobConf so that remote Tasks
* instantiate appropriate object for this generic class based on protoClass
*/
- @SuppressWarnings("unchecked")
- public static <M extends Message> Class<LzoProtobufB64LineOutputFormat>
- getOutputFormatClass(Class<M> protoClass, Configuration jobConf) {
-
- Protobufs.setClassConf(jobConf, LzoProtobufB64LineOutputFormat.class, protoClass);
- return LzoProtobufB64LineOutputFormat.class;
+ public static <M extends Message>
+ void setClassConf(Class<M> protoClass, Configuration jobConf) {
+ Protobufs.setClassConf(jobConf,
+ LzoProtobufB64LineOutputFormat.class,
+ protoClass);
}
@Override
Oops, something went wrong.

0 comments on commit b92bea0

Please sign in to comment.