From 0435ab0c33315c38b83c2cb3eebe4e324a68318e Mon Sep 17 00:00:00 2001 From: Evgeny_Kincharov Date: Mon, 26 Sep 2016 11:10:27 +0400 Subject: [PATCH] [FLINK-4315] Hadoop dependencies were moved from flink-java and flink-scala --- .../flink-hadoop-compatibility/pom.xml | 98 +++++++++++ .../common/HadoopInputFormatCommonBase.java | 0 .../common/HadoopOutputFormatCommonBase.java | 0 .../java/hadoop/mapred/HadoopInputFormat.java | 0 .../hadoop/mapred/HadoopInputFormatBase.java | 0 .../hadoop/mapred/HadoopOutputFormat.java | 0 .../hadoop/mapred/HadoopOutputFormatBase.java | 0 .../java/hadoop/mapred/utils/HadoopUtils.java | 0 .../wrapper/HadoopDummyProgressable.java | 0 .../mapred/wrapper/HadoopDummyReporter.java | 0 .../mapred/wrapper/HadoopInputSplit.java | 0 .../hadoop/mapreduce/HadoopInputFormat.java | 0 .../mapreduce/HadoopInputFormatBase.java | 0 .../hadoop/mapreduce/HadoopOutputFormat.java | 0 .../mapreduce/HadoopOutputFormatBase.java | 0 .../hadoop/mapreduce/utils/HadoopUtils.java | 0 .../mapreduce/wrapper/HadoopInputSplit.java | 0 .../FlinkHadoopEnvironment.java | 143 +++++++++++++++ .../scala/hadoop/FlinkHadoopEnvironment.scala | 165 ++++++++++++++++++ .../hadoop/mapred/HadoopInputFormat.scala | 0 .../hadoop/mapred/HadoopOutputFormat.scala | 0 .../hadoop/mapreduce/HadoopInputFormat.scala | 0 .../hadoop/mapreduce/HadoopOutputFormat.scala | 0 .../hadoop/mapred/HadoopInputFormatTest.java | 0 .../hadoop/mapred/HadoopOutputFormatTest.java | 0 .../mapreduce/HadoopInputFormatTest.java | 0 .../mapreduce/HadoopOutputFormatTest.java | 0 .../FlinkHadoopEnvironmentTest.java | 34 ++++ flink-java/pom.xml | 5 +- .../flink/api/java/ExecutionEnvironment.java | 80 --------- .../flink/api/java/utils/ParameterTool.java | 22 --- .../api/java/operator/JoinOperatorTest.java | 3 - .../java/utils/AbstractParameterToolTest.java | 71 ++++++++ .../api/java/utils/ParameterToolTest.java | 52 +----- flink-scala/pom.xml | 3 + .../api/scala/ExecutionEnvironment.scala | 113 +----------- .../hadoop/mapred/WordCountMapredITCase.java | 3 +- .../mapreduce/WordCountMapreduceITCase.java | 3 +- .../hadoop/mapred/WordCountMapredITCase.scala | 6 +- .../mapreduce/WordCountMapreduceITCase.scala | 4 +- 40 files changed, 530 insertions(+), 275 deletions(-) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java (100%) create mode 100644 flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/FlinkHadoopEnvironment.java create mode 100644 flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/FlinkHadoopEnvironment.scala rename {flink-scala => flink-batch-connectors/flink-hadoop-compatibility}/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala (100%) rename {flink-scala => flink-batch-connectors/flink-hadoop-compatibility}/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala (100%) rename {flink-scala => flink-batch-connectors/flink-hadoop-compatibility}/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala (100%) rename {flink-scala => flink-batch-connectors/flink-hadoop-compatibility}/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java (100%) rename {flink-java => flink-batch-connectors/flink-hadoop-compatibility}/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java (100%) create mode 100644 flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/FlinkHadoopEnvironmentTest.java create mode 100644 flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java diff --git a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml index aa818f61b06a7..8143a03cefc4e 100644 --- a/flink-batch-connectors/flink-hadoop-compatibility/pom.xml +++ b/flink-batch-connectors/flink-hadoop-compatibility/pom.xml @@ -45,6 +45,21 @@ under the License. provided + + org.apache.flink + flink-scala_2.10 + ${project.version} + provided + + + + org.apache.flink + flink-java + ${project.version} + test-jar + test + + org.apache.flink ${shading-artifact.name} @@ -78,6 +93,89 @@ under the License. com.github.siom79.japicmp japicmp-maven-plugin + + + net.alchim31.maven + scala-maven-plugin + 3.1.4 + + + + scala-compile-first + process-resources + + compile + + + + + + -Xms128m + -Xmx512m + + + + + + + org.apache.maven.plugins + maven-eclipse-plugin + 2.8 + + true + + org.scala-ide.sdt.core.scalanature + org.eclipse.jdt.core.javanature + + + org.scala-ide.sdt.core.scalabuilder + + + org.scala-ide.sdt.launching.SCALA_CONTAINER + org.eclipse.jdt.launching.JRE_CONTAINER + + + org.scala-lang:scala-library + org.scala-lang:scala-compiler + + + **/*.scala + **/*.java + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + + add-source + generate-sources + + add-source + + + + src/main/scala + + + + + + + + + org.scalastyle + scalastyle-maven-plugin + + ${project.basedir}/../../tools/maven/scalastyle-config.xml + + diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopInputFormatCommonBase.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/common/HadoopOutputFormatCommonBase.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormat.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatBase.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormat.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyProgressable.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopDummyReporter.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormat.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatBase.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormat.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatBase.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java similarity index 100% rename from flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/FlinkHadoopEnvironment.java b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/FlinkHadoopEnvironment.java new file mode 100644 index 0000000000000..4b176fe0052a4 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/FlinkHadoopEnvironment.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility; + +import org.apache.commons.cli.Option; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; +import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.GenericOptionsParser; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * The FlinkHadoopEnvironment is the context in which a program is executed, connected with hadoop. + * + * The environment provides methods to interact with the hadoop cluster (data access). + */ +public final class FlinkHadoopEnvironment { + // ----------------------------------- Hadoop Input Format --------------------------------------- + + private ExecutionEnvironment environment; + + public FlinkHadoopEnvironment(ExecutionEnvironment environment){ + this.environment = environment; + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. The + * given inputName is set on the given job. + */ + @PublicEvolving + public DataSource> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat mapredInputFormat, Class key, Class value, String inputPath, JobConf job) { + DataSource> result = createHadoopInput(mapredInputFormat, key, value, job); + + org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath)); + + return result; + } + + /** + * Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat} + * A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created. + */ + @PublicEvolving + public DataSource> readSequenceFile(Class key, Class value, String inputPath) throws IOException { + return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat(), key, value, inputPath); + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A + * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created. + */ + @PublicEvolving + public DataSource> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat mapredInputFormat, Class key, Class value, String inputPath) { + return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf()); + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}. + */ + @PublicEvolving + public DataSource> createHadoopInput(org.apache.hadoop.mapred.InputFormat mapredInputFormat, Class key, Class value, JobConf job) { + HadoopInputFormat hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job); + + return environment.createInput(hadoopInputFormat); + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The + * given inputName is set on the given job. + */ + @PublicEvolving + public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapreduceInputFormat, Class key, Class value, String inputPath, Job job) throws IOException { + DataSource> result = createHadoopInput(mapreduceInputFormat, key, value, job); + + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache + .hadoop.fs.Path(inputPath)); + + return result; + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A + * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created. + */ + @PublicEvolving + public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapreduceInputFormat, Class key, Class value, String inputPath) throws IOException { + return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance()); + } + + /** + * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}. + */ + @PublicEvolving + public DataSource> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat mapreduceInputFormat, Class key, Class value, Job job) { + org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job); + + return environment.createInput(hadoopInputFormat); + } + + /** + * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser} + * + * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser} + * @return A {@link ParameterTool} + * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser} + * @see GenericOptionsParser + */ + @PublicEvolving + public static ParameterTool paramsFromGenericOptionsParser(String[] args) throws IOException { + Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions(); + Map map = new HashMap(); + for (Option option : options) { + String[] split = option.getValue().split("="); + map.put(split[0], split[1]); + } + return ParameterTool.fromMap(map); + } +} diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/FlinkHadoopEnvironment.scala b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/FlinkHadoopEnvironment.scala new file mode 100644 index 0000000000000..590a3fc46a3e1 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/FlinkHadoopEnvironment.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.hadoop + +import org.apache.flink.annotation.{Public, PublicEvolving} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.hadoop.mapred.{HadoopInputFormat => MapredHadoopInputFormat} +import org.apache.flink.api.scala.hadoop.mapreduce.{HadoopInputFormat => MapreduceHadoopInputFormat} +import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} +import org.apache.hadoop.fs.{Path => HadoopPath} +import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat} +import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat} + +/** + * The FlinkHadoopEnvironment is the context in which a program is executed, connected with hadoop. + * + * The environment provides methods to interact with the hadoop cluster (data access). + * + * Use [[FlinkHadoopEnvironment#getHadoopEnvironment]] to get the correct environment. + */ +@Public +class FlinkHadoopEnvironment(parentEnv: ExecutionEnvironment) { + + /** + * @return the Execution environment. + */ + def getParentEnv: ExecutionEnvironment = parentEnv + + /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. The + * given inputName is set on the given job. + */ + @PublicEvolving + def readHadoopFile[K, V]( + mapredInputFormat: MapredFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String, + job: JobConf) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + val result = createHadoopInput(mapredInputFormat, key, value, job) + MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) + result + } + + /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. A + * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created. + */ + @PublicEvolving + def readHadoopFile[K, V]( + mapredInputFormat: MapredFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf) + } + + /** + * Creates a [[DataSet]] from [[org.apache.hadoop.mapred.SequenceFileInputFormat]] + * A [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created. + */ + @PublicEvolving + def readSequenceFile[K, V]( + key: Class[K], + value: Class[V], + inputPath: String) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V], + key, value, inputPath) + } + + /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]]. + */ + @PublicEvolving + def createHadoopInput[K, V]( + mapredInputFormat: MapredInputFormat[K, V], + key: Class[K], + value: Class[V], + job: JobConf) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + val hadoopInputFormat = new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job) + getParentEnv.createInput(hadoopInputFormat) + } + + /** + * Creates a [[DataSet]] from the given + * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. + * The given inputName is set on the given job. + */ + @PublicEvolving + def readHadoopFile[K, V]( + mapreduceInputFormat: MapreduceFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String, + job: Job) + (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { + val result = createHadoopInput(mapreduceInputFormat, key, value, job) + MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) + result + } + + /** + * Creates a [[DataSet]] from the given + * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A + * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be created. + */ + @PublicEvolving + def readHadoopFile[K, V]( + mapreduceInputFormat: MapreduceFileInputFormat[K, V], + key: Class[K], + value: Class[V], + inputPath: String) + (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = { + readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance) + } + + /** + * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.InputFormat]]. + */ + @PublicEvolving + def createHadoopInput[K, V]( + mapreduceInputFormat: MapreduceInputFormat[K, V], + key: Class[K], + value: Class[V], + job: Job) + (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = { + val hadoopInputFormat = + new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job) + getParentEnv.createInput(hadoopInputFormat) + } +} + +@Public +object FlinkHadoopEnvironment { + + /** + * Creates a hadoop environment that represents the context in which the program is + * currently executed. + */ + def getHadoopEnvironment(env: ExecutionEnvironment): FlinkHadoopEnvironment = { + new FlinkHadoopEnvironment(env) + } + +} + diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala similarity index 100% rename from flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopInputFormat.scala diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala similarity index 100% rename from flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapred/HadoopOutputFormat.scala diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala similarity index 100% rename from flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopInputFormat.scala diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala b/flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala similarity index 100% rename from flink-scala/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala rename to flink-batch-connectors/flink-hadoop-compatibility/src/main/scala/org/apache/flink/api/scala/hadoop/mapreduce/HadoopOutputFormat.scala diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java similarity index 100% rename from flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopInputFormatTest.java diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java similarity index 100% rename from flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatTest.java diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java similarity index 100% rename from flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopInputFormatTest.java diff --git a/flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java similarity index 100% rename from flink-java/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java rename to flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/api/java/hadoop/mapreduce/HadoopOutputFormatTest.java diff --git a/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/FlinkHadoopEnvironmentTest.java b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/FlinkHadoopEnvironmentTest.java new file mode 100644 index 0000000000000..4bc04ab3db2c9 --- /dev/null +++ b/flink-batch-connectors/flink-hadoop-compatibility/src/test/java/org/apache/flink/hadoopcompatibility/FlinkHadoopEnvironmentTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hadoopcompatibility; + +import org.apache.flink.api.java.utils.AbstractParameterToolTest; +import org.apache.flink.api.java.utils.ParameterTool; +import org.junit.Test; + +import java.io.IOException; + +public class FlinkHadoopEnvironmentTest extends AbstractParameterToolTest { + + @Test + public void testParamsFromGenericOptionsParser() throws IOException { + ParameterTool parameter = FlinkHadoopEnvironment.paramsFromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"}); + validate(parameter); + } +} diff --git a/flink-java/pom.xml b/flink-java/pom.xml index 6924da8bbedca..d8b0ad8d9be69 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -82,9 +82,12 @@ under the License. com.github.siom79.japicmp japicmp-maven-plugin + + true + - + org.apache.maven.plugins maven-jar-plugin diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index add85316f6abf..db8dd6be14770 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -35,7 +35,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat; import org.apache.flink.api.java.io.CollectionInputFormat; import org.apache.flink.api.java.io.CsvReader; import org.apache.flink.api.java.io.IteratorInputFormat; @@ -61,9 +60,6 @@ import org.apache.flink.util.SplittableIterator; import org.apache.flink.util.Visitor; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -582,82 +578,6 @@ public DataSource createInput(InputFormat inputFormat, TypeInformat return new DataSource<>(this, inputFormat, producedType, Utils.getCallLocationName()); } - // ----------------------------------- Hadoop Input Format --------------------------------------- - - /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. The - * given inputName is set on the given job. - */ - @PublicEvolving - public DataSource> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat mapredInputFormat, Class key, Class value, String inputPath, JobConf job) { - DataSource> result = createHadoopInput(mapredInputFormat, key, value, job); - - org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath)); - - return result; - } - - /** - * Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat} - * A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created. - */ - @PublicEvolving - public DataSource> readSequenceFile(Class key, Class value, String inputPath) throws IOException { - return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat(), key, value, inputPath); - } - - /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A - * {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created. - */ - @PublicEvolving - public DataSource> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat mapredInputFormat, Class key, Class value, String inputPath) { - return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf()); - } - - /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}. - */ - @PublicEvolving - public DataSource> createHadoopInput(org.apache.hadoop.mapred.InputFormat mapredInputFormat, Class key, Class value, JobConf job) { - HadoopInputFormat hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job); - - return this.createInput(hadoopInputFormat); - } - - /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The - * given inputName is set on the given job. - */ - @PublicEvolving - public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapreduceInputFormat, Class key, Class value, String inputPath, Job job) throws IOException { - DataSource> result = createHadoopInput(mapreduceInputFormat, key, value, job); - - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache - .hadoop.fs.Path(inputPath)); - - return result; - } - - /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A - * {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created. - */ - @PublicEvolving - public DataSource> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat mapreduceInputFormat, Class key, Class value, String inputPath) throws IOException { - return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance()); - } - - /** - * Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}. - */ - @PublicEvolving - public DataSource> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat mapreduceInputFormat, Class key, Class value, Job job) { - org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job); - - return this.createInput(hadoopInputFormat); - } - // ----------------------------------- Collection --------------------------------------- /** diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java index 8f504e4117e43..d83e1905b5535 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java @@ -17,14 +17,11 @@ */ package org.apache.flink.api.java.utils; -import org.apache.commons.cli.Option; import org.apache.commons.lang3.math.NumberUtils; import org.apache.flink.annotation.Public; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Preconditions; -import org.apache.hadoop.util.GenericOptionsParser; import java.io.File; import java.io.FileInputStream; @@ -184,25 +181,6 @@ public static ParameterTool fromSystemProperties() { return fromMap((Map) System.getProperties()); } - /** - * Returns {@link ParameterTool} for the arguments parsed by {@link GenericOptionsParser} - * - * @param args Input array arguments. It should be parsable by {@link GenericOptionsParser} - * @return A {@link ParameterTool} - * @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser} - * @see GenericOptionsParser - */ - @PublicEvolving - public static ParameterTool fromGenericOptionsParser(String[] args) throws IOException { - Option[] options = new GenericOptionsParser(args).getCommandLine().getOptions(); - Map map = new HashMap(); - for (Option option : options) { - String[] split = option.getValue().split("="); - map.put(split[0], split[1]); - } - return fromMap(map); - } - // ------------------ ParameterUtil ------------------------ protected final Map data; protected final HashMap defaultData; diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java index ae233822e9bad..132ab9626bc11 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java @@ -39,7 +39,6 @@ import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.hadoop.io.Writable; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -1212,7 +1211,6 @@ public static class CustomType implements Serializable { public String myString; public Object nothing; public List countries; - public Writable interfaceTest; public CustomType() {}; @@ -1221,7 +1219,6 @@ public CustomType(int i, long l, String s) { myLong = l; myString = s; countries = null; - interfaceTest = null; nested = new NestedCustomType(i, l, s); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java new file mode 100644 index 0000000000000..9aa9a95714ac2 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/AbstractParameterToolTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.utils; + +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.configuration.Configuration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +public abstract class AbstractParameterToolTest { + + @Rule + public TemporaryFolder tmp = new TemporaryFolder(); + + protected void validate(ParameterTool parameter) { + ClosureCleaner.ensureSerializable(parameter); + Assert.assertEquals("myInput", parameter.getRequired("input")); + Assert.assertEquals("myDefaultValue", parameter.get("output", "myDefaultValue")); + Assert.assertEquals(null, parameter.get("whatever")); + Assert.assertEquals(15L, parameter.getLong("expectedCount", -1L)); + Assert.assertTrue(parameter.getBoolean("thisIsUseful", true)); + Assert.assertEquals(42, parameter.getByte("myDefaultByte", (byte) 42)); + Assert.assertEquals(42, parameter.getShort("myDefaultShort", (short) 42)); + + Configuration config = parameter.getConfiguration(); + Assert.assertEquals(15L, config.getLong("expectedCount", -1L)); + + Properties props = parameter.getProperties(); + Assert.assertEquals("myInput", props.getProperty("input")); + props = null; + + // -------- test the default file creation ------------ + try { + String pathToFile = tmp.newFile().getAbsolutePath(); + parameter.createPropertiesFile(pathToFile); + Properties defaultProps = new Properties(); + try (FileInputStream fis = new FileInputStream(pathToFile)) { + defaultProps.load(fis); + } + + Assert.assertEquals("myDefaultValue", defaultProps.get("output")); + Assert.assertEquals("-1", defaultProps.get("expectedCount")); + Assert.assertTrue(defaultProps.containsKey("input")); + + } catch (IOException e) { + Assert.fail(e.getMessage()); + e.printStackTrace(); + } + } +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java index 605f0333b13a8..11a0daf92ae40 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java @@ -18,25 +18,17 @@ package org.apache.flink.api.java.utils; -import org.apache.flink.api.java.ClosureCleaner; -import org.apache.flink.configuration.Configuration; import org.junit.Assert; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import java.io.File; -import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Map; import java.util.Properties; -public class ParameterToolTest { - - @Rule - public TemporaryFolder tmp = new TemporaryFolder(); +public class ParameterToolTest extends AbstractParameterToolTest { // ----- Parser tests ----------------- @@ -156,46 +148,4 @@ public void testMerged() { ParameterTool parameter = parameter1.mergeWith(parameter2); validate(parameter); } - - @Test - public void testFromGenericOptionsParser() throws IOException { - ParameterTool parameter = ParameterTool.fromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"}); - validate(parameter); - } - - private void validate(ParameterTool parameter) { - ClosureCleaner.ensureSerializable(parameter); - Assert.assertEquals("myInput", parameter.getRequired("input")); - Assert.assertEquals("myDefaultValue", parameter.get("output", "myDefaultValue")); - Assert.assertEquals(null, parameter.get("whatever")); - Assert.assertEquals(15L, parameter.getLong("expectedCount", -1L)); - Assert.assertTrue(parameter.getBoolean("thisIsUseful", true)); - Assert.assertEquals(42, parameter.getByte("myDefaultByte", (byte) 42)); - Assert.assertEquals(42, parameter.getShort("myDefaultShort", (short) 42)); - - Configuration config = parameter.getConfiguration(); - Assert.assertEquals(15L, config.getLong("expectedCount", -1L)); - - Properties props = parameter.getProperties(); - Assert.assertEquals("myInput", props.getProperty("input")); - props = null; - - // -------- test the default file creation ------------ - try { - String pathToFile = tmp.newFile().getAbsolutePath(); - parameter.createPropertiesFile(pathToFile); - Properties defaultProps = new Properties(); - try (FileInputStream fis = new FileInputStream(pathToFile)) { - defaultProps.load(fis); - } - - Assert.assertEquals("myDefaultValue", defaultProps.get("output")); - Assert.assertEquals("-1", defaultProps.get("expectedCount")); - Assert.assertTrue(defaultProps.containsKey("input")); - - } catch (IOException e) { - Assert.fail(e.getMessage()); - e.printStackTrace(); - } - } } diff --git a/flink-scala/pom.xml b/flink-scala/pom.xml index 88f49e59514f2..8b1659afb242c 100644 --- a/flink-scala/pom.xml +++ b/flink-scala/pom.xml @@ -111,6 +111,9 @@ under the License. com.github.siom79.japicmp japicmp-maven-plugin + + true + diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index f03cb840ac6ef..7c78af3ace22a 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -29,21 +29,16 @@ import org.apache.flink.api.java.operators.DataSource import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfoBase, ValueTypeInfo} import org.apache.flink.api.java.{CollectionEnvironment, ExecutionEnvironment => JavaEnv} -import org.apache.flink.api.scala.hadoop.{mapred, mapreduce} import org.apache.flink.configuration.Configuration import org.apache.flink.core.fs.Path import org.apache.flink.types.StringValue import org.apache.flink.util.{NumberSequenceIterator, Preconditions, SplittableIterator} -import org.apache.hadoop.fs.{Path => HadoopPath} -import org.apache.hadoop.mapred.{FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat, JobConf} -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat} -import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, Job} import scala.collection.JavaConverters._ import scala.reflect.ClassTag /** - * The ExecutionEnviroment is the context in which a program is executed. A local environment will + * The ExecutionEnvironment is the context in which a program is executed. A local environment will * cause execution in the current JVM, a remote environment will cause execution on a remote * cluster installation. * @@ -409,112 +404,6 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { wrap(new DataSource[T](javaEnv, inputFormat, producedType, getCallLocationName())) } - /** - * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. The - * given inputName is set on the given job. - */ - @PublicEvolving - def readHadoopFile[K, V]( - mapredInputFormat: MapredFileInputFormat[K, V], - key: Class[K], - value: Class[V], - inputPath: String, - job: JobConf) - (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { - val result = createHadoopInput(mapredInputFormat, key, value, job) - MapredFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) - result - } - - /** - * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.FileInputFormat]]. A - * [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created. - */ - @PublicEvolving - def readHadoopFile[K, V]( - mapredInputFormat: MapredFileInputFormat[K, V], - key: Class[K], - value: Class[V], - inputPath: String) - (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { - readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf) - } - - /** - * Creates a [[DataSet]] from [[org.apache.hadoop.mapred.SequenceFileInputFormat]] - * A [[org.apache.hadoop.mapred.JobConf]] with the given inputPath is created. - */ - @PublicEvolving - def readSequenceFile[K, V]( - key: Class[K], - value: Class[V], - inputPath: String) - (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { - readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V], - key, value, inputPath) - } - - /** - * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapred.InputFormat]]. - */ - @PublicEvolving - def createHadoopInput[K, V]( - mapredInputFormat: MapredInputFormat[K, V], - key: Class[K], - value: Class[V], - job: JobConf) - (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { - val hadoopInputFormat = new mapred.HadoopInputFormat[K, V](mapredInputFormat, key, value, job) - createInput(hadoopInputFormat) - } - - /** - * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. - * The given inputName is set on the given job. - */ - @PublicEvolving - def readHadoopFile[K, V]( - mapreduceInputFormat: MapreduceFileInputFormat[K, V], - key: Class[K], - value: Class[V], - inputPath: String, - job: Job) - (implicit tpe: TypeInformation[(K, V)]): DataSet[(K, V)] = { - val result = createHadoopInput(mapreduceInputFormat, key, value, job) - MapreduceFileInputFormat.addInputPath(job, new HadoopPath(inputPath)) - result - } - - /** - * Creates a [[DataSet]] from the given - * [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]]. A - * [[org.apache.hadoop.mapreduce.Job]] with the given inputPath will be created. - */ - @PublicEvolving - def readHadoopFile[K, V]( - mapreduceInputFormat: MapreduceFileInputFormat[K, V], - key: Class[K], - value: Class[V], - inputPath: String) - (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = { - readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance) - } - - /** - * Creates a [[DataSet]] from the given [[org.apache.hadoop.mapreduce.InputFormat]]. - */ - @PublicEvolving - def createHadoopInput[K, V]( - mapreduceInputFormat: MapreduceInputFormat[K, V], - key: Class[K], - value: Class[V], - job: Job) - (implicit tpe: TypeInformation[(K, V)]): DataSet[Tuple2[K, V]] = { - val hadoopInputFormat = - new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job) - createInput(hadoopInputFormat) - } - /** * Creates a DataSet from the given non-empty [[Iterable]]. * diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java index a0e3468762771..31e635a24a77a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapred/WordCountMapredITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.hadoopcompatibility.FlinkHadoopEnvironment; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.util.Collector; @@ -55,7 +56,7 @@ protected void testProgram() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet> input = env.readHadoopFile(new TextInputFormat(), + DataSet> input = new FlinkHadoopEnvironment(env).readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath); DataSet text = input.map(new MapFunction, String>() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java index fee49bfa28020..82458bf0a4ef7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/hadoop/mapreduce/WordCountMapreduceITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.hadoopcompatibility.FlinkHadoopEnvironment; import org.apache.flink.test.testdata.WordCountData; import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.util.Collector; @@ -55,7 +56,7 @@ protected void testProgram() throws Exception { final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet> input = env.readHadoopFile(new TextInputFormat(), + DataSet> input = new FlinkHadoopEnvironment(env).readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath); DataSet text = input.map(new MapFunction, String>() { diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala index b09ecc4c90c40..fee13c00a7282 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapred/WordCountMapredITCase.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.scala.hadoop.mapred import org.apache.flink.api.scala._ - +import org.apache.flink.api.scala.hadoop.FlinkHadoopEnvironment import org.apache.flink.test.testdata.WordCountData import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase} import org.apache.hadoop.fs.Path @@ -42,8 +42,8 @@ class WordCountMapredITCase extends JavaProgramTestBase { protected def testProgram() { val env = ExecutionEnvironment.getExecutionEnvironment - val input = - env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + val input = FlinkHadoopEnvironment.getHadoopEnvironment(env). + readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) val text = input map { _._2.toString } val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala index de2d37629c862..97687ee195817 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/hadoop/mapreduce/WordCountMapreduceITCase.scala @@ -19,6 +19,7 @@ package org.apache.flink.api.scala.hadoop.mapreduce import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.hadoop.FlinkHadoopEnvironment import org.apache.flink.test.testdata.WordCountData import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase} import org.apache.hadoop.fs.Path @@ -45,7 +46,8 @@ class WordCountMapreduceITCase extends JavaProgramTestBase { val env = ExecutionEnvironment.getExecutionEnvironment val input = - env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) + FlinkHadoopEnvironment.getHadoopEnvironment(env). + readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) val text = input map { _._2.toString } val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }