Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add consumer_lag in Kafka consumergroup metricset #14822

Merged
merged 10 commits into from
Nov 28, 2019

Conversation

ChrsMark
Copy link
Member

@ChrsMark ChrsMark commented Nov 27, 2019

This PR adds consumer_lag field in consumergroup metricset. This is calculated by subtracting groupOffset from partitionOffset for a partition-topic pair.

partitionOffset is retrieved from the cluster directly using https://github.com/Shopify/sarama/blob/afedecade3c6d8e99ab6dfeeea7814bf800b90a4/client.go#L62

May resolve_ #3608.

Signed-off-by: chrismark chrismarkou92@gmail.com

Signed-off-by: chrismark <chrismarkou92@gmail.com>
Signed-off-by: chrismark <chrismarkou92@gmail.com>
Signed-off-by: chrismark <chrismarkou92@gmail.com>
Signed-off-by: chrismark <chrismarkou92@gmail.com>
Signed-off-by: chrismark <chrismarkou92@gmail.com>
Signed-off-by: chrismark <chrismarkou92@gmail.com>
Signed-off-by: chrismark <chrismarkou92@gmail.com>
Copy link
Member

@jsoriano jsoriano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChrsMark thanks for taking this! I think that this solves one of the main issues we had with this module. We can decide in future PRs if we could further refactor this to have a single client, but I think that we still need to connect to non-leaders to monitor certain things.

Could you also take a look to the dashboard? The consumer lag visualization can be surely simplified with this new field 🙂

@@ -270,6 +280,15 @@ func (b *Broker) FetchGroupOffsets(group string, partitions map[string][]int32)
return b.broker.FetchOffset(requ)
}

// GetPartitionOffsetFromTheLeader fetches the OffsetNewest from the leader.
func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Use Fetch for consistency with other methods here.

Suggested change
func (b *Broker) GetPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {
func (b *Broker) FetchPartitionOffsetFromTheLeader(topic string, partitionID int32) (int64, error) {

@@ -68,35 +68,35 @@ func TestFetchGroupInfo(t *testing.T) {
expected: []common.MapStr{
testEvent("group1", "topic1", 0, common.MapStr{
"client": clientMeta(0),
"offset": int64(10),
"offset": int64(10), "consumer_lag": int64(42) - int64(10),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit.

Suggested change
"offset": int64(10), "consumer_lag": int64(42) - int64(10),
"offset": int64(10),
"consumer_lag": int64(42) - int64(10),

@@ -134,6 +137,13 @@ func (b *Broker) Connect() error {
debugf("found matching broker %v with id %v", other.Addr(), other.ID())
b.id = other.ID()
b.advertisedAddr = other.Addr()

c, err := getClusteWideClient(b.Addr(), b.cfg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that we could use this client for everything, but not, we may still need to fetch offsets from non-leader partition replicas for monitoring purpouses.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though we can also use client.Leader(topic, partitionID) for that.

Copy link
Member Author

@ChrsMark ChrsMark Nov 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on revisiting this in a followup PR with refactoring purpose

@@ -113,12 +114,22 @@ func fetchGroupInfo(

for topic, partitions := range ret.off.Blocks {
for partition, info := range partitions {
partitionOffset, err := getPartitionOffsetFromTheLeader(b, topic, partition)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to explore here.

I guess that it may happen that the partition offset here is always going to be ahead of the group offset, because we get first the group offset, and then the partition offset. Between both operations the partition offset may have changed.

Starting on version 4 of ListOffsets (the API method used to get partition offsets), it is possible to indicate a current_leader_epoch to retrieve "old" metadata.

Starting on version 5 of OffsetFetch (the API method used to get consumer group offsets), its response contains a leader_epoch field.

I wonder if we can use the leader_epoch contained in the response of OffsetFetch when available to query for the offset of the partition in the same epoch. This way we could have a more accurate value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the implications for the support matrix? I'm not very familiar with API version vs Kafka versioning.

Copy link
Member Author

@ChrsMark ChrsMark Nov 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsoriano not sure if this can be achieved with the current implementation of GetOffset

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the implications for the support matrix? I'm not very familiar with API version vs Kafka versioning.

All messages in kafka protocol are versioned, each client and broker can support a different range of versions for each message. There is a message (ApiVersionsRequest) to query the versions supported by the broker, we could use this method to decide if we can use the methods aware of the epoch.

@jsoriano not sure if this can be achieved with the current implementation of GetOffset

No, we would need to forge our own request as we do to request partition offsets to the leader. Or we could contribute to Sarama the support for these versions. 🙂

Copy link
Member Author

@ChrsMark ChrsMark Nov 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -43,6 +43,10 @@
type: keyword
description: custom consumer meta data string

- name: consumer_lag
type: long
description: consumer lag for partition/topic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably worth it explaining what this is, as an important metric: the difference between the partition offset and consumer offset

Signed-off-by: chrismark <chrismarkou92@gmail.com>
Signed-off-by: chrismark <chrismarkou92@gmail.com>
@ChrsMark
Copy link
Member Author

ChrsMark commented Nov 28, 2019

@jsoriano, @mtojek , @exekias thank you all for reviewing! Almost everything is now addressed.

@jsoriano regarding some special points:

  • about dashboard I can sure add it but I would propose in a different PR after this one is merged so as to keep PRs more compact ;)
  • about the general refactoring and the reuse of the new client I would propose to create a refactoring issue to keep track of that needs. I can take care of it by collecting all the issues we discussed here and all related issues we currently have.
  • about leader_epoch I checked it really quickly and answered inline in your comment. Let me know if I understood it correctly. However I'm full positive in investigating further as part of the refactoring story.

Let me know what you think!

@mtojek
Copy link
Contributor

mtojek commented Nov 28, 2019

Regarding @ChrsMark 's last comment - this PR is already medium sized one, I'm for pushing refactoring and dashboards to two next independent PRs.

Thanks for addressing comments. LGTM!

Copy link
Member

@jsoriano jsoriano left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, dashboard and further refactors can be left for future changes.

@ChrsMark
Copy link
Member Author

ChrsMark commented Nov 28, 2019

Failing tests are irrelevant and already addressed on #14849.
Merging this.

@ChrsMark ChrsMark merged commit 23aaf5c into elastic:master Nov 28, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants