Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10342] Filter restored partitions with discovered partitions b… #7726

Closed
wants to merge 10 commits into from

Conversation

fengli
Copy link
Contributor

@fengli fengli commented Feb 17, 2019

What is the purpose of the change

Filter restored partitions with current discovered partitions. It's enabled by default since it'll result unexpected behaviors - e.g. When changing the topic name, or remove some topics, The removed/renamed partitions will be still consumed.

Brief change log

  • Filter restored partitions with current discovered partitions
  • Add disableFilterRestoredPartitionsWithDiscovered to disable the default behavior if necessary

Verifying this change

This change added tests and can be verified as follows:

  • Added UTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (JavaDocs )

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 17, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ✅ 1. The [description] looks good.
  • ✅ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ✅ 4. The change fits into the overall [architecture].
  • ✅ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot approve description to approve the 1st aspect (similarly, it also supports the consensus, architecture and quality keywords)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval

@fengli
Copy link
Contributor Author

fengli commented Feb 17, 2019

cc @tzulitai

@fengli
Copy link
Contributor Author

fengli commented Feb 21, 2019

@tzulitai Do you think you can review this small patch? (Do let me know if there are still prior steps before the review process).

@dawidwys
Copy link
Contributor

@flinkbot approve description
@flinkbot approve consensus
@flinkbot approve architecture

Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fengli, thank you for the contribution.
I sincerely apologize for the bumpy contribution experience you had so far. I've been hard blocked on some Flink 1.8.0 development work, and didn't find time to look at this until now.

Please let me know what you think about the comment, most importantly whether or not we should really be filtering based on external system returned results.

*/
@Test
public void testSetFilterRestoredParitionsNoChange() throws Exception {
checkFilterRestoredPartitionsWithDisovered(Arrays.asList(new String[]{"kafka_topic_1", "kafka_topic_2"}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would also let the first argument be a new line, just for consistency and readability.
Same for the other tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@tzulitai
Copy link
Contributor

@flinkbot approve quality

@tzulitai
Copy link
Contributor

@fengli one comment about addressing reviews:
Unless necessary, I suggest to only push follow-up commits to address review comments to retain the history of the PR, and do not force-push / squash commits yet.
If you always squash and force-push for every review pass, it may be hard for the reviewer to follow the changes. It might be ok for smaller PRs like this one, but once the PR gets bigger and reviewers only revisits the PR after a few days, it will get tough to recall what happened in past reviews.

@tzulitai
Copy link
Contributor

There seems to be failing tests that are related:

17:52:59.921 [ERROR] Failures: 
17:52:59.921 [ERROR]   FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateWithPartitions:263
17:52:59.921 [ERROR]   FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateWithPartitions:263
17:52:59.921 [ERROR]   FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateWithPartitions:263
17:52:59.921 [ERROR]   FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateWithPartitions:263
17:52:59.921 [ERROR]   FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateWithPartitions:263
17:52:59.921 [ERROR]   FlinkKafkaConsumerBaseMigrationTest.testRestoreFromEmptyStateWithPartitions:263
17:52:59.921 [ERROR]   FlinkKafkaConsumerBaseMigrationTest.testRestore:307
17:52:59.921 [ERROR]   FlinkKafkaConsumerBaseMigrationTest.testRestore:307
17:52:59.923 [ERROR]   FlinkKafkaConsumerBaseMigrationTest.testRestore:307
17:52:59.923 [ERROR]   FlinkKafkaConsumerBaseMigrationTest.testRestore:307
17:52:59.923 [ERROR]   FlinkKafkaConsumerBaseMigrationTest.testRestore:307
17:52:59.923 [ERROR]   FlinkKafkaConsumerBaseMigrationTest.testRestore:307

@tzulitai
Copy link
Contributor

This looks good to merge now!
Thanks a lot for the contribution @fengli.

I'll proceed to clean this PR up and merge this for 1.8.0 ..

tzulitai added a commit to tzulitai/flink that referenced this pull request Feb 28, 2019
…titionsWithSubscribedTopics method in FlinkKafkaConsumerBase

This closes apache#7726.
tzulitai added a commit to tzulitai/flink that referenced this pull request Feb 28, 2019
…titionsWithSubscribedTopics method in FlinkKafkaConsumerBase

This closes apache#7726.
tzulitai added a commit to tzulitai/flink that referenced this pull request Feb 28, 2019
…titionsWithSubscribedTopics method in FlinkKafkaConsumerBase

This closes apache#7726.
@asfgit asfgit closed this in 6a4f012 Feb 28, 2019
asfgit pushed a commit that referenced this pull request Feb 28, 2019
…titionsWithSubscribedTopics method in FlinkKafkaConsumerBase

This closes #7726.
@fengli fengli deleted the FLINK-10342 branch February 28, 2019 13:08
HuangZhenQiu pushed a commit to HuangZhenQiu/flink that referenced this pull request Apr 22, 2019
…titionsWithSubscribedTopics method in FlinkKafkaConsumerBase

This closes apache#7726.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants