Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add consumer_member_id tag to partitions assign / revoked metrics #358

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

mwarkentin
Copy link
Member

@mwarkentin mwarkentin commented Apr 22, 2024

Attempted fix for #357

DACI

Starting with the metrics most important for understanding consumer balancing across a group, but it may make sense to include this tag on other consumer-related metrics as well. Can be done in a follow up.

Questions

  • Do we have consumer member ID available when we generate all metrics?
  • Can we enable this "globally" in that case?
  • If there are times when member ID is not available, could that just be recorded with a null or none tag?

@mwarkentin mwarkentin requested review from a team as code owners April 22, 2024 19:24
Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you check what member_id represent? Is it a sequential number, a random string, others ?

@@ -235,7 +235,7 @@ def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
logger.info("New partitions assigned: %r", partitions)
logger.info("Member id: %r", self.__consumer.member_id)
self.__metrics_buffer.metrics.increment(
"arroyo.consumer.partitions_assigned.count", len(partitions)
"arroyo.consumer.partitions_assigned.count", len(partitions), tags={"consumer_member_id": self.__consumer.member_id}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think member id is a method, not a property. (thus member_id() )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was copying what was in the log line above, is that wrong?

logger.info("Member id: %r", self.__consumer.member_id)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of string? :D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is the point that it's not an int?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i found it! it's uuid

In the current consumer protocol, the field member.idis assigned by broker to track group member status. A new consumer joins the group withmember.id field set as UNKNOWN_MEMBER_ID (empty string), since it needs to receive the identity assignment from broker first. For request with unknown member id, broker will blindly accept the new join group request, store the member metadata and return a UUID to consumer.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member.id+for+initial+join+group+request

@evanh
Copy link
Member

evanh commented Apr 23, 2024

Wouldn't this need to be applied to the rust version as well?

@mwarkentin
Copy link
Member Author

Yes - my immediate concern was with a Python consumer so this is where I started.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants