Skip to content

Commit

Permalink
feat(rdkafka): metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
llevkin committed Apr 17, 2024
1 parent 7713f3d commit abe0981
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 10 deletions.
4 changes: 2 additions & 2 deletions packages/@biorate/kafkajs/src/connectors/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ export class KafkaJSConsumerConnector extends Connector<
* @description Counter
*/
@counter({
name: 'kafka_consumer_seconds_count',
help: 'Kafka consumer seconds count',
name: 'kafka_consumer_count',
help: 'Kafka consumer count',
labelNames: ['topic', 'status', 'group', 'partition'],
})
protected counter: Counter;
Expand Down
14 changes: 9 additions & 5 deletions packages/@biorate/rdkafka/src/connections/consumer-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ export class RDKafkaConsumerStreamConnection
protected pool: Message[] = [];
protected started = false;
@counter({
name: 'kafka_consumer_seconds_count',
help: 'Kafka consumer seconds count',
name: 'kafka_consumer_count',
help: 'Kafka consumer count',
labelNames: ['topic', 'status', 'group', 'partition'],
})
protected counter: Counter;
Expand Down Expand Up @@ -127,7 +127,10 @@ export class RDKafkaConsumerStreamConnection
const prev = latest.get(message.partition);
const last = !prev || message.offset > prev.offset ? message : prev;
latest.set(message.partition, last);
counter.set(message.topic, (counter.get(message.topic) ?? 0) + 1);
counter.set(
`${message.topic}_${message.partition}`,
(counter.get(message.topic) ?? 0) + 1,
);
}
if (this.config.batch) tasks.push(this.handler!(messages));
await Promise.all(tasks);
Expand All @@ -147,9 +150,10 @@ export class RDKafkaConsumerStreamConnection
};

#setMetrics = (counter: Map<string, number>, status: number, time: number) => {
for (const [topic, count] of counter) {
for (const [key, count] of counter) {
const [topic, partition] = key.split('_');
for (const item of this.assignment) {
if (topic !== item.topic) continue;
if (topic !== item.topic || Number(partition) !== item.partition) continue;
const labels = {
topic,
status,
Expand Down
6 changes: 3 additions & 3 deletions packages/@biorate/rdkafka/tests/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ describe('@biorate/rdkafka', function () {
expect(res).to.be.a('number');
});

// it('metrics', async () => {
// console.log(await root.prometheus.registry.metrics());
// });
it('metrics', async () => {
console.log(await root.prometheus.registry.metrics());
});
});

0 comments on commit abe0981

Please sign in to comment.