Skip to content

Commit

Permalink
Do not consume orphaned message bodies, just warn. (#2733)
Browse files Browse the repository at this point in the history
Motivation
----------
In a previous changeset, the HttpMessageDiscardWatchdogServiceFilter
has been introduced which tries to proactively clean up orphaned/
discarded message body buffers: This works in some, but not all cases
in a reliable fashion.

Modifications
-------------
Instead of trying to clean up the buffers (which might not work all
the time), just WARN into the logs so users can take proactive
action to fix their code.

The log level has been dropped from ERROR to WARN since it is not
a fatal error but still needs to be taken seriously.
  • Loading branch information
daschl committed Oct 20, 2023
1 parent 428ee71 commit 294de53
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 43 deletions.
1 change: 1 addition & 0 deletions servicetalk-http-netty/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies {
testImplementation testFixtures(project(":servicetalk-concurrent-internal"))
testImplementation testFixtures(project(":servicetalk-concurrent-reactivestreams"))
testImplementation testFixtures(project(":servicetalk-http-api"))
testImplementation testFixtures(project(":servicetalk-log4j2-mdc-utils"))
testImplementation testFixtures(project(":servicetalk-transport-netty-internal"))
testImplementation project(":servicetalk-concurrent-api-test")
testImplementation project(":servicetalk-concurrent-test-internal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
final AtomicReference<Publisher<?>> reference = request.context()
.computeIfAbsent(MESSAGE_PUBLISHER_KEY, key -> new AtomicReference<>());
assert reference != null;
final Publisher<?> previous = reference.getAndSet(response.messageBody());
if (previous != null) {
if (reference.getAndSet(response.messageBody()) != null) {
// If a previous message exists, the Single<StreamingHttpResponse> got resubscribed to
// (i.e. during a retry) and so previous message body needs to be cleaned up.
LOGGER.warn("Automatically draining previous HTTP response message body that was " +
"not consumed. Users-defined retry logic must drain response payload before " +
"retrying.");
previous.ignoreElements().subscribe();
// (i.e. during a retry) and so previous message body needs to be cleaned up by the
// user.
LOGGER.warn("Discovered un-drained HTTP response message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. Responses (or their message body) must " +
"be fully consumed before retrying.");
}

return response.transformMessageBody(msgPublisher -> msgPublisher.beforeSubscriber(() -> {
Expand Down Expand Up @@ -146,14 +146,6 @@ public void onComplete() {
*/
private static final class CleanerHttpLifecycleObserver implements HttpLifecycleObserver {

/**
* Helps to remember if we logged an error for user-defined filters already to not spam the logs.
* <p>
* NOTE: this variable is intentionally not volatile since thread visibility is not a concern, but repeated
* volatile accesses are.
*/
private static boolean loggedError;

private CleanerHttpLifecycleObserver() {
// Singleton
}
Expand Down Expand Up @@ -181,20 +173,13 @@ public HttpResponseObserver onResponse(final HttpResponseMetaData responseMetaDa
public void onExchangeFinally() {
if (requestContext != null) {
final AtomicReference<?> maybePublisher = requestContext.get(MESSAGE_PUBLISHER_KEY);
if (maybePublisher != null) {
Publisher<?> message = (Publisher<?>) maybePublisher.get();
if (message != null) {
// No-one subscribed to the message (or there is none), so if there is a message
// proactively clean it up.
if (!loggedError) {
LOGGER.error("Automatically draining HTTP response message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. Responses (or their message body) must " +
"be fully consumed before discarding.");
loggedError = true;
}
message.ignoreElements().subscribe();
}
if (maybePublisher != null && maybePublisher.get() != null) {
// No-one subscribed to the message (or there is none), so if there is a message
// tell the user to clean it up.
LOGGER.warn("Discovered un-drained HTTP response message body which has " +
"been dropped by user code - this is a strong indication of a bug " +
"in a user-defined filter. Responses (or their message body) must " +
"be fully consumed before discarding.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,24 @@
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpServiceFilter;
import io.servicetalk.log4j2.mdc.utils.LoggerStringWriter;
import io.servicetalk.transport.netty.internal.ExecutionContextExtension;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Stream;

import static io.servicetalk.http.netty.BuilderUtils.newClientBuilder;
import static io.servicetalk.http.netty.BuilderUtils.newServerBuilder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

final class HttpMessageDiscardWatchdogServiceFilterTest {

Expand All @@ -54,12 +57,19 @@ final class HttpMessageDiscardWatchdogServiceFilterTest {
ExecutionContextExtension.cached("client-io", "client-executor")
.setClassLevel(true);

@ParameterizedTest(name = "{displayName} [{index}] transformer={0} expectedSubscriptions={1}")
@MethodSource("responseTransformers")
void cleansPayloadBodyIfDiscardedInFilter(final ResponseTransformer transformer,
final int expectedSubscriptions) throws Exception {
final CountDownLatch payloadSubscriptionCounter = new CountDownLatch(expectedSubscriptions);
@BeforeEach
public void setup() {
LoggerStringWriter.reset();
}

@AfterEach
public void tearDown() {
LoggerStringWriter.remove();
}

@ParameterizedTest(name = "{displayName} [{index}] transformer={0}")
@MethodSource("responseTransformers")
void cleansPayloadBodyIfDiscardedInFilter(final ResponseTransformer transformer) throws Exception {
try (HttpServerContext serverContext = newServerBuilder(SERVER_CTX)
.appendServiceFilter(service -> new StreamingHttpServiceFilter(service) {
@Override
Expand All @@ -71,8 +81,7 @@ public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
})
.listenStreamingAndAwait((ctx, request, responseFactory) -> Single.fromSupplier(() -> {
final Publisher<Buffer> buffer = Publisher
.from(ctx.executionContext().bufferAllocator().fromUtf8("Hello, World!"))
.beforeOnSubscribe(subscription -> payloadSubscriptionCounter.countDown());
.from(ctx.executionContext().bufferAllocator().fromUtf8("Hello, World!"));
return responseFactory.ok().payloadBody(buffer);
}))) {

Expand All @@ -82,7 +91,9 @@ public Single<StreamingHttpResponse> handle(final HttpServiceContext ctx,
assertEquals(0, response.payloadBody().readableBytes());
}

payloadSubscriptionCounter.await();
String output = LoggerStringWriter.stableAccumulated(1000);
assertTrue(output.contains("Discovered un-drained HTTP response message body which " +
"has been dropped by user code"));
}
}

Expand All @@ -101,7 +112,7 @@ public Single<StreamingHttpResponse> apply(final Single<StreamingHttpResponse> r
public String toString() {
return "Throws Exception";
}
}, 1),
}),
Arguments.of(new ResponseTransformer() {
@Override
public Single<StreamingHttpResponse> apply(final Single<StreamingHttpResponse> response,
Expand All @@ -113,7 +124,7 @@ public Single<StreamingHttpResponse> apply(final Single<StreamingHttpResponse> r
public String toString() {
return "Drops payload body while transforming";
}
}, 1),
}),
Arguments.of(new ResponseTransformer() {
@Override
public Single<StreamingHttpResponse> apply(final Single<StreamingHttpResponse> response,
Expand All @@ -125,7 +136,7 @@ public Single<StreamingHttpResponse> apply(final Single<StreamingHttpResponse> r
public String toString() {
return "Drops message body while transforming";
}
}, 1),
}),
Arguments.of(new ResponseTransformer() {
@Override
public Single<StreamingHttpResponse> apply(final Single<StreamingHttpResponse> response,
Expand All @@ -137,7 +148,7 @@ public Single<StreamingHttpResponse> apply(final Single<StreamingHttpResponse> r
public String toString() {
return "Drops response and creates new one";
}
}, 1),
}),
Arguments.of(new ResponseTransformer() {

private final AtomicInteger retryCounter = new AtomicInteger();
Expand All @@ -158,7 +169,7 @@ public Single<StreamingHttpResponse> apply(final Single<StreamingHttpResponse> r
public String toString() {
return "Retries and drops again";
}
}, 2)
})
);
}

Expand Down

0 comments on commit 294de53

Please sign in to comment.