Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/119564.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 119564
summary: Http stream activity tracker and exceptions handling
area: Network
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ record Ctx(String testName, String nodeName, Bootstrap clientBootstrap, Channel
@Override
public void close() throws Exception {
safeGet(clientChannel.close());
safeGet(clientBootstrap.config().group().shutdownGracefully());
safeGet(clientBootstrap.config().group().shutdownGracefully(0, 0, TimeUnit.SECONDS));
clientRespQueue.forEach(o -> { if (o instanceof FullHttpResponse resp) resp.release(); });
for (var opaqueId : ControlServerRequestPlugin.handlers.keySet()) {
if (opaqueId.startsWith(testName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
} else {
var contentStream = new Netty4HttpRequestBodyStream(
ctx.channel(),
serverTransport.getThreadPool().getThreadContext()
serverTransport.getThreadPool().getThreadContext(),
activityTracker
);
currentRequestStream = contentStream;
netty4HttpRequest = new Netty4HttpRequest(readSequence++, request, contentStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;

import org.elasticsearch.common.network.ThreadWatchdog;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.http.HttpBody;
Expand All @@ -36,6 +37,7 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
private final ChannelFutureListener closeListener = future -> doClose();
private final List<ChunkHandler> tracingHandlers = new ArrayList<>(4);
private final ThreadContext threadContext;
private final ThreadWatchdog.ActivityTracker activityTracker;
private ByteBuf buf;
private boolean requested = false;
private boolean closing = false;
Expand All @@ -46,10 +48,11 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
private volatile int bufSize = 0;
private volatile boolean hasLast = false;

public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext) {
public Netty4HttpRequestBodyStream(Channel channel, ThreadContext threadContext, ThreadWatchdog.ActivityTracker activityTracker) {
this.channel = channel;
this.threadContext = threadContext;
this.requestContext = threadContext.newStoredContext();
this.activityTracker = activityTracker;
Netty4Utils.addListener(channel.closeFuture(), closeListener);
channel.config().setAutoRead(false);
}
Expand All @@ -76,15 +79,18 @@ public void next() {
assert handler != null : "handler must be set before requesting next chunk";
requestContext = threadContext.newStoredContext();
channel.eventLoop().submit(() -> {
activityTracker.startActivity();
requested = true;
if (buf == null) {
channel.read();
} else {
try {
try {
if (buf == null) {
channel.read();
} else {
send();
} catch (Exception e) {
channel.pipeline().fireExceptionCaught(e);
}
} catch (Throwable e) {
channel.pipeline().fireExceptionCaught(e);
} finally {
activityTracker.stopActivity();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.DefaultHttpContent;
Expand All @@ -19,6 +21,7 @@
import io.netty.handler.flow.FlowControlHandler;

import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.network.ThreadWatchdog;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.HttpBody;
Expand All @@ -27,6 +30,8 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -35,17 +40,18 @@

public class Netty4HttpRequestBodyStreamTests extends ESTestCase {

static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close();
private final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
private EmbeddedChannel channel;
private Netty4HttpRequestBodyStream stream;
static HttpBody.ChunkHandler discardHandler = (chunk, isLast) -> chunk.close();
private ThreadWatchdog.ActivityTracker activityTracker;

@Override
public void setUp() throws Exception {
super.setUp();
channel = new EmbeddedChannel();
threadContext.putHeader("header1", "value1");
stream = new Netty4HttpRequestBodyStream(channel, threadContext);
activityTracker = new ThreadWatchdog.ActivityTracker();
stream = new Netty4HttpRequestBodyStream(channel, threadContext, activityTracker);
stream.setHandler(discardHandler); // set default handler, each test might override one
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
@Override
Expand Down Expand Up @@ -128,57 +134,112 @@ public void testReadFromChannel() {
}

public void testReadFromHasCorrectThreadContext() throws InterruptedException {
var gotLast = new AtomicBoolean(false);
AtomicReference<Map<String, String>> headers = new AtomicReference<>();
stream.setHandler(new HttpBody.ChunkHandler() {
@Override
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
headers.set(threadContext.getHeaders());
gotLast.set(isLast);
chunk.close();
}

@Override
public void close() {
headers.set(threadContext.getHeaders());
}
});
channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read()
var eventLoop = new DefaultEventLoop();
var gotLast = new AtomicBoolean(false);
var chunkSize = 1024;
threadContext.putHeader("header1", "value1");
try {
// activity tracker requires stream execution in the same thread, setting up stream inside event-loop
eventLoop.submit(() -> {
channel = new EmbeddedChannel();
stream = new Netty4HttpRequestBodyStream(channel, threadContext, new ThreadWatchdog.ActivityTracker());
channel.pipeline().addLast(new SimpleChannelInboundHandler<HttpContent>(false) {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpContent msg) {
stream.handleNettyContent(msg);
}
});
stream.setHandler(new HttpBody.ChunkHandler() {
@Override
public void onNext(ReleasableBytesReference chunk, boolean isLast) {
headers.set(threadContext.getHeaders());
gotLast.set(isLast);
chunk.close();
}

@Override
public void close() {
headers.set(threadContext.getHeaders());
}
});
channel.pipeline().addFirst(new FlowControlHandler()); // block all incoming messages, need explicit channel.read()
}).await();

channel.writeInbound(randomContent(chunkSize));
channel.writeInbound(randomLastContent(chunkSize));
channel.writeInbound(randomContent(chunkSize));
channel.writeInbound(randomLastContent(chunkSize));

threadContext.putHeader("header2", "value2");
stream.next();
threadContext.putHeader("header2", "value2");
stream.next();

Thread thread = new Thread(() -> channel.runPendingTasks());
thread.start();
thread.join();
eventLoop.submit(() -> channel.runPendingTasks()).await();
assertThat(headers.get(), hasEntry("header1", "value1"));
assertThat(headers.get(), hasEntry("header2", "value2"));

assertThat(headers.get(), hasEntry("header1", "value1"));
assertThat(headers.get(), hasEntry("header2", "value2"));
threadContext.putHeader("header3", "value3");
stream.next();

threadContext.putHeader("header3", "value3");
stream.next();
eventLoop.submit(() -> channel.runPendingTasks()).await();
assertThat(headers.get(), hasEntry("header1", "value1"));
assertThat(headers.get(), hasEntry("header2", "value2"));
assertThat(headers.get(), hasEntry("header3", "value3"));

thread = new Thread(() -> channel.runPendingTasks());
thread.start();
thread.join();
assertTrue("should receive last content", gotLast.get());

assertThat(headers.get(), hasEntry("header1", "value1"));
assertThat(headers.get(), hasEntry("header2", "value2"));
assertThat(headers.get(), hasEntry("header3", "value3"));
headers.set(new HashMap<>());

assertTrue("should receive last content", gotLast.get());
stream.close();

assertThat(headers.get(), hasEntry("header1", "value1"));
assertThat(headers.get(), hasEntry("header2", "value2"));
assertThat(headers.get(), hasEntry("header3", "value3"));
} finally {
eventLoop.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
}

headers.set(new HashMap<>());
public void testStreamNextActivityTracker() {
var t0 = activityTracker.get();
var N = between(1, 10);
for (int i = 0; i < N; i++) {
channel.writeInbound(randomContent(1024));
stream.next();
channel.runPendingTasks();
}
var t1 = activityTracker.get();
assertEquals("stream#next() must trigger activity tracker: N*step=" + N + "*2=" + N * 2L + " times", t1, t0 + N * 2L);
}

stream.close();
// ensure that we catch all exceptions and throw them into channel pipeline
public void testCatchExceptions() {
var gotExceptions = new CountDownLatch(3); // number of tests below

channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
gotExceptions.countDown();
}
});

// catch exception for not buffered chunk, will be thrown on channel.fireChannelRead()
stream.setHandler((a, b) -> { throw new RuntimeException(); });
stream.next();
channel.runPendingTasks();
channel.writeInbound(randomContent(1));

// catch exception for buffered chunk, will be thrown from eventLoop.submit()
channel.writeInbound(randomContent(1));
stream.next();
channel.runPendingTasks();

// should catch OOM exceptions too, see DieWithDignity
// swallowing exceptions can result in dangling streams, hanging channels, and delayed shutdowns
stream.setHandler((a, b) -> { throw new OutOfMemoryError(); });
channel.writeInbound(randomContent(1));
stream.next();
channel.runPendingTasks();

assertThat(headers.get(), hasEntry("header1", "value1"));
assertThat(headers.get(), hasEntry("header2", "value2"));
assertThat(headers.get(), hasEntry("header3", "value3"));
safeAwait(gotExceptions);
}

HttpContent randomContent(int size, boolean isLast) {
Expand Down