-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7165: Retry the BrokerInfo registration into ZooKeeper #5575
Conversation
@jonathansantilli : Thanks for the patch. About the approach. I am not exactly sure about the impact of explicitly deleting an ephemeral node whose session is about to expire. For example, I am not sure if the following can happen: (1) ZK session 1 expires and the ZK leader changes; (2) new ZK leader retains the ephemeral node from session 1 and is about to expire session 1; (3) ZK session 2 deletes and recreates the ephemeral node; (4) ZK leader expires session 1 and deletes the ephemeral node created by session 2. If this can happen, it will create a weird problem in Kafka. An alternative approach is to retry the creation of the ephemeral node up to sth like twice the session timeout. It may take a bit long for the broker to be re-registered. However, it seems it's a bit safer and simpler, until ZOOKEEPER-2985 is fixed. |
Hello @junrao thanks a lot for your comment. Do you mind If I move the conversation to the JIRA issue and there we can elaborate your alternative approach? I would like to continue working on this. |
Hello @junrao, I would like to continue working on this bug, hope you can have some time to elaborate a little bit more your proposal here: https://issues.apache.org/jira/browse/KAFKA-7165?focusedCommentId=16661931&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16661931 Cheers! |
22fc1a0
to
5ffc8a7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonathansantilli : On second thought, your current patch could also work as a temporary fix. Added a few more comments below.
* is created and the subsequent updates of the session id, it is possible that the session id change | ||
* over the time for 'Session expired', for instance. | ||
* */ | ||
private var previousZooKeeperSessionId = zooKeeperClient.sessionId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following is the comment from ZooKeeper.getSessionId(). Since ZK connects to the server asynchronously, the sessionId is not guaranteed to be available until the session is established.
/**
* The session id for this ZooKeeper client instance. The value returned is
* not valid until the client connects to a server and may change after a
* re-connect.
*
* This method is NOT thread safe
*
* @return current session id
*/
public long getSessionId() {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this could be a problem at the moment the session Id is collected for the first time. I will check where is the best place to assign it.
Thanks for the heads up.
getDataResponse.resultCode match { | ||
case Code.OK if getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId => | ||
case Code.OK if shouldReCreateEphemeralZNode(ephemeralOwnerId) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add some comments why we need this and that we want to remove this once ZOOKEEPER-2985 is fixed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, is definitely needed, I will add a comment with the appropriated references.
Once again, thanks for the heads up.
* */ | ||
private var previousZooKeeperSessionId = zooKeeperClient.sessionId | ||
// Only for testing, to simulate the ZooKeeper session id has changed | ||
private var currentZooKeeperSessionId: Long = -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this? Could we just create a subclass of KafkaZkClient and overwrite the return value of shouldReCreateEphemeralZNode() in the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you pointed out, this is a momentaneous workaround to tackle the issue. I have thought on a more elegant way to do it, as you propose. Probably am opting for the simples but not elegant solution for this. The rationale behind my proposal is:
- Keeping everything in the same place/file (now with the comments you suggested is better), in this way, the workaround is framed to a single place.
- Once the Zookeeper issue is addressed properly, we just have to come to this file/class and delete this code.
Having said that, am more than happy to change it if you think is better :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I am mostly after simplicity. The patch as it is seems a bit hard for me to read. Part of that is to have to understand the meaning of 2 separate ZK session ids and when they are updated. I thought maintaining just one session id var could be simpler. With regard to keeping things in one place, I guess in either case, we have to change the test case when this logic is eventually removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it makes sense, am changing the code to achieve that. 👍
5ffc8a7
to
15c6795
Compare
Hello @junrao I have done the suggested changes, please take a look when suitable for you 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonathansantilli : Thanks for the updated patch. A few more comments below.
@@ -67,6 +67,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean | |||
// Only for testing | |||
private[kafka] def currentZooKeeper: ZooKeeper = zooKeeperClient.currentZooKeeper | |||
|
|||
// This variable holds the Zookeeper session id at the moment a Broker gets registered in Zookeeper and the subsequent | |||
// updates of the session id. It is possible that the session id change over the time for 'Session expired'. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change -> changes
// This variable holds the Zookeeper session id at the moment a Broker gets registered in Zookeeper and the subsequent | ||
// updates of the session id. It is possible that the session id change over the time for 'Session expired'. | ||
// This code is part of the work around done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code must | ||
// be deleted, if by the time you are reading this the code is still present and the bug solve, please delete it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't need the remaining part starting with "if by the time". Ditto below.
case Code.OK if getDataResponse.stat.getEphemeralOwner != zooKeeperClient.sessionId => | ||
// At this point, the Zookeeper session could be different (due a 'Session expired') from the one that initially | ||
// registered the Broker into the Zookeeper ephemeral node, but the znode is still present in ZooKeeper. | ||
// The expected behaviour is that Zookeeper removes the ephemeral node associated with the expired session but |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Zookeeper removes => Zookeeper server removes
// That is, the fist attempt to register the Broker, we save a reference of the used Zookeeper session id. | ||
// This is done here since the Zookeeper session id may not be available at the Object creation time, | ||
// hence, this is assuming the 'retryRequestUntilConnected' method got connected and a valid session id is present. | ||
setCurrentZookeeperSessionId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, do we need both setCurrentZookeeperSessionId() and updateCurrentZKSessionId()? To me, it seems that we can just initialize currentZooKeeperSessionId to -1 and set currentZooKeeperSessionId
in checkedEphemeralCreate(path: String, data: Array[Byte]) if the return code is ok.
val deleteResponse = retryRequestUntilConnected(deleteRequest) | ||
deleteResponse.resultCode match { | ||
case code@ Code.OK => code | ||
case code => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that if code is NoNode, we should just let it go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@junrao sorry, but I do not get this comment.
Is true (after you point this out) that at the moment we try to delete the ephemeral znode, it could not be present anymore an a NoNode
response will be received, and because we are just checking if (codeAfterDelete == Code.OK)
the created()
call will not be performed, hence, the Broker will be not registered. Is that what you meant?
If so, will be adding if (codeAfterDelete == Code.OK || codeAfterDelete == Code.NONODE)
?
Hope you can help me to understand your point.
Cheers!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or this?
deleteResponse.resultCode match {
case code@ Code.OK => code
> case code@ Code.NONODE => code
case code =>
error(s"Error while deleting ephemeral node at $path with return code: $code")
code
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it seems that if the node doesn't exist (ZK server could have deleted), we should still do the create() instead of returning an error code.
private def reCreate(): Code = { | ||
val codeAfterDelete = delete() | ||
var codeAfterReCreate = codeAfterDelete | ||
info(s"Result of znode ephemeral deletion at $path is: $codeAfterDelete") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this should be debug level logging?
info(s"Result of znode ephemeral deletion at $path is: $codeAfterDelete") | ||
if (codeAfterDelete == Code.OK) { | ||
codeAfterReCreate = create() | ||
info(s"Result of znode ephemeral re-creation at $path is: $codeAfterReCreate") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this should be debug level logging?
// This code is part of the work around done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code must | ||
// be deleted, if by the time you are reading this the code is still present and the bug solve, please delete it. | ||
case Code.OK if shouldReCreateEphemeralZNode(ephemeralOwnerId) => | ||
warn(s"Error while creating ephemeral at $path, node already exists and owner " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be info instead of warn? We probably want to void the word Error in an info logging.
15c6795
to
2b30a71
Compare
Hello @junrao I have done the suggested changes, please take a look. Cheers! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonathansantilli : Thanks for the updated patch. Just a couple of more comments below.
val deleteRequest = DeleteRequest(path, ZkVersion.MatchAnyVersion) | ||
val deleteResponse = retryRequestUntilConnected(deleteRequest) | ||
deleteResponse.resultCode match { | ||
case code@ Code.OK => code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we just move the call to updateCurrentZKSessionId() here? This way, there will be a single place where updateCurrentZKSessionId() will be called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmmm, where exactly @junrao?
We are setting the currentZooKeeperSessionId
at the end of private def checkedEphemeralCreate(path: String, data: Array[Byte])
method, this ensures that the first time the broker tries to registered we have the session Id used.
If we do not do that, the method isZKSessionTheEphemeralOwner
will return false
(since -1 != zooKeeperClient.sessionId
) and will not try to recreate the ephemeral node.
Then we are setting currentZooKeeperSessionId
again if the condition case Code.OK if shouldReCreateEphemeralZNode(ephemeralOwnerId) =>
applies, becase this will update the currentZooKeeperSessionId
with the newest session, useful when trying to recreate the ephemeral node.
Am pretty sure am missing something since you point this out, but I can not see it, hope you can give me a clue :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't make it clear. Basically, we want to set the session id after a successful creation of the ephemeral path. I was thinking that we can call updateCurrentZKSessionId() after line 1667 in CheckedEphemeral.create() when the return code is ok. Then, we don't need to call updateCurrentZKSessionId() anywhere else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
expiredSessionZkClient.registerBroker(expiredBrokerInfo) | ||
|
||
// The broker info should be the same, no error should be raised | ||
assertEquals(Some(expiredBrokerInfo.broker), expiredSessionZkClient.getBroker(brokerId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we further verify that the czxid of the ephemeral node is different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thanks, good idea, doing it.
2b30a71
to
f7f0cb1
Compare
Hello @junrao, I have done the suggested changes, please take a look. Cheers! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonathansantilli : Thanks for the new update. A couple of more comments below.
// This is assuming the 'retryRequestUntilConnected' method got connected and a valid session id is present. | ||
// This code is part of the work around done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code | ||
// must be deleted. | ||
updateCurrentZKSessionId(zooKeeperClient.sessionId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we need to update the ZK session id in the case below when getAfterNodeExists(0 returns true. So, perhaps we can save the code for all cases and call updateCurrentZKSessionId() if the saved code is OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you mean when getAfterNodeExists()
returns Code.OK
.
If I understood correctly, will be like:
var createResultCode = Code.OK
createResponse.resultCode match {
case code@ Code.OK =>
createResultCode = code
case Code.NODEEXISTS =>
createResultCode = getAfterNodeExists()
case code =>
error(s"Error while creating ephemeral at $path with return code: $code")
createResultCode = code
}
if (createResultCode == Code.OK) {
// At this point, we need to save a reference to the zookeeper session id.
// This is done here since the Zookeeper session id may not be available at the Object creation time.
// This is assuming the 'retryRequestUntilConnected' method got connected and a valid session id is present.
// This code is part of the workaround done in the KAFKA-7165, once ZOOKEEPER-2985 is complete, this code
// must be deleted.
updateCurrentZKSessionId(zooKeeperClient.sessionId)
}
createResultCode
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's exactly what I was thinking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool @junrao, I have updated the code as suggested.
@@ -1585,7 +1591,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean | |||
|
|||
private def acls(path: String): Seq[ACL] = ZkData.defaultAcls(isSecure, path) | |||
|
|||
private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = { | |||
private[zk] def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to expose package level visibility for this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is to take advantage of that method to perform the call to Zookeeper.
I can move the method getPathCzxid
present in the class ExpiredKafkaZkClient
from the test, to the class KafkaZkClient
(but not useful in that class KafkaZkClient
).
Or I can add more code just to make the call to Zookeeper to get the czxid
in the class ExpiredKafkaZkClient
What do you think?
f7f0cb1
to
191d5ae
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonathansantilli : Thanks for the new patch. LGTM. Just a few more minor comments below.
case code => | ||
error(s"Error while creating ephemeral at $path with return code: $code") | ||
createResultCode = code | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps the code can be simplified a bit as the following?
val createResultCode =
createResponse.resultCode match {
case code@ Code.OK =>
code
case Code.NODEEXISTS =>
getAfterNodeExists()
case code =>
error(s"Error while creating ephemeral at $path with return code: $code")
code
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it definitely is simpler and allow use to use val
instead of var
.
Doing it.
@@ -58,21 +58,26 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { | |||
val topicPartitions10_11 = Seq(topicPartition10, topicPartition11) | |||
val controllerEpochZkVersion = 0 | |||
|
|||
var otherZkClient: KafkaZkClient = _ | |||
var otherZkClient : KafkaZkClient = _ | |||
var expiredSessionZkClient : ExpiredKafkaZkClient = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The convention seems to be no space before ":" in both line 61 and 62.
override def shouldReCreateEphemeralZNode(ephemeralOwnerId: Long): Boolean = { | ||
true | ||
} | ||
def getPathCzxid(path: String): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add a new line above?
In case the ZooKeeper session has been regenerated and the broker tries to register the BrokerInfo into Zookeeper, this code deletes the current BrokerInfo from Zookeeper and creates it again, just if the znode ephemeral owner belongs to the Broker which tries to register himself again into ZooKeeper
191d5ae
to
b0e2df5
Compare
Hello @junrao, I have added the changes as suggested. |
@junrao please let me know what's the process to merge this PR, please :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonathansantilli : Thanks for the latest patch. LGTM
Good day all, This is not yet included in any release, right? How to work around it? |
Hello @pabloa502 although this has been included into the trunk branch, is not yet released. |
…per (apache#5575) * Add logic to retry the BrokerInfo registration into ZooKeeper In case the ZooKeeper session has been regenerated and the broker tries to register the BrokerInfo into Zookeeper, this code deletes the current BrokerInfo from Zookeeper and creates it again, just if the znode ephemeral owner belongs to the Broker which tries to register himself again into ZooKeeper * Add test to validate the BrokerInfo re-registration into ZooKeeper
…5575) * Add logic to retry the BrokerInfo registration into ZooKeeper In case the ZooKeeper session has been regenerated and the broker tries to register the BrokerInfo into Zookeeper, this code deletes the current BrokerInfo from Zookeeper and creates it again, just if the znode ephemeral owner belongs to the Broker which tries to register himself again into ZooKeeper * Add test to validate the BrokerInfo re-registration into ZooKeeper
The following is a proposal when the ZooKeeper session has changed and the Broker tries to register himself into ZooKeeper.
Currently, this throws a
NodeExistsException
since a Broker with the same id got registered previously into Zookeeper.Assuming Broker id = 1
NodeExistsException
will be generated.NodeExistsException
will be generated.Committer Checklist (excluded from commit message)