Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10774] Rework lifecycle management of partitionDiscoverer in FlinkKafkaConsumerBase #7606

Closed
wants to merge 4 commits into from

Conversation

tillrohrmann
Copy link
Contributor

What is the purpose of the change

This PR reworks the lifecycle management of the FlinkKafkaConsumerBase#partitionDiscoverer such that it will always be closed in the close() method. This implies that all users of this class need to call close() in order to properly clean up resources.

cc @stevenzwu @tzulitai

Brief change log

  • Remove closing of partitionDiscoverer from #run method
  • separate FlinkKafkaConsumerBase#cancel and #close such that cancel is non blocking and close will release all required resources

Verifying this change

  • Added tests: FlinkKafkaConsumerBaseTest#testClosePartitionDiscovererWhenOpenThrowException, #testClosePartitionDiscovererWhenCreateKafkaFetcherFails, #testClosePartitionDiscovererWhenKafkaFetcherFails and #testClosePartitionDiscovererWithCancellation

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I've already reviewed this as part of the efforts in #7020, and Travis is green, LGTM +1 👍
Thanks for fixing this @stevenzwu @tillrohrmann

@tillrohrmann
Copy link
Contributor Author

Thanks for the review @tzulitai. Merging.

tillrohrmann and others added 4 commits January 31, 2019 08:36
…LifeCycle

Split #testConsumerLifeCycle into two methods which represent the two if-else
branches.

This closes apache#7606.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Jan 31, 2019
…LifeCycle

Split #testConsumerLifeCycle into two methods which represent the two if-else
branches.

This closes apache#7606.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Jan 31, 2019
…LifeCycle

Split #testConsumerLifeCycle into two methods which represent the two if-else
branches.

This closes apache#7606.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Jan 31, 2019
…LifeCycle

Split #testConsumerLifeCycle into two methods which represent the two if-else
branches.

This closes apache#7606.
@tillrohrmann
Copy link
Contributor Author

Failing test case is unrelated.

@asfgit asfgit closed this in 3cbaabc Jan 31, 2019
@tillrohrmann tillrohrmann deleted the FLINK-10774 branch January 31, 2019 10:50
@stevenzwu
Copy link

@tillrohrmann sorry for a late question. you mentioned This implies that all users of this class need to call close() in order to properly clean up resources. I also noticed your test cases explicitly calls close method to clean up resources.

In production/distributed environment, when a job restart due to exception from open method, only cancel method was called. close method wasn't invoked in this case. that was my experience with 1.4. is that still true?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants