Skip to content

Commit

Permalink
[FLINK-1271] [hadoop] Remove Writable limitation from Hadoop format a…
Browse files Browse the repository at this point in the history
…nd function wrappers

This closes #287
  • Loading branch information
FelixNeutatz authored and fhueske committed Jan 15, 2015
1 parent d62ab47 commit ba7a19c
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 96 deletions.
Expand Up @@ -25,6 +25,7 @@
import java.util.ArrayList; import java.util.ArrayList;


import org.apache.flink.api.common.io.LocatableInputSplitAssigner; import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.io.InputFormat;
Expand All @@ -34,7 +35,6 @@
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystem;
Expand All @@ -44,13 +44,12 @@
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;


public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> { public class HadoopInputFormat<K, V> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;


Expand Down Expand Up @@ -293,6 +292,6 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE


@Override @Override
public TypeInformation<Tuple2<K,V>> getProducedType() { public TypeInformation<Tuple2<K,V>> getProducedType() {
return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass)); return new TupleTypeInfo<Tuple2<K,V>>(TypeExtractor.createTypeInfo(keyClass), TypeExtractor.createTypeInfo(valueClass));
} }
} }
Expand Up @@ -29,14 +29,11 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
Expand All @@ -45,8 +42,7 @@
* This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
*/ */
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public final class HadoopMapFunction<KEYIN extends WritableComparable, VALUEIN extends Writable, public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
KEYOUT extends WritableComparable, VALUEOUT extends Writable>
extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {


Expand Down Expand Up @@ -108,8 +104,8 @@ public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2); Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3); Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3);


final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass); final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass); final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
} }


Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils; import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileOutputCommitter; import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.JobContext;
Expand All @@ -41,7 +40,7 @@
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;




public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>>, FinalizeOnMaster { public class HadoopOutputFormat<K,V> implements OutputFormat<Tuple2<K,V>>, FinalizeOnMaster {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;


Expand Down
Expand Up @@ -29,15 +29,12 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
Expand All @@ -47,8 +44,7 @@
*/ */
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
@org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable @org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
public final class HadoopReduceCombineFunction<KEYIN extends WritableComparable, VALUEIN extends Writable, public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
KEYOUT extends WritableComparable, VALUEOUT extends Writable>
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {


Expand Down Expand Up @@ -132,9 +128,9 @@ public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collecto
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);

final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass); final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass); final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
} }


Expand Down
Expand Up @@ -29,15 +29,12 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.WritableTypeInfo;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.Reporter;
Expand All @@ -46,8 +43,7 @@
* This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction. * This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
*/ */
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public final class HadoopReduceFunction<KEYIN extends WritableComparable, VALUEIN extends Writable, public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
KEYOUT extends WritableComparable, VALUEOUT extends Writable>
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {


Expand Down Expand Up @@ -113,9 +109,9 @@ public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector
public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3);

final WritableTypeInfo<KEYOUT> keyTypeInfo = new WritableTypeInfo<KEYOUT>(outKeyClass); final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
final WritableTypeInfo<VALUEOUT> valueTypleInfo = new WritableTypeInfo<VALUEOUT>(outValClass); final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo);
} }


Expand Down
Expand Up @@ -20,8 +20,6 @@


import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector; import org.apache.flink.util.Collector;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.OutputCollector;


import java.io.IOException; import java.io.IOException;
Expand All @@ -32,7 +30,7 @@
* *
*/ */
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public final class HadoopOutputCollector<KEY extends WritableComparable, VALUE extends Writable> public final class HadoopOutputCollector<KEY,VALUE>
implements OutputCollector<KEY,VALUE> { implements OutputCollector<KEY,VALUE> {


private Collector<Tuple2<KEY,VALUE>> flinkCollector; private Collector<Tuple2<KEY,VALUE>> flinkCollector;
Expand Down Expand Up @@ -63,4 +61,4 @@ public void collect(final KEY key, final VALUE val) throws IOException {
this.flinkCollector.collect(outTuple); this.flinkCollector.collect(outTuple);
} }


} }
Expand Up @@ -20,32 +20,30 @@


import java.util.Iterator; import java.util.Iterator;


import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator; import org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator;
import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.WritableSerializer; import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;



/** /**
* Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field. * Wraps a Flink Tuple2 (key-value-pair) iterator into an iterator over the second (value) field.
*/ */
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
public class HadoopTupleUnwrappingIterator<KEY extends WritableComparable, VALUE extends Writable> public class HadoopTupleUnwrappingIterator<KEY,VALUE>
extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable { extends TupleUnwrappingIterator<VALUE, KEY> implements java.io.Serializable {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;


private Iterator<Tuple2<KEY,VALUE>> iterator; private Iterator<Tuple2<KEY,VALUE>> iterator;


private final WritableSerializer<KEY> keySerializer; private final TypeSerializer<KEY> keySerializer;


private boolean atFirst = false; private boolean atFirst = false;
private KEY curKey = null; private KEY curKey = null;
private VALUE firstValue = null; private VALUE firstValue = null;


public HadoopTupleUnwrappingIterator(Class<KEY> keyClass) { public HadoopTupleUnwrappingIterator(Class<KEY> keyClass) {
this.keySerializer = new WritableSerializer<KEY>(keyClass); this.keySerializer = TypeExtractor.getForClass((Class<KEY>) keyClass).createSerializer();
} }


/** /**
Expand Down

0 comments on commit ba7a19c

Please sign in to comment.