New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix worker thread pool exhaustion bug #3760
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just some minor comments.
@@ -255,6 +257,12 @@ public boolean apply(TaskRunnerWorkItem taskRunnerWorkItem) | |||
} | |||
}; | |||
|
|||
this.futureTimeout = Math.max( | |||
120, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might as well make this a constant too, along with the other stuff you made constants.
@@ -174,6 +175,7 @@ public TaskGroup(ImmutableMap<Integer, Long> partitionOffsets, Optional<DateTime | |||
private final KafkaTuningConfig taskTuningConfig; | |||
private final String supervisorId; | |||
private final TaskInfoProvider taskInfoProvider; | |||
private final long futureTimeout; // how long to wait for async operations to complete |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
include "in seconds"; millis is more common so people will assume that if you leave the units off.
@@ -1420,7 +1432,7 @@ public Void apply(@Nullable List<Void> input) | |||
{ | |||
return null; | |||
} | |||
}, workerExec | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment: this code would be more readable if this pattern was expressed in a function like ListenableFuture<Void> asVoidFuture(ListenableFuture<?> future)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may also be fine in a lot of situations to just cast it, since presumably nobody is actually trying to cast the returned Void value to a Void, so there should never actually be a bad cast at runtime. But, eh.
} | ||
}, workerExec | ||
); | ||
return asVoidFuture(Futures.successfulAsList(futures)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe stopTaskInGroup()
could return ListenableFuture<?>
and this wrapping asVoidFuture
is not needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leventov I like this refactor, thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
👍 |
* fix worker thread pool exhaustion bug * code review changes * code review changes
Fixed a bug where the supervisor can hang waiting for a thread from the worker thread pool which never becomes available. Also add a timeout for async operations as a safety for these kind of issues, and fix the unit test which should have caught this issue but was broken.