Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.netty.handler.stream.ChunkedStream;
import io.netty.handler.stream.ChunkedWriteHandler;

import org.apache.logging.log4j.Level;
import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.internal.node.NodeClient;
Expand All @@ -51,6 +52,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.http.HttpBodyTracer;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpTransportSettings;
Expand All @@ -65,6 +67,8 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.util.Collection;
Expand All @@ -74,6 +78,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

Expand Down Expand Up @@ -209,10 +214,12 @@ public void testServerCloseConnectionMidStream() throws Exception {
public void testClientBackpressure() throws Exception {
try (var ctx = setupClientCtx()) {
var opaqueId = opaqueId(0);
var payloadSize = MBytes(50);
var payloadSize = maxContentLength();
var totalParts = 10;
var partSize = payloadSize / totalParts;
ctx.clientChannel.writeAndFlush(httpRequest(opaqueId, payloadSize));
for (int i = 0; i < 5; i++) {
ctx.clientChannel.writeAndFlush(randomContent(MBytes(10), false));
for (int i = 0; i < totalParts; i++) {
ctx.clientChannel.writeAndFlush(randomContent(partSize, false));
}
assertFalse(
"should not flush last content immediately",
Expand All @@ -221,16 +228,15 @@ public void testClientBackpressure() throws Exception {

var handler = ctx.awaitRestChannelAccepted(opaqueId);

// Read buffers for socket and channel usually within few MBytes range all together.
// This test assumes that buffers will not exceed 10 MBytes, in other words there should
// be less than 10 MBytes in fly between http client's socket and rest handler. This
// loop ensures that reading 10 MBytes of content on server side should free almost
// same size in client's channel write buffer.
for (int mb = 0; mb <= 50; mb += 10) {
var minBufSize = payloadSize - MBytes(10 + mb);
var maxBufSize = payloadSize - MBytes(mb);
// some data flushes from channel into OS buffer and won't be visible here, usually 4-8Mb
var osBufferOffset = MBytes(10);

// incrementally read data on server side and ensure client side buffer drains accordingly
for (int readBytes = 0; readBytes <= payloadSize; readBytes += partSize) {
var minBufSize = Math.max(payloadSize - readBytes - osBufferOffset, 0);
var maxBufSize = Math.max(payloadSize - readBytes, 0);
// it is hard to tell that client's channel is no logger flushing data
// it might take a few busy-iterations before channel buffer flush to kernel
// it might take a few busy-iterations before channel buffer flush to OS
// and bytesBeforeWritable will stop changing
assertBusy(() -> {
var bufSize = ctx.clientChannel.bytesBeforeWritable();
Expand All @@ -239,7 +245,7 @@ public void testClientBackpressure() throws Exception {
bufSize >= minBufSize && bufSize <= maxBufSize
);
});
handler.readBytes(MBytes(10));
handler.readBytes(partSize);
}
assertTrue(handler.stream.hasLast());
}
Expand Down Expand Up @@ -350,6 +356,107 @@ public void testBadRequestReleaseQueuedChunks() throws Exception {
}
}

private static long transportStatsRequestBytesSize(Ctx ctx) {
var httpTransport = internalCluster().getInstance(HttpServerTransport.class, ctx.nodeName);
var stats = httpTransport.stats().clientStats();
var bytes = 0L;
for (var s : stats) {
bytes += s.requestSizeBytes();
}
return bytes;
}

/**
* ensures that {@link org.elasticsearch.http.HttpClientStatsTracker} counts streamed content bytes
*/
public void testHttpClientStats() throws Exception {
try (var ctx = setupClientCtx()) {
// need to offset starting point, since we reuse cluster and other tests already sent some data
var totalBytesSent = transportStatsRequestBytesSize(ctx);

for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var contentSize = randomIntBetween(0, maxContentLength());
totalBytesSent += contentSize;
ctx.clientChannel.writeAndFlush(httpRequest(id, contentSize));
ctx.clientChannel.writeAndFlush(randomContent(contentSize, true));
var handler = ctx.awaitRestChannelAccepted(id);
handler.readAllBytes();
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
assertEquals(totalBytesSent, transportStatsRequestBytesSize(ctx));
}
}
}

/**
* ensures that we log parts of http body and final line
*/
@TestLogging(
reason = "testing TRACE logging",
value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE"
)
public void testHttpBodyLogging() throws Exception {
assertHttpBodyLogging((ctx) -> () -> {
try {
var req = fullHttpRequest(opaqueId(0), randomByteBuf(8 * 1024));
ctx.clientChannel.writeAndFlush(req);
var handler = ctx.awaitRestChannelAccepted(opaqueId(0));
handler.readAllBytes();
} catch (Exception e) {
fail(e);
}
});
}

/**
* ensures that we log some parts of body and final line when connection is closed in the middle
*/
@TestLogging(
reason = "testing TRACE logging",
value = "org.elasticsearch.http.HttpTracer:TRACE,org.elasticsearch.http.HttpBodyTracer:TRACE"
)
public void testHttpBodyLoggingChannelClose() throws Exception {
assertHttpBodyLogging((ctx) -> () -> {
try {
var req = httpRequest(opaqueId(0), 2 * 8192);
var halfContent = randomContent(8192, false);
ctx.clientChannel.writeAndFlush(req);
ctx.clientChannel.writeAndFlush(halfContent);
var handler = ctx.awaitRestChannelAccepted(opaqueId(0));
handler.readBytes(8192);
ctx.clientChannel.close();
handler.stream.next();
assertBusy(() -> assertTrue(handler.streamClosed));
} catch (Exception e) {
fail(e);
}
});
}

// asserts that we emit at least one logging event for a part and last line
// http body should be large enough to split across multiple lines, > 4kb
private void assertHttpBodyLogging(Function<Ctx, Runnable> test) throws Exception {
try (var ctx = setupClientCtx()) {
MockLog.assertThatLogger(
test.apply(ctx),
HttpBodyTracer.class,
new MockLog.SeenEventExpectation(
"request part",
HttpBodyTracer.class.getCanonicalName(),
Level.TRACE,
"* request body [part *]*"
),
new MockLog.SeenEventExpectation(
"request end",
HttpBodyTracer.class.getCanonicalName(),
Level.TRACE,
"* request body (gzip compressed, base64-encoded, and split into * parts on preceding log lines; for details see "
+ "https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html#http-rest-request-tracer)"
)
);
}
}

private int maxContentLength() {
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
}
Expand Down Expand Up @@ -402,6 +509,10 @@ static HttpContent randomContent(int size, boolean isLast) {
}
}

static ByteBuf randomByteBuf(int size) {
return Unpooled.wrappedBuffer(randomByteArrayOfLength(size));
}

Ctx setupClientCtx() throws Exception {
var nodeName = internalCluster().getRandomNodeName();
var clientRespQueue = new LinkedBlockingDeque<>(16);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.FutureListener;

import org.elasticsearch.core.Releasables;
import org.elasticsearch.http.HttpBody;
import org.elasticsearch.transport.netty4.Netty4Utils;

import java.util.ArrayList;
import java.util.List;

/**
* Netty based implementation of {@link HttpBody.Stream}.
* This implementation utilize {@link io.netty.channel.ChannelConfig#setAutoRead(boolean)}
Expand All @@ -28,6 +32,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {

private final Channel channel;
private final FutureListener<Void> closeListener = future -> doClose();
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
private ByteBuf buf;
private boolean hasLast = false;
private boolean requested = false;
Expand All @@ -50,6 +55,12 @@ public void setHandler(ChunkHandler chunkHandler) {
this.handler = chunkHandler;
}

@Override
public void addTracingHandler(ChunkHandler chunkHandler) {
assert tracingHandlers.contains(chunkHandler) == false;
tracingHandlers.add(chunkHandler);
}

@Override
public void next() {
assert closing == false : "cannot request next chunk on closing stream";
Expand Down Expand Up @@ -117,6 +128,9 @@ private void send() {
var bytesRef = Netty4Utils.toReleasableBytesReference(buf);
requested = false;
buf = null;
for (var tracer : tracingHandlers) {
tracer.onNext(bytesRef, hasLast);
}
handler.onNext(bytesRef, hasLast);
}

Expand All @@ -131,6 +145,9 @@ public void close() {

private void doClose() {
closing = true;
for (var tracer : tracingHandlers) {
Releasables.closeExpectNoException(tracer);
}
if (handler != null) {
handler.close();
}
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/elasticsearch/http/HttpBody.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ non-sealed interface Stream extends HttpBody {
@Nullable
ChunkHandler handler();

/**
* Adds tracing chunk handler. Tracing handler will be invoked before main handler, and
* should never release or call for next chunk. It should be used for monitoring and
* logging purposes.
*/
void addTracingHandler(ChunkHandler chunkHandler);

/**
* Sets handler that can handle next chunk
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ synchronized void update(HttpRequest httpRequest, HttpChannel httpChannel, long
requestCount += 1;
if (httpRequest.body().isFull()) {
requestSizeBytes += httpRequest.body().asFull().bytes().length();
} else {
httpRequest.body().asStream().addTracingHandler((chunk, last) -> requestSizeBytes += chunk.length());
}
}

Expand Down
57 changes: 53 additions & 4 deletions server/src/main/java/org/elasticsearch/http/HttpTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
Expand All @@ -20,6 +21,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;

Expand Down Expand Up @@ -77,10 +79,10 @@ HttpTracer maybeLogRequest(RestRequest restRequest, @Nullable Exception e) {
e
);
if (isBodyTracerEnabled()) {
try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) {
restRequest.content().writeTo(stream);
} catch (Exception e2) {
assert false : e2; // no real IO here
if (restRequest.isFullContent()) {
logFullContent(restRequest);
} else {
logStreamContent(restRequest);
}
}

Expand All @@ -89,6 +91,53 @@ HttpTracer maybeLogRequest(RestRequest restRequest, @Nullable Exception e) {
return null;
}

private void logFullContent(RestRequest restRequest) {
try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) {
restRequest.content().writeTo(stream);
} catch (Exception e2) {
assert false : e2; // no real IO here
}
}

private void logStreamContent(RestRequest restRequest) {
restRequest.contentStream().addTracingHandler(new LoggingChunkHandler(restRequest));
}

private static class LoggingChunkHandler implements HttpBody.ChunkHandler {
private final OutputStream stream;
private volatile boolean closed = false;

LoggingChunkHandler(RestRequest request) {
stream = HttpBodyTracer.getBodyOutputStream(request.getRequestId(), HttpBodyTracer.Type.REQUEST);
}

@Override
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
try {
chunk.writeTo(stream);
} catch (IOException e) {
assert false : e; // no real IO
} finally {
if (isLast) {
this.close();
}
}
}

@Override
public void close() {
if (closed) {
return;
}
try {
closed = true;
stream.close();
} catch (IOException e) {
assert false : e; // no real IO
}
}
}

boolean isBodyTracerEnabled() {
return HttpBodyTracer.isEnabled();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,9 @@ public ChunkHandler handler() {
return null;
}

@Override
public void addTracingHandler(ChunkHandler chunkHandler) {}

@Override
public void setHandler(ChunkHandler chunkHandler) {}

Expand Down