From 24a9157cf15a057373aff04732c6ef1ce3c71a01 Mon Sep 17 00:00:00 2001 From: Mikhail Berezovskiy Date: Tue, 7 Jan 2025 10:11:53 -0800 Subject: [PATCH] Http stream activity tracker and exceptions handling (#119564) --- docs/changelog/119564.yaml | 5 + .../Netty4IncrementalRequestHandlingIT.java | 2 +- .../netty4/Netty4HttpPipeliningHandler.java | 3 +- .../netty4/Netty4HttpRequestBodyStream.java | 20 ++- .../Netty4HttpRequestBodyStreamTests.java | 143 +++++++++++++----- 5 files changed, 123 insertions(+), 50 deletions(-) create mode 100644 docs/changelog/119564.yaml diff --git a/docs/changelog/119564.yaml b/docs/changelog/119564.yaml new file mode 100644 index 0000000000000..175eff75c8218 --- /dev/null +++ b/docs/changelog/119564.yaml @@ -0,0 +1,5 @@ +pr: 119564 +summary: Http stream activity tracker and exceptions handling +area: Network +type: enhancement +issues: [] diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 4bb27af4bd0f5..ab2fb41d5a22b 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -594,7 +594,7 @@ record Ctx(String testName, String nodeName, Bootstrap clientBootstrap, Channel @Override public void close() throws Exception { safeGet(clientChannel.close()); - safeGet(clientBootstrap.config().group().shutdownGracefully()); + safeGet(clientBootstrap.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS)); clientRespQueue.forEach(o -> { if (o instanceof FullHttpResponse resp) resp.release(); }); for (var opaqueId : ControlServerRequestPlugin.handlers.keySet()) { if (opaqueId.startsWith(testName)) { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java index 1a391a05add58..4809f1a1a275b 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java @@ -139,7 +139,8 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) { } else { var contentStream = new Netty4HttpRequestBodyStream( ctx.channel(), - serverTransport.getThreadPool().getThreadContext() + serverTransport.getThreadPool().getThreadContext(), + activityTracker ); currentRequestStream = contentStream; netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java index ac3e3aecf97b9..0902e707b706e 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStream.java @@ -16,6 +16,7 @@ import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.LastHttpContent; +import org.elasticsearch.common.network.ThreadWatchdog; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Releasables; import org.elasticsearch.http.HttpBody; @@ -36,6 +37,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { private final ChannelFutureListener closeListener = future -> doClose(); private final List tracingHandlers = new ArrayList<>(4); private final ThreadContext threadContext; + private final ThreadWatchdog.ActivityTracker activityTracker; private ByteBuf buf; private boolean requested = false; private boolean closing = false; @@ -46,10 +48,11 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream { private volatile int bufSize = 0; private volatile boolean hasLast = false; - public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext) { + public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext, ThreadWatchdog.ActivityTracker activityTracker) { this.channel = channel; this.threadContext = threadContext; this.requestContext = threadContext.newStoredContext(); + this.activityTracker = activityTracker; Netty4Utils.addListener(channel.closeFuture(), closeListener); channel.config().setAutoRead(false); } @@ -76,15 +79,18 @@ public void next() { assert handler != null : "handler must be set before requesting next chunk"; requestContext = threadContext.newStoredContext(); channel.eventLoop().submit(() -> { + activityTracker.startActivity(); requested = true; - if (buf == null) { - channel.read(); - } else { - try { + try { + if (buf == null) { + channel.read(); + } else { send(); - } catch (Exception e) { - channel.pipeline().fireExceptionCaught(e); } + } catch (Throwable e) { + channel.pipeline().fireExceptionCaught(e); + } finally { + activityTracker.stopActivity(); } }); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java index d456bbecfbd20..7492737d4f877 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpRequestBodyStreamTests.java @@ -11,6 +11,8 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.DefaultEventLoop; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.DefaultHttpContent; @@ -19,6 +21,7 @@ import io.netty.handler.flow.FlowControlHandler; import org.elasticsearch.common.bytes.ReleasableBytesReference; +import org.elasticsearch.common.network.ThreadWatchdog; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.http.HttpBody; @@ -27,6 +30,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -35,17 +40,18 @@ public class Netty4HttpRequestBodyStreamTests extends ESTestCase { + static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close(); private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); private EmbeddedChannel channel; private Netty4HttpRequestBodyStream stream; - static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close(); + private ThreadWatchdog.ActivityTracker activityTracker; @Override public void setUp() throws Exception { super.setUp(); channel = new EmbeddedChannel(); - threadContext.putHeader("header1", "value1"); - stream = new Netty4HttpRequestBodyStream(channel, threadContext); + activityTracker = new ThreadWatchdog.ActivityTracker(); + stream = new Netty4HttpRequestBodyStream(channel, threadContext, activityTracker); stream.setHandler(discardHandler); // set default handler, each test might override one channel.pipeline().addLast(new SimpleChannelInboundHandler(false) { @Override @@ -128,57 +134,112 @@ public void testReadFromChannel() { } public void testReadFromHasCorrectThreadContext() throws InterruptedException { - var gotLast = new AtomicBoolean(false); AtomicReference> headers = new AtomicReference<>(); - stream.setHandler(new HttpBody.ChunkHandler() { - @Override - public void onNext(ReleasableBytesReference chunk, boolean isLast) { - headers.set(threadContext.getHeaders()); - gotLast.set(isLast); - chunk.close(); - } - - @Override - public void close() { - headers.set(threadContext.getHeaders()); - } - }); - channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read() + var eventLoop = new DefaultEventLoop(); + var gotLast = new AtomicBoolean(false); var chunkSize = 1024; + threadContext.putHeader("header1", "value1"); + try { + // activity tracker requires stream execution in the same thread, setting up stream inside event-loop + eventLoop.submit(() -> { + channel = new EmbeddedChannel(); + stream = new Netty4HttpRequestBodyStream(channel, threadContext, new ThreadWatchdog.ActivityTracker()); + channel.pipeline().addLast(new SimpleChannelInboundHandler(false) { + @Override + protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) { + stream.handleNettyContent(msg); + } + }); + stream.setHandler(new HttpBody.ChunkHandler() { + @Override + public void onNext(ReleasableBytesReference chunk, boolean isLast) { + headers.set(threadContext.getHeaders()); + gotLast.set(isLast); + chunk.close(); + } + + @Override + public void close() { + headers.set(threadContext.getHeaders()); + } + }); + channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read() + }).await(); - channel.writeInbound(randomContent(chunkSize)); - channel.writeInbound(randomLastContent(chunkSize)); + channel.writeInbound(randomContent(chunkSize)); + channel.writeInbound(randomLastContent(chunkSize)); - threadContext.putHeader("header2", "value2"); - stream.next(); + threadContext.putHeader("header2", "value2"); + stream.next(); - Thread thread = new Thread(() -> channel.runPendingTasks()); - thread.start(); - thread.join(); + eventLoop.submit(() -> channel.runPendingTasks()).await(); + assertThat(headers.get(), hasEntry("header1", "value1")); + assertThat(headers.get(), hasEntry("header2", "value2")); - assertThat(headers.get(), hasEntry("header1", "value1")); - assertThat(headers.get(), hasEntry("header2", "value2")); + threadContext.putHeader("header3", "value3"); + stream.next(); - threadContext.putHeader("header3", "value3"); - stream.next(); + eventLoop.submit(() -> channel.runPendingTasks()).await(); + assertThat(headers.get(), hasEntry("header1", "value1")); + assertThat(headers.get(), hasEntry("header2", "value2")); + assertThat(headers.get(), hasEntry("header3", "value3")); - thread = new Thread(() -> channel.runPendingTasks()); - thread.start(); - thread.join(); + assertTrue("should receive last content", gotLast.get()); - assertThat(headers.get(), hasEntry("header1", "value1")); - assertThat(headers.get(), hasEntry("header2", "value2")); - assertThat(headers.get(), hasEntry("header3", "value3")); + headers.set(new HashMap<>()); - assertTrue("should receive last content", gotLast.get()); + stream.close(); + + assertThat(headers.get(), hasEntry("header1", "value1")); + assertThat(headers.get(), hasEntry("header2", "value2")); + assertThat(headers.get(), hasEntry("header3", "value3")); + } finally { + eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS); + } + } - headers.set(new HashMap<>()); + public void testStreamNextActivityTracker() { + var t0 = activityTracker.get(); + var N = between(1, 10); + for (int i = 0; i < N; i++) { + channel.writeInbound(randomContent(1024)); + stream.next(); + channel.runPendingTasks(); + } + var t1 = activityTracker.get(); + assertEquals("stream#next() must trigger activity tracker: N*step=" + N + "*2=" + N * 2L + " times", t1, t0 + N * 2L); + } - stream.close(); + // ensure that we catch all exceptions and throw them into channel pipeline + public void testCatchExceptions() { + var gotExceptions = new CountDownLatch(3); // number of tests below + + channel.pipeline().addLast(new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + gotExceptions.countDown(); + } + }); + + // catch exception for not buffered chunk, will be thrown on channel.fireChannelRead() + stream.setHandler((a, b) -> { throw new RuntimeException(); }); + stream.next(); + channel.runPendingTasks(); + channel.writeInbound(randomContent(1)); + + // catch exception for buffered chunk, will be thrown from eventLoop.submit() + channel.writeInbound(randomContent(1)); + stream.next(); + channel.runPendingTasks(); + + // should catch OOM exceptions too, see DieWithDignity + // swallowing exceptions can result in dangling streams, hanging channels, and delayed shutdowns + stream.setHandler((a, b) -> { throw new OutOfMemoryError(); }); + channel.writeInbound(randomContent(1)); + stream.next(); + channel.runPendingTasks(); - assertThat(headers.get(), hasEntry("header1", "value1")); - assertThat(headers.get(), hasEntry("header2", "value2")); - assertThat(headers.get(), hasEntry("header3", "value3")); + safeAwait(gotExceptions); } HttpContent randomContent(int size, boolean isLast) {