Skip to content

Commit

Permalink
feat: support pattern subscription non persistent topic.
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Jun 13, 2023
1 parent bc5182a commit 692ce62
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 0 deletions.
6 changes: 6 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export interface ConsumerConfig {
maxPendingChunkedMessage?: number;
autoAckOldestChunkedMessageOnQueueFull?: number;
schema?: SchemaInfo;
regexSubscriptionMode?: RegexSubscriptionMode;
}

export class Consumer {
Expand Down Expand Up @@ -249,6 +250,11 @@ export type ConsumerCryptoFailureAction =
'DISCARD' |
'CONSUME';

export type RegexSubscriptionMode =
'PersistentOnly' |
'NonPersistentOnly' |
'AllTopics';

export type SchemaType =
'None' |
'String' |
Expand Down
16 changes: 16 additions & 0 deletions src/ConsumerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ static const std::string CFG_TOPICS_PATTERN = "topicsPattern";
static const std::string CFG_SUBSCRIPTION = "subscription";
static const std::string CFG_SUBSCRIPTION_TYPE = "subscriptionType";
static const std::string CFG_INIT_POSITION = "subscriptionInitialPosition";
static const std::string CFG_REGEX_SUBSCRIPTION_MODE = "regexSubscriptionMode";
static const std::string CFG_ACK_TIMEOUT = "ackTimeoutMs";
static const std::string CFG_NACK_REDELIVER_TIMEOUT = "nAckRedeliverTimeoutMs";
static const std::string CFG_RECV_QUEUE = "receiverQueueSize";
Expand All @@ -52,6 +53,11 @@ static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"KeyShared", pulsar_ConsumerKeyShared},
{"Failover", pulsar_ConsumerFailover}};

static const std::map<std::string, pulsar_consumer_regex_subscription_mode> REGEX_SUBSCRIPTION_MODE = {
{"PersistentOnly", pulsar_consumer_regex_sub_mode_PersistentOnly},
{"NonPersistentOnly", pulsar_consumer_regex_sub_mode_NonPersistentOnly},
{"AllTopics", pulsar_consumer_regex_sub_mode_AllTopics}};

static const std::map<std::string, initial_position> INIT_POSITION = {
{"Latest", initial_position_latest}, {"Earliest", initial_position_earliest}};

Expand Down Expand Up @@ -110,6 +116,16 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
}
}

if (consumerConfig.Has(CFG_REGEX_SUBSCRIPTION_MODE) &&
consumerConfig.Get(CFG_REGEX_SUBSCRIPTION_MODE).IsString()) {
std::string regexSubscriptionMode =
consumerConfig.Get(CFG_REGEX_SUBSCRIPTION_MODE).ToString().Utf8Value();
if (REGEX_SUBSCRIPTION_MODE.count(regexSubscriptionMode)) {
pulsar_consumer_configuration_set_regex_subscription_mode(
this->cConsumerConfig.get(), REGEX_SUBSCRIPTION_MODE.at(regexSubscriptionMode));
}
}

if (consumerConfig.Has(CFG_CONSUMER_NAME) && consumerConfig.Get(CFG_CONSUMER_NAME).IsString()) {
std::string consumerName = consumerConfig.Get(CFG_CONSUMER_NAME).ToString().Utf8Value();
if (!consumerName.empty())
Expand Down
52 changes: 52 additions & 0 deletions tests/consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,57 @@ const Pulsar = require('../index.js');
await expect(consumer.close()).rejects.toThrow('Failed to close consumer: AlreadyClosed');
});
});

describe('Regex subscription', () => {
test('Regex subscription', async () => {
const topicName1 = 'persistent://public/default/regex-sub-1';
const topicName2 = 'persistent://public/default/regex-sub-2';
const topicName3 = 'non-persistent://public/default/regex-sub-3';
const topicName4 = 'persistent://public/default/no-match-regex-sub-2';
const producer1 = await client.createProducer({
topic: topicName1,
});
const producer2 = await client.createProducer({
topic: topicName2,
});
const producer3 = await client.createProducer({
topic: topicName3,
});
const producer4 = await client.createProducer({
topic: topicName4,
});

const consumer = await client.subscribe({
topicsPattern: 'persistent://public/default/regex-sub.*',
subscription: 'sub1',
subscriptionType: 'Shared',
regexSubscriptionMode: 'AllTopics',
});

const num = 10;
for (let i = 0; i < num; i += 1) {
const msg = `my-message-${i}`;
await producer1.send({ data: Buffer.from(msg) });
await producer2.send({ data: Buffer.from(msg) });
await producer3.send({ data: Buffer.from(msg) });
await producer4.send({ data: Buffer.from(msg) });
}
const results = [];
for (let i = 0; i < 3 * num; i += 1) {
const msg = await consumer.receive();
results.push(msg.getData().toString());
}
expect(results.length).toEqual(3 * num);
// assert no more msgs.
await expect(consumer.receive(1000)).rejects.toThrow(
'Failed to receive message: TimeOut',
);
await producer1.close();
await producer2.close();
await producer3.close();
await producer4.close();
await consumer.close();
});
});
});
})();

0 comments on commit 692ce62

Please sign in to comment.