Skip to content

Conversation

imaffe
Copy link
Contributor

@imaffe imaffe commented Mar 15, 2022

What is the purpose of the change

Fix FLINK-26645: When users specify only consume from 1 partition but the source consumes from all partitions.

Brief change log

  • changed TopicListSubscriber implementation.
  • added a test case to validate it has been fixed.

Verifying this change

This change added tests and can be verified as follows:

  • run PulsarSubscriberTest#subscribeOnePartitionOfMultiplePartitionTopic()

Does this pull request potentially affect one of the following parts:

No

Documentation

No

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 15, 2022

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@imaffe imaffe changed the title WIP: [FLINK-26645][Connector/Pulsar] Fix Pulsar source subscriber consume from all partitions when only subscribed to 1 partition [FLINK-26645][Connector/Pulsar] Fix Pulsar source subscriber consume from all partitions when only subscribed to 1 partition Mar 15, 2022
String name = topicName.getPartitionedTopicName();
int index = topicName.getPartitionIndex();

TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, name);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a partition, should the topicMetadata be created manually or query from the admin api ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query from admin API is ok.

@syhily
Copy link
Contributor

syhily commented Mar 15, 2022

BK-1.14, BK-1.15 are required.

}

public static boolean isPartitioned(String topic) {
public static boolean isPartitionOfPartitionedTopic(String topic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isPartitionOfTopic ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, this is a more concise naming~

@syhily syhily force-pushed the FLINK-26645 branch 4 times, most recently from ffc7781 to 0cffb7e Compare March 30, 2022 18:00
@syhily
Copy link
Contributor

syhily commented Mar 31, 2022

@fapaul @AHeise Can you review this hotfix?

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks quite good to me. The test need some fixes concerning used resources.

Comment on lines 71 to 72
operator().deleteTopic(topic1);
operator().deleteTopic(topic2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should be executed in a finally statement or else a failing test will leave the topic.
Alternatively, if topic creation is not taking too long, just create and delete all topics in @beforeeach and @AfterEach. Do we even need to create them for each test? Maybe BeforeAll is sufficient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, I have fixed this issue.

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Merging. Can you please create the backport PRs?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants