Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 75 additions & 3 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@

### Producer

* `sendBatch` is currently unsupported - but will be supported. TODO. However, the actual batching semantics are handled by librdkafka.
* `sendBatch` is not supported (YET). However, the actual batching semantics are handled by librdkafka.
* Changes to `send`:
1. `acks`, `compression` and `timeout` are not set on a per-send basis. Rather, they must be configured in the configuration.
* `acks`, `compression` and `timeout` are not set on a per-send basis. Rather, they must be configured in the configuration.
Before:
```javascript
const kafka = new Kafka({/* ... */});
Expand Down Expand Up @@ -99,8 +99,80 @@
});
```

* Error-handling for a failed `send` is stricter. While sending multiple messages, even if one of the messages fails, the method throws an error.
* Error-handling for a failed `send` is stricter. While sending multiple messages, even if one of the messages fails, the method throws an error.

### Consumer

* While passing a list of topics to `subscribe`, the `fromBeginning` property is not supported. Instead, the property `auto.offset.reset` needs to be used.
Before:
```javascript
const kafka = new Kafka({ /* ... */ });
const consumer = kafka.consumer({
groupId: 'test-group',
});
await consumer.connect();
await consumer.subscribe({ topics: ["topic"], fromBeginning: true});
```

After:
```javascript
const kafka = new Kafka({ /* ... */ });
const consumer = kafka.consumer({
groupId: 'test-group',
rdKafka: {
topicConfig: {
'auto.offset.reset': 'earliest',
},
}
});
await consumer.connect();
await consumer.subscribe({ topics: ["topic"] });
```

* For auto-commiting using a consumer, the properties on `run` are no longer used. Instead, corresponding rdKafka properties must be set.
* `autoCommit` corresponds to `enable.auto.commit`.
* `autoCommitInterval` corresponds to `auto.commit.interval.ms`.
* `autoCommitThreshold` is no longer supported.

Before:
```javascript
const kafka = new Kafka({ /* ... */ });
const consumer = kafka.consumer({ /* ... */ });
await consumer.connect();
await consumer.subscribe({ topics: ["topic"] });
consumer.run({
eachMessage: someFunc,
autoCommit: true,
autoCommitThreshold: 5000,
});
```

After:
```javascript
const kafka = new Kafka({ /* ... */ });
const consumer = kafka.consumer({
/* ... */,
rdKafka: {
globalConfig: {
"enable.auto.commit": "true",
"auto.commit.interval.ms": "5000",
}
},
});
await consumer.connect();
await consumer.subscribe({ topics: ["topic"] });
consumer.run({
eachMessage: someFunc,
});
```

* For the `eachMessage` method while running the consumer:
* The `heartbeat()` no longer needs to be called. Heartbeats are automatically managed by librdkafka.
* The `partitionsConsumedConcurrently` property is not supported (YET).
* The `eachBatch` method is not supported.
* `commitOffsets` does not (YET) support sending metadata for topic partitions being commited.
* `paused()` is not (YET) supported.
* Custom partition assignors are not supported.


## node-rdkafka
20 changes: 20 additions & 0 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,26 @@ KafkaConsumer.prototype.commitMessageSync = function(msg) {
return this;
};

/**
* Commits a list of offsets per topic partition, using provided callback.
*
* @param {TopicPartition[]} toppars - Topic partition list to commit
* offsets for. Defaults to the current assignment
* @param {Function} cb - Callback method to execute when finished
* @return {Client} - Returns itself
*/
KafkaConsumer.prototype.commitCb = function(toppars, cb) {
this._client.commitCb(toppars, function(err) {
if (err) {
cb(LibrdKafkaError.create(err));
return;
}

cb(null);
});
return this;
};

/**
* Get last known offsets from the client.
*
Expand Down
6 changes: 6 additions & 0 deletions lib/kafkajs/_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,15 @@ function convertToRdKafkaHeaders(kafkaJSHeaders) {
return headers;
}


function notImplemented(msg = 'Not implemented') {
throw new error.KafkaJSError(msg, { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
}

module.exports = {
kafkaJSToRdKafkaConfig,
topicPartitionOffsetToRdKafka,
createKafkaJsErrorFromLibRdKafkaError,
convertToRdKafkaHeaders,
notImplemented,
};
Loading