diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index d59e755a1f706..7ff0fbc566f05 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -64,6 +64,12 @@ under the License.
${guava.version}
+
+ commons-io
+ commons-io
+ test
+
+
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 2264119b5c580..d0548b79854d9 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -305,7 +305,6 @@ under the License.
org.apache.maven.plugins
maven-shade-plugin
- 2.3
package
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index d7770480533ef..6b818e0f1a442 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -40,7 +40,6 @@ under the License.
flink-core
${project.version}
-
org.apache.flink
${shading-artifact.name}
@@ -79,7 +78,7 @@ under the License.
de.javakaffee
kryo-serializers
- 0.27
+ 0.36
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index d50ddb4a8f0ac..d3d8192676fba 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -569,8 +569,8 @@ public DataSource> createHadoopInput(org.apache.hadoop.mapred
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
* given inputName is set on the given job.
*/
- public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapredInputFormat, Class key, Class value, String inputPath, Job job) throws IOException {
- DataSource> result = createHadoopInput(mapredInputFormat, key, value, job);
+ public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapreduceInputFormat, Class key, Class value, String inputPath, Job job) throws IOException {
+ DataSource> result = createHadoopInput(mapreduceInputFormat, key, value, job);
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache
.hadoop.fs.Path(inputPath));
@@ -582,15 +582,15 @@ public DataSource> readHadoopFile(org.apache.hadoop.mapreduce
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
* {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
*/
- public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapredInputFormat, Class key, Class value, String inputPath) throws IOException {
- return readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance());
+ public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapreduceInputFormat, Class key, Class value, String inputPath) throws IOException {
+ return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
}
/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
*/
- public DataSource> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat mapredInputFormat, Class key, Class value, Job job) {
- org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat(mapredInputFormat, key, value, job);
+ public DataSource> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat mapreduceInputFormat, Class key, Class value, Job job) {
+ org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat(mapreduceInputFormat, key, value, job);
return this.createInput(hadoopInputFormat);
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java
new file mode 100644
index 0000000000000..301265f7e7756
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.hadoop.common;
+
+import org.apache.flink.api.common.io.RichInputFormat;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * A common base for both "mapred" and "mapreduce" Hadoop input formats.
+ */
+public abstract class HadoopInputFormatCommonBase extends RichInputFormat {
+ protected transient Credentials credentials;
+
+ protected HadoopInputFormatCommonBase(Credentials creds) {
+ this.credentials = creds;
+ }
+
+ protected void write(ObjectOutputStream out) throws IOException {
+ this.credentials.write(out);
+ }
+
+ public void read(ObjectInputStream in) throws IOException {
+ this.credentials = new Credentials();
+ credentials.readFields(in);
+ }
+
+
+
+ /**
+ * This method only exists because there is no UserGroupInformation.getCredentials() method
+ * in Hadoop 1.x
+ *
+ * Note that this method returns "null" in Hadoop 1.x environments.
+ *
+ * @param ugi The user information
+ * @return new credentials object from the user information. MAY RETURN NULL!
+ */
+ public static Credentials getCredentialsFromUGI(UserGroupInformation ugi) {
+ Method getCredentialsMethod = null;
+ for(Method m: ugi.getClass().getMethods()) {
+ if(m.getName().equals("getCredentials")) {
+ getCredentialsMethod = m;
+ break;
+ }
+ }
+ if(getCredentialsMethod == null) {
+ return null;
+ } else {
+ try {
+ return (Credentials) getCredentialsMethod.invoke(ugi);
+ } catch (InvocationTargetException | IllegalAccessException e) {
+ throw new RuntimeException("Unable to get credentials from UserGroupInformation. This is only supported by Hadoop 2.2.0+");
+ }
+ }
+ }
+}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java
new file mode 100644
index 0000000000000..de611ce1500a6
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.hadoop.common;
+
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.hadoop.security.Credentials;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * A common base for both "mapred" and "mapreduce" Hadoop output formats.
+ *
+ * The base is taking care of handling (serializing) security credentials.
+ */
+public abstract class HadoopOutputFormatCommonBase extends RichOutputFormat {
+ protected transient Credentials credentials;
+
+ protected HadoopOutputFormatCommonBase(Credentials creds) {
+ this.credentials = creds;
+ }
+
+ protected void write(ObjectOutputStream out) throws IOException {
+ this.credentials.write(out);
+ }
+
+ public void read(ObjectInputStream in) throws IOException {
+ this.credentials = new Credentials();
+ credentials.readFields(in);
+ }
+}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
index 64cf9da8f6b07..126b629b0dd19 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java
@@ -27,6 +27,14 @@
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.hadoop.mapred.JobConf;
+/**
+ * Wrapper for using HadoopInputFormats (mapred-variant) with Flink.
+ *
+ * The IF is returning a Tuple2.
+ *
+ * @param Type of the key
+ * @param Type of the value.
+ */
public class HadoopInputFormat extends HadoopInputFormatBase> implements ResultTypeQueryable> {
private static final long serialVersionUID = 1L;
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
index b24e3517ddb40..932b7deeacc31 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java
@@ -16,13 +16,12 @@
* limitations under the License.
*/
-
package org.apache.flink.api.java.hadoop.mapred;
import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
@@ -36,6 +35,8 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +46,14 @@
import java.io.ObjectOutputStream;
import java.util.ArrayList;
-public abstract class HadoopInputFormatBase extends RichInputFormat {
+/**
+ * Common base for Java and Scala API for using Hadoop input formats with Flink.
+ *
+ * @param Type of key
+ * @param Type of value
+ * @param The type iself
+ */
+public abstract class HadoopInputFormatBase extends HadoopInputFormatCommonBase {
private static final long serialVersionUID = 1L;
@@ -64,7 +72,7 @@ public abstract class HadoopInputFormatBase extends RichInputFormat mapredInputFormat, Class key, Class value, JobConf job) {
- super();
+ super(job.getCredentials());
this.mapredInputFormat = mapredInputFormat;
this.keyClass = key;
this.valueClass = value;
@@ -225,6 +233,7 @@ private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apac
// --------------------------------------------------------------------------------------------
private void writeObject(ObjectOutputStream out) throws IOException {
+ super.write(out);
out.writeUTF(mapredInputFormat.getClass().getName());
out.writeUTF(keyClass.getName());
out.writeUTF(valueClass.getName());
@@ -233,6 +242,8 @@ private void writeObject(ObjectOutputStream out) throws IOException {
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ super.read(in);
+
String hadoopInputFormatClassName = in.readUTF();
String keyClassName = in.readUTF();
String valueClassName = in.readUTF();
@@ -256,5 +267,12 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
throw new RuntimeException("Unable to find value class.", e);
}
ReflectionUtils.setConf(mapredInputFormat, jobConf);
+
+ jobConf.getCredentials().addAll(this.credentials);
+ Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
+ if(currentUserCreds != null) {
+ jobConf.getCredentials().addAll(currentUserCreds);
+ }
}
+
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
index 44cfb57871e46..a62fc67c76934 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java
@@ -23,6 +23,14 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.JobConf;
+/**
+ * Wrapper for using HadoopOutputFormats (mapred-variant) with Flink.
+ *
+ * The IF is returning a Tuple2.
+ *
+ * @param Type of the key
+ * @param Type of the value.
+ */
public class HadoopOutputFormat extends HadoopOutputFormatBase> {
private static final long serialVersionUID = 1L;
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
index a5baa7e2ad71c..bbf7efb191389 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
@@ -20,7 +20,7 @@
package org.apache.flink.api.java.hadoop.mapred;
import org.apache.flink.api.common.io.FinalizeOnMaster;
-import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
@@ -34,14 +34,24 @@
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
-public abstract class HadoopOutputFormatBase extends RichOutputFormat implements FinalizeOnMaster {
+/**
+ * Common base for the mapred HadoopOutputFormat wrappers. There are implementations for Java and Scala.
+ *
+ * @param Type of Key
+ * @param Type of Value
+ * @param Record type.
+ */
+public abstract class HadoopOutputFormatBase extends HadoopOutputFormatCommonBase implements FinalizeOnMaster {
private static final long serialVersionUID = 1L;
@@ -50,9 +60,9 @@ public abstract class HadoopOutputFormatBase extends RichOutputFormat recordWriter;
private transient FileOutputCommitter fileOutputCommitter;
private transient TaskAttemptContext context;
- private transient JobContext jobContext;
public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat mapredOutputFormat, JobConf job) {
+ super(job.getCredentials());
this.mapredOutputFormat = mapredOutputFormat;
HadoopUtils.mergeHadoopConf(job);
this.jobConf = job;
@@ -108,8 +118,9 @@ public void open(int taskNumber, int numTasks) throws IOException {
this.fileOutputCommitter = new FileOutputCommitter();
+ JobContext jobContext;
try {
- this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
+ jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -151,12 +162,14 @@ public void finalizeGlobal(int parallelism) throws IOException {
// --------------------------------------------------------------------------------------------
private void writeObject(ObjectOutputStream out) throws IOException {
+ super.write(out);
out.writeUTF(mapredOutputFormat.getClass().getName());
jobConf.write(out);
}
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ super.read(in);
String hadoopOutputFormatName = in.readUTF();
if(jobConf == null) {
jobConf = new JobConf();
@@ -168,5 +181,11 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
throw new RuntimeException("Unable to instantiate the hadoop output format", e);
}
ReflectionUtils.setConf(mapredOutputFormat, jobConf);
+
+ jobConf.getCredentials().addAll(this.credentials);
+ Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
+ if(currentUserCreds != null) {
+ jobConf.getCredentials().addAll(currentUserCreds);
+ }
}
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
index 4d29d74df38d0..f0788c77f599e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java
@@ -27,6 +27,12 @@
import org.apache.hadoop.mapreduce.Job;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+/**
+ * InputFormat implementation allowing to use Hadoop (mapreduce) InputFormats with Flink.
+ *
+ * @param Key Type
+ * @param Value Type
+ */
public class HadoopInputFormat extends HadoopInputFormatBase> implements ResultTypeQueryable> {
private static final long serialVersionUID = 1L;
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
index 3693176762944..ee601d0f48a35 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java
@@ -19,9 +19,9 @@
package org.apache.flink.api.java.hadoop.mapreduce;
import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapreduce.wrapper.HadoopInputSplit;
import org.apache.flink.configuration.Configuration;
@@ -37,6 +37,8 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,12 +50,17 @@
import static com.google.common.base.Preconditions.checkNotNull;
-public abstract class HadoopInputFormatBase extends RichInputFormat {
+/**
+ * Base class shared between the Java and Scala API of Flink
+ */
+public abstract class HadoopInputFormatBase extends HadoopInputFormatCommonBase {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(HadoopInputFormatBase.class);
+ // NOTE: this class is using a custom serialization logic, without a defaultWriteObject() method.
+ // Hence, all fields here are "transient".
private org.apache.hadoop.mapreduce.InputFormat mapreduceInputFormat;
protected Class keyClass;
protected Class valueClass;
@@ -64,11 +71,11 @@ public abstract class HadoopInputFormatBase extends RichInputFormat mapreduceInputFormat, Class key, Class value, Job job) {
- super();
+ super(checkNotNull(job, "Job can not be null").getCredentials());
this.mapreduceInputFormat = checkNotNull(mapreduceInputFormat);
this.keyClass = checkNotNull(key);
this.valueClass = checkNotNull(value);
- this.configuration = checkNotNull(job).getConfiguration();
+ this.configuration = job.getConfiguration();
HadoopUtils.mergeHadoopConf(configuration);
}
@@ -94,7 +101,7 @@ public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExcepti
return null;
}
- JobContext jobContext = null;
+ JobContext jobContext;
try {
jobContext = HadoopUtils.instantiateJobContext(configuration, null);
} catch (Exception e) {
@@ -128,13 +135,19 @@ public HadoopInputSplit[] createInputSplits(int minNumSplits)
throws IOException {
configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits);
- JobContext jobContext = null;
+ JobContext jobContext;
try {
jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID());
} catch (Exception e) {
throw new RuntimeException(e);
}
+ jobContext.getCredentials().addAll(this.credentials);
+ Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
+ if(currentUserCreds != null) {
+ jobContext.getCredentials().addAll(currentUserCreds);
+ }
+
List splits;
try {
splits = this.mapreduceInputFormat.getSplits(jobContext);
@@ -156,7 +169,7 @@ public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits)
@Override
public void open(HadoopInputSplit split) throws IOException {
- TaskAttemptContext context = null;
+ TaskAttemptContext context;
try {
context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());
} catch(Exception e) {
@@ -255,6 +268,7 @@ private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apac
// --------------------------------------------------------------------------------------------
private void writeObject(ObjectOutputStream out) throws IOException {
+ super.write(out);
out.writeUTF(this.mapreduceInputFormat.getClass().getName());
out.writeUTF(this.keyClass.getName());
out.writeUTF(this.valueClass.getName());
@@ -263,6 +277,7 @@ private void writeObject(ObjectOutputStream out) throws IOException {
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ super.read(in);
String hadoopInputFormatClassName = in.readUTF();
String keyClassName = in.readUTF();
String valueClassName = in.readUTF();
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
index 26a2c0d20f10d..36d8ad5c0846f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java
@@ -23,6 +23,12 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapreduce.Job;
+/**
+ * OutputFormat implementation allowing to use Hadoop (mapreduce) OutputFormats with Flink.
+ *
+ * @param Key Type
+ * @param Value Type
+ */
public class HadoopOutputFormat extends HadoopOutputFormatBase> {
private static final long serialVersionUID = 1L;
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
index 14d5c81549ce4..d8260b09a3460 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java
@@ -19,7 +19,7 @@
package org.apache.flink.api.java.hadoop.mapreduce;
import org.apache.flink.api.common.io.FinalizeOnMaster;
-import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapreduce.utils.HadoopUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.conf.Configurable;
@@ -32,13 +32,19 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;
-public abstract class HadoopOutputFormatBase extends RichOutputFormat implements FinalizeOnMaster {
+/**
+ * Base class shared between the Java and Scala API of Flink
+ */
+public abstract class HadoopOutputFormatBase extends HadoopOutputFormatCommonBase implements FinalizeOnMaster {
private static final long serialVersionUID = 1L;
@@ -50,7 +56,7 @@ public abstract class HadoopOutputFormatBase extends RichOutputFormat mapreduceOutputFormat, Job job) {
- super();
+ super(job.getCredentials());
this.mapreduceOutputFormat = mapreduceOutputFormat;
this.configuration = job.getConfiguration();
HadoopUtils.mergeHadoopConf(configuration);
@@ -105,6 +111,12 @@ public void open(int taskNumber, int numTasks) throws IOException {
throw new RuntimeException(e);
}
+ this.context.getCredentials().addAll(this.credentials);
+ Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
+ if(currentUserCreds != null) {
+ this.context.getCredentials().addAll(currentUserCreds);
+ }
+
this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
try {
@@ -171,7 +183,13 @@ public void finalizeGlobal(int parallelism) throws IOException {
throw new RuntimeException(e);
}
this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext);
-
+
+ jobContext.getCredentials().addAll(this.credentials);
+ Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
+ if(currentUserCreds != null) {
+ jobContext.getCredentials().addAll(currentUserCreds);
+ }
+
// finalize HDFS output format
this.fileOutputCommitter.commitJob(jobContext);
}
@@ -181,12 +199,14 @@ public void finalizeGlobal(int parallelism) throws IOException {
// --------------------------------------------------------------------------------------------
private void writeObject(ObjectOutputStream out) throws IOException {
+ super.write(out);
out.writeUTF(this.mapreduceOutputFormat.getClass().getName());
this.configuration.write(out);
}
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ super.read(in);
String hadoopOutputFormatClassName = in.readUTF();
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index f3ca404d81be3..152b2e829ecce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -441,7 +441,7 @@ public boolean isDistributedFS() {
@Override
public Class> getHadoopWrapperClassNameForFileSystem(String scheme) {
Configuration hadoopConf = getHadoopConfiguration();
- Class extends org.apache.hadoop.fs.FileSystem> clazz = null;
+ Class extends org.apache.hadoop.fs.FileSystem> clazz;
// We can activate this block once we drop Hadoop1 support (only hd2 has the getFileSystemClass-method)
// try {
// clazz = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConf);
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 17311e9f9253a..cdf7211caea3d 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -419,13 +419,13 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* The given inputName is set on the given job.
*/
def readHadoopFile[K, V](
- mapredInputFormat: MapreduceFileInputFormat[K, V],
+ mapreduceInputFormat: MapreduceFileInputFormat[K, V],
key: Class[K],
value: Class[V],
inputPath: String,
job: Job)
(implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = {
- val result = createHadoopInput(mapredInputFormat, key, value, job)
+ val result = createHadoopInput(mapreduceInputFormat, key, value, job)
MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath))
result
}
@@ -436,25 +436,25 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be created.
*/
def readHadoopFile[K, V](
- mapredInputFormat: MapreduceFileInputFormat[K, V],
+ mapreduceInputFormat: MapreduceFileInputFormat[K, V],
key: Class[K],
value: Class[V],
inputPath: String)
(implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = {
- readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance)
+ readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance)
}
/**
* Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.InputFormat]].
*/
def createHadoopInput[K, V](
- mapredInputFormat: MapreduceInputFormat[K, V],
+ mapreduceInputFormat: MapreduceInputFormat[K, V],
key: Class[K],
value: Class[V],
job: Job)
(implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = {
val hadoopInputFormat =
- new mapreduce.HadoopInputFormat[K, V](mapredInputFormat, key, value, job)
+ new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job)
createInput(hadoopInputFormat)
}
diff --git a/flink-shaded-hadoop/pom.xml b/flink-shaded-hadoop/pom.xml
index 8b4993eef47b7..d7e315828503d 100644
--- a/flink-shaded-hadoop/pom.xml
+++ b/flink-shaded-hadoop/pom.xml
@@ -61,7 +61,6 @@ under the License.
org.apache.maven.plugins
maven-shade-plugin
- 2.3
shade-hadoop
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
index 6bba57f5f6a9a..b4f5ad20431b6 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
@@ -152,7 +152,6 @@ under the License.
org.apache.maven.plugins
maven-shade-plugin
- 2.3
shade-flink
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/pom.xml
index 9499ea566b353..f1ef0f0e669b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/pom.xml
@@ -74,7 +74,6 @@ under the License.
org.apache.maven.plugins
maven-shade-plugin
- 2.3
shade-flink
diff --git a/flink-yarn-tests/pom.xml b/flink-yarn-tests/pom.xml
index b912fb0ab039a..2444ee4bf86c5 100644
--- a/flink-yarn-tests/pom.xml
+++ b/flink-yarn-tests/pom.xml
@@ -130,7 +130,6 @@ under the License.
org.apache.maven.plugins
maven-shade-plugin
- 2.3
shade-hadoop
diff --git a/pom.xml b/pom.xml
index 6af03550aed10..b08216254a667 100644
--- a/pom.xml
+++ b/pom.xml
@@ -906,7 +906,6 @@ under the License.
org.apache.maven.plugins
maven-shade-plugin
- 2.3
shade-flink
@@ -962,6 +961,12 @@ under the License.
-->
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.4.1
+