Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Unary Callables Deadline values respect the TotalTimeout in RetrySettings #1603

Merged
merged 143 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
143 commits
Select commit Hold shift + click to select a range
dca83d7
chore: Add retry test
lqiu96 Mar 31, 2023
45c0765
chore: Check the timeout for unary callables
lqiu96 Apr 3, 2023
7862762
chore: Fix tests
lqiu96 Apr 4, 2023
7f5622f
chore: Address code smell
lqiu96 Apr 4, 2023
94b5017
chore: Add tests for DEADLINE_EXCEEDED
lqiu96 Apr 4, 2023
608d44b
chore: Add tests for Server-side streaming
lqiu96 Apr 4, 2023
0c1782f
chore: Start the timeout after http request invocation
lqiu96 Apr 4, 2023
f75f67e
chore: Fix format issues
lqiu96 Apr 4, 2023
7c4c841
chore: Remove Instant calculation with System clock
lqiu96 Apr 5, 2023
3668093
Merge branch 'main' into main-showcase_retries
lqiu96 Apr 5, 2023
d440b8e
chore: Add ITRetry test cases
lqiu96 Apr 5, 2023
bbf7f4f
chore: Fix the Retry showcase test
lqiu96 Apr 6, 2023
bee8c0e
chore: Fix the Retry showcase test
lqiu96 Apr 6, 2023
73882a7
chore: Convert duration to nanos
lqiu96 Apr 6, 2023
b0d83b9
chore: Set the readTimeout min to 20s
lqiu96 Apr 6, 2023
fc5cf13
chore: Add sucessful retry tests cases
lqiu96 Apr 6, 2023
2f1bc8a
chore: Add comments to timeout
lqiu96 Apr 6, 2023
ead0eb9
chore: Update the connect timeout
lqiu96 Apr 6, 2023
ca0770b
chore: Refactor timeout logic
lqiu96 Apr 7, 2023
21ed291
chore: Fix format issues
lqiu96 Apr 10, 2023
bb39513
chore: Add logic for deadlineScheduler
lqiu96 Apr 13, 2023
fab86be
chore: Fix format issues
lqiu96 Apr 13, 2023
95ef1a1
chore: Update logic
lqiu96 Apr 14, 2023
25190ec
chore: Update comment
lqiu96 Apr 14, 2023
f59de69
Merge branch 'main' into main-showcase_retries
lqiu96 Apr 14, 2023
133af94
chore: Update showcase test
lqiu96 Apr 14, 2023
ff5108f
chore: Fix format issues
lqiu96 Apr 14, 2023
e800373
chore: Fix logic
lqiu96 Apr 14, 2023
d757c05
Merge branch 'main' into main-showcase_retries
lqiu96 Apr 14, 2023
f139797
chore: Do not disconnect the connection
lqiu96 Apr 14, 2023
e1d12c5
chore: Disconnect after end
lqiu96 Apr 14, 2023
8ee7513
chore: Resolve steam close error
lqiu96 Apr 14, 2023
bed960e
chore: Fix disconnect logic
lqiu96 Apr 14, 2023
5c5eaec
chore: Fix disconnect logic
lqiu96 Apr 14, 2023
ab93117
chore: Update CI
lqiu96 Apr 14, 2023
f3d19b6
chore: Fix native test
lqiu96 Apr 14, 2023
1e38e5f
chore: Revert changes
lqiu96 Apr 17, 2023
5a10f4c
chore: try with rpc timeout 100ms
lqiu96 Apr 17, 2023
9b877ac
chore: Fix format issues
lqiu96 Apr 17, 2023
eff3513
chore: Re-run delivery loop with deadlineschedule priority
lqiu96 Apr 17, 2023
1d1dffa
chore: Check for timeoutExceeded
lqiu96 Apr 17, 2023
887f3bb
chore: Do not send message is time exceeded
lqiu96 Apr 17, 2023
102ab99
chore: Fix format issues
lqiu96 Apr 17, 2023
3f4b9a5
chore: Add timeout for tests
lqiu96 Apr 17, 2023
dffe194
chore: Fix format issues
lqiu96 Apr 17, 2023
47c8da6
chore: Refactor trailer logic
lqiu96 Apr 17, 2023
e5ccddd
chore: Refactor trailer logic
lqiu96 Apr 17, 2023
fdf6c63
chore: Rename variables
lqiu96 Apr 17, 2023
64b2c77
chore: Increase the wait to 1s
lqiu96 Apr 17, 2023
c5c2f54
chore: Fix format issues
lqiu96 Apr 17, 2023
d0893ea
chore: Set closed var as volatile
lqiu96 Apr 17, 2023
1281ab5
chore: Update logic for onClose
lqiu96 Apr 17, 2023
1ae5d3d
chore: Attempt with longer timeout
lqiu96 Apr 17, 2023
36e3788
chore: Empty commit
lqiu96 Apr 17, 2023
452fc97
chore: Fix format issues
lqiu96 Apr 17, 2023
0ad9442
chore: Trigger deliver loop instead of notifyListeners
lqiu96 Apr 17, 2023
0ad7a4d
chore: Remove variable
lqiu96 Apr 17, 2023
a769ee1
chore: Remove variable
lqiu96 Apr 17, 2023
65e9e67
chore: Fix close logic
lqiu96 Apr 17, 2023
0b926d5
chore: Revert graalvm ci
lqiu96 Apr 18, 2023
6b70a50
chore: Use 2s as delay
lqiu96 Apr 18, 2023
9af022a
chore: Update to 5s delay
lqiu96 Apr 18, 2023
66c757c
Merge branch 'main' into main-showcase_retries
lqiu96 Apr 18, 2023
fc59f98
chore: Add comments for timeout method
lqiu96 Apr 18, 2023
b773f89
chore: Use deliver loop in timeout
lqiu96 Apr 18, 2023
b571b6e
chore: Run matrix jobs sequentially
lqiu96 Apr 18, 2023
f97e7c0
chore: Fix format issues
lqiu96 Apr 18, 2023
1aec63c
chore: Fix format issues
lqiu96 Apr 18, 2023
de5927d
chore: Increase the wait to 10s
lqiu96 Apr 18, 2023
6a337c5
chore: Use 110ms delay
lqiu96 Apr 19, 2023
8be44c8
chore: Set delay to be 30s
lqiu96 Apr 19, 2023
42dd7ed
chore: Fix format issues
lqiu96 Apr 19, 2023
1de756c
chore: Log the onClose message
lqiu96 Apr 19, 2023
8756e27
chore: Remove localRunnable
lqiu96 Apr 19, 2023
51df7ea
chore: Fix format issues
lqiu96 Apr 19, 2023
a9ff512
chore: Lower the retry amounts
lqiu96 Apr 19, 2023
8f1627e
chore: Lower the retry amounts
lqiu96 Apr 19, 2023
becbc25
chore: Fix shouldRetry logic
lqiu96 Apr 19, 2023
c1f609f
chore: Log results of shouldRetry
lqiu96 Apr 19, 2023
be1f9fb
chore: Ignore other retry tests
lqiu96 Apr 19, 2023
490ccc1
chore: Add more logging
lqiu96 Apr 19, 2023
3a9bdfd
chore: Fix shouldRetry logic
lqiu96 Apr 19, 2023
87a41dc
chore: Remove small optimization
lqiu96 Apr 19, 2023
da26a56
chore: Temp ignore tests
lqiu96 Apr 19, 2023
0978549
chore: Temp ignore tests
lqiu96 Apr 19, 2023
93c8fdc
chore: Add more logging
lqiu96 Apr 19, 2023
2a9d2df
chore: revert back to checking for negative duration
lqiu96 Apr 19, 2023
a517d95
chore: Revert ignored test
lqiu96 Apr 19, 2023
0f8bbc8
chore: Fix logging
lqiu96 Apr 19, 2023
c1f914f
chore: Log timeout
lqiu96 Apr 19, 2023
6bb2d04
chore: Set min RPC timeout to be 1ms
lqiu96 Apr 19, 2023
3da7937
chore: Update the retry algorithms
lqiu96 Apr 19, 2023
d28b87f
chore: Clean up the algoritms
lqiu96 Apr 19, 2023
7c316a1
chore: Uncomment out ITRetry tests
lqiu96 Apr 19, 2023
ae3c2ec
chore: Refactor the retryAlgorithms
lqiu96 Apr 19, 2023
3fb78f7
chore: Add more comments
lqiu96 Apr 19, 2023
8cd3e3b
chore: Add in the parallel execution for ITs
lqiu96 Apr 19, 2023
b8704ff
chore: Add LRO showcase tests
lqiu96 Apr 20, 2023
eb6635a
chore: Fix format
lqiu96 Apr 20, 2023
0b76faf
chore: Remove deadline getters
lqiu96 Apr 20, 2023
acc3ee1
chore: Remove sonar changes
lqiu96 Apr 20, 2023
8b6445e
chore: Fix algorithm test
lqiu96 Apr 20, 2023
d458167
chore: Log the flaky test
lqiu96 Apr 20, 2023
09e7ff2
chore: Fix format
lqiu96 Apr 20, 2023
c20ee95
chore: Check for rpcTimeout being zero or negative
lqiu96 Apr 20, 2023
6552dbe
chore: Fix tests
lqiu96 Apr 20, 2023
0454e02
chore: Fix format issues
lqiu96 Apr 20, 2023
a1dcfdd
chore: Remove unused code
lqiu96 Apr 20, 2023
f9afed4
chore: Update comment for RetryAlgorithm
lqiu96 Apr 20, 2023
1d5c00a
chore: Fix format issues
lqiu96 Apr 20, 2023
a7983d3
chore: Use millis for timeout
lqiu96 Apr 21, 2023
15448d2
chore: Await termination for clients
lqiu96 Apr 21, 2023
924f7a0
chore: Fix format issues
lqiu96 Apr 21, 2023
f4ba8af
chore: Update LRO first call timeout value
lqiu96 Apr 21, 2023
a305123
chore: Update LRO test names
lqiu96 Apr 21, 2023
6e35172
chore: Remove the parallel showcase tests
lqiu96 Apr 21, 2023
d4bc792
chore: Add showcase sequence tests for retries
lqiu96 Apr 21, 2023
ccdf1e2
chore: Add showcase sequence tests for retries
lqiu96 Apr 21, 2023
561ffa3
chore: Update retry test name
lqiu96 Apr 24, 2023
3607a30
chore: Fix typos
lqiu96 Apr 24, 2023
cb34160
chore: Fix server streaming callable test
lqiu96 Apr 25, 2023
cf5ba07
chore: Clean up tests
lqiu96 Apr 25, 2023
07ecd7a
chore: Remove retry tests
lqiu96 Apr 25, 2023
b4812e3
chore: Fix format issues
lqiu96 Apr 25, 2023
ed6b53c
chore: Address PR comments
lqiu96 May 1, 2023
2095745
chore: Update java.time.Duration import
lqiu96 May 1, 2023
a8e3153
chore: Update values for LRO showcase test
lqiu96 May 1, 2023
760f5fe
chore: Update values for LRO showcase test
lqiu96 May 1, 2023
d890dee
chore: Fix format issues
lqiu96 May 1, 2023
3086821
chore: Update variable name
lqiu96 May 3, 2023
084b592
chore: Convert shouldRetry logic to use milliseconds
lqiu96 May 3, 2023
145d084
chore: Remove jitter from tests
lqiu96 May 3, 2023
e9c1645
chore: Fix showcase test
lqiu96 May 3, 2023
600a2dc
chore: Log the attempt callable timeout
lqiu96 May 3, 2023
5d81e85
chore: Update LRO test case
lqiu96 May 3, 2023
c429d96
chore: Add logging
lqiu96 May 3, 2023
5527700
chore: Use millis for timeout
lqiu96 May 4, 2023
13881bb
chore: Address PR comments
lqiu96 May 5, 2023
c052a87
chore: Update to use TestClientInitializer class
lqiu96 May 5, 2023
069acc6
Merge branch 'main' into main-showcase_retries
lqiu96 May 5, 2023
6cc15fb
chore: Fix client initialize method names
lqiu96 May 5, 2023
e099ac1
chore: Add back public method
lqiu96 May 8, 2023
0a3cadf
chore: Add back public methods
lqiu96 May 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/workflows/sonar.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,4 @@ jobs:
-Dsonar.projectKey=googleapis_gapic-generator-java_unit_tests \
-Dsonar.organization=googleapis \
-Dsonar.host.url=https://sonarcloud.io \
-Dsonar.projectName=java_showcase_unit_tests

-Dsonar.projectName=java_showcase_unit_tests
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.auth.Credentials;
import com.google.auto.value.AutoValue;
import com.google.protobuf.TypeRegistry;
import java.time.Duration;
import javax.annotation.Nullable;
import org.threeten.bp.Instant;

Expand All @@ -42,6 +43,9 @@
public abstract class HttpJsonCallOptions {
public static final HttpJsonCallOptions DEFAULT = newBuilder().build();

@Nullable
public abstract Duration getTimeout();

@Nullable
public abstract Instant getDeadline();

Expand Down Expand Up @@ -69,6 +73,13 @@ public HttpJsonCallOptions merge(HttpJsonCallOptions inputOptions) {
builder.setDeadline(newDeadline);
}

if (inputOptions.getTimeout() != null) {
Duration newTimeout = java.time.Duration.ofMillis(inputOptions.getTimeout().toMillis());
if (newTimeout != null) {
builder.setTimeout(newTimeout);
}
}

Credentials newCredentials = inputOptions.getCredentials();
if (newCredentials != null) {
builder.setCredentials(newCredentials);
Expand All @@ -84,6 +95,8 @@ public HttpJsonCallOptions merge(HttpJsonCallOptions inputOptions) {

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTimeout(java.time.Duration value);

public abstract Builder setDeadline(Instant value);

public abstract Builder setCredentials(Credentials value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@
import com.google.api.gax.httpjson.ApiMethodDescriptor.MethodType;
import com.google.api.gax.httpjson.HttpRequestRunnable.ResultListener;
import com.google.api.gax.httpjson.HttpRequestRunnable.RunnableResult;
import com.google.api.gax.rpc.StatusCode;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

Expand Down Expand Up @@ -88,6 +92,7 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT>
private final ApiMethodDescriptor<RequestT, ResponseT> methodDescriptor;
private final HttpTransport httpTransport;
private final Executor executor;
private final ScheduledExecutorService deadlineCancellationExecutor;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to reuse the executor above? I know it's not a ScheduledExecutorService at this point, but the what's being passed in is from here, which is indeed a ScheduledExecutorService, so we should be able to change the type and reuse it. I did a quick search and I don't think most of the change would be breaking, there is one public builder method that could be considered breaking.
The reason we may want to reuse the existing executor is that we can leverage the existing client lifecycles for shutting down the executor, otherwise we need to explicitly shut down this new deadlineCancellationExecutor somewhere to prevent from resource leaking, which is missing from this PR currently. Reusing the existing executor may also have other issues like the ThreadPool is not large enough so new requests may have to wait for new thread. The bottom line is that we need to be careful with anything related to executors, so we have enough resource for large number of concurrent requests and we don't leak resource accidentally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, makes sense. I'll take another look into how grpc handles the multiple executors. I'd imagine they would also possibly run into this issue.


//
// Request-specific data (provided by client code) before we get a response.
Expand All @@ -114,19 +119,21 @@ final class HttpJsonClientCallImpl<RequestT, ResponseT>
private ProtoMessageJsonStreamIterator responseStreamIterator;

@GuardedBy("lock")
private boolean closed;
private volatile boolean closed;
lqiu96 marked this conversation as resolved.
Show resolved Hide resolved

HttpJsonClientCallImpl(
ApiMethodDescriptor<RequestT, ResponseT> methodDescriptor,
String endpoint,
HttpJsonCallOptions callOptions,
HttpTransport httpTransport,
Executor executor) {
Executor executor,
ScheduledExecutorService deadlineCancellationExecutor) {
this.methodDescriptor = methodDescriptor;
this.endpoint = endpoint;
this.callOptions = callOptions;
this.httpTransport = httpTransport;
this.executor = executor;
this.deadlineCancellationExecutor = deadlineCancellationExecutor;
this.closed = false;
}

Expand Down Expand Up @@ -161,6 +168,38 @@ public void start(Listener<ResponseT> responseListener, HttpJsonMetadata request
this.listener = responseListener;
this.requestHeaders = requestHeaders;
}

// Use the timeout duration value instead of calculating the future Instant
// Only schedule the deadline if the RPC timeout has been set in the RetrySettings
Duration timeout = callOptions.getTimeout();
if (timeout != null) {
// The future timeout value is guaranteed to not be a negative value as the
// RetryAlgorithm will not retry
long timeoutMs = timeout.toMillis();
this.deadlineCancellationExecutor.schedule(this::timeout, timeoutMs, TimeUnit.MILLISECONDS);
}
}

// Notify the FutureListener that the there is a timeout exception from this RPC
// call (DEADLINE_EXCEEDED). For retrying RPCs, this code is returned for every attempt
// that exceeds the timeout. The RetryAlgorithm will check both the timing and code to
// ensure another attempt is made.
private void timeout() {
// There is a race between the deadline scheduler and response being returned from
// the server. The deadline scheduler has priority as it will clear out the pending
// notifications queue and add the DEADLINE_EXCEEDED event once it is able to obtain
// the lock.
synchronized (lock) {
close(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment in close method makes me worried that we may still have to wait for the server to fully send the response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the cancellation in the runnable doesn't seem like it's actually cancel's the runnable. I'll look into the possibility of being able to actually disconnect the connection instead of waiting for the socket timeout.

StatusCode.Code.DEADLINE_EXCEEDED.getHttpStatusCode(),
"Deadline exceeded",
new HttpJsonStatusRuntimeException(
StatusCode.Code.DEADLINE_EXCEEDED.getHttpStatusCode(), "Deadline exceeded", null),
true);
}

// trigger delivery loop if not already running
deliver();
}

@Override
Expand Down Expand Up @@ -260,9 +299,10 @@ private void deliver() {
throw new InterruptedException("Message delivery has been interrupted");
}

// All listeners must be called under delivery loop (but outside the lock) to ensure that no
// two notifications come simultaneously from two different threads and that we do not go
// indefinitely deep in the stack if delivery logic is called recursively via listeners.
// All listeners must be called under delivery loop (but outside the lock) to ensure that
// no two notifications come simultaneously from two different threads and that we do not
// go indefinitely deep in the stack if delivery logic is called recursively via
// listeners.
notifyListeners();

// The synchronized block around message reading and cancellation notification processing
Expand Down Expand Up @@ -302,7 +342,7 @@ private void deliver() {
inDelivery = false;
break;
} else {
// We still have some stuff in notiticationTasksQueue so continue the loop, most
// We still have some stuff in notificationTasksQueue so continue the loop, most
// likely we will finally terminate on the next cycle.
continue;
}
Expand All @@ -319,8 +359,8 @@ private void deliver() {
// can do in such an unlikely situation (otherwise we would stay forever in the delivery
// loop).
synchronized (lock) {
// Close the call immediately marking it cancelled. If already closed close() will have no
// effect.
// Close the call immediately marking it cancelled. If already closed, close() will have
// no effect.
close(ex.getStatusCode(), ex.getMessage(), ex, true);
}
}
Expand Down Expand Up @@ -352,7 +392,7 @@ private boolean consumeMessageFromStream() throws IOException {
boolean allMessagesConsumed;
Reader responseReader;
if (methodDescriptor.getType() == MethodType.SERVER_STREAMING) {
// Lazily initialize responseStreamIterator in case if it is a server steraming response
// Lazily initialize responseStreamIterator in case if it is a server streaming response
if (responseStreamIterator == null) {
responseStreamIterator =
new ProtoMessageJsonStreamIterator(
Expand Down Expand Up @@ -384,7 +424,7 @@ private boolean consumeMessageFromStream() throws IOException {

@GuardedBy("lock")
private void close(
int statusCode, String message, Throwable cause, boolean terminateImmediatelly) {
int statusCode, String message, Throwable cause, boolean terminateImmediately) {
try {
if (closed) {
return;
Expand All @@ -399,12 +439,12 @@ private void close(
requestRunnable = null;
}

HttpJsonMetadata.Builder meatadaBuilder = HttpJsonMetadata.newBuilder();
HttpJsonMetadata.Builder metadataBuilder = HttpJsonMetadata.newBuilder();
if (runnableResult != null && runnableResult.getTrailers() != null) {
meatadaBuilder = runnableResult.getTrailers().toBuilder();
metadataBuilder = runnableResult.getTrailers().toBuilder();
}
meatadaBuilder.setException(cause);
meatadaBuilder.setStatusMessage(message);
metadataBuilder.setException(cause);
metadataBuilder.setStatusMessage(message);
if (responseStreamIterator != null) {
responseStreamIterator.close();
}
Expand All @@ -415,19 +455,19 @@ private void close(
// onClose() suppresses all other pending notifications.
// there should be no place in the code which inserts something in this queue before checking
// the `closed` flag under the lock and refusing to insert anything if `closed == true`.
if (terminateImmediatelly) {
if (terminateImmediately) {
// This usually means we are cancelling the call before processing the response in full.
// It may happen if a user explicitly cancels the call or in response to an unexpected
// exception either from server or a call listener execution.
pendingNotifications.clear();
}

pendingNotifications.offer(
new OnCloseNotificationTask<>(listener, statusCode, meatadaBuilder.build()));
new OnCloseNotificationTask<>(listener, statusCode, metadataBuilder.build()));

} catch (Throwable e) {
// suppress stream closing exceptions in favor of the actual call closing cause. This method
// should not throw, otherwise we may stuck in an infinite loop of exception processing.
// should not throw, otherwise we may be stuck in an infinite loop of exception processing.
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
import com.google.api.gax.rpc.ApiCallContext;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
import org.threeten.bp.Instant;
import org.threeten.bp.Duration;

/**
* {@code HttpJsonClientCalls} creates a new {@code HttpJsonClientCAll} from the given call context.
Expand All @@ -50,12 +49,25 @@ public static <RequestT, ResponseT> HttpJsonClientCall<RequestT, ResponseT> newC

HttpJsonCallContext httpJsonContext = HttpJsonCallContext.createDefault().nullToSelf(context);

// Try to convert the timeout into a deadline and use it if it occurs before the actual deadline
// Use the context's timeout instead of calculating a future deadline with the System clock.
// The timeout value is calculated from TimedAttemptSettings which accounts for the
// TotalTimeout value set in the RetrySettings.
if (httpJsonContext.getTimeout() != null) {
@Nonnull Instant newDeadline = Instant.now().plus(httpJsonContext.getTimeout());
HttpJsonCallOptions callOptions = httpJsonContext.getCallOptions();
if (callOptions.getDeadline() == null || newDeadline.isBefore(callOptions.getDeadline())) {
callOptions = callOptions.toBuilder().setDeadline(newDeadline).build();
// HttpJsonChannel expects the HttpJsonCallOptions and we store the timeout duration
// inside the HttpJsonCallOptions
// Note: There is manual conversion between threetenbp's Duration and java.util.Duration
// This is temporary here as we plan to migrate to java.util.Duration
if (callOptions.getTimeout() == null
|| httpJsonContext
.getTimeout()
.compareTo(Duration.ofMillis(callOptions.getTimeout().toMillis()))
< 0) {
callOptions =
callOptions
.toBuilder()
.setTimeout(java.time.Duration.ofMillis(httpJsonContext.getTimeout().toMillis()))
blakeli0 marked this conversation as resolved.
Show resolved Hide resolved
.build();
httpJsonContext = httpJsonContext.withCallOptions(callOptions);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;
import org.threeten.bp.Instant;

/** A runnable object that creates and executes an HTTP request. */
class HttpRequestRunnable<RequestT, ResponseT> implements Runnable {
Expand Down Expand Up @@ -100,24 +99,22 @@ void cancel() {

@Override
public void run() {
HttpResponse httpResponse = null;
RunnableResult.Builder result = RunnableResult.builder();
HttpJsonMetadata.Builder trailers = HttpJsonMetadata.newBuilder();
HttpRequest httpRequest = null;
HttpResponse httpResponse = null;
try {
// Check if already cancelled before even creating a request
if (cancelled) {
return;
}
httpRequest = createHttpRequest();
HttpRequest httpRequest = createHttpRequest();
// Check if already cancelled before sending the request;
if (cancelled) {
return;
}

httpResponse = httpRequest.execute();

// Check if already cancelled before sending the request;
// Check if already cancelled before trying to construct and read the response
if (cancelled) {
httpResponse.disconnect();
return;
Expand Down Expand Up @@ -145,6 +142,9 @@ public void run() {
}
trailers.setException(e);
} finally {
// If cancelled, `close()` in HttpJsonClientCallImpl has already been invoked
// and returned a DEADLINE_EXCEEDED error back so there is no need to set
// a result back.
if (!cancelled) {
resultListener.setResult(result.setTrailers(trailers.build()).build());
}
Expand Down Expand Up @@ -191,16 +191,6 @@ HttpRequest createHttpRequest() throws IOException {

HttpRequest httpRequest = buildRequest(requestFactory, url, jsonHttpContent);

Instant deadline = httpJsonCallOptions.getDeadline();
if (deadline != null) {
long readTimeout = Duration.between(Instant.now(), deadline).toMillis();
if (httpRequest.getReadTimeout() > 0
&& httpRequest.getReadTimeout() < readTimeout
&& readTimeout < Integer.MAX_VALUE) {
httpRequest.setReadTimeout((int) readTimeout);
}
}

for (Map.Entry<String, Object> entry : headers.getHeaders().entrySet()) {
HttpHeadersUtils.setHeader(
httpRequest.getHeaders(), entry.getKey(), (String) entry.getValue());
Expand Down Expand Up @@ -243,9 +233,35 @@ private HttpRequest buildRequest(
HttpHeadersUtils.setHeader(
httpRequest.getHeaders(), "X-HTTP-Method-Override", originalHttpMethod);
}

Duration timeout = httpJsonCallOptions.getTimeout();
if (timeout != null) {
long timeoutMs = timeout.toMillis();

// Read timeout is the timeout between reading two data packets and not total timeout
// HttpJsonClientCallsImpl implements a deadlineCancellationExecutor to cancel the
// RPC when it exceeds the RPC timeout
if (shouldUpdateTimeout(httpRequest.getReadTimeout(), timeoutMs)) {
httpRequest.setReadTimeout((int) timeoutMs);
}

// Connect timeout is the time allowed for establishing the connection.
// This is updated to match the RPC timeout as we do not want a shorter
// connect timeout to preemptively throw a ConnectExcepetion before
// we've reached the RPC timeout
if (shouldUpdateTimeout(httpRequest.getConnectTimeout(), timeoutMs)) {
httpRequest.setConnectTimeout((int) timeoutMs);
}
}
return httpRequest;
}

private boolean shouldUpdateTimeout(int currentTimeoutMs, long newTimeoutMs) {
return currentTimeoutMs > 0
&& currentTimeoutMs < newTimeoutMs
&& newTimeoutMs < Integer.MAX_VALUE;
}

// This will be frequently executed, so avoiding using regexps if not necessary.
private String normalizeEndpoint(String rawEndpoint) {
String normalized = rawEndpoint;
Expand Down