Skip to content

Commit

Permalink
Merge pull request #69 from xiaoziv/fix_input_kafka_cluster_label_ove…
Browse files Browse the repository at this point in the history
…rwrite

fix input kafka cluster label overwrite bug (#68)
  • Loading branch information
ysyneu committed Jul 6, 2022
2 parents ed6bab2 + 86f8a94 commit a449428
Showing 1 changed file with 81 additions and 58 deletions.
139 changes: 81 additions & 58 deletions inputs/kafka/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
clientID = "kafka_exporter"
)

var (
type PromDesc struct {
clusterBrokers *prometheus.Desc
topicPartitions *prometheus.Desc
topicCurrentOffset *prometheus.Desc
Expand All @@ -44,7 +44,7 @@ var (
topicPartitionLagMillis *prometheus.Desc
lagDatapointUsedInterpolation *prometheus.Desc
lagDatapointUsedExtrapolation *prometheus.Desc
)
}

// Exporter collects Kafka stats from the given server and exports them using
// the prometheus metrics package.
Expand All @@ -66,6 +66,7 @@ type Exporter struct {
kafkaOpts Options
saramaConfig *sarama.Config
logger log.Logger
promDesc *PromDesc
}

type Options struct {
Expand Down Expand Up @@ -235,6 +236,7 @@ func New(logger log.Logger, opts Options, topicFilter, groupFilter string) (*Exp
kafkaOpts: opts,
saramaConfig: config,
logger: logger,
promDesc: nil, // initialized in func initializeMetrics
}

level.Debug(logger).Log("msg", "Initializing metrics")
Expand All @@ -257,23 +259,23 @@ func (e *Exporter) fetchOffsetVersion() int16 {
// Describe describes all the metrics ever exported by the Kafka exporter. It
// implements prometheus.Collector.
func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
ch <- clusterBrokers
ch <- topicCurrentOffset
ch <- topicOldestOffset
ch <- topicPartitions
ch <- topicPartitionLeader
ch <- topicPartitionReplicas
ch <- topicPartitionInSyncReplicas
ch <- topicPartitionUsesPreferredReplica
ch <- topicUnderReplicatedPartition
ch <- consumergroupCurrentOffset
ch <- consumergroupCurrentOffsetSum
ch <- consumergroupUncomittedOffsets
ch <- consumergroupUncommittedOffsetsZookeeper
ch <- consumergroupUncommittedOffsetsSum
ch <- topicPartitionLagMillis
ch <- lagDatapointUsedInterpolation
ch <- lagDatapointUsedExtrapolation
ch <- e.promDesc.clusterBrokers
ch <- e.promDesc.topicCurrentOffset
ch <- e.promDesc.topicOldestOffset
ch <- e.promDesc.topicPartitions
ch <- e.promDesc.topicPartitionLeader
ch <- e.promDesc.topicPartitionReplicas
ch <- e.promDesc.topicPartitionInSyncReplicas
ch <- e.promDesc.topicPartitionUsesPreferredReplica
ch <- e.promDesc.topicUnderReplicatedPartition
ch <- e.promDesc.consumergroupCurrentOffset
ch <- e.promDesc.consumergroupCurrentOffsetSum
ch <- e.promDesc.consumergroupUncomittedOffsets
ch <- e.promDesc.consumergroupUncommittedOffsetsZookeeper
ch <- e.promDesc.consumergroupUncommittedOffsetsSum
ch <- e.promDesc.topicPartitionLagMillis
ch <- e.promDesc.lagDatapointUsedInterpolation
ch <- e.promDesc.lagDatapointUsedExtrapolation
}

func (e *Exporter) Collect(ch chan<- prometheus.Metric) {
Expand Down Expand Up @@ -329,7 +331,7 @@ func (e *Exporter) collectChans(quit chan struct{}) {
func (e *Exporter) collect(ch chan<- prometheus.Metric) {
var wg = sync.WaitGroup{}
ch <- prometheus.MustNewConstMetric(
clusterBrokers, prometheus.GaugeValue, float64(len(e.client.Brokers())),
e.promDesc.clusterBrokers, prometheus.GaugeValue, float64(len(e.client.Brokers())),
)

now := time.Now()
Expand Down Expand Up @@ -398,7 +400,7 @@ func (e *Exporter) metricsForTopic(topic string, ch chan<- prometheus.Metric) {
return
}
ch <- prometheus.MustNewConstMetric(
topicPartitions, prometheus.GaugeValue, float64(len(partitions)), topic,
e.promDesc.topicPartitions, prometheus.GaugeValue, float64(len(partitions)), topic,
)
e.mu.Lock()
offset := make(map[int32]int64, len(partitions))
Expand All @@ -409,7 +411,7 @@ func (e *Exporter) metricsForTopic(topic string, ch chan<- prometheus.Metric) {
level.Error(e.logger).Log("msg", "Error getting leader for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionLeader, prometheus.GaugeValue, float64(broker.ID()), topic, strconv.FormatInt(int64(partition), 10),
e.promDesc.topicPartitionLeader, prometheus.GaugeValue, float64(broker.ID()), topic, strconv.FormatInt(int64(partition), 10),
)
}

Expand All @@ -421,7 +423,7 @@ func (e *Exporter) metricsForTopic(topic string, ch chan<- prometheus.Metric) {
offset[partition] = currentOffset
e.mu.Unlock()
ch <- prometheus.MustNewConstMetric(
topicCurrentOffset, prometheus.GaugeValue, float64(currentOffset), topic, strconv.FormatInt(int64(partition), 10),
e.promDesc.topicCurrentOffset, prometheus.GaugeValue, float64(currentOffset), topic, strconv.FormatInt(int64(partition), 10),
)
}

Expand All @@ -430,7 +432,7 @@ func (e *Exporter) metricsForTopic(topic string, ch chan<- prometheus.Metric) {
level.Error(e.logger).Log("msg", "Error getting oldest offset for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
ch <- prometheus.MustNewConstMetric(
topicOldestOffset, prometheus.GaugeValue, float64(oldestOffset), topic, strconv.FormatInt(int64(partition), 10),
e.promDesc.topicOldestOffset, prometheus.GaugeValue, float64(oldestOffset), topic, strconv.FormatInt(int64(partition), 10),
)
}

Expand All @@ -439,7 +441,7 @@ func (e *Exporter) metricsForTopic(topic string, ch chan<- prometheus.Metric) {
level.Error(e.logger).Log("msg", "Error getting replicas for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionReplicas, prometheus.GaugeValue, float64(len(replicas)), topic, strconv.FormatInt(int64(partition), 10),
e.promDesc.topicPartitionReplicas, prometheus.GaugeValue, float64(len(replicas)), topic, strconv.FormatInt(int64(partition), 10),
)
}

Expand All @@ -448,27 +450,27 @@ func (e *Exporter) metricsForTopic(topic string, ch chan<- prometheus.Metric) {
level.Error(e.logger).Log("msg", "Error getting in-sync replicas for topic/partition", "topic", topic, "partition", partition, "err", err.Error())
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionInSyncReplicas, prometheus.GaugeValue, float64(len(inSyncReplicas)), topic, strconv.FormatInt(int64(partition), 10),
e.promDesc.topicPartitionInSyncReplicas, prometheus.GaugeValue, float64(len(inSyncReplicas)), topic, strconv.FormatInt(int64(partition), 10),
)
}

if broker != nil && replicas != nil && len(replicas) > 0 && broker.ID() == replicas[0] {
ch <- prometheus.MustNewConstMetric(
topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
e.promDesc.topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
)
} else {
ch <- prometheus.MustNewConstMetric(
topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
e.promDesc.topicPartitionUsesPreferredReplica, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
)
}

if replicas != nil && inSyncReplicas != nil && len(inSyncReplicas) < len(replicas) {
ch <- prometheus.MustNewConstMetric(
topicUnderReplicatedPartition, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
e.promDesc.topicUnderReplicatedPartition, prometheus.GaugeValue, float64(1), topic, strconv.FormatInt(int64(partition), 10),
)
} else {
ch <- prometheus.MustNewConstMetric(
topicUnderReplicatedPartition, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
e.promDesc.topicUnderReplicatedPartition, prometheus.GaugeValue, float64(0), topic, strconv.FormatInt(int64(partition), 10),
)
}

Expand All @@ -485,7 +487,7 @@ func (e *Exporter) metricsForTopic(topic string, ch chan<- prometheus.Metric) {

consumerGroupLag := currentOffset - offset
ch <- prometheus.MustNewConstMetric(
consumergroupUncommittedOffsetsZookeeper, prometheus.GaugeValue, float64(consumerGroupLag), group.Name, topic, strconv.FormatInt(int64(partition), 10),
e.promDesc.consumergroupUncommittedOffsetsZookeeper, prometheus.GaugeValue, float64(consumerGroupLag), group.Name, topic, strconv.FormatInt(int64(partition), 10),
)
}
}
Expand Down Expand Up @@ -531,7 +533,7 @@ func (e *Exporter) metricsForConsumerGroup(broker *sarama.Broker, ch chan<- prom
}
}
ch <- prometheus.MustNewConstMetric(
consumergroupMembers, prometheus.GaugeValue, float64(len(group.Members)), group.GroupId,
e.promDesc.consumergroupMembers, prometheus.GaugeValue, float64(len(group.Members)), group.GroupId,
)
level.Debug(e.logger).Log("msg", "fetching offsets for broker/group", "broker", broker.ID(), "group", group.GroupId)
if offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest); err != nil {
Expand Down Expand Up @@ -563,7 +565,7 @@ func (e *Exporter) metricsForConsumerGroup(broker *sarama.Broker, ch chan<- prom
currentOffsetSum += currentOffset

ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
e.promDesc.consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
e.mu.Lock()
// Get and insert the next offset to be produced into the interpolation map
Expand All @@ -584,14 +586,14 @@ func (e *Exporter) metricsForConsumerGroup(broker *sarama.Broker, ch chan<- prom
}
e.mu.Unlock()
ch <- prometheus.MustNewConstMetric(
consumergroupUncomittedOffsets, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
e.promDesc.consumergroupUncomittedOffsets, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10),
)
}
ch <- prometheus.MustNewConstMetric(
consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic,
e.promDesc.consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic,
)
ch <- prometheus.MustNewConstMetric(
consumergroupUncommittedOffsetsSum, prometheus.GaugeValue, float64(lagSum), group.GroupId, topic,
e.promDesc.consumergroupUncommittedOffsetsSum, prometheus.GaugeValue, float64(lagSum), group.GroupId, topic,
)
}
}
Expand Down Expand Up @@ -661,8 +663,8 @@ func (e *Exporter) metricsForLag(ch chan<- prometheus.Metric) {
lagMillis := float64(time.Now().UnixNano()/1000000) - px
level.Debug(e.logger).Log("msg", "estimated lag for group/topic/partition (in ms)", "group", group, "topic", topic, "partition", partition, "lag", lagMillis)

ch <- prometheus.MustNewConstMetric(lagDatapointUsedExtrapolation, prometheus.CounterValue, 1, group, topic, strconv.FormatInt(int64(partition), 10))
ch <- prometheus.MustNewConstMetric(topicPartitionLagMillis, prometheus.GaugeValue, lagMillis, group, topic, strconv.FormatInt(int64(partition), 10))
ch <- prometheus.MustNewConstMetric(e.promDesc.lagDatapointUsedExtrapolation, prometheus.CounterValue, 1, group, topic, strconv.FormatInt(int64(partition), 10))
ch <- prometheus.MustNewConstMetric(e.promDesc.topicPartitionLagMillis, prometheus.GaugeValue, lagMillis, group, topic, strconv.FormatInt(int64(partition), 10))

} else {
level.Debug(e.logger).Log("msg", "estimating lag for group/topic/partition", "group", group, "topic", topic, "partition", partition, "method", "interpolation")
Expand All @@ -673,8 +675,8 @@ func (e *Exporter) metricsForLag(ch chan<- prometheus.Metric) {
float64((offsets[nextHigherOffset].Sub(offsets[nextLowerOffset])).Milliseconds())/float64(nextHigherOffset-nextLowerOffset)
lagMillis := float64(time.Now().UnixNano()/1000000) - px
level.Debug(e.logger).Log("msg", "estimated lag for group/topic/partition (in ms)", "group", group, "topic", topic, "partition", partition, "lag", lagMillis)
ch <- prometheus.MustNewConstMetric(lagDatapointUsedInterpolation, prometheus.CounterValue, 1, group, topic, strconv.FormatInt(int64(partition), 10))
ch <- prometheus.MustNewConstMetric(topicPartitionLagMillis, prometheus.GaugeValue, lagMillis, group, topic, strconv.FormatInt(int64(partition), 10))
ch <- prometheus.MustNewConstMetric(e.promDesc.lagDatapointUsedInterpolation, prometheus.CounterValue, 1, group, topic, strconv.FormatInt(int64(partition), 10))
ch <- prometheus.MustNewConstMetric(e.promDesc.topicPartitionLagMillis, prometheus.GaugeValue, lagMillis, group, topic, strconv.FormatInt(int64(partition), 10))
}
} else {
level.Error(e.logger).Log("msg", "Could not get latest latest consumed offset", "group", group, "topic", topic, "partition", partition)
Expand Down Expand Up @@ -752,109 +754,130 @@ func (e *Exporter) initializeMetrics() {
}
}

clusterBrokers = prometheus.NewDesc(
clusterBrokers := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "brokers"),
"Number of Brokers in the Kafka Cluster.",
nil, labels,
)
topicPartitions = prometheus.NewDesc(
topicPartitions := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partitions"),
"Number of partitions for this Topic",
[]string{"topic"}, labels,
)
topicCurrentOffset = prometheus.NewDesc(
topicCurrentOffset := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_current_offset"),
"Current Offset of a Broker at Topic/Partition",
[]string{"topic", "partition"}, labels,
)
topicOldestOffset = prometheus.NewDesc(
topicOldestOffset := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_oldest_offset"),
"Oldest Offset of a Broker at Topic/Partition",
[]string{"topic", "partition"}, labels,
)

topicPartitionLeader = prometheus.NewDesc(
topicPartitionLeader := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_leader"),
"Leader Broker ID of this Topic/Partition",
[]string{"topic", "partition"}, labels,
)

topicPartitionReplicas = prometheus.NewDesc(
topicPartitionReplicas := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_replicas"),
"Number of Replicas for this Topic/Partition",
[]string{"topic", "partition"}, labels,
)

topicPartitionInSyncReplicas = prometheus.NewDesc(
topicPartitionInSyncReplicas := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_in_sync_replica"),
"Number of In-Sync Replicas for this Topic/Partition",
[]string{"topic", "partition"}, labels,
)

topicPartitionUsesPreferredReplica = prometheus.NewDesc(
topicPartitionUsesPreferredReplica := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_leader_is_preferred"),
"1 if Topic/Partition is using the Preferred Broker",
[]string{"topic", "partition"}, labels,
)

topicUnderReplicatedPartition = prometheus.NewDesc(
topicUnderReplicatedPartition := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_under_replicated_partition"),
"1 if Topic/Partition is under Replicated",
[]string{"topic", "partition"}, labels,
)

consumergroupCurrentOffset = prometheus.NewDesc(
consumergroupCurrentOffset := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "current_offset"),
"Current Offset of a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
)

consumergroupCurrentOffsetSum = prometheus.NewDesc(
consumergroupCurrentOffsetSum := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "current_offset_sum"),
"Current Offset of a ConsumerGroup at Topic for all partitions",
[]string{"consumergroup", "topic"}, labels,
)

consumergroupUncomittedOffsets = prometheus.NewDesc(
consumergroupUncomittedOffsets := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "uncommitted_offsets"),
"Current Approximate count of uncommitted offsets for a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, labels,
)

consumergroupUncommittedOffsetsZookeeper = prometheus.NewDesc(
consumergroupUncommittedOffsetsZookeeper := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroupzookeeper", "uncommitted_offsets_zookeeper"),
"Current Approximate count of uncommitted offsets(zookeeper) for a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"}, nil,
)

consumergroupUncommittedOffsetsSum = prometheus.NewDesc(
consumergroupUncommittedOffsetsSum := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "uncommitted_offsets_sum"),
"Current Approximate count of uncommitted offsets for a ConsumerGroup at Topic for all partitions",
[]string{"consumergroup", "topic"}, labels,
)

consumergroupMembers = prometheus.NewDesc(
consumergroupMembers := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumergroup", "members"),
"Amount of members in a consumer group",
[]string{"consumergroup"}, labels,
)

topicPartitionLagMillis = prometheus.NewDesc(
topicPartitionLagMillis := prometheus.NewDesc(
prometheus.BuildFQName(namespace, "consumer_lag", "millis"),
"Current approximation of consumer lag for a ConsumerGroup at Topic/Partition",
[]string{"consumergroup", "topic", "partition"},
labels,
)

lagDatapointUsedInterpolation = prometheus.NewDesc(prometheus.BuildFQName(namespace, "consumer_lag", "interpolation"),
lagDatapointUsedInterpolation := prometheus.NewDesc(prometheus.BuildFQName(namespace, "consumer_lag", "interpolation"),
"Indicates that a consumer group lag estimation used interpolation",
[]string{"consumergroup", "topic", "partition"},
labels,
)

lagDatapointUsedExtrapolation = prometheus.NewDesc(prometheus.BuildFQName(namespace, "consumer_lag", "extrapolation"),
lagDatapointUsedExtrapolation := prometheus.NewDesc(prometheus.BuildFQName(namespace, "consumer_lag", "extrapolation"),
"Indicates that a consumer group lag estimation used extrapolation",
[]string{"consumergroup", "topic", "partition"},
labels,
)

e.promDesc = &PromDesc{
clusterBrokers: clusterBrokers,
topicPartitions: topicPartitions,
topicCurrentOffset: topicCurrentOffset,
topicOldestOffset: topicOldestOffset,
topicPartitionLeader: topicPartitionLeader,
topicPartitionReplicas: topicPartitionReplicas,
topicPartitionInSyncReplicas: topicPartitionInSyncReplicas,
topicPartitionUsesPreferredReplica: topicPartitionUsesPreferredReplica,
topicUnderReplicatedPartition: topicUnderReplicatedPartition,
consumergroupCurrentOffset: consumergroupCurrentOffset,
consumergroupCurrentOffsetSum: consumergroupCurrentOffsetSum,
consumergroupUncomittedOffsets: consumergroupUncomittedOffsets,
consumergroupUncommittedOffsetsSum: consumergroupUncommittedOffsetsSum,
consumergroupUncommittedOffsetsZookeeper: consumergroupUncommittedOffsetsZookeeper,
consumergroupMembers: consumergroupMembers,
topicPartitionLagMillis: topicPartitionLagMillis,
lagDatapointUsedInterpolation: lagDatapointUsedInterpolation,
lagDatapointUsedExtrapolation: lagDatapointUsedExtrapolation,
}
}

0 comments on commit a449428

Please sign in to comment.