-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6074 Use ZookeeperClient in ReplicaManager and Partition #4166
Conversation
See if changes in ReassignPartitionsCommand are on right track. Alternative is to use ZkUtils and KafkaZkClient in ReassignPartitionsCommand. |
I suggest to change ReplicaManager.scala, LogDirUtils.scala, ReplicationUtils.scala classes as part of this PR. ReassignPartitionsCommand can be done in another JIRA/PR. I am doing few changes to AdminUtils as part of KAFKA-5646. We can reuse those changes for Admin Operations/KAFKA-5647. |
FAILURE |
2 similar comments
FAILURE |
FAILURE |
FAILURE |
2 similar comments
FAILURE |
FAILURE |
40 tests extend ZooKeeperTestHarness where {{var zkUtils: ZkUtils}} is declared. Suggestion on how to phase out ZkUtils is welcome. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tedyu : Thanks for the patch. A few comments below. Also, your PR somehow shows "unknown repository".
def createSequentialPersistentPath(path: String, data: String = ""): String = { | ||
val createRequest = CreateRequest(path, data.getBytes("UTF-8"), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) | ||
val createResponse = retryRequestUntilConnected(createRequest) | ||
createResponse.path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should throw an exception if the result code is not OK.
def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = { | ||
Json.encode(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch, | ||
"controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't access zookeeperClient and should be moved to ZkData.scala.
} else { | ||
val data = new String(getDataResponse.data, UTF_8) | ||
(Some(data), getDataResponse.stat) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should throw an exception in the result code is not OK.
"controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr)) | ||
} | ||
|
||
def readDataMaybeNull(path: String): (Option[String], Stat) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this, methods like getPreferredReplicaElection() and getControllerId, etc could potentially be simplified by reusing this method.
@@ -48,6 +48,29 @@ import scala.collection.mutable.ArrayBuffer | |||
class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends Logging { | |||
import KafkaZkClient._ | |||
|
|||
def createSequentialPersistentPath(path: String, data: String = ""): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a unit test for each new public method introduced in this class?
@junrao I was waiting for some advice on how the related tests should be handled so that compilation passes. |
@tedyu I am migrating few utility methods as part of KAFKA-5646 and 5647. |
@tedyu : KAFKA-5646 is now committed. You can rebase your patch now. |
No description provided.