Skip to content

Commit

Permalink
Add executed pipelines to bulk api response (#100031)
Browse files Browse the repository at this point in the history
This change allows users to pass a new list_executed_pipelines parameter
to the bulk API, which results in an executed_pipelines list being returned.
  • Loading branch information
masseyke committed Oct 17, 2023
1 parent 3418c6a commit 92ec9d6
Show file tree
Hide file tree
Showing 24 changed files with 476 additions and 29 deletions.
Expand Up @@ -54,6 +54,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
String defaultRouting = request.param("routing");
String defaultPipeline = request.param("pipeline");
Boolean defaultRequireAlias = request.paramAsBoolean("require_alias", null);
Boolean defaultListExecutedPipelines = request.paramAsBoolean("list_executed_pipelines", null);

String waitForActiveShards = request.param("wait_for_active_shards");
if (waitForActiveShards != null) {
Expand All @@ -68,6 +69,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
null,
defaultPipeline,
defaultRequireAlias,
defaultListExecutedPipelines,
true,
request.getXContentType(),
request.getRestApiVersion()
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/100031.yaml
@@ -0,0 +1,5 @@
pr: 100031
summary: Add executed pipelines to bulk api response
area: Indices APIs
type: enhancement
issues: []
9 changes: 9 additions & 0 deletions docs/reference/docs/bulk.asciidoc
Expand Up @@ -244,6 +244,11 @@ on.
[[docs-bulk-api-query-params]]
==== {api-query-parms-title}

`list_executed_pipelines`::
(Optional, Boolean) If `true`, the response will include the ingest pipelines that
were executed for each `index` or ``create`.
Defaults to `false`.

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=pipeline]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=refresh]
Expand Down Expand Up @@ -292,6 +297,8 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=bulk-index-ds]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=bulk-id]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=bulk-list-executed-pipelines]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=bulk-require-alias]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=bulk-dynamic-templates]
Expand Down Expand Up @@ -321,6 +328,8 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=bulk-index]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=bulk-id]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=bulk-list-executed-pipelines]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=bulk-require-alias]

include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=bulk-dynamic-templates]
Expand Down
6 changes: 6 additions & 0 deletions docs/reference/rest-api/common-parms.asciidoc
Expand Up @@ -689,6 +689,12 @@ tag::require-alias[]
Defaults to `false`.
end::require-alias[]

tag::bulk-list-executed-pipelines[]
`list_executed_pipelines`::
(Optional, Boolean) If `true`, the response will include the ingest pipelines that
were executed. Defaults to `false`.
end::bulk-list-executed-pipelines[]

tag::bulk-dynamic-templates[]
`dynamic_templates`::
(Optional, map)
Expand Down
Expand Up @@ -220,3 +220,29 @@ teardown:
index: test_index
id: 1
- match: { _source: { my_field: "upper" } }

---
"Test bulk request with list_executed_pipelines":

- do:
bulk:
list_executed_pipelines: true
body:
- index:
_index: test_index
_id: test_id1
pipeline: pipeline1
- f1: v1
- index:
_index: test_index
_id: test_id2
- f1: v2
- index:
_index: test_index
_id: test_id2
pipeline: fake_pipeline
- f1: v2

- match: { items.0.index.executed_pipelines: ["pipeline1"] }
- match: { items.1.index.executed_pipelines: [] }
- match: { items.2.index.executed_pipelines: null }
4 changes: 4 additions & 0 deletions rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json
Expand Up @@ -79,6 +79,10 @@
"require_alias": {
"type": "boolean",
"description": "Sets require_alias for all incoming documents. Defaults to unset (false)"
},
"list_executed_pipelines": {
"type": "boolean",
"description": "Sets list_executed_pipelines for all incoming documents. Defaults to unset (false)"
}
},
"body":{
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/org/elasticsearch/TransportVersions.java
Expand Up @@ -141,6 +141,8 @@ static TransportVersion def(int id) {
public static final TransportVersion NODE_STATS_HTTP_ROUTE_STATS_ADDED = def(8_516_00_0);
public static final TransportVersion INCLUDE_SHARDS_STATS_ADDED = def(8_517_00_0);
public static final TransportVersion BUILD_QUALIFIER_SEPARATED = def(8_518_00_0);
public static final TransportVersion PIPELINES_IN_BULK_RESPONSE_ADDED = def(8_519_00_0);

/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Expand Up @@ -39,6 +39,9 @@ public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
// Flag set for disallowing index auto creation for an individual write request.
String REQUIRE_ALIAS = "require_alias";

// Flag indicating that the list of executed pipelines should be returned in the request
String LIST_EXECUTED_PIPELINES = "list_executed_pipelines";

/**
* Set the index for this request
* @return the Request
Expand Down
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportWriteAction;
Expand All @@ -19,6 +20,7 @@
import org.elasticsearch.index.translog.Translog;

import java.util.Arrays;
import java.util.List;

/**
* This is a utility class that holds the per request state needed to perform bulk operations on the primary.
Expand Down Expand Up @@ -231,13 +233,20 @@ public void markOperationAsExecuted(Engine.Result result) {
final DocWriteResponse response;
if (result.getOperationType() == Engine.Operation.TYPE.INDEX) {
Engine.IndexResult indexResult = (Engine.IndexResult) result;
List<String> executedPipelines;
if (docWriteRequest instanceof IndexRequest indexRequest) {
executedPipelines = indexRequest.getExecutedPipelines();
} else {
executedPipelines = null;
}
response = new IndexResponse(
primary.shardId(),
indexResult.getId(),
result.getSeqNo(),
result.getTerm(),
indexResult.getVersion(),
indexResult.isCreated()
indexResult.isCreated(),
executedPipelines
);
} else if (result.getOperationType() == Engine.Operation.TYPE.DELETE) {
Engine.DeleteResult deleteResult = (Engine.DeleteResult) result;
Expand Down
Expand Up @@ -438,7 +438,7 @@ public BulkProcessor add(
lock.lock();
try {
ensureOpen();
bulkRequest.add(data, defaultIndex, null, null, defaultPipeline, null, true, xContentType, RestApiVersion.current());
bulkRequest.add(data, defaultIndex, null, null, defaultPipeline, null, null, true, xContentType, RestApiVersion.current());
bulkRequestToExecute = newBulkRequestIfNeeded();
} finally {
lock.unlock();
Expand Down
Expand Up @@ -232,15 +232,15 @@ public BulkRequest add(byte[] data, int from, int length, @Nullable String defau
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, XContentType xContentType) throws IOException {
return add(data, defaultIndex, null, null, null, null, true, xContentType, RestApiVersion.current());
return add(data, defaultIndex, null, null, null, null, null, true, xContentType, RestApiVersion.current());
}

/**
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, boolean allowExplicitIndex, XContentType xContentType)
throws IOException {
return add(data, defaultIndex, null, null, null, null, allowExplicitIndex, xContentType, RestApiVersion.current());
return add(data, defaultIndex, null, null, null, null, null, allowExplicitIndex, xContentType, RestApiVersion.current());

}

Expand All @@ -251,6 +251,7 @@ public BulkRequest add(
@Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline,
@Nullable Boolean defaultRequireAlias,
@Nullable Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
RestApiVersion restApiVersion
Expand All @@ -265,6 +266,7 @@ public BulkRequest add(
defaultFetchSourceContext,
pipeline,
requireAlias,
defaultListExecutedPipelines,
allowExplicitIndex,
xContentType,
(indexRequest, type) -> internalAdd(indexRequest),
Expand Down
Expand Up @@ -61,6 +61,7 @@ public final class BulkRequestParser {
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);
private static final ParseField LIST_EXECUTED_PIPELINES = new ParseField(DocWriteRequest.LIST_EXECUTED_PIPELINES);
private static final ParseField DYNAMIC_TEMPLATES = new ParseField("dynamic_templates");

// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
Expand Down Expand Up @@ -126,6 +127,7 @@ public void parse(
@Nullable FetchSourceContext defaultFetchSourceContext,
@Nullable String defaultPipeline,
@Nullable Boolean defaultRequireAlias,
@Nullable Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Expand Down Expand Up @@ -207,6 +209,7 @@ public void parse(
int retryOnConflict = 0;
String pipeline = defaultPipeline;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
boolean listExecutedPipelines = defaultListExecutedPipelines != null && defaultListExecutedPipelines;
Map<String, String> dynamicTemplates = Map.of();

// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
Expand Down Expand Up @@ -260,6 +263,8 @@ public void parse(
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (REQUIRE_ALIAS.match(currentFieldName, parser.getDeprecationHandler())) {
requireAlias = parser.booleanValue();
} else if (LIST_EXECUTED_PIPELINES.match(currentFieldName, parser.getDeprecationHandler())) {
listExecutedPipelines = parser.booleanValue();
} else {
throw new IllegalArgumentException(
"Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"
Expand Down Expand Up @@ -343,7 +348,8 @@ public void parse(
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias),
.setRequireAlias(requireAlias)
.setListExecutedPipelines(listExecutedPipelines),
type
);
} else {
Expand All @@ -358,7 +364,8 @@ public void parse(
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias),
.setRequireAlias(requireAlias)
.setListExecutedPipelines(listExecutedPipelines),
type
);
}
Expand All @@ -374,7 +381,8 @@ public void parse(
.setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicTemplates(dynamicTemplates)
.setRequireAlias(requireAlias),
.setRequireAlias(requireAlias)
.setListExecutedPipelines(listExecutedPipelines),
type
);
} else if ("update".equals(action)) {
Expand Down Expand Up @@ -410,7 +418,7 @@ public void parse(
}
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.setPipeline(pipeline);
upsertRequest.setPipeline(pipeline).setListExecutedPipelines(listExecutedPipelines);
}

updateRequestConsumer.accept(updateRequest);
Expand Down
Expand Up @@ -42,6 +42,9 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -103,6 +106,16 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private boolean isPipelineResolved;

private boolean requireAlias;
/**
* This indicates whether the response to this request ought to list the ingest pipelines that were executed on the document
*/
private boolean listExecutedPipelines;
/**
* This holds the names of the ingest pipelines that have been executed on the document for this request. This is not meant to be set by
* the creator of the request -- pipelines are added here at runtime as they are executed.
*/
@Nullable
private List<String> executedPipelines = null;

/**
* Value for {@link #getAutoGeneratedTimestamp()} if the document has an external
Expand Down Expand Up @@ -167,6 +180,15 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
if (in.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)) {
pipelinesHaveRun = in.readBoolean();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.PIPELINES_IN_BULK_RESPONSE_ADDED)) {
this.listExecutedPipelines = in.readBoolean();
if (listExecutedPipelines) {
List<String> possiblyImmutableExecutedPipelines = in.readOptionalCollectionAsList(StreamInput::readString);
this.executedPipelines = possiblyImmutableExecutedPipelines == null
? null
: new ArrayList<>(possiblyImmutableExecutedPipelines);
}
}
}

public IndexRequest() {
Expand Down Expand Up @@ -727,6 +749,12 @@ private void writeBody(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)) {
out.writeBoolean(pipelinesHaveRun);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.PIPELINES_IN_BULK_RESPONSE_ADDED)) {
out.writeBoolean(listExecutedPipelines);
if (listExecutedPipelines) {
out.writeOptionalCollection(executedPipelines, StreamOutput::writeString);
}
}
}

@Override
Expand Down Expand Up @@ -797,6 +825,15 @@ public IndexRequest setRequireAlias(boolean requireAlias) {
return this;
}

public IndexRequest setListExecutedPipelines(boolean listExecutedPipelines) {
this.listExecutedPipelines = listExecutedPipelines;
return this;
}

public boolean getListExecutedPipelines() {
return listExecutedPipelines;
}

/**
* Specifies a map from the full path of field names to the name of dynamic mapping templates
*/
Expand Down Expand Up @@ -830,4 +867,33 @@ public void setPipelinesHaveRun() {
public boolean pipelinesHaveRun() {
return pipelinesHaveRun;
}

/**
* Adds the pipeline to the list of executed pipelines, if listExecutedPipelines is true
* @param pipeline
*/
public void addPipeline(String pipeline) {
if (listExecutedPipelines) {
if (executedPipelines == null) {
executedPipelines = new ArrayList<>();
}
executedPipelines.add(pipeline);
}
}

/**
* This returns the list of pipelines executed on the document for this request. If listExecutedPipelines is false, the response will be
* null, even if pipelines were executed. If listExecutedPipelines is true but no pipelines were executed, the list will be empty.
* @return
*/
@Nullable
public List<String> getExecutedPipelines() {
if (listExecutedPipelines == false) {
return null;
} else if (executedPipelines == null) { // The client has asked to list pipelines, but none have been executed
return List.of();
} else {
return Collections.unmodifiableList(executedPipelines);
}
}
}

0 comments on commit 92ec9d6

Please sign in to comment.