From 60cade9febc05efeda579e9ae0fc42cf74306628 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Wed, 11 Sep 2024 14:23:12 -0700 Subject: [PATCH 1/8] add tracing http chunk handlers --- .../Netty4IncrementalRequestHandlingIT.java | 69 +++++++++++++++++++ .../netty4/Netty4HttpRequestBodyStream.java | 13 ++++ .../java/org/elasticsearch/http/HttpBody.java | 7 ++ .../http/HttpClientStatsTracker.java | 2 + .../org/elasticsearch/http/HttpTracer.java | 36 ++++++++-- 5 files changed, 123 insertions(+), 4 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index a401d84b59aca..8a8d4d7fd7c36 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -36,6 +36,7 @@ import io.netty.handler.stream.ChunkedStream; import io.netty.handler.stream.ChunkedWriteHandler; +import org.apache.logging.log4j.Level; import org.elasticsearch.ESNetty4IntegTestCase; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.node.NodeClient; @@ -51,6 +52,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.http.HttpBodyTracer; import org.elasticsearch.http.HttpHandlingSettings; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpTransportSettings; @@ -65,6 +67,8 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.transport.netty4.Netty4Utils; import java.util.Collection; @@ -338,6 +342,67 @@ public void testBadRequestReleaseQueuedChunks() throws Exception { } } + /** + * ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes + */ + public void testHttpClientStats() throws Exception { + try (var ctx = setupClientCtx()) { + var totalBytesSent = 0L; + var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName); + for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) { + var id = opaqueId(reqNo); + var contentSize = randomIntBetween(0, maxContentLength()); + totalBytesSent += contentSize; + ctx.clientChannel.writeAndFlush(httpRequest(id, contentSize)); + ctx.clientChannel.writeAndFlush(randomContent(contentSize, true)); + var handler = ctx.awaitRestChannelAccepted(id); + handler.readAllBytes(); + handler.sendResponse(new RestResponse(RestStatus.OK, "")); + + var stats = httpTransport.stats().clientStats(); + var totalBytesRecv = 0L; + for (var s : stats) { + totalBytesRecv += s.requestSizeBytes(); + } + assertEquals(totalBytesSent, totalBytesRecv); + } + } + } + + @TestLogging( + reason = "testing TRACE logging", + value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE" + ) + public void testHttpBodyLogging() throws Exception { + try (var ctx = setupClientCtx()) { + MockLog.assertThatLogger(() -> { + try { + var req = fullHttpRequest(opaqueId(0), randomByteBuf(8 * 1024)); + ctx.clientChannel.writeAndFlush(req); + var handler = ctx.awaitRestChannelAccepted(opaqueId(0)); + handler.readAllBytes(); + } catch (Exception e) { + fail(e); + } + }, + HttpBodyTracer.class, + new MockLog.SeenEventExpectation( + "request part", + HttpBodyTracer.class.getCanonicalName(), + Level.TRACE, + "* request body [part *]*" + ), + new MockLog.SeenEventExpectation( + "request end", + HttpBodyTracer.class.getCanonicalName(), + Level.TRACE, + "* request body (gzip compressed, base64-encoded, and split into * parts on preceding log lines; for details see " + + "https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html#http-rest-request-tracer)" + ) + ); + } + } + private int maxContentLength() { return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength(); } @@ -390,6 +455,10 @@ static HttpContent randomContent(int size, boolean isLast) { } } + static ByteBuf randomByteBuf(int size) { + return Unpooled.wrappedBuffer(randomByteArrayOfLength(size)); + } + Ctx setupClientCtx() throws Exception { var nodeName = internalCluster().getRandomNodeName(); var clientRespQueue = new LinkedBlockingDeque<>(16); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 288a46c638dbb..6bbc011502ab9 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -18,6 +18,9 @@ import org.elasticsearch.http.HttpBody; import org.elasticsearch.transport.netty4.Netty4Utils; +import java.util.ArrayList; +import java.util.List; + /** * Netty based implementation of {@link HttpBody.Stream}. * This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} @@ -28,6 +31,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { private final Channel channel; private final FutureListener closeListener = future -> doClose(); + private final List tracingHandlers = new ArrayList<>(4); private ByteBuf buf; private boolean hasLast = false; private boolean requested = false; @@ -50,6 +54,12 @@ public void setHandler(ChunkHandler chunkHandler) { this.handler = chunkHandler; } + @Override + public void addTracingHandler(ChunkHandler chunkHandler) { + assert tracingHandlers.contains(chunkHandler) == false; + tracingHandlers.add(chunkHandler); + } + @Override public void next() { assert closing == false : "cannot request next chunk on closing stream"; @@ -117,6 +127,9 @@ private void send() { var bytesRef = Netty4Utils.toReleasableBytesReference(buf); requested = false; buf = null; + for (var tracing : tracingHandlers) { + tracing.onNext(bytesRef, hasLast); + } handler.onNext(bytesRef, hasLast); } diff --git a/server/src/main/java/org/elasticsearch/http/HttpBody.java b/server/src/main/java/org/elasticsearch/http/HttpBody.java index 9da1bc85b2a29..7f21451dabbcf 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpBody.java +++ b/server/src/main/java/org/elasticsearch/http/HttpBody.java @@ -72,6 +72,13 @@ non-sealed interface Stream extends HttpBody { @Nullable ChunkHandler handler(); + /** + * Adds tracing chunk handler. Tracing handler will be invoked before main handler, and + * should never release or call for next chunk. It should be used for monitoring and + * logging purposes. + */ + void addTracingHandler(ChunkHandler chunkHandler); + /** * Sets handler that can handle next chunk */ diff --git a/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java b/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java index ef05af8bb9ade..1f9698e627f16 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java +++ b/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java @@ -228,6 +228,8 @@ synchronized void update(HttpRequest httpRequest, HttpChannel httpChannel, long requestCount += 1; if (httpRequest.body().isFull()) { requestSizeBytes += httpRequest.body().asFull().bytes().length(); + } else { + httpRequest.body().asStream().addTracingHandler((chunk, last) -> requestSizeBytes += chunk.length()); } } diff --git a/server/src/main/java/org/elasticsearch/http/HttpTracer.java b/server/src/main/java/org/elasticsearch/http/HttpTracer.java index 2f3d376e39086..4adf57e6b6097 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpTracer.java +++ b/server/src/main/java/org/elasticsearch/http/HttpTracer.java @@ -20,6 +20,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.io.OutputStream; import java.util.List; @@ -77,10 +78,10 @@ HttpTracer maybeLogRequest(RestRequest restRequest, @Nullable Exception e) { e ); if (isBodyTracerEnabled()) { - try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) { - restRequest.content().writeTo(stream); - } catch (Exception e2) { - assert false : e2; // no real IO here + if (restRequest.isFullContent()) { + logFullContent(restRequest); + } else { + logStreamContent(restRequest); } } @@ -89,6 +90,33 @@ HttpTracer maybeLogRequest(RestRequest restRequest, @Nullable Exception e) { return null; } + private void logFullContent(RestRequest restRequest) { + try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) { + restRequest.content().writeTo(stream); + } catch (Exception e2) { + assert false : e2; // no real IO here + } + } + + private void logStreamContent(RestRequest restRequest) { + var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST); + restRequest.contentStream().addTracingHandler((chunk, isLast) -> { + try { + chunk.writeTo(stream); + } catch (IOException e) { + assert false : e; // no real IO + } finally { + if (isLast) { + try { + stream.close(); + } catch (IOException e) { + assert false : e; + } + } + } + }); + } + boolean isBodyTracerEnabled() { return HttpBodyTracer.isEnabled(); } From e9418c44cd85c316b126e8a20b00532b29100b3e Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Wed, 11 Sep 2024 14:37:55 -0700 Subject: [PATCH 2/8] add recv bytes offset to stats test --- .../Netty4IncrementalRequestHandlingIT.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 8a8d4d7fd7c36..5baf0a7c40bd1 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -78,6 +78,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -347,8 +348,20 @@ public void testBadRequestReleaseQueuedChunks() throws Exception { */ public void testHttpClientStats() throws Exception { try (var ctx = setupClientCtx()) { - var totalBytesSent = 0L; + Function totalRecvBytes = (HttpServerTransport httpTransport) -> { + var stats = httpTransport.stats().clientStats(); + var bytes = 0L; + for (var s : stats) { + bytes += s.requestSizeBytes(); + } + return bytes; + }; + var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName); + + // need to offset starting point, since we reuse cluster and other tests already sent some data + var totalBytesSent = totalRecvBytes.apply(httpTransport); + for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) { var id = opaqueId(reqNo); var contentSize = randomIntBetween(0, maxContentLength()); @@ -358,13 +371,7 @@ public void testHttpClientStats() throws Exception { var handler = ctx.awaitRestChannelAccepted(id); handler.readAllBytes(); handler.sendResponse(new RestResponse(RestStatus.OK, "")); - - var stats = httpTransport.stats().clientStats(); - var totalBytesRecv = 0L; - for (var s : stats) { - totalBytesRecv += s.requestSizeBytes(); - } - assertEquals(totalBytesSent, totalBytesRecv); + assertEquals(totalBytesSent, totalRecvBytes.apply(httpTransport)); } } } From aa1e91b116c9409ae301833a9bcf7718f842e1eb Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Wed, 11 Sep 2024 15:20:05 -0700 Subject: [PATCH 3/8] fix --- .../rest/action/document/RestBulkActionTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index a9f1c3becb099..bcba6e947e157 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -210,6 +210,9 @@ public ChunkHandler handler() { return null; } + @Override + public void addTracingHandler(ChunkHandler chunkHandler) {} + @Override public void setHandler(ChunkHandler chunkHandler) {} From 1c989f5d930a7e4b644056057338d890b7d4f4a4 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 13 Sep 2024 11:26:13 -0700 Subject: [PATCH 4/8] close tracing handlers --- .../netty4/Netty4HttpRequestBodyStream.java | 7 +++-- .../org/elasticsearch/http/HttpTracer.java | 31 ++++++++++++------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 2f056e0676e97..215dc44d3a648 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -127,8 +127,8 @@ private void send() { var bytesRef = Netty4Utils.toReleasableBytesReference(buf); requested = false; buf = null; - for (var tracing : tracingHandlers) { - tracing.onNext(bytesRef, hasLast); + for (var tracer : tracingHandlers) { + tracer.onNext(bytesRef, hasLast); } handler.onNext(bytesRef, hasLast); } @@ -144,6 +144,9 @@ public void close() { private void doClose() { closing = true; + for (var tracer : tracingHandlers) { + tracer.close(); + } if (handler != null) { handler.close(); } diff --git a/server/src/main/java/org/elasticsearch/http/HttpTracer.java b/server/src/main/java/org/elasticsearch/http/HttpTracer.java index 4adf57e6b6097..53fe2b251f25b 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpTracer.java +++ b/server/src/main/java/org/elasticsearch/http/HttpTracer.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; @@ -100,20 +101,28 @@ private void logFullContent(RestRequest restRequest) { private void logStreamContent(RestRequest restRequest) { var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST); - restRequest.contentStream().addTracingHandler((chunk, isLast) -> { - try { - chunk.writeTo(stream); - } catch (IOException e) { - assert false : e; // no real IO - } finally { - if (isLast) { - try { - stream.close(); - } catch (IOException e) { - assert false : e; + restRequest.contentStream().addTracingHandler(new HttpBody.ChunkHandler() { + @Override + public void onNext(ReleasableBytesReference chunk, boolean isLast) { + try { + chunk.writeTo(stream); + } catch (IOException e) { + assert false : e; // no real IO + } finally { + if (isLast) { + this.close(); } } } + + @Override + public void close() { + try { + stream.close(); + } catch (IOException e) { + assert false : e; // no real IO + } + } }); } From 631339100d162f73cffc5460ff43aa30027e479f Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 13 Sep 2024 11:46:52 -0700 Subject: [PATCH 5/8] re-arrange test code --- .../Netty4IncrementalRequestHandlingIT.java | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 6b6f7d40c8b59..28a249ddcd30f 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -78,7 +78,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -355,24 +354,23 @@ public void testBadRequestReleaseQueuedChunks() throws Exception { } } + private static long transportStatsRequestBytesSize(Ctx ctx) { + var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName); + var stats = httpTransport.stats().clientStats(); + var bytes = 0L; + for (var s : stats) { + bytes += s.requestSizeBytes(); + } + return bytes; + } + /** * ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes */ public void testHttpClientStats() throws Exception { try (var ctx = setupClientCtx()) { - Function totalRecvBytes = (HttpServerTransport httpTransport) -> { - var stats = httpTransport.stats().clientStats(); - var bytes = 0L; - for (var s : stats) { - bytes += s.requestSizeBytes(); - } - return bytes; - }; - - var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName); - // need to offset starting point, since we reuse cluster and other tests already sent some data - var totalBytesSent = totalRecvBytes.apply(httpTransport); + var totalBytesSent = transportStatsRequestBytesSize(ctx); for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) { var id = opaqueId(reqNo); @@ -383,7 +381,7 @@ public void testHttpClientStats() throws Exception { var handler = ctx.awaitRestChannelAccepted(id); handler.readAllBytes(); handler.sendResponse(new RestResponse(RestStatus.OK, "")); - assertEquals(totalBytesSent, totalRecvBytes.apply(httpTransport)); + assertEquals(totalBytesSent, transportStatsRequestBytesSize(ctx)); } } } From c35617b82329a3fbc1be1387876dd4e9097b75cb Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 13 Sep 2024 14:51:20 -0700 Subject: [PATCH 6/8] add test for http-body-logging-close --- .../Netty4IncrementalRequestHandlingIT.java | 85 +++++++++++++------ .../netty4/Netty4HttpRequestBodyStream.java | 3 +- .../org/elasticsearch/http/HttpTracer.java | 52 +++++++----- 3 files changed, 95 insertions(+), 45 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 28a249ddcd30f..7f729c009215c 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -78,6 +78,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -93,7 +94,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)); - builder.put(HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), new ByteSizeValue(50, ByteSizeUnit.MB)); + builder.put(HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), new ByteSizeValue(10, ByteSizeUnit.MB)); return builder.build(); } @@ -213,10 +214,12 @@ public void testServerCloseConnectionMidStream() throws Exception { public void testClientBackpressure() throws Exception { try (var ctx = setupClientCtx()) { var opaqueId = opaqueId(0); - var payloadSize = MBytes(50); + var payloadSize = maxContentLength(); + var totalParts = 10; + var partSize = payloadSize / totalParts; ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize)); - for (int i = 0; i < 5; i++) { - ctx.clientChannel.writeAndFlush(randomContent(MBytes(10), false)); + for (int i = 0; i < totalParts; i++) { + ctx.clientChannel.writeAndFlush(randomContent(partSize, false)); } assertFalse( "should not flush last content immediately", @@ -225,16 +228,15 @@ public void testClientBackpressure() throws Exception { var handler = ctx.awaitRestChannelAccepted(opaqueId); - // Read buffers for socket and channel usually within few MBytes range all together. - // This test assumes that buffers will not exceed 10 MBytes, in other words there should - // be less than 10 MBytes in fly between http client's socket and rest handler. This - // loop ensures that reading 10 MBytes of content on server side should free almost - // same size in client's channel write buffer. - for (int mb = 0; mb <= 50; mb += 10) { - var minBufSize = payloadSize - MBytes(10 + mb); - var maxBufSize = payloadSize - MBytes(mb); + // some data flushes from channel into OS buffer and won't be visible here + var osBufferOffset = MBytes(4); + + // incrementally read data on server side and ensure client side buffer drains accordingly + for (int readBytes = 0; readBytes <= payloadSize; readBytes += partSize) { + var minBufSize = Math.max(payloadSize - readBytes - osBufferOffset, 0); + var maxBufSize = Math.max(payloadSize - readBytes, 0); // it is hard to tell that client's channel is no logger flushing data - // it might take a few busy-iterations before channel buffer flush to kernel + // it might take a few busy-iterations before channel buffer flush to OS // and bytesBeforeWritable will stop changing assertBusy(() -> { var bufSize = ctx.clientChannel.bytesBeforeWritable(); @@ -243,7 +245,7 @@ public void testClientBackpressure() throws Exception { bufSize >= minBufSize && bufSize <= maxBufSize ); }); - handler.readBytes(MBytes(10)); + handler.readBytes(partSize); } assertTrue(handler.stream.hasLast()); } @@ -386,22 +388,57 @@ public void testHttpClientStats() throws Exception { } } + /** + * ensures that we log parts of http body and final line + */ @TestLogging( reason = "testing TRACE logging", value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE" ) public void testHttpBodyLogging() throws Exception { + assertHttpBodyLogging((ctx) -> () -> { + try { + var req = fullHttpRequest(opaqueId(0), randomByteBuf(8 * 1024)); + ctx.clientChannel.writeAndFlush(req); + var handler = ctx.awaitRestChannelAccepted(opaqueId(0)); + handler.readAllBytes(); + } catch (Exception e) { + fail(e); + } + }); + } + + /** + * ensures that we log some parts of body and final line when connection is closed in the middle + */ + @TestLogging( + reason = "testing TRACE logging", + value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE" + ) + public void testHttpBodyLoggingChannelClose() throws Exception { + assertHttpBodyLogging((ctx) -> () -> { + try { + var req = httpRequest(opaqueId(0), 2 * 8192); + var halfContent = randomContent(8192, false); + ctx.clientChannel.writeAndFlush(req); + ctx.clientChannel.writeAndFlush(halfContent); + var handler = ctx.awaitRestChannelAccepted(opaqueId(0)); + handler.readBytes(8192); + ctx.clientChannel.close(); + handler.stream.next(); + assertBusy(() -> assertTrue(handler.streamClosed)); + } catch (Exception e) { + fail(e); + } + }); + } + + // asserts that we emit at least one logging event for a part and last line + // http body should be large enough to split across multiple lines, > 4kb + private void assertHttpBodyLogging(Function test) throws Exception { try (var ctx = setupClientCtx()) { - MockLog.assertThatLogger(() -> { - try { - var req = fullHttpRequest(opaqueId(0), randomByteBuf(8 * 1024)); - ctx.clientChannel.writeAndFlush(req); - var handler = ctx.awaitRestChannelAccepted(opaqueId(0)); - handler.readAllBytes(); - } catch (Exception e) { - fail(e); - } - }, + MockLog.assertThatLogger( + test.apply(ctx), HttpBodyTracer.class, new MockLog.SeenEventExpectation( "request part", diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 215dc44d3a648..4551305ab39df 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -15,6 +15,7 @@ import io.netty.handler.codec.http.LastHttpContent; import io.netty.util.concurrent.FutureListener; +import org.elasticsearch.core.Releasables; import org.elasticsearch.http.HttpBody; import org.elasticsearch.transport.netty4.Netty4Utils; @@ -145,7 +146,7 @@ public void close() { private void doClose() { closing = true; for (var tracer : tracingHandlers) { - tracer.close(); + Releasables.closeExpectNoException(tracer); } if (handler != null) { handler.close(); diff --git a/server/src/main/java/org/elasticsearch/http/HttpTracer.java b/server/src/main/java/org/elasticsearch/http/HttpTracer.java index 53fe2b251f25b..aba6cc394e9ee 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpTracer.java +++ b/server/src/main/java/org/elasticsearch/http/HttpTracer.java @@ -100,30 +100,42 @@ private void logFullContent(RestRequest restRequest) { } private void logStreamContent(RestRequest restRequest) { - var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST); - restRequest.contentStream().addTracingHandler(new HttpBody.ChunkHandler() { - @Override - public void onNext(ReleasableBytesReference chunk, boolean isLast) { - try { - chunk.writeTo(stream); - } catch (IOException e) { - assert false : e; // no real IO - } finally { - if (isLast) { - this.close(); - } + restRequest.contentStream().addTracingHandler(new LoggingChunkHandler(restRequest)); + } + + private static class LoggingChunkHandler implements HttpBody.ChunkHandler { + private final OutputStream stream; + private volatile boolean closed = false; + + LoggingChunkHandler(RestRequest request) { + stream = HttpBodyTracer.getBodyOutputStream(request.getRequestId(), HttpBodyTracer.Type.REQUEST); + } + + @Override + public void onNext(ReleasableBytesReference chunk, boolean isLast) { + try { + chunk.writeTo(stream); + } catch (IOException e) { + assert false : e; // no real IO + } finally { + if (isLast) { + this.close(); } } + } - @Override - public void close() { - try { - stream.close(); - } catch (IOException e) { - assert false : e; // no real IO - } + @Override + public void close() { + if (closed) { + return; } - }); + try { + closed = true; + stream.close(); + } catch (IOException e) { + assert false : e; // no real IO + } + } } boolean isBodyTracerEnabled() { From 3d08ef65c51122db2e7668d276bf8dddc926f93a Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 13 Sep 2024 15:40:44 -0700 Subject: [PATCH 7/8] fix --- .../http/netty4/Netty4IncrementalRequestHandlingIT.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 7f729c009215c..81941656b564d 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -94,7 +94,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)); - builder.put(HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), new ByteSizeValue(10, ByteSizeUnit.MB)); + builder.put(HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), new ByteSizeValue(20, ByteSizeUnit.MB)); return builder.build(); } @@ -229,7 +229,7 @@ public void testClientBackpressure() throws Exception { var handler = ctx.awaitRestChannelAccepted(opaqueId); // some data flushes from channel into OS buffer and won't be visible here - var osBufferOffset = MBytes(4); + var osBufferOffset = MBytes(8); // incrementally read data on server side and ensure client side buffer drains accordingly for (int readBytes = 0; readBytes <= payloadSize; readBytes += partSize) { From b8425bc82f6a2292ec71c2ab253d908e2852d491 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Fri, 13 Sep 2024 16:34:47 -0700 Subject: [PATCH 8/8] fix --- .../http/netty4/Netty4IncrementalRequestHandlingIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 81941656b564d..a02c133027575 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -94,7 +94,7 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)); - builder.put(HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), new ByteSizeValue(20, ByteSizeUnit.MB)); + builder.put(HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), new ByteSizeValue(50, ByteSizeUnit.MB)); return builder.build(); } @@ -228,8 +228,8 @@ public void testClientBackpressure() throws Exception { var handler = ctx.awaitRestChannelAccepted(opaqueId); - // some data flushes from channel into OS buffer and won't be visible here - var osBufferOffset = MBytes(8); + // some data flushes from channel into OS buffer and won't be visible here, usually 4-8Mb + var osBufferOffset = MBytes(10); // incrementally read data on server side and ensure client side buffer drains accordingly for (int readBytes = 0; readBytes <= payloadSize; readBytes += partSize) {