-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6320: move ZK metrics in KafkaHealthCheck to ZookeeperClient #4351
Conversation
Ping @ijuma @onurkaraman for review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, looks good overall, just a few minor questions/comments.
now = System.currentTimeMillis() | ||
} | ||
case e: Exception => | ||
debug("Error when recreating ZooKeeper", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we want this to be info
? It seems like it should be rare, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
* This is added to preserve the original metric name in JMX | ||
*/ | ||
override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = { | ||
explicitMetricName("kafka.server", "KafkaHealthcheck", name, metricTags) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: maybe we can take a metricGroup
and metricTypeName
as parameters in the constructor to keep ZooKeeperClient
generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Added.
error("Auth failed.") | ||
stateChangeHandlers.foreach {case (name, handler) => handler.onAuthFailure()} | ||
} else if (event.getState == KeeperState.Expired) { | ||
} else if (state == KeeperState.Expired) { | ||
inWriteLock(initializationLock) { | ||
info("Session expired.") | ||
stateChangeHandlers.foreach {case (name, handler) => handler.beforeInitializingSession()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we can use values.foreach
to simplify this. There are a few other cases where we do something similar in this method and initialize
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
assertEquals(s"Unexpected ZK state ${server.zkUtils.zkConnection.getZookeeperState}", | ||
"CONNECTED", yammerMetricValue("SessionState")) | ||
// Latency is rounded to milliseconds, so check the count instead. | ||
val initialCount = yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Long] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, we don't actually need to wait until true here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, since the ZK write in between is blocking and is guaranteed to update the count in the histogram.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I had missed that you're using the count now. Makes sense.
@@ -20,7 +20,7 @@ import java.nio.charset.StandardCharsets | |||
|
|||
import kafka.common.KafkaException | |||
import kafka.utils.{Json, Logging, ZkUtils} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused ZKUtils import
} | ||
|
||
/** | ||
* Generate a borker id by updating the broker sequence id path in ZK and return the version of the path. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo "borker"
@@ -31,6 +32,7 @@ import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerialize | |||
import org.I0Itec.zkclient.exception.ZkException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. LGTM. Will submit a separate PR for the comments below.
@@ -455,9 +439,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP | |||
|
|||
// Get the current controller info. This is to ensure we use the most recent info to issue the | |||
// controlled shutdown request | |||
val controllerId = zkUtils.getController() | |||
val controllerId = zkClient.getControllerId.getOrElse(throw new KafkaException("Controller doesn't exist")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we ignore the missing controller and retry (like we ignore the missing broker info)?
TopicsZNode.path, | ||
ConfigEntityChangeNotificationZNode.path, | ||
ConfigEntityTypeZNode.path(ConfigType.Topic), | ||
ConfigEntityTypeZNode.path(ConfigType.Client), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we do this for Broker
and User
as well?
time: Time) extends Logging { | ||
time: Time, | ||
metricGroup: String = "kafka.server", | ||
metricType: String = "KafkaHealthcheck") extends Logging with KafkaMetricsGroup { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better not to have the default at all.
state -> newMeter(name, eventType.toLowerCase(Locale.ROOT), TimeUnit.SECONDS) | ||
} | ||
} | ||
|
||
info(s"Initializing a new session to $connectString.") | ||
@volatile private var zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, ZooKeeperClientWatcher) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth adding a comment stating that we intend to fail-fast during construction, but that we retry forever if this is reinitialised.
waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS) | ||
|
||
|
||
/** | ||
* This is added to preserve the original metric name in JMX |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're receiving this as a parameter now, we should tweak this comment.
} | ||
} | ||
info(s"Timed out waiting for connection during session initialization while in state: ${zooKeeper.getState}") | ||
stateChangeHandlers.foreach {case (name, handler) => handler.onReconnectionTimeout()} | ||
stateChangeHandlers.values.foreach(_.onReconnectionTimeout()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this code will never run now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We can just remove the onReconnectionTimeout() callback.
assertEquals(s"Unexpected ZK state ${server.zkUtils.zkConnection.getZookeeperState}", | ||
"CONNECTED", yammerMetricValue("SessionState")) | ||
// Latency is rounded to milliseconds, so check the count instead. | ||
val initialCount = yammerHistogramCount("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs").asInstanceOf[Long] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I had missed that you're using the count now. Makes sense.
@@ -276,6 +274,16 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { | |||
} | |||
} | |||
|
|||
private def yammerHistogramCount(name: String): Any = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can return a Long
here.
val clusterId = CoreUtils.generateUuidAsBase64 | ||
|
||
zkClient.createOrGetClusterId(clusterId) | ||
assertEquals(clusterId, zkClient.getClusterId.getOrElse(fail("No cluster id found"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we can assert that that the cluster id is Some(clusterId)
.
zkClient.createTopLevelPaths() | ||
|
||
ZkData.PersistentZkPaths.foreach { | ||
path => assertTrue(zkClient.pathExists(path)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: path is usually in the previous line.
Committer Checklist (excluded from commit message)