Skip to content

Commit

Permalink
Fix wrong key-hash selector used for new consumers after all the prev…
Browse files Browse the repository at this point in the history
…ious consumers disconnected (apache#12035)

We will encounter the issue after all the previous consumers disconnected and the new consumers connect
to the topic with different key_shared policy.

The root cause is we are using the previous dispatcher after the key_shared policy changed, so the fix
is to use a new dispatcher after a new consumer with a different key-shared policy

(cherry picked from commit 3a4755f)
  • Loading branch information
codelipenghui authored and eolivelli committed Sep 17, 2021
1 parent 98ef811 commit 49be8b8
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,66 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;

public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {

private final StickyKeyConsumerSelector selector;
private final KeySharedMode keySharedMode;

public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
KeySharedMeta ksm) {
super(topic, subscription);
this.keySharedMode = ksm.getKeySharedMode();
switch (this.keySharedMode) {
case STICKY:
this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
break;

case AUTO_SPLIT:
default:
ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
this.selector = new ConsistentHashingStickyKeyConsumerSelector(
conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
} else {
this.selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
}
break;
}
}

@VisibleForTesting
NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription,
StickyKeyConsumerSelector selector) {
super(topic, subscription);
if (selector instanceof HashRangeExclusiveStickyKeyConsumerSelector) {
keySharedMode = KeySharedMode.STICKY;
} else if (selector instanceof ConsistentHashingStickyKeyConsumerSelector
|| selector instanceof HashRangeAutoSplitStickyKeyConsumerSelector) {
keySharedMode = KeySharedMode.AUTO_SPLIT;
} else {
keySharedMode = null;
}
this.selector = selector;
}

Expand Down Expand Up @@ -104,4 +143,8 @@ public void sendMessages(List<Entry> entries) {
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
}
}

public KeySharedMode getKeySharedMode() {
return keySharedMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,19 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
Expand Down Expand Up @@ -139,29 +136,13 @@ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
}
break;
case Key_Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
KeySharedMeta ksm = consumer.getKeySharedMeta();
KeySharedMode keySharedMode = ksm.getKeySharedMode();
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
|| ((NonPersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
!= keySharedMode) {
previousDispatcher = dispatcher;

switch (consumer.getKeySharedMeta().getKeySharedMode()) {
case STICKY:
dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
new HashRangeExclusiveStickyKeyConsumerSelector());
break;

case AUTO_SPLIT:
default:
StickyKeyConsumerSelector selector;
ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
selector = new ConsistentHashingStickyKeyConsumerSelector(
conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
} else {
selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
}

dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, selector);
break;
}
this.dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, ksm);
}
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -57,6 +58,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
private final StickyKeyConsumerSelector selector;

private boolean isDispatcherStuckOnReplays = false;
private final KeySharedMode keySharedMode;

/**
* When a consumer joins, it will be added to this map with the current read position.
Expand All @@ -76,8 +78,8 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>();
this.stuckConsumers = new HashSet<>();
this.nextStuckConsumers = new HashSet<>();

switch (ksm.getKeySharedMode()) {
this.keySharedMode = ksm.getKeySharedMode();
switch (this.keySharedMode) {
case AUTO_SPLIT:
if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
selector = new ConsistentHashingStickyKeyConsumerSelector(
Expand Down Expand Up @@ -408,6 +410,10 @@ protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> pos
return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
}

public KeySharedMode getKeySharedMode() {
return this.keySharedMode;
}

public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
return recentlyJoinedConsumers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.TxnAction;
Expand Down Expand Up @@ -247,9 +248,12 @@ public CompletableFuture<Void> addConsumer(Consumer consumer) {
}
break;
case Key_Shared:
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
KeySharedMeta ksm = consumer.getKeySharedMeta();
KeySharedMode keySharedMode = ksm.getKeySharedMode();
if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared
|| ((PersistentStickyKeyDispatcherMultipleConsumers) dispatcher).getKeySharedMode()
!= keySharedMode) {
previousDispatcher = dispatcher;
KeySharedMeta ksm = consumer.getKeySharedMeta();
dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
topic.getBrokerService().getPulsar().getConfiguration(), ksm);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -39,6 +40,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -47,8 +49,12 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -93,6 +99,14 @@ public Object[][] dataProvider() {
};
}

@DataProvider(name = "topicDomain")
public Object[][] topicDomainProvider() {
return new Object[][] {
{ "persistent" },
{ "non-persistent" }
};
}

@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
Expand Down Expand Up @@ -1012,6 +1026,63 @@ public void testKeySharedConsumerWithEncrypted() throws Exception {
});
}

@Test(dataProvider = "topicDomain")
public void testSelectorChangedAfterAllConsumerDisconnected(String topicDomain) throws PulsarClientException,
ExecutionException, InterruptedException {
final String topicName = TopicName.get(topicDomain, "public", "default",
"testSelectorChangedAfterAllConsumerDisconnected" + UUID.randomUUID()).toString();

final String subName = "my-sub";

Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.consumerName("first-consumer")
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange())
.cryptoKeyReader(new EncKeyReader())
.subscribe();

CompletableFuture<Optional<Topic>> future = pulsar.getBrokerService().getTopicIfExists(topicName);
assertTrue(future.isDone());
assertTrue(future.get().isPresent());
Topic topic = future.get().get();
KeySharedMode keySharedMode = getKeySharedModeOfSubscription(topic, subName);
assertNotNull(keySharedMode);
assertEquals(keySharedMode, KeySharedMode.AUTO_SPLIT);

consumer1.close();

consumer1 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.consumerName("second-consumer")
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.stickyHashRange().ranges(Range.of(0, 65535)))
.cryptoKeyReader(new EncKeyReader())
.subscribe();

future = pulsar.getBrokerService().getTopicIfExists(topicName);
assertTrue(future.isDone());
assertTrue(future.get().isPresent());
topic = future.get().get();
keySharedMode = getKeySharedModeOfSubscription(topic, subName);
assertNotNull(keySharedMode);
assertEquals(keySharedMode, KeySharedMode.STICKY);
consumer1.close();
}

private KeySharedMode getKeySharedModeOfSubscription(Topic topic, String subscription) {
if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.persistent)) {
return ((PersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)
.getDispatcher()).getKeySharedMode();
} else if (TopicName.get(topic.getName()).getDomain().equals(TopicDomain.non_persistent)) {
return ((NonPersistentStickyKeyDispatcherMultipleConsumers) topic.getSubscription(subscription)
.getDispatcher()).getKeySharedMode();
}
return null;
}

private Consumer<String> createFixedHashRangesConsumer(String topic, String subscription, Range... ranges) throws PulsarClientException {
return pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
Expand Down

0 comments on commit 49be8b8

Please sign in to comment.