From abe09811b302ffdcdcfe11920ae0b8c32974b27a Mon Sep 17 00:00:00 2001 From: Leonid Levkin Date: Wed, 17 Apr 2024 15:49:06 +0300 Subject: [PATCH] feat(rdkafka): metrics --- .../@biorate/kafkajs/src/connectors/consumer.ts | 4 ++-- .../rdkafka/src/connections/consumer-stream.ts | 14 +++++++++----- packages/@biorate/rdkafka/tests/index.spec.ts | 6 +++--- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/packages/@biorate/kafkajs/src/connectors/consumer.ts b/packages/@biorate/kafkajs/src/connectors/consumer.ts index 69117fde..b7446c9b 100644 --- a/packages/@biorate/kafkajs/src/connectors/consumer.ts +++ b/packages/@biorate/kafkajs/src/connectors/consumer.ts @@ -33,8 +33,8 @@ export class KafkaJSConsumerConnector extends Connector< * @description Counter */ @counter({ - name: 'kafka_consumer_seconds_count', - help: 'Kafka consumer seconds count', + name: 'kafka_consumer_count', + help: 'Kafka consumer count', labelNames: ['topic', 'status', 'group', 'partition'], }) protected counter: Counter; diff --git a/packages/@biorate/rdkafka/src/connections/consumer-stream.ts b/packages/@biorate/rdkafka/src/connections/consumer-stream.ts index 45ddc48c..0f52afde 100644 --- a/packages/@biorate/rdkafka/src/connections/consumer-stream.ts +++ b/packages/@biorate/rdkafka/src/connections/consumer-stream.ts @@ -25,8 +25,8 @@ export class RDKafkaConsumerStreamConnection protected pool: Message[] = []; protected started = false; @counter({ - name: 'kafka_consumer_seconds_count', - help: 'Kafka consumer seconds count', + name: 'kafka_consumer_count', + help: 'Kafka consumer count', labelNames: ['topic', 'status', 'group', 'partition'], }) protected counter: Counter; @@ -127,7 +127,10 @@ export class RDKafkaConsumerStreamConnection const prev = latest.get(message.partition); const last = !prev || message.offset > prev.offset ? message : prev; latest.set(message.partition, last); - counter.set(message.topic, (counter.get(message.topic) ?? 0) + 1); + counter.set( + `${message.topic}_${message.partition}`, + (counter.get(message.topic) ?? 0) + 1, + ); } if (this.config.batch) tasks.push(this.handler!(messages)); await Promise.all(tasks); @@ -147,9 +150,10 @@ export class RDKafkaConsumerStreamConnection }; #setMetrics = (counter: Map, status: number, time: number) => { - for (const [topic, count] of counter) { + for (const [key, count] of counter) { + const [topic, partition] = key.split('_'); for (const item of this.assignment) { - if (topic !== item.topic) continue; + if (topic !== item.topic || Number(partition) !== item.partition) continue; const labels = { topic, status, diff --git a/packages/@biorate/rdkafka/tests/index.spec.ts b/packages/@biorate/rdkafka/tests/index.spec.ts index 946c9a7e..210045e0 100644 --- a/packages/@biorate/rdkafka/tests/index.spec.ts +++ b/packages/@biorate/rdkafka/tests/index.spec.ts @@ -77,7 +77,7 @@ describe('@biorate/rdkafka', function () { expect(res).to.be.a('number'); }); - // it('metrics', async () => { - // console.log(await root.prometheus.registry.metrics()); - // }); + it('metrics', async () => { + console.log(await root.prometheus.registry.metrics()); + }); });