Skip to content

andreacioni/nestjs-kafka-module

Repository files navigation

nestjs-kafka-module

NPM

npm version Release npm npm bundle size
Reliability Rating Maintainability Rating Security Rating Coverage

Description

A NestJS module wrapper for node-rdkafka.

Installation

npm i nestjs-kafka-module

Requirements:

Min Max
Node.JS 16 20
NestJS 8 10

Basic usage

Initialize a KafkaModule with configuration for a consumer, producer or adminClient respectively. A full list of configuration for each item can be found on node-rdkafka's Configuration section.

app.module.ts

import { Module } from "@nestjs/common";
import { KafkaModule } from "nestjs-kafka-module";

@Module({
  imports: [
    KafkaModule.forRoot({
      consumer: {
        conf: {
          "group.id": "kafka_consumer",
          "metadata.broker.list": "127.0.0.1:9092",
        },
      },
      producer: {
        conf: {
          "client.id": "kafka_prducer",
          "metadata.broker.list": "127.0.0.1:9092",
        },
      },
      adminClient: {
        conf: {
          "metadata.broker.list": "127.0.0.1:9092",
        },
      },
    }),
  ],
})
export class AppModule {}

cats.service.ts

import { Injectable, Inject } from "@nestjs/common";
import { KafkaConsumer, Producer, IAdminClient } from "node-rdkafka";
import { KAFKA_ADMIN_CLIENT_PROVIDER } from "nestjs-kafka-module";

@Injectable()
export class CatsService {
  constructor(
    private readonly kafkaConsumer: KafkaConsumer,
    private readonly kafkaProducer: Producer,
    @Inject(KAFKA_ADMIN_CLIENT_PROVIDER)
    private readonly kafkaAdminClient: IAdminClient
  ) {
    /* Trying to get an instance of a provider without defining a dedicated configuration will result in an error. */
  }
}

It is not mandatory to define configuration for any consumer, producer or adminClient, you're free to define just what you need. Keep in mind the table below showing which Provider is going to be available in your context based on the defined configuration:

Configuration Provider
consumer KafkaConsumer
producer Producer
admin KAFKA_ADMIN_CLIENT_PROVIDER

Examples

In the example folder you can find examples of Nest application that uses this library.

Async initialization

It is possible to dynamically configure the module using forRootAsync method and pass, for instance, a ConfigService as shown below:

import { Module } from "@nestjs/common";
import { KafkaModule } from "nestjs-kafka-module";

@Module({
  imports: [
    KafkaModule.forRootAsync({
      useFactory: (configService: ConfigService) => {
        const groupId = configService.get("group_id");
        const brokerList = configService.get("metadata_broker_list");
        const clientId = configService.get("cliend_id");

        return {
          consumer: {
            conf: {
              "group.id": groupId,
              "metadata.broker.list": brokerList,
            },
          },
          producer: {
            conf: {
              "client.id": clientId,
              "metadata.broker.list": brokerList,
            },
          },
          adminClient: {
            conf: {
              "metadata.broker.list": brokerList,
            },
          },
        };
      },
      inject: [ConfigService],
    }),
  ],
})
export class ApplicationModule {}

Auto connect

By default, during KafkaModule initialization, a connection attempt is done automatically. However this implies that if the broker connection is not available (broker is temporary down/not accessible) during startup, the NestJS initialization may fail.

Is it possible to change this behavior using autoConnect flag on KafkaConsuner and Producer as shown below:

KafkaModule.forRoot({
  consumer: {
    autoConnect: false,
    conf: {
      "group.id": "nestjs-rdkafka-test",
      "metadata.broker.list": "127.0.0.1:9092",
    },
  },
  producer: {
    autoConnect: false,
    conf: {
      "metadata.broker.list": "127.0.0.1:9092",
    },
  },
});

Disconnect

All clients will be automatically disconnected from Kafka onModuleDestroy. You can manually disconnect by calling:

await this.consumer?.disconnect();
await this.producer?.disconnect();
await this.adminClient?.disconnect();

License

nestjs-kafka-module is MIT licensed.