Skip to content

Commit

Permalink
feat: add support for re-registering metrics (#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
odedino authored and christiangalsterer committed May 7, 2024
1 parent 6b6143a commit ed4d11f
Show file tree
Hide file tree
Showing 9 changed files with 449 additions and 233 deletions.
119 changes: 72 additions & 47 deletions src/kafkaJSAdminPrometheusExporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ export class KafkaJSAdminPrometheusExporter {
private readonly adminRequestDuration: Histogram
private readonly adminRequestSizeTotal: Counter
private readonly adminRequestQueueSize: Gauge
private readonly KAFKA_ADMIN_CONNECTION_COUNT = 'kafka_admin_connection_count'
private readonly KAFKA_ADMIN_CONNECTION_CREATION_TOTAL = 'kafka_admin_connection_creation_total'
private readonly KAFKA_ADMIN_CONNECTION_CLOSE_TOTAL = 'kafka_admin_connection_close_total'
private readonly KAFKA_ADMIN_REQUEST_TOTAL = 'kafka_admin_request_total'
private readonly KAFKA_ADMIN_REQUEST_DURATION_SECONDS = 'kafka_admin_request_duration_seconds'
private readonly KAFKA_ADMIN_REQUEST_SIZE_TOTAL = 'kafka_admin_request_size_total'
private readonly KAFKA_ADMIN_REQUEST_QUEUE_SIZE = 'kafka_admin_request_queue_size'
/**
* Creates a new exporter
* @param admin the admin client to monitor
Expand All @@ -33,55 +40,62 @@ export class KafkaJSAdminPrometheusExporter {
this.register = register
this.options = { ...this.defaultOptions, ...options }

this.adminActiveConnections = new Gauge({
name: 'kafka_admin_connection_count',
help: 'The current number of active connections established with a broker',
labelNames: mergeLabelNamesWithStandardLabels([], this.options.defaultLabels),
registers: [this.register]
})
this.adminActiveConnections = (this.register.getSingleMetric(this.KAFKA_ADMIN_CONNECTION_COUNT) ??
new Gauge({
name: this.KAFKA_ADMIN_CONNECTION_COUNT,
help: 'The current number of active connections established with a broker',
labelNames: mergeLabelNamesWithStandardLabels([], this.options.defaultLabels),
registers: [this.register]
})) as Gauge

this.adminConnectionsCreatedTotal = new Counter({
name: 'kafka_admin_connection_creation_total',
help: 'The total number of connections established with a broker',
labelNames: mergeLabelNamesWithStandardLabels([], this.options.defaultLabels),
registers: [this.register]
})
this.adminConnectionsCreatedTotal = (this.register.getSingleMetric(this.KAFKA_ADMIN_CONNECTION_CREATION_TOTAL) ??
new Counter({
name: this.KAFKA_ADMIN_CONNECTION_CREATION_TOTAL,
help: 'The total number of connections established with a broker',
labelNames: mergeLabelNamesWithStandardLabels([], this.options.defaultLabels),
registers: [this.register]
})) as Counter

this.adminConnectionsClosedTotal = new Counter({
name: 'kafka_admin_connection_close_total',
help: 'The total number of connections closed with a broker',
labelNames: mergeLabelNamesWithStandardLabels([], this.options.defaultLabels),
registers: [this.register]
})
this.adminConnectionsClosedTotal = (this.register.getSingleMetric(this.KAFKA_ADMIN_CONNECTION_CLOSE_TOTAL) ??
new Counter({
name: this.KAFKA_ADMIN_CONNECTION_CLOSE_TOTAL,
help: 'The total number of connections closed with a broker',
labelNames: mergeLabelNamesWithStandardLabels([], this.options.defaultLabels),
registers: [this.register]
})) as Counter

this.adminRequestTotal = new Counter({
name: 'kafka_admin_request_total',
help: 'The total number of requests sent.',
labelNames: mergeLabelNamesWithStandardLabels(['broker'], this.options.defaultLabels),
registers: [this.register]
})
this.adminRequestTotal = (this.register.getSingleMetric(this.KAFKA_ADMIN_REQUEST_TOTAL) ??
new Counter({
name: this.KAFKA_ADMIN_REQUEST_TOTAL,
help: 'The total number of requests sent.',
labelNames: mergeLabelNamesWithStandardLabels(['broker'], this.options.defaultLabels),
registers: [this.register]
})) as Counter

this.adminRequestDuration = new Histogram({
name: 'kafka_admin_request_duration_seconds',
help: 'The time taken for processing an admin request.',
labelNames: mergeLabelNamesWithStandardLabels(['broker'], this.options.defaultLabels),
buckets: this.options.adminRequestDurationHistogramBuckets,
registers: [this.register]
})
this.adminRequestDuration = (this.register.getSingleMetric(this.KAFKA_ADMIN_REQUEST_DURATION_SECONDS) ??
new Histogram({
name: this.KAFKA_ADMIN_REQUEST_DURATION_SECONDS,
help: 'The time taken for processing an admin request.',
labelNames: mergeLabelNamesWithStandardLabels(['broker'], this.options.defaultLabels),
buckets: this.options.adminRequestDurationHistogramBuckets,
registers: [this.register]
})) as Histogram

this.adminRequestSizeTotal = new Counter({
name: 'kafka_admin_request_size_total',
help: 'The size of any request sent.',
labelNames: mergeLabelNamesWithStandardLabels(['broker'], this.options.defaultLabels),
registers: [this.register]
})
this.adminRequestSizeTotal = (this.register.getSingleMetric(this.KAFKA_ADMIN_REQUEST_SIZE_TOTAL) ??
new Counter({
name: this.KAFKA_ADMIN_REQUEST_SIZE_TOTAL,
help: 'The size of any request sent.',
labelNames: mergeLabelNamesWithStandardLabels(['broker'], this.options.defaultLabels),
registers: [this.register]
})) as Counter

this.adminRequestQueueSize = new Gauge({
name: 'kafka_admin_request_queue_size',
help: 'Size of the request queue.',
labelNames: mergeLabelNamesWithStandardLabels(['broker'], this.options.defaultLabels),
registers: [this.register]
})
this.adminRequestQueueSize = (this.register.getSingleMetric(this.KAFKA_ADMIN_REQUEST_QUEUE_SIZE) ??
new Gauge({
name: this.KAFKA_ADMIN_REQUEST_QUEUE_SIZE,
help: 'Size of the request queue.',
labelNames: mergeLabelNamesWithStandardLabels(['broker'], this.options.defaultLabels),
registers: [this.register]
})) as Gauge
}

public enableMetrics (): void {
Expand All @@ -102,12 +116,23 @@ export class KafkaJSAdminPrometheusExporter {
}

onAdminRequest (event: RequestEvent): void {
this.adminRequestTotal.inc(mergeLabelsWithStandardLabels({ broker: event.payload.broker }, this.options.defaultLabels))
this.adminRequestSizeTotal.inc(mergeLabelsWithStandardLabels({ broker: event.payload.broker }, this.options.defaultLabels), event.payload.size)
this.adminRequestDuration.observe(mergeLabelsWithStandardLabels({ broker: event.payload.broker }, this.options.defaultLabels), event.payload.duration / 1000)
this.adminRequestTotal.inc(
mergeLabelsWithStandardLabels({ broker: event.payload.broker }, this.options.defaultLabels)
)
this.adminRequestSizeTotal.inc(
mergeLabelsWithStandardLabels({ broker: event.payload.broker }, this.options.defaultLabels),
event.payload.size
)
this.adminRequestDuration.observe(
mergeLabelsWithStandardLabels({ broker: event.payload.broker }, this.options.defaultLabels),
event.payload.duration / 1000
)
}

onAdminRequestQueueSize (event: RequestQueueSizeEvent): void {
this.adminRequestQueueSize.set(mergeLabelsWithStandardLabels({ broker: event.payload.broker }, this.options.defaultLabels), event.payload.queueSize)
this.adminRequestQueueSize.set(
mergeLabelsWithStandardLabels({ broker: event.payload.broker }, this.options.defaultLabels),
event.payload.queueSize
)
}
}

0 comments on commit ed4d11f

Please sign in to comment.