Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export interface ProducerConfig {
accessMode?: ProducerAccessMode;
batchingType?: ProducerBatchType;
messageRouter?: MessageRouter;
batchingMaxAllowedSizeInBytes?: number;
}

export class Producer {
Expand Down
11 changes: 11 additions & 0 deletions src/ProducerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ static const std::string CFG_COMPRESS_TYPE = "compressionType";
static const std::string CFG_BATCH_ENABLED = "batchingEnabled";
static const std::string CFG_BATCH_MAX_DELAY = "batchingMaxPublishDelayMs";
static const std::string CFG_BATCH_MAX_MSG = "batchingMaxMessages";
static const std::string CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES = "batchingMaxAllowedSizeInBytes";
static const std::string CFG_SCHEMA = "schema";
static const std::string CFG_PROPS = "properties";
static const std::string CFG_PUBLIC_KEY_PATH = "publicKeyPath";
Expand Down Expand Up @@ -201,6 +202,16 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
}
}

if (producerConfig.Has(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES) &&
producerConfig.Get(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES).IsNumber()) {
int64_t batchingMaxAllowedSizeInBytes =
producerConfig.Get(CFG_BATCH_MAX_ALLOWED_SIZE_IN_BYTES).ToNumber().Int64Value();
if (batchingMaxAllowedSizeInBytes > 0) {
pulsar_producer_configuration_set_batching_max_allowed_size_in_bytes(
this->cProducerConfig.get(), (unsigned long)batchingMaxAllowedSizeInBytes);
}
}

if (producerConfig.Has(CFG_SCHEMA) && producerConfig.Get(CFG_SCHEMA).IsObject()) {
SchemaInfo* schemaInfo = new SchemaInfo(producerConfig.Get(CFG_SCHEMA).ToObject());
schemaInfo->SetProducerSchema(this->cProducerConfig);
Expand Down
74 changes: 74 additions & 0 deletions tests/producer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@
expect(response.statusCode).toBe(204);
const producer = await client.createProducer({
topic,
messageRouter: (message, topicMetadata) => {

Check warning on line 209 in tests/producer.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'topicMetadata' is defined but never used

Check warning on line 209 in tests/producer.test.js

View workflow job for this annotation

GitHub Actions / Run unit tests (3.10)

'message' is defined but never used
throw new Error('Custom error in message router');
},
messageRoutingMode: 'CustomPartition',
Expand Down Expand Up @@ -239,5 +239,79 @@
expect(partitions.size).toBe(1);
}, 30000);
});
describe('Batching', () => {
function getBatchIndex(msgId) {
const parts = msgId.toString().split(':');
if (parts.length > 3) {
return Number(parts[3]);
}
return -1;
}

test('should batch messages based on max allowed size in bytes', async () => {
const topicName = `persistent://public/default/test-batch-size-in-bytes-${Date.now()}`;
const subName = 'subscription-name';
const numOfMessages = 30;
const prefix = '12345678'; // 8 bytes message prefix

let producer;
let consumer;

try {
// 1. Setup Producer with batching enabled and size limit
producer = await client.createProducer({
topic: topicName,
compressionType: 'LZ4',
batchingEnabled: true,
batchingMaxMessages: 10000,
batchingMaxAllowedSizeInBytes: 20,
});

// 2. Setup Consumer
consumer = await client.subscribe({
topic: topicName,
subscription: subName,
});

// 3. Send messages asynchronously
const sendPromises = [];
for (let i = 0; i < numOfMessages; i += 1) {
const messageContent = prefix + i;
const msg = {
data: Buffer.from(messageContent),
properties: { msgIndex: String(i) },
};
sendPromises.push(producer.send(msg));
}
await producer.flush();
await Promise.all(sendPromises);

// 4. Receive messages and run assertions
let receivedCount = 0;
for (let i = 0; i < numOfMessages; i += 1) {
const receivedMsg = await consumer.receive(5000);
const expectedMessageContent = prefix + i;

// Assert that batchIndex is 0 or 1, since batch size should be 2
const batchIndex = getBatchIndex(receivedMsg.getMessageId());
expect(batchIndex).toBeLessThan(2);

// Assert message properties and content
expect(receivedMsg.getProperties().msgIndex).toBe(String(i));
expect(receivedMsg.getData().toString()).toBe(expectedMessageContent);

await consumer.acknowledge(receivedMsg);
receivedCount += 1;
}

// 5. Final check on the number of consumed messages
expect(receivedCount).toBe(numOfMessages);
} finally {
// 6. Cleanup
if (producer) await producer.close();
if (consumer) await consumer.close();
}
}, 30000);
});
});
})();
Loading