Skip to content

Commit

Permalink
Fix flaky [Http|Grpc]LifecycleObserverTest
Browse files Browse the repository at this point in the history
Motivation:

It's possible that requesting more data from the payload publisher can
race with completion. Subsequent `request(N)` are possible even after
both observers terminate.

Modifications:

- Await for client & server shutdown before starting verification of
invoked observer callbacks.

Result:

No more events are possible after server shutdown. Verification step
sees all observed events.

Resolves apple#2343.
Resolves apple#2298.
  • Loading branch information
idelpivnitskiy committed Sep 28, 2022
1 parent 83760f5 commit 3814fae
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
Expand Up @@ -173,10 +173,12 @@ void tearDown() throws Exception {
try {
if (client != null) {
client.close();
client = null;
}
} finally {
if (server != null) {
server.close();
server = null;
}
}
}
Expand Down Expand Up @@ -215,7 +217,10 @@ private void runTest(Callable<String> executeRequest, boolean error, boolean agg
assertThat(executeRequest.call(), equalTo(CONTENT));
}

// Await full termination to make sure no more callbacks will be invoked.
bothTerminate.await();
tearDown();

verifyObservers(true, error, aggregated, clientLifecycleObserver, clientExchangeObserver,
clientRequestObserver, clientResponseObserver, clientInOrder, clientRequestInOrder);
verifyObservers(false, error, aggregated, serverLifecycleObserver, serverExchangeObserver,
Expand Down
Expand Up @@ -149,7 +149,7 @@ void testCompleteEmptyMessageBody(HttpProtocol protocol) throws Exception {
setUp(protocol);
makeRequestAndAssertResponse(SVC_NO_CONTENT, protocol, NO_CONTENT, 0);

bothTerminate.await();
awaitFullTermination();
verifyObservers(true, clientLifecycleObserver, clientExchangeObserver, clientRequestObserver,
clientResponseObserver, clientInOrder, clientRequestInOrder, false, false, 0);
verifyObservers(false, serverLifecycleObserver, serverExchangeObserver, serverRequestObserver,
Expand All @@ -162,7 +162,7 @@ void testCompleteWithPayloadBodyAndTrailers(HttpProtocol protocol) throws Except
setUp(protocol);
makeRequestAndAssertResponse(SVC_ECHO, protocol, OK, CONTENT.readableBytes());

bothTerminate.await();
awaitFullTermination();
verifyObservers(true, clientLifecycleObserver, clientExchangeObserver, clientRequestObserver,
clientResponseObserver, clientInOrder, clientRequestInOrder, true, true,
CONTENT.readableBytes());
Expand All @@ -177,7 +177,7 @@ void testServerThrows(HttpProtocol protocol) throws Exception {
setUp(protocol);
makeRequestAndAssertResponse(SVC_THROW_ERROR, protocol, INTERNAL_SERVER_ERROR, 0);

bothTerminate.await();
awaitFullTermination();
verifyObservers(true, clientLifecycleObserver, clientExchangeObserver, clientRequestObserver,
clientResponseObserver, clientInOrder, clientRequestInOrder, false, false, 0);
verifyObservers(false, serverLifecycleObserver, serverExchangeObserver, serverRequestObserver,
Expand All @@ -190,7 +190,7 @@ void testServerSingleFailed(HttpProtocol protocol) throws Exception {
setUp(protocol);
makeRequestAndAssertResponse(SVC_SINGLE_ERROR, protocol, INTERNAL_SERVER_ERROR, 0);

bothTerminate.await();
awaitFullTermination();
verifyObservers(true, clientLifecycleObserver, clientExchangeObserver, clientRequestObserver,
clientResponseObserver, clientInOrder, clientRequestInOrder, false, false, 0);
verifyObservers(false, serverLifecycleObserver, serverExchangeObserver, serverRequestObserver,
Expand All @@ -206,7 +206,7 @@ void testServerPayloadBodyFailure(HttpProtocol protocol) throws Exception {
assertThat(e.getCause(), instanceOf(protocol == HttpProtocol.HTTP_2 ?
Http2Exception.class : ClosedChannelException.class));

bothTerminate.await();
awaitFullTermination();
verifyError(true, clientLifecycleObserver, clientExchangeObserver, clientRequestObserver,
clientResponseObserver, clientInOrder, clientRequestInOrder, 0);
verifyError(false, serverLifecycleObserver, serverExchangeObserver, serverRequestObserver,
Expand All @@ -229,7 +229,7 @@ public Single<StreamingHttpResponse> request(StreamingHttpRequest request) {
assertThat(e.getCause(), sameInstance(DELIBERATE_EXCEPTION));

bothTerminate.countDown(); // server is not involved in this test, count down manually
bothTerminate.await();
awaitFullTermination();

clientInOrder.verify(clientLifecycleObserver).onNewExchange();
clientInOrder.verify(clientExchangeObserver).onRequest(any(StreamingHttpRequest.class));
Expand Down Expand Up @@ -264,7 +264,7 @@ public Single<StreamingHttpResponse> handle(HttpServiceContext ctx,
requestReceived.await();
responseFuture.cancel(true);

bothTerminate.await();
awaitFullTermination();

verify(serverExchangeObserver, await()).onExchangeFinally();

Expand Down Expand Up @@ -327,7 +327,7 @@ public Single<StreamingHttpResponse> handle(HttpServiceContext ctx,
serverResponsePayload.onNext(CONTENT.duplicate());
serverResponsePayload.onNext(CONTENT.duplicate());

bothTerminate.await();
awaitFullTermination();

clientInOrder.verify(clientLifecycleObserver).onNewExchange();
clientInOrder.verify(clientExchangeObserver).onConnectionSelected(any(ConnectionInfo.class));
Expand Down Expand Up @@ -419,4 +419,10 @@ private static void verifyError(boolean client, HttpLifecycleObserver lifecycle,
inOrder.verify(exchange).onExchangeFinally();
verifyNoMoreInteractions(request, response, exchange, lifecycle);
}

private void awaitFullTermination() throws Exception {
bothTerminate.await();
stopServer();
// No more callbacks will be invoked after this point.
}
}

0 comments on commit 3814fae

Please sign in to comment.