-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async task client for SeekableStreamSupervisors. #13354
Conversation
In this patch |
Main changes: 1) Convert SeekableStreamIndexTaskClient to an interface, move old code to SeekableStreamIndexTaskClientSyncImpl, and add new implementation SeekableStreamIndexTaskClientAsyncImpl that uses ServiceClient. 2) Add "chatAsync" parameter to seekable stream supervisors that causes the supervisor to use an async task client. 3) In SeekableStreamSupervisor.discoverTasks, adjust logic to avoid making blocking RPC calls in workerExec threads. 4) In SeekableStreamSupervisor generally, switch from Futures.successfulAsList to FutureUtils.coalesce, so we can better capture the errors that occurred with contacting individual tasks. Other, related changes: 1) Add ServiceRetryPolicy.retryNotAvailable, which controls whether ServiceClient retries unavailable services. Useful since we do not want to retry calls unavailable tasks within the service client. (The supervisor does its own higher-level retries.) 2) Add FutureUtils.transformAsync, a more lambda friendly version of Futures.transform(f, AsyncFunction). 3) Add FutureUtils.coalesce. Similar to Futures.successfulAsList, but returns Either instead of using null on error. 4) Add JacksonUtils.readValue overloads for JavaType and TypeReference.
20a0947
to
afd2bad
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the improvement @gianm , the changes look good.
I have added some minor suggestions.
The only thing I am not clear on is the CONNECT_EXEC_THREADS being hardcoded to 4. Since these threads are all meant to do network IO and would basically remain blocked until a response is received, wouldn't we benefit from a higher/configurable number to allow more concurrency when chatting to multiple tasks? Or maybe I have misunderstood how the CONNECT_EXEC_THREADS are used by the ServiceClient
.
* Like {@link Futures#transform(ListenableFuture, AsyncFunction)}, but works better with lambdas due to not having | ||
* overloads. | ||
* | ||
* One can write {@code FutureUtils.transform(future, v -> ...)} instead of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* One can write {@code FutureUtils.transform(future, v -> ...)} instead of | |
* One can write {@code FutureUtils.transformAsync(future, v -> ...)} instead of |
} | ||
|
||
/** | ||
* Like {@link Futures#successfulAsList}, but returns {@link Either} instead of using {@code} null in case of error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Like {@link Futures#successfulAsList}, but returns {@link Either} instead of using {@code} null in case of error. | |
* Like {@link Futures#successfulAsList}, but returns {@link Either} instead of using {@code null} in case of error. |
|
||
@JsonProperty("chatAsync") | ||
@JsonInclude(JsonInclude.Include.NON_NULL) | ||
Boolean getChatAsyncConfigured() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to retain the boxed value in order to have desired behaviour when we switch the default value of chatAsync
to true
?
Style-wise, I think it might be simpler and similar to the other fields if we just name this method as getChatAsync()
and mark it as @JsonProperty
with the other one just being called chatAsync
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it this way because I didn't want chatAsync
to appear in serialized tuningConfigs unless it was actually set by the user. And, I wanted us to be able to change the default in the future, and have everyone get the new default if they hadn't explicitly set this. Let me know what you think of that rationale.
{ | ||
return Futures.transform( | ||
taskClient.getStatusAsync(taskId), | ||
new AsyncFunction<SeekableStreamIndexTaskRunner.Status, Pair<SeekableStreamIndexTaskRunner.Status, Map<PartitionIdType, SequenceOffsetType>>>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: Maybe use the new FutureUtils.transform
here and create a new class for the Pair<Status, Map>
to make this more readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. It's only used in one place and a new class would be a bunch of boilerplate. I tried switching to FutureUtils.transformAsync
and adding javadocs. Let me know if you think it's clear enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thanks.
{ | ||
final ServiceRetryPolicy retryPolicy = makeRetryPolicy(taskId, retry); | ||
final SeekableStreamTaskLocator locator = new SeekableStreamTaskLocator(taskInfoProvider, taskId); | ||
return serviceClientFactory.makeClient(taskId, locator, retryPolicy); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this create a new client on every call? Would it make sense to cache this to help with the preferred location in case of redirects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it this way because I didn't want to deal with figuring out how long to keep the clients for, and they're pretty cheap to create. The only state they have is the cache of redirects. But, tasks don't do redirects — only leadery things like Coordinators and Overlords do that. So I think it's OK. I added some comments:
// We're creating a new locator for each request and not closing it. This is OK, since SeekableStreamTaskLocator
// is stateless, cheap to create, and its close() method does nothing.
final SeekableStreamTaskLocator locator = new SeekableStreamTaskLocator(taskInfoProvider, taskId);
// We're creating a new client for each request. This is OK, clients are cheap to create and do not contain
// state that is important for us to retain across requests. (The main state they retain is preferred location
// from prior redirects; but tasks don't do redirects.)
return serviceClientFactory.makeClient(taskId, locator, retryPolicy);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
The idea is that they aren't doing very much, just handling scheduled retries, so we don't need very many. I'm not sure if 4 threads is always going to be enough but I don't have a better idea about what to set it to. I did do some testing with MSQ shuffle (another user of this) and found that it was not limiting to set this to 4, even when sending a lot of data around. I figure that if we ever discover it isn't enough, we could adjust it at that time. |
Thanks for the clarification. Since this has been working fine on a cluster with ~1k tasks, I guess it'll hold up. |
This functionality was originally added in apache#13354.
This functionality was originally added in #13354.
Motivations:
chatThreads
. Shortens the time it takes to execute RunNotices when there are a lot of tasks. Also shortens the time for other operations that require contacting all tasks, like getting task reports.chatThreads
.workerThreads
, mainly due to the restructuring ofdiscoverTasks
.Main changes:
Convert SeekableStreamIndexTaskClient to an interface, move old code
to SeekableStreamIndexTaskClientSyncImpl, and add new implementation
SeekableStreamIndexTaskClientAsyncImpl that uses ServiceClient.
Add "chatAsync" parameter to seekable stream supervisors that causes
the supervisor to use an async task client.
In SeekableStreamSupervisor.discoverTasks, adjust logic to avoid making
blocking RPC calls in workerExec threads.
In SeekableStreamSupervisor generally, switch from Futures.successfulAsList
to FutureUtils.coalesce, so we can better capture the errors that occurred
with contacting individual tasks.
Other, related changes:
Add ServiceRetryPolicy.retryNotAvailable, which controls whether
ServiceClient retries unavailable services. Useful since we do not
want to retry calls unavailable tasks within the service client. (The
supervisor does its own higher-level retries.)
Add FutureUtils.transformAsync, a more lambda friendly version of
Futures.transform(f, AsyncFunction).
Add FutureUtils.coalesce. Similar to Futures.successfulAsList, but
returns Either instead of using null on error.
Add JacksonUtils.readValue overloads for JavaType and TypeReference.