-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
overlord helpers framework and tasklog auto cleanup #3677
Conversation
|
||
/** | ||
*/ | ||
public interface OverlordHelper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While you are in this code, u should also consider merging overlord and coordinator :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm, actually we can. I believe we have had the desire for quite some time and it was discussed in the dev sync meeting too and agreed upon.
I would imagine introducing a CliMaster (a "master" node) that binds/start everything that coordinator and overlords do. also, have only one "leader election" for the "leader master" instead of the two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@himanshug Yeah that would be pretty great, we don't have to do it as part of this PR though, but I think it would be a really useful to have
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, will do that in separate PR. let us keep this one as is.
if (exec == null) { | ||
exec = execFactory.create(1, "Overlord-Helper-Manager-Exec--%d"); | ||
} | ||
helper.schedule(exec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic is basically just repeating what the coordinator is doing
high level comment, can we have this as a coordinator helper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it is similar. but, i did it on overlord because tasks and tasklogs are owned by overlord and their retention should also be handled at the overlord.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the executor is having issues and rejecting execution, what is the intended behavior here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that would be a fatal situation, will fail the lifecycle start and consequently prohibit druid process startup.... which is intended behavior in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool
public interface OverlordHelper | ||
{ | ||
boolean isEnabled(); | ||
void schedule(ScheduledExecutorService exec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the method here should be run()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the way i structured things are that helpers schedule themselves on the executor supplied by manager. this is done so that all helpers can have their independent schedule and can potentially run in parallel.
|Property|Description|Default| | ||
|--------|-----------|-------| | ||
|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false| | ||
|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In seconds, task logs to be retained created in last x seconds. |None| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default value - Long.MAX_VALUE
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default does not matter because it is forced to be provided by the user when enabled. Long.MAX_VALUE is there just because some value needs to be stored there when it is not enabled.
|--------|-----------|-------| | ||
|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false| | ||
|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In seconds, task logs to be retained created in last x seconds. |None| | ||
|`druid.indexer.logs.kill.initialDelay`| Optional. Number of seconds after overlord start when first auto kill is run. |random value less than 300 (5 mins)| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
random value between 1 and 5 mins ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dint want to make document too confusing by declaring exact randomness behavior, i think knowing that its some random value less than 5 mins is enough.
Quasi-related #401 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is clock skew between this service and the HDFS server's timestamps, how is that expected to be handled or detected?
Do you expect it to be a problem?
Can such a scenario be documented in the property docs?
@@ -48,4 +48,10 @@ public void killAll() throws IOException | |||
{ | |||
log.info("Noop: No task logs are deleted."); | |||
} | |||
|
|||
@Override | |||
public void kill(long beforeTimestamp) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be renamed killBefore
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to void killOlderThan(long timestamp)
|Property|Description|Default| | ||
|--------|-----------|-------| | ||
|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false| | ||
|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In seconds, task logs to be retained created in last x seconds. |None| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most places use either Milliseconds or a duration string, can you pick one here and use those?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to milliseconds
|--------|-----------|-------| | ||
|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false| | ||
|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In seconds, task logs to be retained created in last x seconds. |None| | ||
|`druid.indexer.logs.kill.initialDelay`| Optional. Number of seconds after overlord start when first auto kill is run. |random value less than 300 (5 mins)| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing about either milliseconds or a Duration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
milliseconds
|`druid.indexer.logs.kill.enabled`|Boolean value for whether to enable deletion of old task logs. |false| | ||
|`druid.indexer.logs.kill.durationToRetain`| Required if kill is enabled. In seconds, task logs to be retained created in last x seconds. |None| | ||
|`druid.indexer.logs.kill.initialDelay`| Optional. Number of seconds after overlord start when first auto kill is run. |random value less than 300 (5 mins)| | ||
|`druid.indexer.logs.kill.delay`|Optional. Number of seconds of delay between successive executions of auto kill run. |21600 (6 hours)| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing about milliseconds or a string Duration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
milliseconds
FileSystem fs = taskLogDir.getFileSystem(hadoopConfig); | ||
if (fs.exists(taskLogDir)) { | ||
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(taskLogDir); | ||
while (iter.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no failure recovery in this loop, and I'm forgetting how HDFS handles various issues, is this intended to just fail this kill run if there are any sort of issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if hdfs has an issue while iteration, it will throw some exception which will be caught by the caller. current caller, i.e. anonymous Runnable in TaskLogAutoCleaner.schedule(..) caches the exception and logs it as error. So behavior would be that, not all older logs are killed in that particular run and will be tried again later.
} | ||
|
||
if (Thread.interrupted()) { | ||
throw new IOException("Thread interrupted. Couldn't delete all tasklogs."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about wrapping an InterruptedException
in an IOException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
taskLogs.pushTaskLog("log2", logFile); | ||
Assert.assertEquals("log2content", readLog(taskLogs, "log2", 0)); | ||
|
||
taskLogs.kill(time); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before here, can you check and make sure the modified time for the old file is before time
and the new file is after time
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if (exec == null) { | ||
exec = execFactory.create(1, "Overlord-Helper-Manager-Exec--%d"); | ||
} | ||
helper.schedule(exec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the executor is having issues and rejecting execution, what is the intended behavior here?
Also, as a general comment, before the overlord and coordinator are merged, I'd like to see the Helper stuff go into an extensible form. |
@drcrallen regarding ovelord and coordinator merge, helper stuff introduced here is already in an extensible form. |
8bec961
to
ff703d3
Compare
@fjy @drcrallen @pjain1 resolved all comments. |
ff703d3
to
4c2588e
Compare
👍 |
@drcrallen all comments were addressed, pls take a relook |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this is looking really good. I'm excited to see how the Overlord/Coordinator merge stuff comes along.
Minor changes suggested that should help future maintainers.
public void killOlderThan(final long timestamp) throws IOException | ||
{ | ||
File taskLogDir = config.getDirectory(); | ||
if (taskLogDir.exists()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest && taskLogDir.isDirectory()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, throwing an IOException if it is not a directory would be a reasonable alternative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final Set<OverlordHelper> helpers; | ||
|
||
private volatile ScheduledExecutorService exec; | ||
private final Object lock = new Object(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest renaming to startStopLock
or similar to make it clearer what its used for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
this.enabled = enabled; | ||
this.initialDelay = initialDelay == null ? 60000 + new Random().nextInt(4*60000) : initialDelay.longValue(); | ||
this.delay = delay == null ? 6*60*60*1000 : delay.longValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest sanity check for positive number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
this.enabled = enabled; | ||
this.initialDelay = initialDelay == null ? 60000 + new Random().nextInt(4*60000) : initialDelay.longValue(); | ||
this.delay = delay == null ? 6*60*60*1000 : delay.longValue(); | ||
this.durationToRetain = durationToRetain == null ? Long.MAX_VALUE : durationToRetain.longValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest sanity check for positive number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
this.enabled = enabled; | ||
this.initialDelay = initialDelay == null ? 60000 + new Random().nextInt(4*60000) : initialDelay.longValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest sanity check for positive number.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
/** | ||
*/ | ||
public class TaskLogAutoCleanerConfigTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest adding test for defaults.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
4c2588e
to
1c06c7e
Compare
1c06c7e
to
8293b42
Compare
@drcrallen any more comments? |
👍 |
if (taskLogDir.exists()) { | ||
|
||
if (!taskLogDir.isDirectory()) { | ||
throw new IOException(String.format("taskLogDir [%s] must be a directory.", taskLogDir)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For future reference IOE
would work best here, not worth blocking on though.
* overlord helpers framework and tasklog auto cleanup * review comment changes * further review comments addressed
…cleanup) * overlord helpers framework and tasklog auto cleanup * review comment changes * further review comments addressed
We push the task logs to HDFS which fill up the scarce namespace and whose deletion is currently managed outside of Druid. This is a major inconvenience to most of the teams using Druid and most of them are deleting those manually.
This patch introduces a feature to configure Overlord to automatically delete older task logs. See updates to
docs/content/configuration/indexing-service.md
for the user documentation.Currently, implementation is only provided for local and hdfs type tasklogs.
Framework introduced can be used to manage retention of other tasks related persisted state but that is for the future.