Skip to content

Commit

Permalink
Complete DefaultClientRequestContext.whenIntialized() after fully i…
Browse files Browse the repository at this point in the history
…nitializing a context (line#3636)

Motivation:

`LazyDynamicEndpointGroupTest.emptyEndpoint()` sometimes failed in CI
builds. line#3381 It expects to fail the test with `EmptyEndpointException`,
however, it fails with `ClosedStreamException`.

After digging the issue, I find out that there is a race in `whenInitialized`.
If the `acquiredEventLoop.execute()` is executed immediately and completed earlier than
returning the method,
https://github.com/line/armeria/blob/d4880fe12690d2dafd2c5e7fa9f24c3b24837a00/core/src/main/java/com/linecorp/armeria/client/DefaultClientRequestContext.java#L307-L309
the callbacks of `ctx.whenInitialized()` will be invoked before a `RequestLog`
is completed.
https://github.com/line/armeria/blob/207c5e038f59802dca769936a50e219a5fe308ea/grpc/src/main/java/com/linecorp/armeria/internal/client/grpc/ArmeriaClientCall.java#L337-L348
As the `req` is closed already, the `req.write()` would be failed with
`ClosedStreamException`.

Modifications:

- Complete `DefaultClientRequestContext.whenIntialized()` after
  `initContextAndExecuteWithFallback` of `ClientUtil`

Result:

- You no longer see `ClosedStreamException` when an `EndpointGroup` is empty.
- Fixes line#3381
  • Loading branch information
ikhoon authored and heowc committed Jun 24, 2021
1 parent 81db6e1 commit 2735c96
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 27 deletions.
Expand Up @@ -281,32 +281,13 @@ private CompletableFuture<Boolean> initEndpointGroup(EndpointGroup endpointGroup
}).thenCompose(Function.identity());
}

private CompletableFuture<Boolean> initFuture(boolean success, @Nullable EventLoop acquiredEventLoop) {
CompletableFuture<Boolean> whenInitialized = this.whenInitialized;
if (whenInitialized == null) {
final CompletableFuture<Boolean> future;
if (acquiredEventLoop == null) {
future = UnmodifiableFuture.completedFuture(success);
} else {
future = CompletableFuture.supplyAsync(() -> success, acquiredEventLoop);
}
if (whenInitializedUpdater.compareAndSet(this, null, future)) {
return future;
}
whenInitialized = this.whenInitialized;
}

final CompletableFuture<Boolean> finalWhenInitialized = whenInitialized;
if (finalWhenInitialized.isDone()) {
return finalWhenInitialized;
}

private static CompletableFuture<Boolean> initFuture(boolean success,
@Nullable EventLoop acquiredEventLoop) {
if (acquiredEventLoop == null) {
finalWhenInitialized.complete(success);
return UnmodifiableFuture.completedFuture(success);
} else {
acquiredEventLoop.execute(() -> finalWhenInitialized.complete(success));
return CompletableFuture.supplyAsync(() -> success, acquiredEventLoop);
}
return finalWhenInitialized;
}

/**
Expand All @@ -329,6 +310,21 @@ public CompletableFuture<Boolean> whenInitialized() {
}
}

/**
* Completes the {@link #whenInitialized()} with the specified value.
*/
public void finishInitialization(boolean success) {
final CompletableFuture<Boolean> whenInitialized = this.whenInitialized;
if (whenInitialized != null) {
whenInitialized.complete(success);
} else {
if (!whenInitializedUpdater.compareAndSet(this, null,
UnmodifiableFuture.completedFuture(success))) {
this.whenInitialized.complete(success);
}
}
}

private void updateEndpoint(@Nullable Endpoint endpoint) {
this.endpoint = endpoint;
autoFillSchemeAndAuthority();
Expand Down
Expand Up @@ -52,12 +52,14 @@ O initContextAndExecuteWithFallback(
requireNonNull(futureConverter, "futureConverter");
requireNonNull(errorResponseFactory, "errorResponseFactory");

boolean initialized = false;
boolean success = false;
try {
endpointGroup = mapEndpoint(ctx, endpointGroup);
final CompletableFuture<Boolean> initFuture = ctx.init(endpointGroup);
if (initFuture.isDone()) {
initialized = initFuture.isDone();
if (initialized) {
// Initialization has been done immediately.
final boolean success;
try {
success = initFuture.get();
} catch (Exception e) {
Expand All @@ -66,22 +68,28 @@ O initContextAndExecuteWithFallback(

return initContextAndExecuteWithFallback(delegate, ctx, errorResponseFactory, success);
} else {
return futureConverter.apply(initFuture.handle((success, cause) -> {
return futureConverter.apply(initFuture.handle((success0, cause) -> {
try {
if (cause != null) {
throw UnprocessedRequestException.of(Exceptions.peel(cause));
}

return initContextAndExecuteWithFallback(delegate, ctx, errorResponseFactory, success);
return initContextAndExecuteWithFallback(delegate, ctx, errorResponseFactory, success0);
} catch (Throwable t) {
fail(ctx, t);
return errorResponseFactory.apply(ctx, t);
} finally {
ctx.finishInitialization(success0);
}
}));
}
} catch (Throwable cause) {
fail(ctx, cause);
return errorResponseFactory.apply(ctx, cause);
} finally {
if (initialized) {
ctx.finishInitialization(success);
}
}
}

Expand Down

0 comments on commit 2735c96

Please sign in to comment.