Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/133775.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 133775
summary: Remove Transfer-Encoding from HTTP request with no content
area: Network
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.InputStream;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
Expand Down Expand Up @@ -392,6 +393,23 @@ public void testOversizedChunkedEncoding() throws Exception {
}
}

public void testEmptyChunkedEncoding() throws Exception {
try (var clientContext = newClientContext()) {
var opaqueId = clientContext.newOpaqueId();
final var emptyStream = new HttpChunkedInput(new ChunkedStream(InputStream.nullInputStream()));
final var request = httpRequest(opaqueId, 0);
HttpUtil.setTransferEncodingChunked(request, true);
clientContext.channel().pipeline().addLast(new ChunkedWriteHandler());
clientContext.channel().writeAndFlush(request);
clientContext.channel().writeAndFlush(emptyStream);

var handler = clientContext.awaitRestChannelAccepted(opaqueId);
var restRequest = handler.restRequest;
assertFalse(restRequest.hasContent());
assertNull(restRequest.header("Transfer-Encoding"));
}
}

// ensures that we don't leak buffers in stream on 400-bad-request
// some bad requests are dispatched from rest-controller before reaching rest handler
// test relies on netty's buffer leak detection
Expand Down Expand Up @@ -733,15 +751,17 @@ Channel channel() {
static class ServerRequestHandler implements BaseRestHandler.RequestBodyChunkConsumer {
final SubscribableListener<Void> channelAccepted = new SubscribableListener<>();
final String opaqueId;
final RestRequest restRequest;
private final AtomicReference<ActionListener<Chunk>> nextChunkListenerRef = new AtomicReference<>();
final Netty4HttpRequestBodyStream stream;
RestChannel channel;
boolean receivedLastChunk = false;
final CountDownLatch closedLatch = new CountDownLatch(1);
volatile boolean shouldThrowInsideHandleChunk = false;

ServerRequestHandler(String opaqueId, Netty4HttpRequestBodyStream stream) {
ServerRequestHandler(String opaqueId, RestRequest restRequest, Netty4HttpRequestBodyStream stream) {
this.opaqueId = opaqueId;
this.restRequest = restRequest;
this.stream = stream;
}

Expand Down Expand Up @@ -934,7 +954,7 @@ public List<Route> routes() {
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
var stream = (Netty4HttpRequestBodyStream) request.contentStream();
var opaqueId = request.getHeaders().get(Task.X_OPAQUE_ID_HTTP_HEADER).get(0);
var handler = new ServerRequestHandler(opaqueId, stream);
var handler = new ServerRequestHandler(opaqueId, request, stream);
handlersByOpaqueId.getHandlerFor(opaqueId).onResponse(handler);
return handler;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.http.netty4;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;

public class Netty4EmptyChunkHandler extends ChannelInboundHandlerAdapter {

private HttpRequest currentRequest;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
switch (msg) {
case HttpRequest request -> {
if (request.decoderResult().isSuccess() && HttpUtil.isTransferEncodingChunked(request)) {
currentRequest = request;
ctx.read();
} else {
currentRequest = null;
ctx.fireChannelRead(request);
}
}
case HttpContent content -> {
if (currentRequest != null) {
if (content instanceof LastHttpContent && content.content().readableBytes() == 0) {
HttpUtil.setTransferEncodingChunked(currentRequest, false);
}
ctx.fireChannelRead(currentRequest);
ctx.fireChannelRead(content);
currentRequest = null;
} else {
ctx.fireChannelRead(content);
}
}
default -> ctx.fireChannelRead(msg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ protected Result beginEncode(HttpResponse httpResponse, String acceptEncoding) t
if (ResourceLeakDetector.isEnabled()) {
ch.pipeline().addLast(new Netty4LeakDetectionHandler());
}
ch.pipeline().addLast(new Netty4EmptyChunkHandler());
// See https://github.com/netty/netty/issues/15053: the combination of FlowControlHandler and HttpContentDecompressor above
// can emit multiple chunks per read, but HttpBody.Stream requires chunks to arrive one-at-a-time so until that issue is
// resolved we must add another flow controller here:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.http.netty4;

import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;

import org.elasticsearch.test.ESTestCase;

public class Netty4EmptyChunkHandlerTests extends ESTestCase {

private EmbeddedChannel channel;

@Override
public void setUp() throws Exception {
super.setUp();
channel = new EmbeddedChannel(new Netty4EmptyChunkHandler());
channel.config().setAutoRead(false);
}

public void testNonChunkedPassthrough() {
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
channel.writeInbound(req, content);
assertEquals(req, channel.readInbound());
assertEquals(content, channel.readInbound());
}

public void testDecodingFailurePassthrough() {
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
HttpUtil.setTransferEncodingChunked(req, true);
req.setDecoderResult(DecoderResult.failure(new Exception()));
channel.writeInbound(req);
var recvReq = (HttpRequest) channel.readInbound();
assertTrue(recvReq.decoderResult().isFailure());
assertTrue(HttpUtil.isTransferEncodingChunked(recvReq));
}

public void testHoldChunkedRequest() {
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
HttpUtil.setTransferEncodingChunked(req, true);
var readSniffer = new ReadSniffer();
channel.pipeline().addFirst(readSniffer);
channel.writeInbound(req);
assertNull("should hold on HTTP request until first chunk arrives", channel.readInbound());
assertEquals("must read first chunk when holding request", 1, readSniffer.readCount);
}

public void testRemoveEncodingFromEmpty() {
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
HttpUtil.setTransferEncodingChunked(req, true);
var content = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
channel.writeInbound(req, content);
var recvReq = channel.readInbound();
assertEquals(req, recvReq);
assertEquals(content, channel.readInbound());
assertFalse("should remove Transfer-Encoding from empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq));
}

public void testKeepEncodingForNonEmpty() {
var req = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");
HttpUtil.setTransferEncodingChunked(req, true);
var content = new DefaultLastHttpContent(Unpooled.wrappedBuffer(randomByteArrayOfLength(between(1, 1024))));
channel.writeInbound(req, content);
var recvReq = channel.readInbound();
assertEquals(req, recvReq);
assertEquals(content, channel.readInbound());
assertTrue("should keep Transfer-Encoding for non-empty content", HttpUtil.isTransferEncodingChunked((HttpMessage) recvReq));
}

public void testRandomizedChannelReuse() {
for (int i = 0; i < 1000; i++) {
switch (between(0, 3)) {
case 0 -> testNonChunkedPassthrough();
case 1 -> testKeepEncodingForNonEmpty();
case 2 -> testDecodingFailurePassthrough();
default -> testRemoveEncodingFromEmpty();
}
}
}
}
Loading