Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,29 @@ public void testOversizedChunkedEncodingNoLimits() throws Exception {
}
}

// ensures that we dont leak buffers in stream on 400-bad-request
// some bad requests are dispatched from rest-controller before reaching rest handler
// test relies on netty's buffer leak detection
public void testBadRequestReleaseQueuedChunks() throws Exception {
try (var ctx = setupClientCtx()) {
for (var reqNo = 0; reqNo < randomIntBetween(2, 10); reqNo++) {
var id = opaqueId(reqNo);
var contentSize = randomIntBetween(0, maxContentLength());
var req = httpRequest(id, contentSize);
var content = randomContent(contentSize, true);

// set unacceptable content-type
req.headers().set(CONTENT_TYPE, "unknown");
ctx.clientChannel.writeAndFlush(req);
ctx.clientChannel.writeAndFlush(content);

var resp = (FullHttpResponse) safePoll(ctx.clientRespQueue);
assertEquals(HttpResponseStatus.BAD_REQUEST, resp.status());
resp.release();
}
}
}

private int maxContentLength() {
return HttpHandlingSettings.fromSettings(internalCluster().getInstance(Settings.class)).maxContentLength();
}
Expand Down Expand Up @@ -513,6 +536,11 @@ public Collection<RestHandler> getRestHandlers(
Predicate<NodeFeature> clusterSupportsFeature
) {
return List.of(new BaseRestHandler() {
@Override
public boolean allowsUnsafeBuffers() {
return true;
}

@Override
public String getName() {
return ROUTE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class Netty4HttpRequest implements HttpRequest {
EmptyHttpHeaders.INSTANCE
),
new AtomicBoolean(false),
false,
true,
contentStream,
null
);
Expand Down Expand Up @@ -115,6 +115,7 @@ public HttpBody body() {
public void release() {
if (pooled && released.compareAndSet(false, true)) {
request.release();
content.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ public class Netty4HttpRequestBodyStream implements HttpBody.Stream {
private final Queue<HttpContent> chunkQueue = new ArrayDeque<>();
private boolean requested = false;
private boolean hasLast = false;
private boolean closing = false;
private HttpBody.ChunkHandler handler;

public Netty4HttpRequestBodyStream(Channel channel) {
this.channel = channel;
channel.closeFuture().addListener((f) -> releaseQueuedChunks());
channel.closeFuture().addListener((f) -> doClose());
channel.config().setAutoRead(false);
}

Expand Down Expand Up @@ -70,6 +71,10 @@ public void next() {
}

public void handleNettyContent(HttpContent httpContent) {
if (closing) {
httpContent.release();
return;
}
assert handler != null : "handler must be set before processing http content";
if (requested && chunkQueue.isEmpty()) {
sendChunk(httpContent);
Expand Down Expand Up @@ -111,4 +116,18 @@ private void releaseQueuedChunks() {
}
}

@Override
public void close() {
if (channel.eventLoop().inEventLoop()) {
doClose();
} else {
channel.eventLoop().submit(this::doClose);
}
}

private void doClose() {
closing = true;
releaseQueuedChunks();
channel.config().setAutoRead(true);
}
}
6 changes: 5 additions & 1 deletion server/src/main/java/org/elasticsearch/http/HttpBody.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;

/**
* A super-interface for different HTTP content implementations
*/
public sealed interface HttpBody permits HttpBody.Full, HttpBody.Stream {
public sealed interface HttpBody extends Releasable permits HttpBody.Full, HttpBody.Stream {

static Full fromBytesReference(BytesReference bytesRef) {
return new ByteRefHttpBody(bytesRef);
Expand Down Expand Up @@ -55,6 +56,9 @@ default Stream asStream() {
*/
non-sealed interface Full extends HttpBody {
BytesReference bytes();

@Override
default void close() {}
}

/**
Expand Down