Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/114869.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 114869
summary: Standardize error code when bulk body is invalid
area: CRUD
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

package org.elasticsearch.http;

import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
Expand All @@ -19,24 +21,30 @@
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class IncrementalBulkRestIT extends HttpSmokeTestCase {
public class BulkRestIT extends HttpSmokeTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), true)
.put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), seventyFivePercentOfTheTime())
.build();
}

private static boolean seventyFivePercentOfTheTime() {
return (randomBoolean() && randomBoolean()) == false;
}

public void testBulkUriMatchingDoesNotMatchBulkCapabilitiesApi() throws IOException {
Request request = new Request("GET", "/_capabilities?method=GET&path=%2F_bulk&capabilities=failure_store_status&pretty");
Response response = getRestClient().performRequest(request);
Expand All @@ -51,6 +59,26 @@ public void testBulkMissingBody() throws IOException {
assertThat(responseException.getMessage(), containsString("request body is required"));
}

public void testBulkInvalidIndexNameString() throws IOException {
Request request = new Request("POST", "/_bulk");

byte[] bytes1 = "{\"create\":{\"_index\":\"".getBytes(StandardCharsets.UTF_8);
byte[] bytes2 = new byte[] { (byte) 0xfe, (byte) 0xfe, (byte) 0xff, (byte) 0xff };
byte[] bytes3 = "\",\"_id\":\"1\"}}\n{\"field\":1}\n\r\n".getBytes(StandardCharsets.UTF_8);
byte[] bulkBody = new byte[bytes1.length + bytes2.length + bytes3.length];
System.arraycopy(bytes1, 0, bulkBody, 0, bytes1.length);
System.arraycopy(bytes2, 0, bulkBody, bytes1.length, bytes2.length);
System.arraycopy(bytes3, 0, bulkBody, bytes1.length + bytes2.length, bytes3.length);

request.setEntity(new ByteArrayEntity(bulkBody, ContentType.APPLICATION_JSON));

ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertThat(responseException.getResponse().getStatusLine().getStatusCode(), equalTo(BAD_REQUEST.getStatus()));
assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
assertThat(responseException.getMessage(), containsString("json_parse_exception"));
assertThat(responseException.getMessage(), containsString("Invalid UTF-8"));
}

public void testBulkRequestBodyImproperlyTerminated() throws IOException {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
// missing final line of the bulk body. cannot process
Expand All @@ -61,10 +89,10 @@ public void testBulkRequestBodyImproperlyTerminated() throws IOException {
);
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
assertThat(responseException.getMessage(), containsString("The bulk request must be terminated by a newline"));
}

public void testIncrementalBulk() throws IOException {
public void testBulkRequest() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
{
Expand All @@ -81,7 +109,6 @@ public void testIncrementalBulk() throws IOException {

Request firstBulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
Expand Down Expand Up @@ -113,7 +140,6 @@ public void testBulkWithIncrementalDisabled() throws IOException {

Request firstBulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
Expand All @@ -137,7 +163,7 @@ public void testBulkWithIncrementalDisabled() throws IOException {
}
}

public void testIncrementalMalformed() throws IOException {
public void testMalformedActionLineBulk() throws IOException {
Request createRequest = new Request("PUT", "/index_name");
createRequest.setJsonEntity("""
{
Expand All @@ -154,7 +180,6 @@ public void testIncrementalMalformed() throws IOException {

Request bulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
bulk.append("{\"field\":1}\n");
Expand All @@ -170,7 +195,6 @@ public void testIncrementalMalformed() throws IOException {
private static void sendLargeBulk() throws IOException {
Request bulkRequest = new Request("POST", "/index_name/_bulk");

// index documents for the rollup job
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
int updates = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,19 +110,23 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.setRefreshPolicy(request.param("refresh"));
bulkRequest.add(
request.requiredContent(),
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
request.getRestApiVersion()
);
try {
bulkRequest.add(
request.requiredContent(),
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
defaultRequireDataStream,
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
request.getRestApiVersion()
);
} catch (Exception e) {
return channel -> new RestToXContentListener<>(channel).onFailure(parseFailureException(e));
}

return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
} else {
Expand All @@ -137,6 +141,15 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
}
}

private static Exception parseFailureException(Exception e) {
if (e instanceof IllegalArgumentException) {
return e;
} else {
// TODO: Maybe improve in follow-up to be XContentParseException and include line number and column
return new ElasticsearchParseException("could not parse bulk request body", e);
}
}

static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {

private final boolean allowExplicitIndex;
Expand Down Expand Up @@ -229,9 +242,7 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo

} catch (Exception e) {
shortCircuit();
new RestToXContentListener<>(channel).onFailure(
new ElasticsearchParseException("could not parse bulk request body", e)
);
new RestToXContentListener<>(channel).onFailure(parseFailureException(e));
return;
}
}
Expand Down