-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restorable indexing tasks #1881
Conversation
b21cc5b
to
a9d5f25
Compare
jdk8 |
public void run() | ||
{ | ||
persistLatch.countDown(); | ||
committer.run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the contract for persist that it will run in a non-daemon thread? also, if the committer errors out is there any special "oh crap" message that needs to fly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe it runs in a daemon thread, but that's okay because there's a non-daemon thread (the shutdown hook) which is waiting for it to finish.
If the committer errors out then that would get logged by the plumber.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to interchange persistLatch.countDown()
and committer.run()
? I believe persistLatch.await()
should wait for task to really finish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, this is a good point. I was thinking that the persistlatch needs to count down even if committer.run() throws an exception- but we can address this with a try/catch
plumber.finishJob(); | ||
} else { | ||
log.info("Persisting pending data without handoff, in preparation for restart."); | ||
final Committer committer = committerSupplier.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here the firehose is already closed while doing shutdown.
Is it safe to call commit on a closed firehose ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this might not work for KafkaEightFirehoseFactory too since the underlying connector is already closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nishantmonu51 that's a good point, this does not really work very well with the kafka firehose. But then again, RealtimeIndexTasks never did work well with kafka…
do you think it's worth it to rework things such that this works well for kafka and the event receiver? I think to do that we would want the behavior,
- if you're using EventReceiverFirehose, stopGracefully causes the servlet to stop accepting new data, and the task will drain existing data, then stop.
- if you're using the Kafka firehose, stopGracefully causes the task to simply stop reading data, then persist/commit, then stop.
This could probably be accomplished somehow…
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree things dont work very well with kafka firehose at present also, till now we also didn't had a concept for graceful restart, If we are going to provide that functionality, I think we should also look into how we can make it working with our current firehoses.
Also, to others in community who might have written their own custom firehoses, a call to commit after a firehose has been shutdown may be unexpected and might result in weird errors.
Both the behaviours for EventReceiverFirehose and KafkaFirehose seem good and points to that we may need to add an API to the firehose where instead of completely shutting it down, we ask firehose to stop reading any further events, ingest all events which might be in some buffers, persist and call commit on firehose, shutdown the firehose and release any resources being held up by the firehose .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think some people do use kafka firehose (with either partition or replication) and it works in the specific cases, with this change that will break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nishantmonu51 @himanshug thinking of doing this by just having special behavior triggered by an instanceof check for EventReceiverFirehose. There doesn't seem to be a nicer way to do it with the current firehose interface. Basically- the ERF would get closed and drained, all other firehoses we would simply immediately stop reading. For those we would rely on commit() being an effective way to get back undrained data.
After these changes, what is the behaviour of task logs ? |
@nishantmonu51 they get uploaded to S3 on each restart, with new uploads overriding previous uploads. The FTR opens the log in "append" mode so each upload will contain logs from all previous runs of the same task. |
@nishantmonu51 I do think that in the future we should make the uploading happen in chunks rather than all at once- but this would be a different PR |
taskRestoreInfo = jsonMapper.readValue(restoreFile, TaskRestoreInfo.class); | ||
} | ||
catch (Exception e) { | ||
log.warn(e, "Failed to restore tasks from file[%s]. Skipping restore.", restoreFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be error as restoreFile existence should mean that there are valid tasks to restore and for whatever reason reading the file failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good to me.
a9d5f25
to
0f6d71b
Compare
@himanshug @nishantmonu51 @drcrallen pushed a new commit, the main change is in firehose-closing behavior. also looking at adding some tests. |
hmm, I think there's a further problem. If you don't close the kafka firehose, and no new data is forthcoming, I believe it will block forever on hasMore. I don't think there's anything we can do about that right now, but that points to potentially wanting to have a timeout on hasMore or a poll-style Firehose interface. |
What that means is that if you do try to stopGracefully a realtime task reading from a dry kafka firehose, it will likely time out and be killed after the gracefulShutdownTimeout (default 5 minutes) |
@@ -62,6 +63,8 @@ | |||
import java.io.IOException; | |||
import java.util.Map; | |||
import java.util.Random; | |||
import java.util.concurrent.CountDownLatch; | |||
import java.util.concurrent.atomic.AtomicBoolean; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, yes, removed.
log.info("Acquired lock file[%s] in %,dms.", taskLockFile, System.currentTimeMillis() - startLocking); | ||
} | ||
} else { | ||
throw new ISE("Already started!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this ever happen or just to catch if start() is called twice (or more)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to catch if start() is called more than once- which it should not be.
0f6d71b
to
f44ccb8
Compare
@himanshug @nishantmonu51 @drcrallen @guobingkun pushed updates for outstanding comments + also some unit tests for restoring realtime tasks |
f44ccb8
to
9ce5222
Compare
log.info("Starting graceful shutdown of task[%s].", task.getId()); | ||
|
||
try { | ||
task.stopGracefully(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we emit a metric about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have a way to keep track of how many graceful stops we do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
e899be4
to
d1dfe8e
Compare
…are. This is done by killing and respawning the jvms rather than reconnecting to existing jvms, for a couple reasons. One is that it lets you restore tasks after server reboots too, and another is that it lets you upgrade all the software on a box at once by just restarting everything. The main changes are, 1) Add "canRestore" and "stopGracefully" methods to Tasks that say if a task can stop gracefully, and actually do a graceful stop. RealtimeIndexTask is the only one that currently implements this. 2) Add "stop" method to TaskRunners that attempts to do an orderly shutdown. ThreadPoolTaskRunner- call stopGracefully on restorable tasks, wait for exit ForkingTaskRunner- close output stream to restorable tasks, wait for exit RemoteTaskRunner- do nothing special, we actually don't want to shutdown 3) Add "restore" method to TaskRunners that attempts to bootstrap tasks from last run. Only ForkingTaskRunner does anything here. It maintains a "restore.json" file with a list of restorable tasks. 4) Have the CliPeon's ExecutorLifecycle lock the task base directory to avoid a restored task and a zombie old task from stomping on each other.
d1dfe8e
to
501dcb4
Compare
@drcrallen @himanshug updated the branch with changes from code review |
we've been using this internally for awhile, seems to work |
👍 with the changes too |
would be great to add some integration tests for this feature. |
Some changes that make it possible to restart tasks on the same hardware.
This is done by killing and respawning the jvms rather than reconnecting to existing
jvms, for a couple reasons. One is that it lets you restore tasks after server reboots
too, and another is that it lets you upgrade all the software on a box at once by just
restarting everything.
The main changes are,
stop gracefully, and actually do a graceful stop. RealtimeIndexTask is the only
one that currently implements this.
ThreadPoolTaskRunner- call stopGracefully on restorable tasks, wait for exit
ForkingTaskRunner- close output stream to restorable tasks, wait for exit
RemoteTaskRunner- do nothing special, we actually don't want to shutdown
Only ForkingTaskRunner does anything here. It maintains a "restore.json" file with
a list of restorable tasks.
task and a zombie old task from stomping on each other.