-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7790: Fix Bugs in Trogdor Task Expiration #6103
Conversation
This commit changes a Trogdor agent/coordinator's behavior to not run tasks that have expired. We define an expired task as one whose sum of `startedMs` and `durationMs` is less than the current time in milliseconds.
void tryCreate() { | ||
try { | ||
client.createWorker(new CreateWorkerRequest(workerId, taskId, spec)); | ||
if (startedMs == -1) | ||
startedMs = time.milliseconds(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the Coordinator keeps track of when it has first ran the task - useful in cases where spec.startMs==0
if (!worker.hasExpired()) { | ||
worker.tryCreate(); | ||
} else { | ||
log.info("{}: Will not create worker state {} as it has expired. ", node.name(), worker.state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in the case where a Coordinator re-schedules a task when an Agent is detected via the heartbeats
@@ -336,6 +336,9 @@ public Void call() throws Exception { | |||
} catch (Throwable t) { | |||
failure = "Failed to create TaskController: " + t.getMessage(); | |||
} | |||
if (spec.hasExpired(time, -1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is where a Coordinator is given a brand new task that is expired
…nt NPathComplexity
Retest this please |
public boolean hasExpired(Time time, long startedMs) { | ||
long startMs = this.startMs > 0 ? this.startMs : startedMs; | ||
if (startMs <= 0) // task doesn't have a start time yet | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be special-casing 0 here. In general, I think the only thing we need here is an accessor function like endMs
which returns startMs + durationMs
.
@@ -250,6 +250,10 @@ synchronized void waitForQuiescence() throws InterruptedException { | |||
this.reference = shutdownManager.takeReference(); | |||
} | |||
|
|||
boolean hasExpired() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if it makes sense to add this function just for a single caller.
We could just check if (spec.endMs() >= time.milliseconds())
below...
Also reverts back changes to NodeManager
…t NPathComplexity
retest this please |
JDK11 failure seems unrelated - |
LGTM |
The Trogdor Coordinator now overwrites a task's startMs to the time it received it if startMs is in the past. The Trogdor Agent now correctly expires a task after the expiry time (startMs + durationMs) passes. Previously, it would ignore startMs and expire after durationMs milliseconds of local start of the task. Reviewed-by: Colin P. McCabe <cmccabe@apache.org>
https://issues.apache.org/jira/browse/KAFKA-7790
Changes:
startMs
to the time it received it ifstartMs
is in the past.startMs + durationMs
) passes. Previously, it would ignorestartMs
and expire afterdurationMs
milliseconds of local start of the task.