-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-6074 Use ZookeeperClient in ReplicaManager and Partition #4124
Conversation
In zkUtils.conditionalUpdatePersistentPath():
Currently there is no writeData in KafkaControllerZkUtils. See if what I have so far is on right track. |
More compilation errors to tackle: |
@@ -361,6 +365,7 @@ sealed trait AsyncResponse { | |||
} | |||
case class CreateResponse(resultCode: Code, path: String, ctx: Option[Any], name: String) extends AsyncResponse | |||
case class DeleteResponse(resultCode: Code, path: String, ctx: Option[Any]) extends AsyncResponse | |||
case class WriteDataReturnStatResponse(resultCode: Code, path: String, ctx: Option[Any], stat: Stat) extends AsyncResponse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use SetDataResponse for writing right? Pls check #4126
Just a heads up: I consciously avoided copying existing util methods from ZkUtils over to KafkaControllerZkUtils. |
I will wait till #4126 gets merged (for code reuse) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tedyu : Thanks for the patch. A couple of comments below.
def createSequentialPersistentPath(path: String, data: String = ""): String = { | ||
val createRequest = CreateRequest(path, data.getBytes("UTF-8"), acls(path), CreateMode.PERSISTENT_SEQUENTIAL) | ||
val createResponse = retryRequestUntilConnected(createRequest) | ||
createResponse.path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We want to throw an exception if the rc in createResponse is not Code.OK.
@@ -98,6 +100,11 @@ class KafkaControllerZkUtils(zookeeperClient: ZookeeperClient, isSecure: Boolean | |||
retryRequestUntilConnected(createRequest) | |||
} | |||
|
|||
def leaderAndIsrZkData(leaderAndIsr: LeaderAndIsr, controllerEpoch: Int): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably should be in ZkData.scala.
Thanks for the review, Jun. Planning to rebase once #4126 goes in. |
This was superseded by #4254 |
No description provided.