-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-4315] Deprecate Hadoop dependent methods in flink-java #2637
Conversation
…-scala are marked as depricated. This change for moving this methods into the flink-hadoop-compatibility in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to deprecating the ExecutionEnvironment methods, should we also now provide the replacement utility structure? This will give users time to migrate to the future API.
@@ -192,6 +192,7 @@ public static ParameterTool fromSystemProperties() { | |||
* @throws IOException If arguments cannot be parsed by {@link GenericOptionsParser} | |||
* @see GenericOptionsParser | |||
*/ | |||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this deprecated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding the dependencies org.apache.hadoop in flink-java only for one method of this class looks not very good. The method fromGenericOptionsParser will be moved with another Hadoop dependent methods into flink-hadoop-compatibility. This method isn't used inside flink. If anybody uses this in his code, he should rewrite before migrating to next major version of flink.
You can look to the full version flink-java without Hadoop here https://github.com/kenmy/flink/tree/FLINK-4048
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@greghogan The GenericOptionsParser
is a Hadoop utility, so it should not be directly referenced in flink-java
Hi @kenmy, thanks for your PR. Can you actually merge this PR with your work in PR #2576? We also want to add the alternatives to which users should switch. The docs of the deprecated methods should point to these alternatives. Later (when hopefully everybody migrated their code) we would remove the deprecated methods. Thanks, Fabian |
…to the flink-hadoop-compatibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @kenmy, thanks for the update.
I made one suggestion to have static methods to build input formats rather than a new environment. What do you think?
Otherwise the PR looks good.
* | ||
* The environment provides methods to interact with the hadoop cluster (data access). | ||
*/ | ||
public final class FlinkHadoopEnvironment { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding a FlinkHadoopEnvironment
, I would add a class HadoopInputs
with static methods to create HadoopInputFormat
s.
This could be used like this:
import static org.apache.flink.hadoopcompatibility.HadoopInputs.createHadoopFileInput;
// ---
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> input = env.createInput(
createHadoopFileInput(new TextInputFormat(), LongWritable.class, Text.class, textPath));
We might be able to remove the flink-scala
dependency if we go for this approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @fhueske for review, . I agree with you, the name HadoopInputs is more suitable here. I renamed it. Moreover I extracted paramsFromGenericOptionsParser into the new class HadoopUtils. HadoopInputs was too unsuitable as place for it.
@@ -1211,7 +1210,6 @@ public String toString() { | |||
public String myString; | |||
public Object nothing; | |||
public List<String> countries; | |||
public Writable interfaceTest; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Writable
represents in this test just an interface. Can you replace it by another interface to maintain the coverage of this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coverage was decreased only on the 1 line. But I returned to the previous version.
I did not expect that the unused variable may affect the coverage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface here checks for correct type extraction and handling of Flink. For that this variable does not need to be used but just declared.
However, this functionality is tested at other places as well.
So, +1 for removing this line.
* Use [[FlinkHadoopEnvironment#getHadoopEnvironment]] to get the correct environment. | ||
*/ | ||
@Public | ||
class FlinkHadoopEnvironment(parentEnv: ExecutionEnvironment) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you convert the Scala code into a HadoopInputs
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically the same changes as for the Java HadoopInputs
.
* | ||
* Use [[FlinkHadoopEnvironment#getHadoopEnvironment]] to get the correct environment. | ||
*/ | ||
@Public |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The connector modules do not have API annotations (´@public,
@PublicEvolving), yet. Please remove all annotations for code in
flink-hadoop-compatibility`.
* The HadoopInputs is the utility class for create {@link HadoopInputFormat}. | ||
* | ||
* Methods: | ||
* createHadoopInput - create {@link org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat} or {@link org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both classnames are the same.
import java.io.IOException; | ||
|
||
/** | ||
* The HadoopInputs is the utility class for create {@link HadoopInputFormat}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would make the purpose of the class a bit more explicit.
HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
It provides methods to create Flink
InputFormat
wrappers for Hadooporg.apache.hadoop.mapred.InputFormat
andorg.apache.hadoop.mapreduce.InputFormat
.
Key value pairs produced by the Hadoop InputFormats are converted into Flink Tuple2 objects where the first field (Tuple2.f0
) is the key and the second field (Tuple2.f1
) is the value.
@@ -42,8 +42,8 @@ class WordCountMapredITCase extends JavaProgramTestBase { | |||
protected def testProgram() { | |||
val env = ExecutionEnvironment.getExecutionEnvironment | |||
|
|||
val input = | |||
env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath) | |||
val input = FlinkHadoopEnvironment.getHadoopEnvironment(env). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please include the deprecated method in the test as done with the Java tests.
Thanks for the update @kenmy. We are trying to keep the Java and Scala APIs as close as possible. Could you convert the Scala I also noticed that there are quite a few Hadoop-related tests in the In fact, there might be a bit of overlap with other tests in Thanks for your work, |
…he hadoopcompatibility project
Thanks @fhueske for a detailed review. |
Thanks for the update @kenmy! Regarding moving the Hadoop tests from Thanks, Fabian |
… in ExecutionEnvironment as @deprecated. - Preparation to remove Hadoop dependency from flink-java - Alternatives for deprecated functionality is provided in flink-hadoop-compatibility via HadoopInputs This closes apache#2637
Merging |
… in ExecutionEnvironment as @deprecated. - Preparation to remove Hadoop dependency from flink-java - Alternatives for deprecated functionality is provided in flink-hadoop-compatibility via HadoopInputs This closes apache#2637.
No description provided.