Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ under the License.
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
1 change: 0 additions & 1 deletion flink-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ under the License.
<!--Build uber jar-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
Expand Down
3 changes: 1 addition & 2 deletions flink-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ under the License.
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${shading-artifact.name}</artifactId>
Expand Down Expand Up @@ -79,7 +78,7 @@ under the License.
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.27</version>
<version>0.36</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,8 @@ public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
* given inputName is set on the given job.
*/
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
DataSource<Tuple2<K, V>> result = createHadoopInput(mapreduceInputFormat, key, value, job);

org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache
.hadoop.fs.Path(inputPath));
Expand All @@ -582,15 +582,15 @@ public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
* {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
*/
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
return readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance());
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
}

/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
*/
public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, Job job) {
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapreduceInputFormat, key, value, job);

return this.createInput(hadoopInputFormat);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.hadoop.common;

import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.core.io.InputSplit;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
* A common base for both "mapred" and "mapreduce" Hadoop input formats.
*/
public abstract class HadoopInputFormatCommonBase<T, SPITTYPE extends InputSplit> extends RichInputFormat<T, SPITTYPE> {
protected transient Credentials credentials;

protected HadoopInputFormatCommonBase(Credentials creds) {
this.credentials = creds;
}

protected void write(ObjectOutputStream out) throws IOException {
this.credentials.write(out);
}

public void read(ObjectInputStream in) throws IOException {
this.credentials = new Credentials();
credentials.readFields(in);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whitespace



/**
* This method only exists because there is no UserGroupInformation.getCredentials() method
* in Hadoop 1.x
*
* Note that this method returns "null" in Hadoop 1.x environments.
*
* @param ugi The user information
* @return new credentials object from the user information. MAY RETURN NULL!
*/
public static Credentials getCredentialsFromUGI(UserGroupInformation ugi) {
Method getCredentialsMethod = null;
for(Method m: ugi.getClass().getMethods()) {
if(m.getName().equals("getCredentials")) {
getCredentialsMethod = m;
break;
}
}
if(getCredentialsMethod == null) {
return null;
} else {
try {
return (Credentials) getCredentialsMethod.invoke(ugi);
} catch (InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException("Unable to get credentials from UserGroupInformation. This is only supported by Hadoop 2.2.0+");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.hadoop.common;

import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.hadoop.security.Credentials;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

/**
* A common base for both "mapred" and "mapreduce" Hadoop output formats.
*
* The base is taking care of handling (serializing) security credentials.
*/
public abstract class HadoopOutputFormatCommonBase<T> extends RichOutputFormat<T> {
protected transient Credentials credentials;

protected HadoopOutputFormatCommonBase(Credentials creds) {
this.credentials = creds;
}

protected void write(ObjectOutputStream out) throws IOException {
this.credentials.write(out);
}

public void read(ObjectInputStream in) throws IOException {
this.credentials = new Credentials();
credentials.readFields(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.hadoop.mapred.JobConf;

/**
* Wrapper for using HadoopInputFormats (mapred-variant) with Flink.
*
* The IF is returning a Tuple2<K,V>.
*
* @param <K> Type of the key
* @param <V> Type of the value.
*/
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K,V>> implements ResultTypeQueryable<Tuple2<K,V>> {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
* limitations under the License.
*/


package org.apache.flink.api.java.hadoop.mapred;

import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
Expand All @@ -36,6 +35,8 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,7 +46,14 @@
import java.io.ObjectOutputStream;
import java.util.ArrayList;

public abstract class HadoopInputFormatBase<K, V, T> extends RichInputFormat<T, HadoopInputSplit> {
/**
* Common base for Java and Scala API for using Hadoop input formats with Flink.
*
* @param <K> Type of key
* @param <V> Type of value
* @param <T> The type iself
*/
public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCommonBase<T, HadoopInputSplit> {

private static final long serialVersionUID = 1L;

Expand All @@ -64,7 +72,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends RichInputFormat<T,
protected transient boolean hasNext;

public HadoopInputFormatBase(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
super();
super(job.getCredentials());
this.mapredInputFormat = mapredInputFormat;
this.keyClass = key;
this.valueClass = value;
Expand Down Expand Up @@ -225,6 +233,7 @@ private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apac
// --------------------------------------------------------------------------------------------

private void writeObject(ObjectOutputStream out) throws IOException {
super.write(out);
out.writeUTF(mapredInputFormat.getClass().getName());
out.writeUTF(keyClass.getName());
out.writeUTF(valueClass.getName());
Expand All @@ -233,6 +242,8 @@ private void writeObject(ObjectOutputStream out) throws IOException {

@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
super.read(in);

String hadoopInputFormatClassName = in.readUTF();
String keyClassName = in.readUTF();
String valueClassName = in.readUTF();
Expand All @@ -256,5 +267,12 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
throw new RuntimeException("Unable to find value class.", e);
}
ReflectionUtils.setConf(mapredInputFormat, jobConf);

jobConf.getCredentials().addAll(this.credentials);
Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
if(currentUserCreds != null) {
jobConf.getCredentials().addAll(currentUserCreds);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.hadoop.mapred.JobConf;

/**
* Wrapper for using HadoopOutputFormats (mapred-variant) with Flink.
*
* The IF is returning a Tuple2<K,V>.
*
* @param <K> Type of the key
* @param <V> Type of the value.
*/
public class HadoopOutputFormat<K,V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.flink.api.java.hadoop.mapred;

import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
Expand All @@ -34,14 +34,24 @@
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;

public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T> implements FinalizeOnMaster {
/**
* Common base for the mapred HadoopOutputFormat wrappers. There are implementations for Java and Scala.
*
* @param <K> Type of Key
* @param <V> Type of Value
* @param <T> Record type.
*/
public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormatCommonBase<T> implements FinalizeOnMaster {

private static final long serialVersionUID = 1L;

Expand All @@ -50,9 +60,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
protected transient RecordWriter<K,V> recordWriter;
private transient FileOutputCommitter fileOutputCommitter;
private transient TaskAttemptContext context;
private transient JobContext jobContext;

public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) {
super(job.getCredentials());
this.mapredOutputFormat = mapredOutputFormat;
HadoopUtils.mergeHadoopConf(job);
this.jobConf = job;
Expand Down Expand Up @@ -108,8 +118,9 @@ public void open(int taskNumber, int numTasks) throws IOException {

this.fileOutputCommitter = new FileOutputCommitter();

JobContext jobContext;
try {
this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -151,12 +162,14 @@ public void finalizeGlobal(int parallelism) throws IOException {
// --------------------------------------------------------------------------------------------

private void writeObject(ObjectOutputStream out) throws IOException {
super.write(out);
out.writeUTF(mapredOutputFormat.getClass().getName());
jobConf.write(out);
}

@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
super.read(in);
String hadoopOutputFormatName = in.readUTF();
if(jobConf == null) {
jobConf = new JobConf();
Expand All @@ -168,5 +181,11 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
throw new RuntimeException("Unable to instantiate the hadoop output format", e);
}
ReflectionUtils.setConf(mapredOutputFormat, jobConf);

jobConf.getCredentials().addAll(this.credentials);
Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
if(currentUserCreds != null) {
jobConf.getCredentials().addAll(currentUserCreds);
}
}
}
Loading