Skip to content

Commit

Permalink
[FLINK-24283][connector/pulsar] Use stick key consumer in Key_Shared …
Browse files Browse the repository at this point in the history
…subscription. This would make sure Pulsar won't treat the flink reader as a shared consumer.

This fix apache/pulsar#12035
  • Loading branch information
syhily authored and dawidwys committed Sep 14, 2021
1 parent 8c1033d commit 5f2ec45
Showing 1 changed file with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,21 @@
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.KeySharedPolicy.KeySharedPolicySticky;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -173,8 +178,7 @@ private Set<TopicPartition> getSubscribedTopicPartitions() {
}

private void seekStartPosition(Set<TopicPartition> partitions) {
ConsumerBuilder<byte[]> consumerBuilder =
createConsumerBuilder(pulsarClient, Schema.BYTES, configuration);
ConsumerBuilder<byte[]> consumerBuilder = consumerBuilder();
Set<String> seekedTopics = new HashSet<>();

for (TopicPartition partition : partitions) {
Expand All @@ -200,6 +204,20 @@ private void seekStartPosition(Set<TopicPartition> partitions) {
}
}

private ConsumerBuilder<byte[]> consumerBuilder() {
ConsumerBuilder<byte[]> builder =
createConsumerBuilder(pulsarClient, Schema.BYTES, configuration);
if (sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) {
Range range = TopicRange.createFullRange().toPulsarRange();
KeySharedPolicySticky keySharedPolicy = KeySharedPolicy.stickyHashRange().ranges(range);
// Force this consume use sticky hash range in Key_Shared subscription.
// Pulsar won't remove old message dispatcher before 2.8.2 release.
builder.keySharedPolicy(keySharedPolicy);
}

return builder;
}

/**
* Check if there's any partition changes within subscribed topic partitions fetched by worker
* thread, and convert them to splits the assign them to pulsar readers.
Expand Down

0 comments on commit 5f2ec45

Please sign in to comment.