Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing metrics for topics by registration of existing collector #600

Merged
merged 1 commit into from
Aug 25, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 180 additions & 33 deletions pulsar/internal/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,39 +282,186 @@ func NewMetricsProvider(userDefinedLabels map[string]string) *Metrics {
}),
}

prometheus.DefaultRegisterer.Register(metrics.messagesPublished)
prometheus.DefaultRegisterer.Register(metrics.bytesPublished)
prometheus.DefaultRegisterer.Register(metrics.messagesPending)
prometheus.DefaultRegisterer.Register(metrics.bytesPending)
prometheus.DefaultRegisterer.Register(metrics.publishErrors)
prometheus.DefaultRegisterer.Register(metrics.publishLatency)
prometheus.DefaultRegisterer.Register(metrics.publishRPCLatency)

prometheus.DefaultRegisterer.Register(metrics.messagesReceived)
prometheus.DefaultRegisterer.Register(metrics.bytesReceived)
prometheus.DefaultRegisterer.Register(metrics.prefetchedMessages)
prometheus.DefaultRegisterer.Register(metrics.prefetchedBytes)
prometheus.DefaultRegisterer.Register(metrics.acksCounter)
prometheus.DefaultRegisterer.Register(metrics.nacksCounter)
prometheus.DefaultRegisterer.Register(metrics.dlqCounter)
prometheus.DefaultRegisterer.Register(metrics.processingTime)

prometheus.DefaultRegisterer.Register(metrics.producersOpened)
prometheus.DefaultRegisterer.Register(metrics.producersClosed)
prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
prometheus.DefaultRegisterer.Register(metrics.consumersOpened)
prometheus.DefaultRegisterer.Register(metrics.consumersClosed)
prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
prometheus.DefaultRegisterer.Register(metrics.readersOpened)
prometheus.DefaultRegisterer.Register(metrics.readersClosed)

prometheus.DefaultRegisterer.Register(metrics.ConnectionsOpened)
prometheus.DefaultRegisterer.Register(metrics.ConnectionsClosed)
prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
err := prometheus.DefaultRegisterer.Register(metrics.messagesPublished)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.messagesPublished = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.bytesPublished)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.bytesPublished = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.messagesPending)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.messagesPending = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.bytesPending)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.bytesPending = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.publishErrors)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.publishErrors = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.publishLatency)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.publishLatency = are.ExistingCollector.(*prometheus.HistogramVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.publishRPCLatency)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.publishRPCLatency = are.ExistingCollector.(*prometheus.HistogramVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.messagesReceived)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.messagesReceived = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.bytesReceived)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.bytesReceived = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.prefetchedMessages)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.prefetchedMessages = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.prefetchedBytes)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.prefetchedBytes = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.acksCounter)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.acksCounter = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.nacksCounter)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.nacksCounter = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.dlqCounter)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.dlqCounter = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.processingTime)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.processingTime = are.ExistingCollector.(*prometheus.HistogramVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.producersOpened)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersOpened = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.producersClosed)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersClosed = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersPartitions = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.consumersOpened)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersOpened = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.consumersClosed)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersClosed = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersPartitions = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.readersOpened)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.readersOpened = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.readersClosed)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.readersClosed = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsOpened)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.ConnectionsOpened = are.ExistingCollector.(prometheus.Counter)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsClosed)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.ConnectionsClosed = are.ExistingCollector.(prometheus.Counter)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.ConnectionsEstablishmentErrors = are.ExistingCollector.(prometheus.Counter)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.ConnectionsHandshakeErrors = are.ExistingCollector.(prometheus.Counter)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.LookupRequestsCount = are.ExistingCollector.(prometheus.Counter)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.PartitionedTopicMetadataRequestsCount = are.ExistingCollector.(prometheus.Counter)
}
}
err = prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.RPCRequestCount = are.ExistingCollector.(prometheus.Counter)
}
}
return metrics
}

Expand Down