-
Notifications
You must be signed in to change notification settings - Fork 71
(TWILL-61) Fix to allow higher attempts to relaunch the app after the first attempt failed #67
Conversation
chtyim
commented
Mar 9, 2018
- Delete the Kafka root zk node for the application if already exist
- Delete the AM instance zk node if already exist
- For runnables parent zk node, it is not an error if it already exist
- Enhance KafkaClient publisher / consumer to deal with Kafka cluster changes
- When AM killed and restarted, the embedded Kafka will be running in different host and port
d596d4d
to
14c38d4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few comments,
public void onFailure(final Throwable createFailure) { | ||
if (!(createFailure instanceof KeeperException.NodeExistsException)) { | ||
resultFuture.setException(createFailure); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need to return here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. Missed it.
resultFuture.setException(createFailure); | ||
} | ||
|
||
// If the node already exists, it is due to previous run attempt that left an ephemeral node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how can it leave an ephemeral node? Doesn't that mean there must be a zombie process holding on to that node?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ephemeral node won't go away immediate if the process die. It will stay there till ZK session timeout, which is typically multiple seconds. In the meantime, the next AM process may already be started by YARN, hence the new AM process will see the ephemeral node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it
} | ||
}, Threads.SAME_THREAD_EXECUTOR); | ||
|
||
return resultFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while this code appears correct (after addressing my comment), the three nested levels of callback make it almost impossible to read. Is there some way to unwind this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, we do need that many levels of callback (create -> delete -> create) for the operation. Any suggestions on how to simplify it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can pull the common code between this and the ApplicationMasterMain class into a util function. But still, inside the util function, there would be three callbacks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking it does not have to be async. Create the ZK node, get the future. If success, done. If not delete the ZK node, get the future. If failure, throw. Else try again. But maybe that would be equally complex?
if (oldProducer != null) { | ||
oldProducer.close(); | ||
} | ||
|
||
LOG.info("Update Kafka producer broker list: {}", newBrokerList); | ||
if (newBrokerList.isEmpty()) { | ||
LOG.warn("Empty Kafka producer broker list, publish will fail."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So when will this happen? If the AM dies (and its broker with it)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
public void onFailure(final Throwable createFailure) { | ||
if (!(createFailure instanceof KeeperException.NodeExistsException)) { | ||
completion.setException(createFailure); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return here?
I've addressed the comments and fixed one more issue (one deletion failure, if the node not exist, we can just go ahead and recreate the node instead of failing). Also refactored the callback code a bit to try to make it cleaner. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one minor comment, otherwise LGTM
@@ -170,45 +167,7 @@ private ApplicationKafkaService(ZKClient zkClient, String kafkaZKConnect) { | |||
protected void startUp() throws Exception { | |||
// Create the ZK node for Kafka to use. If the node already exists, delete it to make sure there is | |||
// no left over content from previous AM attempt. | |||
final SettableOperationFuture<String> completion = SettableOperationFuture.create(kafkaZKPath, | |||
Threads.SAME_THREAD_EXECUTOR); | |||
LOG.info("Preparing Kafka ZK path {}{}", zkClient.getConnectString(), kafkaZKPath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe keep the log message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yeah, accidentally removed.
Iterable<ACL> acls, FutureCallback<String> callback) { | ||
Futures.addCallback(zkClient.create(path, data, createMode, createParent, acls), | ||
callback, Threads.SAME_THREAD_EXECUTOR); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, easier to understand and extracted into separate methods. Like it much better now
… first attempt failed - Delete the Kafka root zk node for the application if already exist - Delete the AM instance zk node if already exist - For runnables parent zk node, it is not an error if it already exist - Enhance KafkaClient publisher / consumer to deal with Kafka cluster changes - When AM killed and restarted, the embedded Kafka will be running in different host and port
07c418c
to
27db864
Compare
… first attempt failed - Delete the Kafka root zk node for the application if already exist - Delete the AM instance zk node if already exist - For runnables parent zk node, it is not an error if it already exist - Enhance KafkaClient publisher / consumer to deal with Kafka cluster changes - When AM killed and restarted, the embedded Kafka will be running in different host and port This closes #67 on Github. Signed-off-by: Terence Yim <chtyim@apache.org>