Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions server/src/main/java/org/elasticsearch/rest/RestRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,16 +329,20 @@ public HttpBody.Stream contentStream() {
return httpRequest.body().asStream();
}

/**
* Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing.
* See {@link #content()}.
*/
public ReleasableBytesReference requiredContent() {
public void ensureContent() {
if (hasContent() == false) {
throw new ElasticsearchParseException("request body is required");
} else if (xContentType.get() == null) {
throwValidationException("unknown content type");
}
}

/**
* Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing.
* See {@link #content()}.
*/
public ReleasableBytesReference requiredContent() {
ensureContent();
return content();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
client.bulk(bulkRequest, ActionListener.releaseAfter(new RestRefCountedChunkedToXContentListener<>(channel), content));
};
} else {
request.ensureContent();
String waitForActiveShards = request.param("wait_for_active_shards");
TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT);
String refresh = request.param("refresh");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.rest.action.document;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequest;
Expand All @@ -18,19 +19,20 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.http.HttpBody;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpNodeClient;
import org.elasticsearch.test.rest.FakeHttpBodyStream;
import org.elasticsearch.test.rest.FakeRestChannel;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xcontent.XContentType;
Expand Down Expand Up @@ -201,28 +203,32 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
}
}

public void testIncrementalBulkMissingContent() {
assertThrows(
ElasticsearchParseException.class,
() -> new RestBulkAction(
Settings.EMPTY,
ClusterSettings.createBuiltInClusterSettings(),
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
).handleRequest(
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withContentLength(0)
.withBody(new FakeHttpBodyStream())
.build(),
mock(RestChannel.class),
mock(NodeClient.class)
)
);
}

public void testIncrementalParsing() {
ArrayList<DocWriteRequest<?>> docs = new ArrayList<>();
AtomicBoolean isLast = new AtomicBoolean(false);
AtomicBoolean next = new AtomicBoolean(false);

FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
.withMethod(RestRequest.Method.POST)
.withBody(new HttpBody.Stream() {
@Override
public void close() {}

@Override
public ChunkHandler handler() {
return null;
}

@Override
public void addTracingHandler(ChunkHandler chunkHandler) {}

@Override
public void setHandler(ChunkHandler chunkHandler) {}

.withBody(new FakeHttpBodyStream() {
@Override
public void next() {
next.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,19 @@ public HttpRequest removeHeader(String header) {
return new FakeHttpRequest(method, uri, body, filteredHeaders, inboundException);
}

public int contentLength() {
return switch (body) {
case HttpBody.Full f -> f.bytes().length();
case HttpBody.Stream s -> {
var len = header("Content-Length");
yield len == null ? 0 : Integer.parseInt(len);
}
};
}

@Override
public boolean hasContent() {
return body.isEmpty() == false;
return contentLength() > 0;
}

@Override
Expand Down Expand Up @@ -237,6 +247,11 @@ public Builder withBody(HttpBody body) {
return this;
}

public Builder withContentLength(int length) {
headers.put("Content-Length", List.of(String.valueOf(length)));
return this;
}

public Builder withPath(String path) {
this.path = path;
return this;
Expand Down