-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6074 Use ZookeeperClient in ReplicaManager and Partition #4254
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tedyu : Thanks for the patch. A few comments below.
def createSequentialPersistentPath(path: String, data: String = ""): String = { | ||
val createRequest = CreateRequest(path, data.getBytes("UTF-8"), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) | ||
val createResponse = retryRequestUntilConnected(createRequest) | ||
if (createResponse.resultCode != Code.OK) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't really need the if statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see similar code in createConfigChangeNotification() on line 272.
Can you clarify how the if statement can be skipped ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just need createResponse.resultException.foreach(e => throw e) since createResponse.resultException is None only when createResponse.resultCode is OK.
Mani will have a separate PR to make existing usage consistent.
createResponse.path | ||
} | ||
|
||
def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method doesn't access ZK. So, it should be in ZkData.scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at ZkData.scala where there are bunch of objects.
Not sure whether this method fits there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could just add a new LeaderAndIsr object and put the method there.
* Read the entity (topic, broker, client, user or <user, client>) config (if any) from zk | ||
* sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or <user>/clients/<client-id>. | ||
*/ | ||
def fetchEntityConfigZkClient(zkClient: KafkaZkClient, rootEntityType: String, sanitizedEntityName: String): Properties = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This already exists in AdminZkClient.fetchEntityConfig().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That method doesn't take KafkaZkClient
Let me take another look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AdminZkClient.fetchEntityConfig() take zkUtils.
I want to see if KafkaZkClient can be used.
Trying to figure out how to make ReplicaManagerTest pass.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AdminZkClient
takes KafkaZkClient
in the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pardon.
AdminZkClient is not referenced by Partition or ReplicaManagerTest.
Do you mind explaining a bit more ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As Jun said, there's no need to add this method. AdminZkClient.fetchEntityConfig
should be used instead. You said that the method did not take a KafkaZkClient
, so I explained that the constructor of the class did. Can you clarify what is unclear?
try { | ||
val writtenLeaderAndIsrInfo = zkUtils.readDataMaybeNull(path) | ||
val writtenLeaderAndIsrInfo = zkClient.readDataMaybeNull(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be val (writtenLeaderOpt, writtenStat) = zkClient.readDataMaybeNull(path)
import scala.collection.Map | ||
|
||
object LogDirUtils extends Logging { | ||
|
||
private val LogDirEventNotificationPrefix = "log_dir_event_" | ||
val LogDirFailureEvent = 1 | ||
|
||
def propagateLogDirEvent(zkUtils: ZkUtils, brokerId: Int) { | ||
val logDirEventNotificationPath: String = zkUtils.createSequentialPersistentPath( | ||
def propagateLogDirEvent(zkClient: KafkaZkClient, brokerId: Int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we could delete the other two methods in this class since they don't seem to be used.
@@ -49,6 +49,32 @@ import scala.collection.{Seq, mutable} | |||
class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends Logging { | |||
import KafkaZkClient._ | |||
|
|||
def createSequentialPersistentPath(path: String, data: String = ""): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a unit test for the new methods added in this class in KafkaZkClientTest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this still needs to be addressed.
I am thinking of passing zkClient to TopicPartition ctor.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, a couple of comments.
createResponse.path | ||
} | ||
|
||
def readDataMaybeNull(path: String): (Option[String], Stat) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks similar to getDataAndStat
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to remove this? You can just use getDataAndStat
.
@@ -30,6 +30,13 @@ import org.apache.zookeeper.data.Stat | |||
|
|||
// This file contains objects for encoding/decoding data stored in ZooKeeper nodes (znodes). | |||
|
|||
object LeaderAndIsrZNode { | |||
def encode(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other encode
methods return Array[Byte]
, why does this return a String
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method calls Json.encode().
The String return type was from zkUtils.leaderAndIsrZkData().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point is that we should aim to be consistent and return Array[Byte]
from all the encode
methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
String is expected for argument 2 of:
def conditionalUpdatePath(path: String, data: String, expectVersion: Int,
If return value is changed to Array[Byte], wouldn't that require converting the output from Json.encode() to Array[Byte] and back to String ?
Please confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should change data
in that method to be of type Array[Byte]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter meanders thru checker and lands as 1st parameter here:
private def parseLeaderAndIsr(leaderAndIsrStr: String, path: String, stat: Stat): Option[LeaderIsrAndControllerEpoch] = {
Json.parseFull(leaderAndIsrStr).flatMap { js =>
This would involve back and forth conversion which seems unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That code can be changed to be Json.parseBytes
. You can take a look at the other code, it's all following the same pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When making the change, I found I need to overcome the following:
Can you confirm this is desirable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's desirable, but maybe we can leave it for another PR in that case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. The scope is bigger than I thought.
@@ -55,7 +55,7 @@ class Partition(val topic: String, | |||
// Do not use replicaManager if this partition is ReplicaManager.OfflinePartition | |||
private val localBrokerId = if (!isOffline) replicaManager.config.brokerId else -1 | |||
private val logManager = if (!isOffline) replicaManager.logManager else null | |||
private val zkUtils = if (!isOffline) replicaManager.zkUtils else null | |||
private val zkUtils = if (!isOffline) replicaManager.zkClient else null | |||
// allReplicasMap includes both assigned replicas and the future replica if there is ongoing replica movement |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can create AdminZkClient instance and use here.
Similar usage for reference:
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/AdminManager.scala#L50
def propagateLogDirEvent(zkUtils: ZkUtils, brokerId: Int) { | ||
val logDirEventNotificationPath: String = zkUtils.createSequentialPersistentPath( | ||
def propagateLogDirEvent(zkClient: KafkaZkClient, brokerId: Int) { | ||
val logDirEventNotificationPath: String = zkClient.createSequentialPersistentPath( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to add this method to KafkaZkClient. This is similar imilar to KafkaZkClient.createAclChangeNotification method.
Related zk nodes/paths are : ZkData.LogDirEventNotificationZNode, ZkData.LogDirEventNotificationSequenceZNode
@@ -55,7 +56,7 @@ class Partition(val topic: String, | |||
// Do not use replicaManager if this partition is ReplicaManager.OfflinePartition | |||
private val localBrokerId = if (!isOffline) replicaManager.config.brokerId else -1 | |||
private val logManager = if (!isOffline) replicaManager.logManager else null | |||
private val zkUtils = if (!isOffline) replicaManager.zkUtils else null | |||
private val zkUtils = if (!isOffline) replicaManager.zkClient else null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be renamed to zkClient
@@ -18,6 +18,7 @@ package kafka.admin | |||
|
|||
import java.util.Properties | |||
import kafka.utils.ZkUtils | |||
import kafka.zk.KafkaZkClient | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import?
@@ -35,6 +36,7 @@ class HighwatermarkPersistenceTest { | |||
val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps) | |||
val topic = "foo" | |||
val zkUtils = EasyMock.createMock(classOf[ZkUtils]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need zkUtils now?
@@ -149,6 +150,7 @@ class ReplicaManagerQuotasTest { | |||
|
|||
def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, bothReplicasInSync: Boolean = false) { | |||
val zkUtils = createNiceMock(classOf[ZkUtils]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need zkUtils now?
@@ -49,10 +50,14 @@ class ReplicaManagerTest { | |||
val metrics = new Metrics | |||
var zkClient: ZkClient = _ | |||
var zkUtils: ZkUtils = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need zkUtils now?
@@ -70,6 +71,7 @@ class SimpleFetchTest { | |||
def setUp() { | |||
// create nice mock since we don't particularly care about zkclient calls | |||
val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
Thanks for the reviews. Update coming soon. I would be out of town next week. So trying to get some work done. |
Failure of SaslSslAdminClientIntegrationTest.testLogStartOffsetCheckpoint was not related to the PR. |
@@ -21,6 +21,7 @@ import kafka.log.LogConfig | |||
import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig} | |||
import kafka.utils._ | |||
import kafka.utils.ZkUtils._ | |||
import kafka.zk.KafkaZkClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import?
@@ -26,6 +26,7 @@ import kafka.cluster.{BrokerEndPoint, Replica} | |||
import kafka.log.LogConfig |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LogConfig is used
Json.encode(Map("version" -> LogDirEventNotificationHandler.Version, "broker" -> brokerId, "event" -> LogDirFailureEvent)) | ||
} | ||
|
||
def propagateLogDirEvent(brokerId: Int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm..This method should be like createConfigChangeNotification, createAclChangeNotification methods.
and should use existing ZkData.LogDirEventNotificationSequenceZNode object.
|
||
def propagateLogDirEvent(brokerId: Int) { | ||
val logDirEventNotificationPath: String = createSequentialPersistentPath( | ||
ZkUtils.LogDirEventNotificationPath + "/log_dir_event_", logDirFailureEventZkData(brokerId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should use LogDirEventNotificationZNode.path, LogDirEventNotificationSequenceZNode.SequenceNumberPrefix constants here
@@ -933,6 +953,17 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends | |||
createResponse.resultException.foreach(e => throw e) | |||
} | |||
|
|||
private def logDirFailureEventZkData(brokerId: Int): String = { | |||
val LogDirFailureEvent = 1 | |||
Json.encode(Map("version" -> LogDirEventNotificationHandler.Version, "broker" -> brokerId, "event" -> LogDirFailureEvent)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already available in ZkData.LogDirEventNotificationSequenceZNode.encode method
Java 8 build failed with:
Not related to my PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from a correctness perspective, but it's missing a test and can be simplified. Will merge to trunk and submit a follow-up.
@@ -1159,7 +1174,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends | |||
} | |||
} | |||
|
|||
private[zk] def pathExists(path: String): Boolean = { | |||
def pathExists(path: String): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be reverted.
Replace ZkUtils with KafkaZkClient in ReplicaManager and Partition
Utilize existing unit tests
Committer Checklist (excluded from commit message)