diff --git a/MIGRATION.md b/MIGRATION.md index 5794f5ad..76952c61 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -61,9 +61,9 @@ ### Producer -* `sendBatch` is currently unsupported - but will be supported. TODO. However, the actual batching semantics are handled by librdkafka. +* `sendBatch` is not supported (YET). However, the actual batching semantics are handled by librdkafka. * Changes to `send`: - 1. `acks`, `compression` and `timeout` are not set on a per-send basis. Rather, they must be configured in the configuration. + * `acks`, `compression` and `timeout` are not set on a per-send basis. Rather, they must be configured in the configuration. Before: ```javascript const kafka = new Kafka({/* ... */}); @@ -99,8 +99,80 @@ }); ``` - * Error-handling for a failed `send` is stricter. While sending multiple messages, even if one of the messages fails, the method throws an error. + * Error-handling for a failed `send` is stricter. While sending multiple messages, even if one of the messages fails, the method throws an error. ### Consumer + * While passing a list of topics to `subscribe`, the `fromBeginning` property is not supported. Instead, the property `auto.offset.reset` needs to be used. + Before: + ```javascript + const kafka = new Kafka({ /* ... */ }); + const consumer = kafka.consumer({ + groupId: 'test-group', + }); + await consumer.connect(); + await consumer.subscribe({ topics: ["topic"], fromBeginning: true}); + ``` + + After: + ```javascript + const kafka = new Kafka({ /* ... */ }); + const consumer = kafka.consumer({ + groupId: 'test-group', + rdKafka: { + topicConfig: { + 'auto.offset.reset': 'earliest', + }, + } + }); + await consumer.connect(); + await consumer.subscribe({ topics: ["topic"] }); + ``` + + * For auto-commiting using a consumer, the properties on `run` are no longer used. Instead, corresponding rdKafka properties must be set. + * `autoCommit` corresponds to `enable.auto.commit`. + * `autoCommitInterval` corresponds to `auto.commit.interval.ms`. + * `autoCommitThreshold` is no longer supported. + + Before: + ```javascript + const kafka = new Kafka({ /* ... */ }); + const consumer = kafka.consumer({ /* ... */ }); + await consumer.connect(); + await consumer.subscribe({ topics: ["topic"] }); + consumer.run({ + eachMessage: someFunc, + autoCommit: true, + autoCommitThreshold: 5000, + }); + ``` + + After: + ```javascript + const kafka = new Kafka({ /* ... */ }); + const consumer = kafka.consumer({ + /* ... */, + rdKafka: { + globalConfig: { + "enable.auto.commit": "true", + "auto.commit.interval.ms": "5000", + } + }, + }); + await consumer.connect(); + await consumer.subscribe({ topics: ["topic"] }); + consumer.run({ + eachMessage: someFunc, + }); + ``` + + * For the `eachMessage` method while running the consumer: + * The `heartbeat()` no longer needs to be called. Heartbeats are automatically managed by librdkafka. + * The `partitionsConsumedConcurrently` property is not supported (YET). + * The `eachBatch` method is not supported. + * `commitOffsets` does not (YET) support sending metadata for topic partitions being commited. + * `paused()` is not (YET) supported. + * Custom partition assignors are not supported. + + ## node-rdkafka diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index dd981ade..ae7452ce 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -582,6 +582,26 @@ KafkaConsumer.prototype.commitMessageSync = function(msg) { return this; }; +/** + * Commits a list of offsets per topic partition, using provided callback. + * + * @param {TopicPartition[]} toppars - Topic partition list to commit + * offsets for. Defaults to the current assignment + * @param {Function} cb - Callback method to execute when finished + * @return {Client} - Returns itself + */ +KafkaConsumer.prototype.commitCb = function(toppars, cb) { + this._client.commitCb(toppars, function(err) { + if (err) { + cb(LibrdKafkaError.create(err)); + return; + } + + cb(null); + }); + return this; +}; + /** * Get last known offsets from the client. * diff --git a/lib/kafkajs/_common.js b/lib/kafkajs/_common.js index 5c8423a3..2878c9fa 100644 --- a/lib/kafkajs/_common.js +++ b/lib/kafkajs/_common.js @@ -125,9 +125,15 @@ function convertToRdKafkaHeaders(kafkaJSHeaders) { return headers; } + +function notImplemented(msg = 'Not implemented') { + throw new error.KafkaJSError(msg, { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED }); +} + module.exports = { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka, createKafkaJsErrorFromLibRdKafkaError, convertToRdKafkaHeaders, + notImplemented, }; diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 776d9d4b..928a7a3d 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -1,6 +1,13 @@ const LibrdKafkaError = require('../error'); +const error = require('./_error'); const RdKafka = require('../rdkafka'); -const { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka } = require('./_common'); +const { + kafkaJSToRdKafkaConfig, + topicPartitionOffsetToRdKafka, + createKafkaJsErrorFromLibRdKafkaError, + notImplemented +} = require('./_common'); +const { Buffer } = require('buffer'); const ConsumerState = Object.freeze({ INIT: 0, @@ -11,10 +18,34 @@ const ConsumerState = Object.freeze({ }); class Consumer { - #kJSConfig = null + /** + * kJSConfig is the kafkaJS config object. + * @type {import("../../types/kafkajs").ConsumerConfig|null} + */ + #kJSConfig = null; + + /** + * rdKafkaConfig contains the config objects that will be passed to node-rdkafka. + * @type {{globalConfig: import("../../types/config").ConsumerGlobalConfig, topicConfig: import("../../types/config").ConsumerTopicConfig}|null} + */ #rdKafkaConfig = null; + + /** + * internalClient is the node-rdkafka client used by the API. + * @type {import("../rdkafka").Consumer|null} + */ #internalClient = null; + + /** + * connectPromiseFunc is the set of promise functions used to resolve/reject the connect() promise. + * @type {{resolve: Function, reject: Function}|{}} + */ #connectPromiseFunc = {}; + + /** + * state is the current state of the consumer. + * @type {ConsumerState} + */ #state = ConsumerState.INIT; /** @@ -31,6 +62,56 @@ class Consumer { return this.#rdKafkaConfig; } + /** + * Used as a trampoline to the user's rebalance listener, if any. + * @param {Error} err - error in rebalance + * @param {import("../../types").TopicPartition[]} assignment + */ + #rebalanceCallback(err, assignment) { + // Create the librdkafka error + err = LibrdKafkaError.create(err); + + let call; + switch (err.code) { + // TODO: is this the right way to handle this error? + // We might just be able to throw, because the error is something the user has caused. + case LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS: + call = (this.#kJSConfig.rebalanceListener.onPartitionsAssigned ? + this.#kJSConfig.rebalanceListener.onPartitionsAssigned(assignment) : + Promise.resolve()).catch(console.error); + break; + case LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS: + call = (this.#kJSConfig.rebalanceListener.onPartitionsRevoked ? + this.#kJSConfig.rebalanceListener.onPartitionsRevoked(assignment) : + Promise.resolve()).catch(console.error); + break; + default: + call = Promise.reject(`Unexpected rebalanceListener error code ${err.code}`).catch((e) => { + console.error(e); + }); + break; + } + + call + .finally(() => { + // Emit the event + this.#internalClient.emit('rebalance', err, assignment); + + try { + if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) { + this.#internalClient.assign(assignment); + } else { + this.#internalClient.unassign(); + } + } catch (e) { + // Ignore exceptions if we are not connected + if (this.#internalClient.isConnected()) { + this.#internalClient.emit('rebalance.error', e); + } + } + }); + } + async #finalizedConfig() { const { globalConfig, topicConfig } = await kafkaJSToRdKafkaConfig(this.#kJSConfig); if (this.#kJSConfig.groupId) { @@ -38,56 +119,15 @@ class Consumer { } globalConfig['offset_commit_cb'] = true; if (this.#kJSConfig.rebalanceListener) { - globalConfig['rebalance_cb'] = (err, assignment) => { - // Create the librdkafka error - err = LibrdKafkaError.create(err); - - let call; - switch (err.code) { - case LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS: - call = (this.#kJSConfig.rebalanceListener.onPartitionsAssigned ? - this.#kJSConfig.rebalanceListener.onPartitionsAssigned(assignment) : - Promise.resolve()).catch(console.error); - break; - case LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS: - call = (this.#kJSConfig.rebalanceListener.onPartitionsRevoked ? - this.#kJSConfig.rebalanceListener.onPartitionsRevoked(assignment) : - Promise.resolve()).catch(console.error); - break; - default: - call = Promise.reject().catch(() => { - console.error(`Unexpected rebalanceListener error code ${err.code}`); - }); - break; - } - - call - .finally(() => { - // Emit the event - this.#internalClient.emit('rebalance', err, assignment); - - try { - if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) { - this.#internalClient.assign(assignment); - } else { - this.#internalClient.unassign(); - } - } catch (e) { - // Ignore exceptions if we are not connected - if (this.#internalClient.isConnected()) { - this.#internalClient.emit('rebalance.error', e); - } - } - }); - }; + globalConfig['rebalance_cb'] = this.#rebalanceCallback.bind(this); } return { globalConfig, topicConfig }; } - #readyCb(arg) { + #readyCb() { if (this.#state !== ConsumerState.CONNECTING) { - // I really don't know how to handle this now. - return; + /* The connectPromiseFunc might not be set, so we throw such an error. It's a state error that we can't recover from. Probably a bug. */ + throw new error.KafkaJSError(`Ready callback called in invalid state ${this.#state}`, { code: error.ErrorCodes.ERR__STATE }); } this.#state = ConsumerState.CONNECTED; @@ -95,33 +135,40 @@ class Consumer { this.#connectPromiseFunc['resolve'](); } - #errorCb(args) { - console.log('error', args); + /** + * Callback for the event.error event, either fails the initial connect(), or logs the error. + * @param {Error} err + */ + #errorCb(err) { if (this.#state === ConsumerState.CONNECTING) { - this.#connectPromiseFunc['reject'](args); + this.#connectPromiseFunc['reject'](err); } else { - // do nothing for now. + /* TODO: we should log the error returned here, depending on the log level. + * Right now, we're just using console.err, but we should allow for a custom + * logger, or at least make a function in _common.js that handles consumer + * and producer. */ + console.error(err); } } - #notImplemented() { - throw new Error('Not implemented'); - } - + /** + * Converts a message returned by node-rdkafka into a message that can be used by the eachMessage callback. + * @param {import("../..").Message} message + * @returns {import("../../types/kafkajs").EachMessagePayload} + */ #createPayload(message) { - var key = message.key == null ? null : message.key; + let key = message.key; if (typeof key === 'string') { key = Buffer.from(key); } - let timestamp = message.timestamp ? new Date(message.timestamp).toISOString() - : ''; + let timestamp = message.timestamp ? String(message.timestamp) : ''; - var headers = undefined; + let headers; if (message.headers) { headers = {} for (const [key, value] of Object.entries(message.headers)) { - if (!headers[key]) { + if (!Object.hasOwn(headers, key)) { headers[key] = value; } else if (headers[key].constructor === Array) { headers[key].push(value); @@ -139,39 +186,47 @@ class Consumer { value: message.value, timestamp, attributes: 0, - offset: message.offset, + offset: String(message.offset), size: message.size, headers }, - heartbeat: async () => { }, - pause: () => { } - } + heartbeat: async () => { /* no op */ }, + pause: () => this.pause([{ topic: message.topic, partitions: [message.partition] }]), + }; } + /** + * Consumes a single message from the internal consumer. + * @returns {Promise} a promise that resolves to a single message. + */ async #consumeSingle() { return new Promise((resolve, reject) => { this.#internalClient.consume(1, function (err, messages) { if (err) { - reject(`Consume error code ${err.code}`); + reject(createKafkaJsErrorFromLibRdKafkaError(err)); return; } - const message = messages[0]; resolve(message); }); }); } + /** + * Flattens a list of topics with partitions into a list of topic, partition. + * @param {({topic: string, partitions: number[]}|{topic: string, partition: number})[]} topics + * @returns {import("../../types").TopicPartition[]} a list of (topic, partition). + */ #flattenTopicPartitions(topics) { const ret = []; - for (let topic of topics) { - if (topic.partition !== null) + for (const topic of topics) { + if (typeof topic.partition === 'number') ret.push({ topic: topic.topic, partition: topic.partition }); else { - for (let partition of topic.partitions) { + for (const partition of topic.partitions) { ret.push({ topic: topic.topic, partition }); } } @@ -179,17 +234,25 @@ class Consumer { return ret; } + /** + * @returns {import("../rdkafka").Consumer} the internal node-rdkafka client. + */ _getInternalConsumer() { return this.#internalClient; } + /** + * Set up the client and connect to the bootstrap brokers. + * @returns {Promise} a promise that resolves when the consumer is connected. + */ async connect() { if (this.#state !== ConsumerState.INIT) { - return Promise.reject('Connect has already been called elsewhere.'); + throw new error.KafkaJSError('Connect has already been called elsewhere.', { code: error.ErrorCodes.ERR__STATE }); } - this.#state = ConsumerState.CONNECTING; const { globalConfig, topicConfig } = await this.#config(); + + this.#state = ConsumerState.CONNECTING; this.#internalClient = new RdKafka.KafkaConsumer(globalConfig, topicConfig); this.#internalClient.on('ready', this.#readyCb.bind(this)); this.#internalClient.on('event.error', this.#errorCb.bind(this)); @@ -197,108 +260,236 @@ class Consumer { return new Promise((resolve, reject) => { this.#connectPromiseFunc = { resolve, reject }; - console.log('Connecting....'); this.#internalClient.connect(); - console.log('connect() called'); }); } + /** + * Subscribes the consumer to the given topics. + * @param {import("../../types/kafkajs").ConsumerSubscribeTopics} subscription + */ async subscribe(subscription) { + if (typeof subscription.fromBeginning == 'boolean') { + throw new error.KafkaJSError( + 'fromBeginning is not supported by subscribe(), but must be passed as rdKafka properties to the consumer', + { code: error.ErrorCodes.ERR__INVALID_ARG }); + } + this.#internalClient.subscribe(subscription.topics); } async stop() { - this.#notImplemented(); + notImplemented(); } + /** + * Starts consumer polling. + * @param {import("../../types/kafkajs").ConsumerRunConfig} config + */ async run(config) { if (this.#state !== ConsumerState.CONNECTED) { - throw new Error('Run must be called in state CONNECTED.'); + throw new error.KafkaJSError('Run must be called after a successful connect().', { code: error.ErrorCodes.ERR__STATE }); + } + + if (typeof config.autoCommit == 'boolean' || typeof config.autoCommitInterval == 'number' || typeof config.autoCommitThreshold == 'number') { + throw new error.KafkaJSError( + 'autoCommit related properties are not supported by run(), but must be passed as rdKafka properties to the consumer.', + { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED }); } while (this.#state === ConsumerState.CONNECTED) { - let m = await this.#consumeSingle(); + const m = await this.#consumeSingle(); if (m) { + /* TODO: add partitionsConsumedConcurrently-based concurrency here. + * If we maintain a map of topic partitions to promises, and a counter, + * we can probably achieve it with the correct guarantees of ordering + * though to maximize performance, we need to consume only from partitions for which + * an eachMessage call is not already going. + * It's risky to consume, and then store the message in something like an + * array/list until it can be processed, because librdkafka marks it as + * 'stored'... but anyway - we can implement something like this. + */ await config.eachMessage( this.#createPayload(m) ) + /* TODO: another check we need to do here is to see how kafkaJS is handling + * commits. Are they commmitting after a message is _processed_? + * In that case we need to turn off librdkafka's auto-commit, and commit + * inside this function. + */ } } } + /** + * Commit offsets for the given topic partitions. If topic partitions are not specified, commits all offsets. + * @param {import("../../types/kafkajs").TopicPartitionOffsetAndMetadata[]?} topicPartitions + * @returns {Promise} a promise that resolves when the offsets have been committed. + */ async commitOffsets(topicPartitions = null) { - try { - if (topicPartitions == null) { - this.#internalClient.commitSync(); - } else { - const topicPartitions = topicPartitions.map( - topicPartitionOffsetToRdKafka); - this.#internalClient.commitSync(topicPartitions); - } - } catch (e) { - if (!e.code || e.code != LibrdKafkaError.codes.ERR__NO_OFFSET) { - throw e; - } + if (this.#state !== ConsumerState.CONNECTED) { + return Promise.reject(new error.KafkaJSError('Commit can only be called while connected.', { code: error.ErrorCodes.ERR__STATE })); } + + return new Promise((resolve, reject) => { + try { + let cb = (e) => { + if (e) + reject(createKafkaJsErrorFromLibRdKafkaError(e)); + else + resolve(); + }; + + if (topicPartitions) + topicPartitions = topicPartitions.map(topicPartitionOffsetToRdKafka); + else + topicPartitions = null; + this.#internalClient.commitCb(topicPartitions, cb); + } catch (e) { + if (!e.code || e.code !== error.ErrorCodes.ERR__NO_OFFSET) + reject(createKafkaJsErrorFromLibRdKafkaError(e)); + else + resolve(); + } + }); } + /** + * Seek to the given offset for the topic partition. + * @param {import("../../types/kafkajs").TopicPartitionOffset} topicPartitionOffset + * @returns {Promise} a promise that resolves when the consumer has seeked. + */ seek(topicPartitionOffset) { + if (this.#state !== ConsumerState.CONNECTED) { + throw new error.KafkaJSError('Seek can only be called while connected.', { code: error.ErrorCodes.ERR__STATE }); + } + return new Promise((resolve, reject) => { const rdKafkaTopicPartitionOffset = topicPartitionOffsetToRdKafka(topicPartitionOffset); this.#internalClient.seek(rdKafkaTopicPartitionOffset, 0, (err) => { if (err) { - reject(new Error(`Seek error code ${err.code}`)); + reject(createKafkaJsErrorFromLibRdKafkaError(err)); } else { resolve(); } }); - }).catch(console.error); // Default handler + }); } async describeGroup() { - this.#notImplemented(); + notImplemented(); } + /** + * Find the assigned topic partitions for the consumer. + * @returns {import("../../types").TopicPartition[]} the current assignment. + */ + assignment() { + if (this.#state !== ConsumerState.CONNECTED) { + throw new error.KafkaJSError('Assignment can only be called while connected.', { code: error.ErrorCodes.ERR__STATE }); + } + + return this.#flattenTopicPartitions(this.#internalClient.assignments()); + } + + /** + * Fetches all partitions of topic that are assigned to this consumer. + * @param {string} topic + * @returns {number[]} a list of partitions. + */ + #getAllAssignedPartition(topic) { + return this.#internalClient.assignments() + .filter((partition) => partition.topic === topic) + .map((tpo) => tpo.partition); + } + + /** + * Pauses the given topic partitions. If partitions are not specified, pauses + * all partitions for the given topic. If topic partition(s) are already paused + * this method has no effect. + * @param {{topic: string, partitions?: number[]}[]} topics + */ pause(topics) { + if (this.#state !== ConsumerState.CONNECTED) { + throw new error.KafkaJSError('Pause can only be called while connected.', { code: error.ErrorCodes.ERR__STATE }); + } + + for (let topic of topics) { + if (!topic.partitions) { + topic.partitions = this.#getAllAssignedPartition(topic.topic); + } + } + topics = this.#flattenTopicPartitions(topics); + if (topics.length === 0) { + return; + } + this.#internalClient.pause(topics); } paused() { - this.#notImplemented(); + notImplemented(); } - assignment() { - return this.#flattenTopicPartitions(this.#internalClient.assignments()); - } + /** + * Resumes the given topic partitions. If partitions are not specified, resumes + * all partitions for the given topic. If topic partition(s) are already resumed + * this method has no effect. + * @param {{topic: string, partitions?: number[]}[]} topics + */ resume(topics) { + if (this.#state !== ConsumerState.CONNECTED) { + throw new error.KafkaJSError('Resume can only be called while connected.', { code: error.ErrorCodes.ERR__STATE }); + } + + for (let topic of topics) { + if (!topic.partitions) { + topic.partitions = this.#getAllAssignedPartition(topic.topic); + } + } + topics = this.#flattenTopicPartitions(topics); this.#internalClient.resume(topics); } - on(eventName, listener) { - this.#notImplemented(); + on(/* eventName, listener */) { + notImplemented(); } logger() { - this.#notImplemented(); + notImplemented(); } get events() { - this.#notImplemented(); + notImplemented(); + return null; } + /** + * Disconnects and cleans up the consumer. + * @returns {Promise} a promise that resolves when the consumer has disconnected. + */ async disconnect() { + if (this.#state === ConsumerState.INIT) { + throw new error.KafkaJSError('Disconnect can only be called once consumer is connected.', { code: error.ErrorCodes.ERR__STATE }); + } + if (this.#state >= ConsumerState.DISCONNECTING) { return; } + this.#state = ConsumerState.DISCONNECTING; await new Promise((resolve, reject) => { const cb = (err) => { - err ? reject(err) : resolve(); + if (err) { + reject(createKafkaJsErrorFromLibRdKafkaError(err)); + return; + } this.#state = ConsumerState.DISCONNECTED; + resolve(); } this.#internalClient.disconnect(cb); }); diff --git a/lib/kafkajs/_producer.js b/lib/kafkajs/_producer.js index a6b68588..8181cbf7 100644 --- a/lib/kafkajs/_producer.js +++ b/lib/kafkajs/_producer.js @@ -102,7 +102,7 @@ class Producer { } this.#state = ProducerState.INITIALIZED_TRANSACTIONS; - this.#readyCb(null); + this.#readyCb(); } /** diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index eccab3e9..f3331a98 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -532,6 +532,7 @@ void KafkaConsumer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "commit", NodeCommit); Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync); + Nan::SetPrototypeMethod(tpl, "commitCb", NodeCommitCb); Nan::SetPrototypeMethod(tpl, "offsetsStore", NodeOffsetsStore); constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext())) @@ -875,6 +876,45 @@ NAN_METHOD(KafkaConsumer::NodeCommitSync) { info.GetReturnValue().Set(Nan::New(error_code)); } +NAN_METHOD(KafkaConsumer::NodeCommitCb) { + Nan::HandleScope scope; + int error_code; + std::optional> toppars = std::nullopt; + Nan::Callback *callback; + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + if (!consumer->IsConnected()) { + Nan::ThrowError("KafkaConsumer is disconnected"); + return; + } + + if (info.Length() != 2) { + Nan::ThrowError("Two arguments are required"); + return; + } + + if (!( + (info[0]->IsArray() || info[0]->IsNull()) && + info[1]->IsFunction() + )) { + Nan::ThrowError("First argument should be an array or null and second one a callback"); + return; + } + + if (info[0]->IsArray()) { + toppars = + Conversion::TopicPartition::FromV8Array(info[0].As()); + } + callback = new Nan::Callback(info[1].As()); + + Nan::AsyncQueueWorker( + new Workers::KafkaConsumerCommitCb(callback, consumer, + toppars)); + + info.GetReturnValue().Set(Nan::Null()); +} + NAN_METHOD(KafkaConsumer::NodeSubscribe) { Nan::HandleScope scope; diff --git a/src/kafka-consumer.h b/src/kafka-consumer.h index d5991944..a25efd00 100644 --- a/src/kafka-consumer.h +++ b/src/kafka-consumer.h @@ -110,6 +110,7 @@ class KafkaConsumer : public Connection { static NAN_METHOD(NodeUnsubscribe); static NAN_METHOD(NodeCommit); static NAN_METHOD(NodeCommitSync); + static NAN_METHOD(NodeCommitCb); static NAN_METHOD(NodeOffsetsStore); static NAN_METHOD(NodeCommitted); static NAN_METHOD(NodePosition); diff --git a/src/workers.cc b/src/workers.cc index 749732d0..c6e5dca4 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -1036,6 +1036,58 @@ void KafkaConsumerCommitted::HandleErrorCallback() { callback->Call(argc, argv); } +/** + * @brief KafkaConsumer commit offsets with a callback function. + * + * The first callback argument is the commit error, or null on success. + * + * @see RdKafka::KafkaConsumer::commitSync + */ +KafkaConsumerCommitCb::KafkaConsumerCommitCb(Nan::Callback *callback, + KafkaConsumer* consumer, + std::optional> & t) : + ErrorAwareWorker(callback), + m_consumer(consumer), + m_topic_partitions(t) {} + +KafkaConsumerCommitCb::~KafkaConsumerCommitCb() { + // Delete the underlying topic partitions as they are ephemeral or cloned + if (m_topic_partitions.has_value()) + RdKafka::TopicPartition::destroy(m_topic_partitions.value()); +} + +void KafkaConsumerCommitCb::Execute() { + Baton b = Baton(NULL); + if (m_topic_partitions.has_value()) { + b = m_consumer->Commit(m_topic_partitions.value()); + } else { + b = m_consumer->Commit(); + } + if (b.err() != RdKafka::ERR_NO_ERROR) { + SetErrorBaton(b); + } +} + +void KafkaConsumerCommitCb::HandleOKCallback() { + Nan::HandleScope scope; + + const unsigned int argc = 1; + v8::Local argv[argc]; + + argv[0] = Nan::Null(); + + callback->Call(argc, argv); +} + +void KafkaConsumerCommitCb::HandleErrorCallback() { + Nan::HandleScope scope; + + const unsigned int argc = 1; + v8::Local argv[argc] = { GetErrorObject() }; + + callback->Call(argc, argv); +} + /** * @brief KafkaConsumer seek * diff --git a/src/workers.h b/src/workers.h index b290d253..36388c41 100644 --- a/src/workers.h +++ b/src/workers.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include "src/common.h" @@ -417,6 +418,20 @@ class KafkaConsumerCommitted : public ErrorAwareWorker { const int m_timeout_ms; }; +class KafkaConsumerCommitCb : public ErrorAwareWorker { + public: + KafkaConsumerCommitCb(Nan::Callback*, + NodeKafka::KafkaConsumer*, std::optional> &); + ~KafkaConsumerCommitCb(); + + void Execute(); + void HandleOKCallback(); + void HandleErrorCallback(); + private: + NodeKafka::KafkaConsumer * m_consumer; + std::optional> m_topic_partitions; +}; + class KafkaConsumerSeek : public ErrorAwareWorker { public: KafkaConsumerSeek(Nan::Callback*, NodeKafka::KafkaConsumer*,