-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-5565. Add a broker metric specifying the number of consumer gro… #3506
Conversation
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. We need a KIP because metrics are a public API. Left a couple of minor comments as well.
def value(): Int = { | ||
groupMetadataCache.values.map(group => { | ||
group synchronized { group.currentState == AwaitingSync } | ||
}).count(_ == true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about:
groupMetadataCache.values.count { group =>
group synchronized { group.currentState == AwaitingSync }
}
@@ -82,19 +82,38 @@ class GroupMetadataManager(brokerId: Int, | |||
|
|||
this.logIdent = "[Group Metadata Manager on Broker " + brokerId + "]: " | |||
|
|||
newGauge("NumOffsets", | |||
private def recreateGauge[T](name: String, gauge: Gauge[T]): Gauge[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this? Can we not do the clean up in the test? If we think this is important, we should probably do it for all the gauges in KafkaMetricsGroup
, but I am not sure if we need it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is basically:
- GroupMetadataManager1 is created. It calls newGauge("NumOffsets"), which registers a metric with the global registry and returns it.
- GroupMetadataManager2 is created. It calls newGauge("NumOffsets"). However, this function looks at the global registry and finds that there is already a metric there with that name. So it returns the gauge created in step Switch to using scala 2.9.2 #1.
- The step Switch to using scala 2.9.2 #1 gauge is bound to the GroupMetadataManager object from step Switch to using scala 2.9.2 #1, not the one from step KAFKA-294 #2. So the tests fail because their modifications to GroupMetadataManager have no effect on the metrics.
The best way to solve this would probably be to explicitly pass the metric registry to the constructor of GroupMetadataManager, rather than relying on magical global variables. We could perhaps have it default to the global registry if no value was passed.
Refer to this link for build results (access rights to CI server needed): |
Refer to this link for build results (access rights to CI server needed): |
@hachikuji, maybe you can review this simple PR. |
Discussed with @cmccabe offline. Seems like we may have lost some of the updates during rebase since this seems to reflect an old version of the KIP. |
…up rebalances in progress
Sorry about the mixup. This should be the latest version, which reflects all KIP discussion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. Just a couple comments.
val numGroupsPreparingRebalanceGauge = recreateGauge("NumGroupsPreparingRebalance", | ||
new Gauge[Int] { | ||
def value(): Int = groupMetadataCache.values.count(group => { | ||
group synchronized { group.currentState == PreparingRebalance } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can replace this with group.is(PreparingRebalance)
. Similarly for the others.
|
||
newGauge("NumGroups", | ||
val numGroupsGauge = recreateGauge("NumGroups", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure these need to be fields. It's a little more work, but we can pull the metric instances out of the metric registry in the test case.
Also, should we update the test case to cover all the metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, other tests usually just retrieve the data from the metrics registry as you said.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK...
import org.apache.kafka.common.internals.Topic | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.collection._ | ||
|
||
class GroupMetadataManagerTest { | ||
class GroupMetadataManagerTest extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this is not needed.
retest this please |
Looks like Jenkins is having some issues
|
Retest this please |
@cmccabe The one test failure seems legitimate. |
retest this please |
@@ -82,19 +82,57 @@ class GroupMetadataManager(brokerId: Int, | |||
|
|||
this.logIdent = s"[GroupMetadataManager brokerId=$brokerId] " | |||
|
|||
newGauge("NumOffsets", | |||
private def recreateGauge[T](name: String, gauge: Gauge[T]): Gauge[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why we want to remove-then-recreate here? Isn't this a one-time call for the life time?
LGTM. Merged to trunk. |
…up rebalances in progress …up rebalances in progress Author: Colin P. Mccabe <cmccabe@confluent.io> Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Guozhang Wang <wangguoz@gmail.com> Closes apache#3506 from cmccabe/KAFKA-5565
…up rebalances in progress