KafkaTS is a Apache Kafka client library for Node.js. It provides both a low-level API for communicating directly with the Apache Kafka cluster and high-level APIs for publishing and subscribing to Kafka topics.
Supported Kafka versions: 3.6 and later
npm install kafka-tsexport const kafka = createKafkaClient({
clientId: 'my-app',
bootstrapServers: [{ host: 'localhost', port: 9092 }],
});const consumer = await kafka.startConsumer({
groupId: 'my-consumer-group',
topics: ['my-topic'],
onMessage: (message) => {
console.log(message);
},
});export const producer = kafka.createProducer();
await producer.send([{ topic: 'my-topic', key: 'key', value: 'value' }]);const cluster = kafka.createCluster();
await cluster.connect();
const { controllerId } = await cluster.sendRequest(API.METADATA, {
allowTopicAutoCreation: false,
includeTopicAuthorizedOperations: false,
topics: [],
});
await cluster.sendRequestToNode(controllerId)(API.CREATE_TOPICS, {
validateOnly: false,
timeoutMs: 10_000,
topics: [
{
name: 'my-topic',
numPartitions: 10,
replicationFactor: 3,
assignments: [],
configs: [],
},
],
});
await cluster.disconnect();process.once('SIGTERM', async () => {
await consumer.close(); // waits for the consumer to finish processing the last batch and disconnects
await producer.close();
});See the examples for more detailed examples.
By default KafkaTS logs out using a JSON logger. This can be globally replaced by calling setLogger method (see src/utils/logger.ts)
By default KafkaTS retries onBatch and onMessage using an exponential backoff strategy (see src/utils/retrier.ts). In case of failure, processed message offsets are committed, and the consumer is restarted.
In case you want to skip failed messages or implement a DLQ-like mechanism, you can overwrite retrier on startConsumer() and execute your own logic onFailure.
Example if you simply want to skip the failing messages:
await kafka.startConsumer({
// ...
retrier: createExponentialBackoffRetrier({ onFailure: () => {} }),
});Depending on the use case, you might want to control concurrency and batchGranularity.
When subscribing to a topic, the consumer group leader will distribute all subscribed topic partitions to consumers within the group. Each consumer will then fetch messages only from partitions assigned to them.
batchGranularity controls how messages are split into batches from a fetch response:
- broker - (default) all messages received from a single kafka broker will be included in a single batch.
- topic - all messages received from a single broker and topic will be included in a single batch.
- partition - a batch will only include messages from a single partition.
After each batch is processed, the consumer will commit offsets for the processed messages. The more granual the batch is, the more often offsets are committed.
concurrency controls how many aforementioned batches are processed concurrently.
By default, messages are partitioned by message key or round-robin if the key is null or undefined. Partition can be overwritten by partition property in the message. You can also override the default partitioner per producer instance kafka.createProducer({ partitioner: customPartitioner }).
A simple example how to partition messages by the value in message header x-partition-key:
import type { Partitioner } from 'kafka-ts';
import { defaultPartitioner } from 'kafka-ts';
const myPartitioner: Partitioner = (context) => {
const partition = defaultPartitioner(context);
return (message) => partition({ ...message, key: message.headers?.['x-partition-key'] });
};
const producer = kafka.createProducer({ partitioner: myPartitioner });
await producer.send([{ topic: 'my-topic', value: 'value', headers: { 'x-partition-key': '123' } }]);The existing low-level libraries (e.g. node-rdkafka) are bindings on librdkafka, which doesn't give enough control over the consumer logic. The existing high-level libraries (e.g. kafkajs) are missing a few crucial features.
- Static consumer membership - Rebalancing during rolling deployments causes delays. Using
groupInstanceIdin addition togroupIdcan avoid rebalancing and continue consuming partitions in the existing assignment. - Consuming messages without consumer groups - When you don't need the consumer to track the partition offsets, you can simply create a consumer without groupId and always either start consuming messages from the beginning or from the latest partition offset.
- Low-level API requests - It's possible to communicate directly with the Kafka cluster using the kafka api protocol.
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| clientId | string | false | null | The client id used for all requests. |
| bootstrapServers | TcpSocketConnectOpts[] | true | List of kafka brokers for initial cluster discovery. | |
| sasl | SASLProvider | false | SASL provider | |
| ssl | TLSSocketOptions | false | SSL configuration. |
- PLAIN:
saslPlain({ username, password }) - SCRAM-SHA-256:
saslScramSha256({ username, password }) - SCRAM-SHA-512:
saslScramSha512({ username, password })
Custom SASL mechanisms can be implemented following the SASLProvider interface. See src/auth for examples.
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| topics | string[] | true | List of topics to subscribe to | |
| groupId | string | false | null | Consumer group id |
| groupInstanceId | string | false | null | Consumer group instance id |
| rackId | string | false | null | Rack id |
| isolationLevel | IsolationLevel | false | IsolationLevel.READ_UNCOMMITTED | Isolation level |
| sessionTimeoutMs | number | false | 30000 | Session timeout in milliseconds |
| rebalanceTimeoutMs | number | false | 60000 | Rebalance timeout in milliseconds |
| maxWaitMs | number | false | 5000 | Fetch long poll timeout in milliseconds |
| minBytes | number | false | 1 | Minimum number of bytes to wait for before returning a fetch response |
| maxBytes | number | false | 1_048_576 | Maximum number of bytes to return in the fetch response |
| partitionMaxBytes | number | false | 1_048_576 | Maximum number of bytes to return per partition in the fetch response |
| allowTopicAutoCreation | boolean | false | false | Allow kafka to auto-create topic when it doesn't exist |
| fromBeginning | boolean | false | false | Start consuming from the beginning of the topic |
| batchGranularity | BatchGranularity | false | broker | Controls messages split from fetch response. Also controls how often offsets are committed. onBatch will include messages: - partition - from a single batch - topic - from all topic partitions - broker - from all assignned topics and partitions |
| concurrency | number | false | 1 | How many batches to process concurrently |
| onMessage | (message: Message) => Promise | true | Callback executed on every message | |
| onBatch | (batch: Message[]) => Promise | true | Callback executed on every batch of messages (based on batchGranuality) |
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| allowTopicAutoCreation | boolean | false | false | Allow kafka to auto-create topic when it doesn't exist |
| partitioner | Partitioner | false | defaultPartitioner | Custom partitioner function. By default, it uses a default java-compatible partitioner. |
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| topic | string | true | Topic to send the message to | |
| partition | number | false | null | Partition to send the message to. By default partitioned by key. If key is also missing, partition is assigned round-robin |
| timestamp | bigint | false | null | Message timestamp in milliseconds |
| key | Buffer | null | false | null | Message key |
| value | Buffer | null | true | Message value | |
| headers | Record<string, string> | false | null | Message headers |