From a4847ea276bcc71433ed421d8eb40548894d5a84 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 10 Jun 2015 10:42:42 +0200 Subject: [PATCH] [FLINK-2195] Configure Configurable Hadoop InputFormats --- .../hadoop/mapreduce/HadoopInputFormatBase.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java index 2a6c0f4120fb1..1236884d6e48a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.api.java.hadoop.mapreduce; import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; @@ -30,6 +29,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -46,6 +46,8 @@ import java.util.ArrayList; import java.util.List; +import static com.google.common.base.Preconditions.checkNotNull; + public abstract class HadoopInputFormatBase implements InputFormat { private static final long serialVersionUID = 1L; @@ -63,10 +65,10 @@ public abstract class HadoopInputFormatBase implements InputFormat mapreduceInputFormat, Class key, Class value, Job job) { super(); - this.mapreduceInputFormat = mapreduceInputFormat; - this.keyClass = key; - this.valueClass = value; - this.configuration = job.getConfiguration(); + this.mapreduceInputFormat = checkNotNull(mapreduceInputFormat); + this.keyClass = checkNotNull(key); + this.valueClass = checkNotNull(value); + this.configuration = checkNotNull(job).getConfiguration(); HadoopUtils.mergeHadoopConf(configuration); } @@ -80,7 +82,9 @@ public org.apache.hadoop.conf.Configuration getConfiguration() { @Override public void configure(Configuration parameters) { - // nothing to do + if (mapreduceInputFormat instanceof Configurable) { + ((Configurable) mapreduceInputFormat).setConf(configuration); + } } @Override