From 6ba48bc2032b1c3c37c442c724c2d309b18511ac Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 22 Apr 2026 10:24:17 +0800 Subject: [PATCH] feat: add replicateSubscriptionState to ConsumerConfig Add support for replicateSubscriptionState option in ConsumerConfig, which enables geo-replication failover by synchronizing subscription cursor state across clusters. Fixes #478 --- index.d.ts | 1 + src/ConsumerConfig.cc | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/index.d.ts b/index.d.ts index 0e4f070..607842f 100644 --- a/index.d.ts +++ b/index.d.ts @@ -110,6 +110,7 @@ export interface ConsumerConfig { deadLetterPolicy?: DeadLetterPolicy; batchReceivePolicy?: ConsumerBatchReceivePolicy; keySharedPolicy?: KeySharedPolicy; + replicateSubscriptionState?: boolean; } export class Consumer { diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc index e7419c6..eff2024 100644 --- a/src/ConsumerConfig.cc +++ b/src/ConsumerConfig.cc @@ -62,6 +62,7 @@ static const std::string CFG_KEY_SHARED_POLICY_MODE = "keyShareMode"; static const std::string CFG_KEY_SHARED_POLICY_ALLOW_OUT_OF_ORDER = "allowOutOfOrderDelivery"; static const std::string CFG_KEY_SHARED_POLICY_STICKY_RANGES = "stickyRanges"; static const std::string CFG_CRYPTO_KEY_READER = "cryptoKeyReader"; +static const std::string CFG_REPLICATE_SUBSCRIPTION_STATE = "replicateSubscriptionState"; static const std::map SUBSCRIPTION_TYPE = { {"Exclusive", pulsar_ConsumerExclusive}, @@ -400,6 +401,13 @@ void ConsumerConfig::InitConfig(std::shared_ptr deferred, } this->cConsumerConfig.get()->consumerConfiguration.setKeySharedPolicy(cppKeySharedPolicy); } + + if (consumerConfig.Has(CFG_REPLICATE_SUBSCRIPTION_STATE) && + consumerConfig.Get(CFG_REPLICATE_SUBSCRIPTION_STATE).IsBoolean()) { + bool replicateSubscriptionState = consumerConfig.Get(CFG_REPLICATE_SUBSCRIPTION_STATE).ToBoolean(); + this->cConsumerConfig.get()->consumerConfiguration.setReplicateSubscriptionStateEnabled( + replicateSubscriptionState); + } } ConsumerConfig::~ConsumerConfig() {