From b0456382e5ff770cbd4829e8349d57ffbed7768b Mon Sep 17 00:00:00 2001
From: Mikhail Berezovskiy
Date: Fri, 23 Aug 2024 22:05:46 -0700
Subject: [PATCH 1/3] handle 100-continue and oversized streaming request
---
.../Netty4IncrementalRequestHandlingIT.java | 93 ++++++++++++++++--
.../http/netty4/Netty4HttpAggregator.java | 94 +++++++++++++++++--
2 files changed, 175 insertions(+), 12 deletions(-)
diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java
index 4de7ca97ed51b..c5b156f107fb0 100644
--- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java
+++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java
@@ -10,6 +10,7 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@@ -24,11 +25,16 @@
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.stream.ChunkedStream;
+import io.netty.handler.stream.ChunkedWriteHandler;
import org.elasticsearch.ESNetty4IntegTestCase;
import org.elasticsearch.action.support.SubscribableListener;
@@ -43,6 +49,7 @@
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.features.NodeFeature;
+import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
@@ -61,9 +68,7 @@
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
@@ -218,6 +223,86 @@ public void testClientBackpressure() throws Exception {
}
}
+ // ensures that server reply 100-continue on acceptable request size
+ public void test100Continue() throws Exception {
+ try (var ctx = setupClientCtx()) {
+ for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
+ var id = opaqueId(reqNo);
+ var acceptableContentLength = randomIntBetween(0, maxContentLength());
+
+ // send request header and await 100-continue
+ var req = httpRequest(id, acceptableContentLength);
+ HttpUtil.set100ContinueExpected(req, true);
+ ctx.clientChannel.writeAndFlush(req);
+ var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
+ assertEquals(HttpResponseStatus.CONTINUE, resp.status());
+ resp.release();
+
+ // send content
+ var content = randomContent(acceptableContentLength, true);
+ ctx.clientChannel.writeAndFlush(content);
+
+ // consume content and reply 200
+ var handler = ctx.awaitRestChannelAccepted(id);
+ handler.consumeBytes(acceptableContentLength);
+ handler.sendResponse(new RestResponse(RestStatus.OK, ""));
+
+ resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
+ assertEquals(HttpResponseStatus.OK, resp.status());
+ resp.release();
+ }
+ }
+ }
+
+ // ensures that server reply 413-too-large on oversized request with expect-100-continue
+ public void test413TooLargeOnExpect100Continue() throws Exception {
+ try (var ctx = setupClientCtx()) {
+ for (int reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
+ var id = opaqueId(reqNo);
+ var oversized = maxContentLength() + 1;
+
+ // send request header and await 413 too large
+ var req = httpRequest(id, oversized);
+ HttpUtil.set100ContinueExpected(req, true);
+ ctx.clientChannel.writeAndFlush(req);
+ var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
+ assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
+ resp.release();
+
+ // terminate request
+ ctx.clientChannel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ }
+ }
+ }
+
+ // ensures that server reply 413-too-large on oversized chunked encoding request and closes connection
+ public void test413TooLargeOnChunkedEncoding() throws Exception {
+ try (var ctx = setupClientCtx()) {
+ var contentSize = maxContentLength() + 1;
+ var content = randomByteArrayOfLength(contentSize);
+ var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
+ var chunkedIs = new ChunkedStream(is);
+ var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
+ var req = httpRequest(opaqueId(0), 0);
+ HttpUtil.setTransferEncodingChunked(req, true);
+
+ ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
+ ctx.clientChannel.writeAndFlush(req);
+ ctx.clientChannel.writeAndFlush(httpChunkedIs);
+ var handler = ctx.awaitRestChannelAccepted(opaqueId(0));
+ handler.stream.channel().config().setAutoRead(true);
+
+ var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
+ assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
+ resp.release();
+ ctx.clientChannel.closeFuture().get(10, TimeUnit.SECONDS);
+ }
+ }
+
+ private int maxContentLength() {
+ return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
+ }
+
private String opaqueId(int reqNo) {
return getTestName() + "-" + reqNo;
}
@@ -384,10 +469,6 @@ void consumeBytes(int bytes) {
}
}
- Future> onChannelThread(Callable> task) {
- return this.stream.channel().eventLoop().submit(task);
- }
-
record Chunk(ReleasableBytesReference chunk, boolean isLast) {}
}
diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java
index 16f1c2bbd2e37..76db975a7128f 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java
@@ -8,19 +8,50 @@
package org.elasticsearch.http.netty4;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.EmptyHttpHeaders;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpContent;
+import io.netty.handler.codec.http.HttpMessage;
+import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
import org.elasticsearch.http.HttpPreRequest;
import org.elasticsearch.http.netty4.internal.HttpHeadersAuthenticatorUtils;
import java.util.function.Predicate;
+/**
+ *
+ * A wrapper around {@link HttpObjectAggregator}. Provides optional content aggregation based on
+ * predicate. {@link HttpObjectAggregator} also handles Expect: 100-continue and oversized content.
+ * Unfortunately, Netty does not provide handlers for oversized messages beyond HttpObjectAggregator.
+ * These details are tight to {@link HttpObjectAggregator} and {@link io.netty.handler.codec.MessageAggregator}.
+ *
+ *
+ * This wrapper cherry-pick methods from underlying handlers.
+ * {@link HttpObjectAggregator#newContinueResponse(HttpMessage, int, ChannelPipeline)} provides
+ * handling for Expect: 100-continue. {@link HttpObjectAggregator#handleOversizedMessage(ChannelHandlerContext, HttpMessage)}
+ * provides handling for requests that already in fly and reached limit, for example chunked encoding.
+ *
+ *
+ */
public class Netty4HttpAggregator extends HttpObjectAggregator {
private static final Predicate IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;
private final Predicate decider;
- private boolean shouldAggregate;
+ private boolean aggregating = true;
+ private long currentContentLength = 0;
+ private HttpRequest currentRequest;
+ private boolean handlingOversized = false;
+ private boolean ignoreContentAfterContinueResponse = false;
public Netty4HttpAggregator(int maxContentLength) {
this(maxContentLength, IGNORE_TEST);
@@ -32,15 +63,66 @@ public Netty4HttpAggregator(int maxContentLength, Predicate deci
}
@Override
- public boolean acceptInboundMessage(Object msg) throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ assert msg instanceof HttpObject;
if (msg instanceof HttpRequest request) {
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request);
- shouldAggregate = decider.test(preReq);
+ aggregating = decider.test(preReq);
+ }
+ if (aggregating || msg instanceof FullHttpRequest) {
+ super.channelRead(ctx, msg);
+ } else {
+ handle(ctx, (HttpObject) msg);
}
- if (shouldAggregate) {
- return super.acceptInboundMessage(msg);
+ }
+
+ private void handle(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
+ if (msg instanceof HttpRequest request) {
+ var continueResponse = newContinueResponse(request, maxContentLength(), ctx.pipeline());
+ if (continueResponse != null) {
+ // there are 3 responses expected: 100, 413, 417
+ // on 100 we pass request further and reply to client to continue
+ // on 413/417 we ignore following content
+ ctx.writeAndFlush(continueResponse);
+ var resp = (FullHttpResponse) continueResponse;
+ if (resp.status() != HttpResponseStatus.CONTINUE) {
+ ignoreContentAfterContinueResponse = true;
+ return;
+ }
+ HttpUtil.set100ContinueExpected(request, false);
+ }
+ currentRequest = request;
+ currentContentLength = 0;
+ ignoreContentAfterContinueResponse = false;
+ ctx.fireChannelRead(msg);
+
} else {
- return false;
+ var httpContent = (HttpContent) msg;
+ if (ignoreContentAfterContinueResponse) {
+ httpContent.release();
+ return;
+ }
+ currentContentLength += httpContent.content().readableBytes();
+ if (currentContentLength > maxContentLength()) {
+ if (handlingOversized == false) {
+ handlingOversized = true;
+ // magic: passing full request into handleOversizedMessage will close connection
+ var fullReq = new DefaultFullHttpRequest(
+ currentRequest.protocolVersion(),
+ currentRequest.method(),
+ currentRequest.uri(),
+ Unpooled.EMPTY_BUFFER,
+ currentRequest.headers(),
+ EmptyHttpHeaders.INSTANCE
+ );
+ handleOversizedMessage(ctx, fullReq);
+ }
+ // if we're already handling oversized message connection will be closed soon
+ // we can discard following content
+ httpContent.release();
+ } else {
+ ctx.fireChannelRead(msg);
+ }
}
}
}
From b9c88f36c63bcd55997187403091bfd2cbc9851f Mon Sep 17 00:00:00 2001
From: Mikhail Berezovskiy
Date: Fri, 23 Aug 2024 22:24:30 -0700
Subject: [PATCH 2/3] docs
---
.../java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java | 1 -
1 file changed, 1 deletion(-)
diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java
index 76db975a7128f..eb2b723de32e7 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java
@@ -33,7 +33,6 @@
* A wrapper around {@link HttpObjectAggregator}. Provides optional content aggregation based on
* predicate. {@link HttpObjectAggregator} also handles Expect: 100-continue and oversized content.
* Unfortunately, Netty does not provide handlers for oversized messages beyond HttpObjectAggregator.
- * These details are tight to {@link HttpObjectAggregator} and {@link io.netty.handler.codec.MessageAggregator}.
*
*
* This wrapper cherry-pick methods from underlying handlers.
From 4ec78a3ef52f143ca6d990cfbc777c11c47cc17f Mon Sep 17 00:00:00 2001
From: Mikhail Berezovskiy
Date: Mon, 26 Aug 2024 12:00:10 -0700
Subject: [PATCH 3/3] remove size limit for chunked encoding
---
.../Netty4IncrementalRequestHandlingIT.java | 89 ++++++++++++-------
.../http/netty4/Netty4HttpAggregator.java | 42 +--------
2 files changed, 56 insertions(+), 75 deletions(-)
diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java
index c5b156f107fb0..b3139fd336a70 100644
--- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java
+++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java
@@ -47,10 +47,13 @@
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpServerTransport;
+import org.elasticsearch.http.HttpTransportSettings;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.BaseRestHandler;
@@ -83,6 +86,13 @@
@ESIntegTestCase.ClusterScope(numDataNodes = 1)
public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
+ @Override
+ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+ Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings));
+ builder.put(HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH.getKey(), new ByteSizeValue(50, ByteSizeUnit.MB));
+ return builder.build();
+ }
+
// ensure empty http content has single 0 size chunk
public void testEmptyContent() throws Exception {
try (var ctx = setupClientCtx()) {
@@ -116,7 +126,7 @@ public void testReceiveAllChunks() throws Exception {
var opaqueId = opaqueId(reqNo);
// this dataset will be compared with one on server side
- var dataSize = randomIntBetween(1024, 10 * 1024 * 1024);
+ var dataSize = randomIntBetween(1024, maxContentLength());
var sendData = Unpooled.wrappedBuffer(randomByteArrayOfLength(dataSize));
sendData.retain();
ctx.clientChannel.writeAndFlush(fullHttpRequest(opaqueId, sendData));
@@ -217,7 +227,7 @@ public void testClientBackpressure() throws Exception {
bufSize >= minBufSize && bufSize <= maxBufSize
);
});
- handler.consumeBytes(MBytes(10));
+ handler.readBytes(MBytes(10));
}
assertTrue(handler.stream.hasLast());
}
@@ -244,7 +254,8 @@ public void test100Continue() throws Exception {
// consume content and reply 200
var handler = ctx.awaitRestChannelAccepted(id);
- handler.consumeBytes(acceptableContentLength);
+ var consumed = handler.readAllBytes();
+ assertEquals(acceptableContentLength, consumed);
handler.sendResponse(new RestResponse(RestStatus.OK, ""));
resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
@@ -275,27 +286,32 @@ public void test413TooLargeOnExpect100Continue() throws Exception {
}
}
- // ensures that server reply 413-too-large on oversized chunked encoding request and closes connection
- public void test413TooLargeOnChunkedEncoding() throws Exception {
+ // ensures that oversized chunked encoded request has no limits at http layer
+ // rest handler is responsible for oversized requests
+ public void testOversizedChunkedEncodingNoLimits() throws Exception {
try (var ctx = setupClientCtx()) {
- var contentSize = maxContentLength() + 1;
- var content = randomByteArrayOfLength(contentSize);
- var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
- var chunkedIs = new ChunkedStream(is);
- var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
- var req = httpRequest(opaqueId(0), 0);
- HttpUtil.setTransferEncodingChunked(req, true);
-
- ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
- ctx.clientChannel.writeAndFlush(req);
- ctx.clientChannel.writeAndFlush(httpChunkedIs);
- var handler = ctx.awaitRestChannelAccepted(opaqueId(0));
- handler.stream.channel().config().setAutoRead(true);
+ for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
+ var id = opaqueId(reqNo);
+ var contentSize = maxContentLength() + 1;
+ var content = randomByteArrayOfLength(contentSize);
+ var is = new ByteBufInputStream(Unpooled.wrappedBuffer(content));
+ var chunkedIs = new ChunkedStream(is);
+ var httpChunkedIs = new HttpChunkedInput(chunkedIs, LastHttpContent.EMPTY_LAST_CONTENT);
+ var req = httpRequest(id, 0);
+ HttpUtil.setTransferEncodingChunked(req, true);
+
+ ctx.clientChannel.pipeline().addLast(new ChunkedWriteHandler());
+ ctx.clientChannel.writeAndFlush(req);
+ ctx.clientChannel.writeAndFlush(httpChunkedIs);
+ var handler = ctx.awaitRestChannelAccepted(id);
+ var consumed = handler.readAllBytes();
+ assertEquals(contentSize, consumed);
+ handler.sendResponse(new RestResponse(RestStatus.OK, ""));
- var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
- assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, resp.status());
- resp.release();
- ctx.clientChannel.closeFuture().get(10, TimeUnit.SECONDS);
+ var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
+ assertEquals(HttpResponseStatus.OK, resp.status());
+ resp.release();
+ }
}
}
@@ -453,20 +469,25 @@ void sendResponse(RestResponse response) {
channel.sendResponse(response);
}
- void consumeBytes(int bytes) {
- if (recvLast) {
- return;
- }
- while (bytes > 0) {
- stream.next();
- var recvChunk = safePoll(recvChunks);
- bytes -= recvChunk.chunk.length();
- recvChunk.chunk.close();
- if (recvChunk.isLast) {
- recvLast = true;
- break;
+ int readBytes(int bytes) {
+ var consumed = 0;
+ if (recvLast == false) {
+ while (consumed < bytes) {
+ stream.next();
+ var recvChunk = safePoll(recvChunks);
+ consumed += recvChunk.chunk.length();
+ recvChunk.chunk.close();
+ if (recvChunk.isLast) {
+ recvLast = true;
+ break;
+ }
}
}
+ return consumed;
+ }
+
+ int readAllBytes() {
+ return readBytes(Integer.MAX_VALUE);
}
record Chunk(ReleasableBytesReference chunk, boolean isLast) {}
diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java
index eb2b723de32e7..031e803737ee8 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpAggregator.java
@@ -8,15 +8,10 @@
package org.elasticsearch.http.netty4;
-import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.http.DefaultFullHttpRequest;
-import io.netty.handler.codec.http.EmptyHttpHeaders;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
-import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
@@ -29,27 +24,15 @@
import java.util.function.Predicate;
/**
- *
* A wrapper around {@link HttpObjectAggregator}. Provides optional content aggregation based on
* predicate. {@link HttpObjectAggregator} also handles Expect: 100-continue and oversized content.
* Unfortunately, Netty does not provide handlers for oversized messages beyond HttpObjectAggregator.
- *
- *
- * This wrapper cherry-pick methods from underlying handlers.
- * {@link HttpObjectAggregator#newContinueResponse(HttpMessage, int, ChannelPipeline)} provides
- * handling for Expect: 100-continue. {@link HttpObjectAggregator#handleOversizedMessage(ChannelHandlerContext, HttpMessage)}
- * provides handling for requests that already in fly and reached limit, for example chunked encoding.
- *
- *
*/
public class Netty4HttpAggregator extends HttpObjectAggregator {
private static final Predicate IGNORE_TEST = (req) -> req.uri().startsWith("/_test/request-stream") == false;
private final Predicate decider;
private boolean aggregating = true;
- private long currentContentLength = 0;
- private HttpRequest currentRequest;
- private boolean handlingOversized = false;
private boolean ignoreContentAfterContinueResponse = false;
public Netty4HttpAggregator(int maxContentLength) {
@@ -75,7 +58,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
}
- private void handle(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
+ private void handle(ChannelHandlerContext ctx, HttpObject msg) {
if (msg instanceof HttpRequest request) {
var continueResponse = newContinueResponse(request, maxContentLength(), ctx.pipeline());
if (continueResponse != null) {
@@ -90,35 +73,12 @@ private void handle(ChannelHandlerContext ctx, HttpObject msg) throws Exception
}
HttpUtil.set100ContinueExpected(request, false);
}
- currentRequest = request;
- currentContentLength = 0;
ignoreContentAfterContinueResponse = false;
ctx.fireChannelRead(msg);
-
} else {
var httpContent = (HttpContent) msg;
if (ignoreContentAfterContinueResponse) {
httpContent.release();
- return;
- }
- currentContentLength += httpContent.content().readableBytes();
- if (currentContentLength > maxContentLength()) {
- if (handlingOversized == false) {
- handlingOversized = true;
- // magic: passing full request into handleOversizedMessage will close connection
- var fullReq = new DefaultFullHttpRequest(
- currentRequest.protocolVersion(),
- currentRequest.method(),
- currentRequest.uri(),
- Unpooled.EMPTY_BUFFER,
- currentRequest.headers(),
- EmptyHttpHeaders.INSTANCE
- );
- handleOversizedMessage(ctx, fullReq);
- }
- // if we're already handling oversized message connection will be closed soon
- // we can discard following content
- httpContent.release();
} else {
ctx.fireChannelRead(msg);
}