diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index ffb5781aae795..a02c133027575 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -36,6 +36,7 @@ import io.netty.handler.stream.ChunkedStream; import io.netty.handler.stream.ChunkedWriteHandler; +import org.apache.logging.log4j.Level; import org.elasticsearch.ESNetty4IntegTestCase; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.node.NodeClient; @@ -51,6 +52,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.features.NodeFeature; +import org.elasticsearch.http.HttpBodyTracer; import org.elasticsearch.http.HttpHandlingSettings; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpTransportSettings; @@ -65,6 +67,8 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.transport.netty4.Netty4Utils; import java.util.Collection; @@ -74,6 +78,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -209,10 +214,12 @@ public void testServerCloseConnectionMidStream() throws Exception { public void testClientBackpressure() throws Exception { try (var ctx = setupClientCtx()) { var opaqueId = opaqueId(0); - var payloadSize = MBytes(50); + var payloadSize = maxContentLength(); + var totalParts = 10; + var partSize = payloadSize / totalParts; ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize)); - for (int i = 0; i < 5; i++) { - ctx.clientChannel.writeAndFlush(randomContent(MBytes(10), false)); + for (int i = 0; i < totalParts; i++) { + ctx.clientChannel.writeAndFlush(randomContent(partSize, false)); } assertFalse( "should not flush last content immediately", @@ -221,16 +228,15 @@ public void testClientBackpressure() throws Exception { var handler = ctx.awaitRestChannelAccepted(opaqueId); - // Read buffers for socket and channel usually within few MBytes range all together. - // This test assumes that buffers will not exceed 10 MBytes, in other words there should - // be less than 10 MBytes in fly between http client's socket and rest handler. This - // loop ensures that reading 10 MBytes of content on server side should free almost - // same size in client's channel write buffer. - for (int mb = 0; mb <= 50; mb += 10) { - var minBufSize = payloadSize - MBytes(10 + mb); - var maxBufSize = payloadSize - MBytes(mb); + // some data flushes from channel into OS buffer and won't be visible here, usually 4-8Mb + var osBufferOffset = MBytes(10); + + // incrementally read data on server side and ensure client side buffer drains accordingly + for (int readBytes = 0; readBytes <= payloadSize; readBytes += partSize) { + var minBufSize = Math.max(payloadSize - readBytes - osBufferOffset, 0); + var maxBufSize = Math.max(payloadSize - readBytes, 0); // it is hard to tell that client's channel is no logger flushing data - // it might take a few busy-iterations before channel buffer flush to kernel + // it might take a few busy-iterations before channel buffer flush to OS // and bytesBeforeWritable will stop changing assertBusy(() -> { var bufSize = ctx.clientChannel.bytesBeforeWritable(); @@ -239,7 +245,7 @@ public void testClientBackpressure() throws Exception { bufSize >= minBufSize && bufSize <= maxBufSize ); }); - handler.readBytes(MBytes(10)); + handler.readBytes(partSize); } assertTrue(handler.stream.hasLast()); } @@ -350,6 +356,107 @@ public void testBadRequestReleaseQueuedChunks() throws Exception { } } + private static long transportStatsRequestBytesSize(Ctx ctx) { + var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName); + var stats = httpTransport.stats().clientStats(); + var bytes = 0L; + for (var s : stats) { + bytes += s.requestSizeBytes(); + } + return bytes; + } + + /** + * ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes + */ + public void testHttpClientStats() throws Exception { + try (var ctx = setupClientCtx()) { + // need to offset starting point, since we reuse cluster and other tests already sent some data + var totalBytesSent = transportStatsRequestBytesSize(ctx); + + for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) { + var id = opaqueId(reqNo); + var contentSize = randomIntBetween(0, maxContentLength()); + totalBytesSent += contentSize; + ctx.clientChannel.writeAndFlush(httpRequest(id, contentSize)); + ctx.clientChannel.writeAndFlush(randomContent(contentSize, true)); + var handler = ctx.awaitRestChannelAccepted(id); + handler.readAllBytes(); + handler.sendResponse(new RestResponse(RestStatus.OK, "")); + assertEquals(totalBytesSent, transportStatsRequestBytesSize(ctx)); + } + } + } + + /** + * ensures that we log parts of http body and final line + */ + @TestLogging( + reason = "testing TRACE logging", + value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE" + ) + public void testHttpBodyLogging() throws Exception { + assertHttpBodyLogging((ctx) -> () -> { + try { + var req = fullHttpRequest(opaqueId(0), randomByteBuf(8 * 1024)); + ctx.clientChannel.writeAndFlush(req); + var handler = ctx.awaitRestChannelAccepted(opaqueId(0)); + handler.readAllBytes(); + } catch (Exception e) { + fail(e); + } + }); + } + + /** + * ensures that we log some parts of body and final line when connection is closed in the middle + */ + @TestLogging( + reason = "testing TRACE logging", + value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE" + ) + public void testHttpBodyLoggingChannelClose() throws Exception { + assertHttpBodyLogging((ctx) -> () -> { + try { + var req = httpRequest(opaqueId(0), 2 * 8192); + var halfContent = randomContent(8192, false); + ctx.clientChannel.writeAndFlush(req); + ctx.clientChannel.writeAndFlush(halfContent); + var handler = ctx.awaitRestChannelAccepted(opaqueId(0)); + handler.readBytes(8192); + ctx.clientChannel.close(); + handler.stream.next(); + assertBusy(() -> assertTrue(handler.streamClosed)); + } catch (Exception e) { + fail(e); + } + }); + } + + // asserts that we emit at least one logging event for a part and last line + // http body should be large enough to split across multiple lines, > 4kb + private void assertHttpBodyLogging(Function test) throws Exception { + try (var ctx = setupClientCtx()) { + MockLog.assertThatLogger( + test.apply(ctx), + HttpBodyTracer.class, + new MockLog.SeenEventExpectation( + "request part", + HttpBodyTracer.class.getCanonicalName(), + Level.TRACE, + "* request body [part *]*" + ), + new MockLog.SeenEventExpectation( + "request end", + HttpBodyTracer.class.getCanonicalName(), + Level.TRACE, + "* request body (gzip compressed, base64-encoded, and split into * parts on preceding log lines; for details see " + + "https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html#http-rest-request-tracer)" + ) + ); + } + } + private int maxContentLength() { return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength(); } @@ -402,6 +509,10 @@ static HttpContent randomContent(int size, boolean isLast) { } } + static ByteBuf randomByteBuf(int size) { + return Unpooled.wrappedBuffer(randomByteArrayOfLength(size)); + } + Ctx setupClientCtx() throws Exception { var nodeName = internalCluster().getRandomNodeName(); var clientRespQueue = new LinkedBlockingDeque<>(16); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index 31abf93557574..4551305ab39df 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -15,9 +15,13 @@ import io.netty.handler.codec.http.LastHttpContent; import io.netty.util.concurrent.FutureListener; +import org.elasticsearch.core.Releasables; import org.elasticsearch.http.HttpBody; import org.elasticsearch.transport.netty4.Netty4Utils; +import java.util.ArrayList; +import java.util.List; + /** * Netty based implementation of {@link HttpBody.Stream}. * This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)} @@ -28,6 +32,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { private final Channel channel; private final FutureListener closeListener = future -> doClose(); + private final List tracingHandlers = new ArrayList<>(4); private ByteBuf buf; private boolean hasLast = false; private boolean requested = false; @@ -50,6 +55,12 @@ public void setHandler(ChunkHandler chunkHandler) { this.handler = chunkHandler; } + @Override + public void addTracingHandler(ChunkHandler chunkHandler) { + assert tracingHandlers.contains(chunkHandler) == false; + tracingHandlers.add(chunkHandler); + } + @Override public void next() { assert closing == false : "cannot request next chunk on closing stream"; @@ -117,6 +128,9 @@ private void send() { var bytesRef = Netty4Utils.toReleasableBytesReference(buf); requested = false; buf = null; + for (var tracer : tracingHandlers) { + tracer.onNext(bytesRef, hasLast); + } handler.onNext(bytesRef, hasLast); } @@ -131,6 +145,9 @@ public void close() { private void doClose() { closing = true; + for (var tracer : tracingHandlers) { + Releasables.closeExpectNoException(tracer); + } if (handler != null) { handler.close(); } diff --git a/server/src/main/java/org/elasticsearch/http/HttpBody.java b/server/src/main/java/org/elasticsearch/http/HttpBody.java index 689119e63cafb..10ccf52396d6b 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpBody.java +++ b/server/src/main/java/org/elasticsearch/http/HttpBody.java @@ -72,6 +72,13 @@ non-sealed interface Stream extends HttpBody { @Nullable ChunkHandler handler(); + /** + * Adds tracing chunk handler. Tracing handler will be invoked before main handler, and + * should never release or call for next chunk. It should be used for monitoring and + * logging purposes. + */ + void addTracingHandler(ChunkHandler chunkHandler); + /** * Sets handler that can handle next chunk */ diff --git a/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java b/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java index ef05af8bb9ade..1f9698e627f16 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java +++ b/server/src/main/java/org/elasticsearch/http/HttpClientStatsTracker.java @@ -228,6 +228,8 @@ synchronized void update(HttpRequest httpRequest, HttpChannel httpChannel, long requestCount += 1; if (httpRequest.body().isFull()) { requestSizeBytes += httpRequest.body().asFull().bytes().length(); + } else { + httpRequest.body().asStream().addTracingHandler((chunk, last) -> requestSizeBytes += chunk.length()); } } diff --git a/server/src/main/java/org/elasticsearch/http/HttpTracer.java b/server/src/main/java/org/elasticsearch/http/HttpTracer.java index 2f3d376e39086..aba6cc394e9ee 100644 --- a/server/src/main/java/org/elasticsearch/http/HttpTracer.java +++ b/server/src/main/java/org/elasticsearch/http/HttpTracer.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; @@ -20,6 +21,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.io.OutputStream; import java.util.List; @@ -77,10 +79,10 @@ HttpTracer maybeLogRequest(RestRequest restRequest, @Nullable Exception e) { e ); if (isBodyTracerEnabled()) { - try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) { - restRequest.content().writeTo(stream); - } catch (Exception e2) { - assert false : e2; // no real IO here + if (restRequest.isFullContent()) { + logFullContent(restRequest); + } else { + logStreamContent(restRequest); } } @@ -89,6 +91,53 @@ HttpTracer maybeLogRequest(RestRequest restRequest, @Nullable Exception e) { return null; } + private void logFullContent(RestRequest restRequest) { + try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) { + restRequest.content().writeTo(stream); + } catch (Exception e2) { + assert false : e2; // no real IO here + } + } + + private void logStreamContent(RestRequest restRequest) { + restRequest.contentStream().addTracingHandler(new LoggingChunkHandler(restRequest)); + } + + private static class LoggingChunkHandler implements HttpBody.ChunkHandler { + private final OutputStream stream; + private volatile boolean closed = false; + + LoggingChunkHandler(RestRequest request) { + stream = HttpBodyTracer.getBodyOutputStream(request.getRequestId(), HttpBodyTracer.Type.REQUEST); + } + + @Override + public void onNext(ReleasableBytesReference chunk, boolean isLast) { + try { + chunk.writeTo(stream); + } catch (IOException e) { + assert false : e; // no real IO + } finally { + if (isLast) { + this.close(); + } + } + } + + @Override + public void close() { + if (closed) { + return; + } + try { + closed = true; + stream.close(); + } catch (IOException e) { + assert false : e; // no real IO + } + } + } + boolean isBodyTracerEnabled() { return HttpBodyTracer.isEnabled(); } diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index a9f1c3becb099..bcba6e947e157 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -210,6 +210,9 @@ public ChunkHandler handler() { return null; } + @Override + public void addTracingHandler(ChunkHandler chunkHandler) {} + @Override public void setHandler(ChunkHandler chunkHandler) {}