From f1683dd0cbc4a472397bc47de5b2ffb9cdeecd6d Mon Sep 17 00:00:00 2001 From: Pat Whelan Date: Wed, 24 Sep 2025 15:47:58 -0400 Subject: [PATCH] [Transform] Fix Missing Config Errors (#134963) When there are more Transforms than the current page size, GET _transform will mistakenly display an error message `Found task for transform [...], but no configuration for it. To delete this transform use DELETE with force=true.`. When the page size is smaller than the reported total count of transforms, we will now get all Transform Ids and compare that to the running transforms to generate the dangling tasks error message. Resolve #134263 --- docs/changelog/134963.yaml | 6 + .../transform_check_for_dangling_tasks.csv | 1 + .../resources/transport/upper_bounds/8.18.csv | 2 +- .../resources/transport/upper_bounds/8.19.csv | 2 +- .../resources/transport/upper_bounds/9.0.csv | 1 + .../resources/transport/upper_bounds/9.1.csv | 2 +- .../resources/transport/upper_bounds/9.2.csv | 2 +- .../xpack/core/transform/TransformField.java | 1 + .../transform/action/GetTransformAction.java | 49 ++++- .../GetTransformActionRequestTests.java | 17 +- .../TransformGetAndGetStatsIT.java | 134 ++++++++++---- .../integration/TransformRestTestCase.java | 11 ++ .../action/TransportGetTransformAction.java | 170 ++++++++++++++++-- .../rest/action/RestGetTransformAction.java | 8 +- 14 files changed, 334 insertions(+), 72 deletions(-) create mode 100644 docs/changelog/134963.yaml create mode 100644 server/src/main/resources/transport/definitions/referable/transform_check_for_dangling_tasks.csv create mode 100644 server/src/main/resources/transport/upper_bounds/9.0.csv diff --git a/docs/changelog/134963.yaml b/docs/changelog/134963.yaml new file mode 100644 index 0000000000000..9dc36675fab77 --- /dev/null +++ b/docs/changelog/134963.yaml @@ -0,0 +1,6 @@ +pr: 134963 +summary: Fix a bug in the GET _transform API that incorrectly claims some Transform configurations are missing +area: Transform +type: bug +issues: + - 134263 diff --git a/server/src/main/resources/transport/definitions/referable/transform_check_for_dangling_tasks.csv b/server/src/main/resources/transport/definitions/referable/transform_check_for_dangling_tasks.csv new file mode 100644 index 0000000000000..4e2559c7e9b0a --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/transform_check_for_dangling_tasks.csv @@ -0,0 +1 @@ +9170000,9112009,9000018,8841070,8840011 diff --git a/server/src/main/resources/transport/upper_bounds/8.18.csv b/server/src/main/resources/transport/upper_bounds/8.18.csv index ffc592e1809ee..266bfbbd3bf78 100644 --- a/server/src/main/resources/transport/upper_bounds/8.18.csv +++ b/server/src/main/resources/transport/upper_bounds/8.18.csv @@ -1 +1 @@ -initial_elasticsearch_8_18_8,8840010 +transform_check_for_dangling_tasks,8840011 diff --git a/server/src/main/resources/transport/upper_bounds/8.19.csv b/server/src/main/resources/transport/upper_bounds/8.19.csv index 3cc6f439c5ea5..3600b3f8c633a 100644 --- a/server/src/main/resources/transport/upper_bounds/8.19.csv +++ b/server/src/main/resources/transport/upper_bounds/8.19.csv @@ -1 +1 @@ -initial_elasticsearch_8_19_5,8841069 +transform_check_for_dangling_tasks,8841070 diff --git a/server/src/main/resources/transport/upper_bounds/9.0.csv b/server/src/main/resources/transport/upper_bounds/9.0.csv new file mode 100644 index 0000000000000..c11e6837bb813 --- /dev/null +++ b/server/src/main/resources/transport/upper_bounds/9.0.csv @@ -0,0 +1 @@ +transform_check_for_dangling_tasks,9000018 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index 96438ef697051..80b97d85f7511 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -esql_fixed_index_like,9112002 +transform_check_for_dangling_tasks,9112009 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index 50744eb9b3662..2c15e0254cbe8 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -esql_fixed_index_like,9119000 +transform_check_for_dangling_tasks,9170000 diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java index a00acdb3897f0..2254c6a7938f5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java @@ -59,6 +59,7 @@ public final class TransformField { public static final ParseField MAX_AGE = new ParseField("max_age"); public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); + public static final ParseField CHECK_FOR_DANGLING_TASKS = new ParseField("check_dangling_tasks"); /** * Fields for checkpointing */ diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformAction.java index d1b9578dec0bb..56faa2f80128f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/GetTransformAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; @@ -16,6 +17,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; @@ -39,6 +41,8 @@ public class GetTransformAction extends ActionType public static final GetTransformAction INSTANCE = new GetTransformAction(); public static final String NAME = "cluster:monitor/transform/get"; + static final TransportVersion DANGLING_TASKS = TransportVersion.fromName("transform_check_for_dangling_tasks"); + private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(GetTransformAction.class); private GetTransformAction() { @@ -47,24 +51,49 @@ private GetTransformAction() { public static class Request extends AbstractGetResourcesRequest { + // for legacy purposes, this transport action previously had no timeout + private static final TimeValue LEGACY_TIMEOUT_VALUE = TimeValue.MAX_VALUE; private static final int MAX_SIZE_RETURN = 1000; + private final boolean checkForDanglingTasks; + private final TimeValue timeout; public Request(String id) { - super(id, PageParams.defaultParams(), true); + this(id, false, LEGACY_TIMEOUT_VALUE); } - public Request() { - super(null, PageParams.defaultParams(), true); + public Request(String id, boolean checkForDanglingTasks, TimeValue timeout) { + super(id, PageParams.defaultParams(), true); + this.checkForDanglingTasks = checkForDanglingTasks; + this.timeout = timeout; } public Request(StreamInput in) throws IOException { super(in); + this.checkForDanglingTasks = in.getTransportVersion().onOrAfter(DANGLING_TASKS) ? in.readBoolean() : true; + this.timeout = in.getTransportVersion().onOrAfter(DANGLING_TASKS) ? in.readTimeValue() : LEGACY_TIMEOUT_VALUE; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getTransportVersion().onOrAfter(DANGLING_TASKS)) { + out.writeBoolean(checkForDanglingTasks); + out.writeTimeValue(timeout); + } } public String getId() { return getResourceId(); } + public boolean checkForDanglingTasks() { + return checkForDanglingTasks; + } + + public TimeValue timeout() { + return timeout; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException exception = null; @@ -86,6 +115,20 @@ public String getCancelableTaskDescription() { public String getResourceIdField() { return TransformField.ID.getPreferredName(); } + + @Override + public boolean equals(Object obj) { + return this == obj + || (obj instanceof Request other + && super.equals(obj) + && (checkForDanglingTasks == other.checkForDanglingTasks) + && Objects.equals(timeout, other.timeout)); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), checkForDanglingTasks, timeout); + } } public static class Response extends AbstractGetResourcesResponse implements ToXContentObject { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformActionRequestTests.java index 3d564f0cf2fa7..ce81cf940e4a6 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/GetTransformActionRequestTests.java @@ -7,28 +7,37 @@ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase; import org.elasticsearch.xpack.core.transform.action.GetTransformAction.Request; -public class GetTransformActionRequestTests extends AbstractWireSerializingTestCase { +import static org.elasticsearch.xpack.core.transform.action.GetTransformAction.DANGLING_TASKS; + +public class GetTransformActionRequestTests extends AbstractBWCWireSerializationTestCase { @Override protected Request createTestInstance() { if (randomBoolean()) { return new Request(Metadata.ALL); } - return new Request(randomAlphaOfLengthBetween(1, 20)); + return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean(), randomPositiveTimeValue()); } @Override protected Request mutateInstance(Request instance) { - return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929 + return randomValueOtherThan(instance, this::createTestInstance); } @Override protected Writeable.Reader instanceReader() { return Request::new; } + + @Override + protected Request mutateInstanceForVersion(Request instance, TransportVersion version) { + return version.onOrAfter(DANGLING_TASKS) ? instance : new Request(instance.getId(), true, TimeValue.MAX_VALUE); + } } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java index 570feaf28cc58..4c7ab877cbc65 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformGetAndGetStatsIT.java @@ -27,6 +27,7 @@ import static java.util.Collections.singletonList; import static org.elasticsearch.xpack.core.transform.TransformField.BASIC_STATS; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasEntry; @@ -460,45 +461,7 @@ public void testGetStatsWithContinuous() throws Exception { String transformDest = transformId + "_idx"; String transformSrc = "reviews_cont_pivot_test"; createReviewsIndex(transformSrc); - final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, null); - String config = Strings.format(""" - { - "dest": { - "index": "%s" - }, - "source": { - "index": "%s" - }, - "frequency": "1s", - "sync": { - "time": { - "field": "timestamp", - "delay": "1s" - } - }, - "pivot": { - "group_by": { - "reviewer": { - "terms": { - "field": "user_id" - } - } - }, - "aggregations": { - "avg_rating": { - "avg": { - "field": "stars" - } - } - } - } - }""", transformDest, transformSrc); - - createTransformRequest.setJsonEntity(config); - - Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); - assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - startAndWaitForContinuousTransform(transformId, transformDest, null); + createAndStartTransform(transformId, transformSrc, transformDest); Request getRequest = createRequestWithAuthAndTimeout( "GET", @@ -577,6 +540,99 @@ public void testGetStatsWithContinuous() throws Exception { }, 120, TimeUnit.SECONDS); } + private void createAndStartTransform(String transformId, String transformSrc, String transformDest) throws Exception { + var createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, null); + var config = Strings.format(""" + { + "dest": { + "index": "%s" + }, + "source": { + "index": "%s" + }, + "frequency": "1s", + "sync": { + "time": { + "field": "timestamp", + "delay": "1s" + } + }, + "pivot": { + "group_by": { + "reviewer": { + "terms": { + "field": "user_id" + } + } + }, + "aggregations": { + "avg_rating": { + "avg": { + "field": "stars" + } + } + } + } + }""", transformDest, transformSrc); + + createTransformRequest.setJsonEntity(config); + + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + startAndWaitForContinuousTransform(transformId, transformDest, null); + } + + /** + * For Github Issue #134263 + * https://github.com/elastic/elasticsearch/issues/134263 + */ + public void testGetTransformsDoesNotErrorOnPageSize() throws Exception { + var transformId1 = "multiple-transforms-1"; + var transformSrc = "reviews_multiple_transforms_test"; + createReviewsIndex(transformSrc); + createAndStartTransform(transformId1, transformSrc, transformId1 + "_idx"); + + var transformId2 = "multiple-transforms-2"; + createAndStartTransform(transformId2, transformSrc, transformId2 + "_idx"); + + // getting transform 1 on page 1 will not have any errors for transform 2 + assertThat(getTransformIdFromAll(0, 1), equalTo(transformId1)); + // getting transform 2 on page 2 will not have any errors for transform 1 + assertThat(getTransformIdFromAll(1, 1), equalTo(transformId2)); + + // getting transform 1 by id will not have any errors for transform 2 + getTransformConfig(transformId1, null, null); + // getting transform 2 by id will not have any errors for transform 1 + getTransformConfig(transformId2, null, null); + + // getting all transform will not have any errors + assertThat(getAllTransformIds(), containsInAnyOrder(transformId1, transformId2)); + + // getting a stopped transform 1 will not have any errors for transform 2 + stopTransform(transformId1, false); + assertThat(getTransformIdFromAll(0, 1), equalTo(transformId1)); + } + + @SuppressWarnings("unchecked") + private String getTransformIdFromAll(int from, int size) throws IOException { + var params = Strings.format("?from=%d&size=%d", from, size); + var request = new Request("GET", getTransformEndpoint() + "_all" + params); + var response = adminClient().performRequest(request); + var transforms = entityAsMap(response); + var transformConfigs = (List>) XContentMapValues.extractValue("transforms", transforms); + var errors = (List>) XContentMapValues.extractValue("errors", transforms); + assertThat(errors, is(nullValue())); + assertThat(transformConfigs, hasSize(1)); + return (String) transformConfigs.get(0).get("id"); + } + + @SuppressWarnings("unchecked") + private List getAllTransformIds() throws IOException { + var transforms = getTransforms(0, 1_000); + var configs = (List>) transforms.get("transforms"); + return configs.stream().map(transform -> transform.get("id")).map(Object::toString).toList(); + } + @SuppressWarnings("unchecked") public void testManyTransforms() throws IOException { String config = transformConfig(); diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index 20ec649f74811..14438d59626ce 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -590,6 +590,17 @@ protected Map getTransformConfig(String transformId, String auth return transformConfig; } + @SuppressWarnings("unchecked") + protected Map getTransformConfig(String transformId, String authHeader, List> expectedErrors) + throws IOException { + Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId, authHeader); + Map transforms = entityAsMap(client().performRequest(getRequest)); + assertEquals(1, XContentMapValues.extractValue("count", transforms)); + List> errors = (List>) XContentMapValues.extractValue("errors", transforms); + assertThat(errors, is(equalTo(expectedErrors))); + return ((List>) transforms.get("transforms")).get(0); + } + protected static String getTransformState(String transformId) throws IOException { Map transformStatsAsMap = getTransformStateAndStats(transformId); return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java index cb2985b5b1b3a..871098bebd354 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformAction.java @@ -9,11 +9,15 @@ import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.core.Strings; +import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.injection.guice.Inject; @@ -26,9 +30,10 @@ import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.XContentParser; -import org.elasticsearch.xpack.core.ClientHelper; import org.elasticsearch.xpack.core.action.AbstractTransportGetResourcesAction; +import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher; import org.elasticsearch.xpack.core.action.util.QueryPage; +import org.elasticsearch.xpack.core.common.time.RemainingTime; import org.elasticsearch.xpack.core.transform.TransformField; import org.elasticsearch.xpack.core.transform.TransformMessages; import org.elasticsearch.xpack.core.transform.action.GetTransformAction; @@ -39,13 +44,17 @@ import org.elasticsearch.xpack.transform.transforms.TransformNodes; import org.elasticsearch.xpack.transform.transforms.TransformTask; -import java.util.Collection; -import java.util.List; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Stream; import static java.util.function.Predicate.not; -import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; +import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; import static org.elasticsearch.xpack.core.transform.TransformField.INDEX_DOC_TYPE; public class TransportGetTransformAction extends AbstractTransportGetResourcesAction { @@ -54,6 +63,7 @@ public class TransportGetTransformAction extends AbstractTransportGetResourcesAc "Found task for transform [%s], but no configuration for it. To delete this transform use DELETE with force=true."; private final ClusterService clusterService; + private final Client client; @Inject public TransportGetTransformAction( @@ -65,6 +75,7 @@ public TransportGetTransformAction( ) { super(GetTransformAction.NAME, transportService, actionFilters, Request::new, client, xContentRegistry); this.clusterService = clusterService; + this.client = client; } @Override @@ -73,20 +84,29 @@ protected void doExecute(Task task, Request request, ActionListener li final ClusterState clusterState = clusterService.state(); TransformNodes.warnIfNoTransformNodes(clusterState); + RemainingTime remainingTime = RemainingTime.from(Instant::now, request.timeout()); + // Step 2: Search for all the transform tasks (matching the request) that *do not* have corresponding transform config. - ActionListener> searchTransformConfigsListener = ActionListener.wrap(r -> { - Set transformConfigIds = r.results().stream().map(TransformConfig::getId).collect(toSet()); - Collection> transformTasks = TransformTask.findTransformTasks( - request.getId(), - clusterState - ); - List errors = transformTasks.stream() - .map(PersistentTasksCustomMetadata.PersistentTask::getId) - .filter(not(transformConfigIds::contains)) - .map(transformId -> new Response.Error("dangling_task", Strings.format(DANGLING_TASK_ERROR_MESSAGE_FORMAT, transformId))) - .collect(toList()); - listener.onResponse(new Response(r.results(), r.count(), errors.isEmpty() ? null : errors)); - }, listener::onFailure); + ActionListener> searchTransformConfigsListener = listener.delegateFailureAndWrap((l, r) -> { + if (request.checkForDanglingTasks()) { + getAllTransformIds(request, r, remainingTime, l.delegateFailureAndWrap((ll, transformConfigIds) -> { + var errors = TransformTask.findTransformTasks(request.getId(), clusterState) + .stream() + .map(PersistentTasksCustomMetadata.PersistentTask::getId) + .filter(not(transformConfigIds::contains)) + .map( + transformId -> new Response.Error( + "dangling_task", + Strings.format(DANGLING_TASK_ERROR_MESSAGE_FORMAT, transformId) + ) + ) + .toList(); + ll.onResponse(new Response(r.results(), r.count(), errors.isEmpty() ? null : errors)); + })); + } else { + l.onResponse(new Response(r.results(), r.count(), null)); + } + }); // Step 1: Search for all the transform configs matching the request. searchResources(request, parentTaskId, searchTransformConfigsListener); @@ -116,7 +136,7 @@ protected ResourceNotFoundException notFoundException(String resourceId) { @Override protected String executionOrigin() { - return ClientHelper.TRANSFORM_ORIGIN; + return TRANSFORM_ORIGIN; } @Override @@ -131,7 +151,119 @@ protected QueryBuilder additionalQuery() { @Override protected SearchSourceBuilder customSearchOptions(SearchSourceBuilder searchSourceBuilder) { - return searchSourceBuilder.sort("_index", SortOrder.DESC); + return searchSourceBuilder.sort("_index", SortOrder.DESC).sort(TransformField.ID.getPreferredName(), SortOrder.ASC); + } + + private void getAllTransformIds( + Request request, + QueryPage initialResults, + RemainingTime remainingTime, + ActionListener> listener + ) { + ActionListener> transformIdListener = listener.map(stream -> stream.collect(toSet())); + var requestedPage = initialResults.results().stream().map(TransformConfig::getId); + + if (initialResults.count() == initialResults.results().size()) { + transformIdListener.onResponse(requestedPage); + } else { + // if we do not have all of our transform ids already, we have to go get them + // we'll read everything after our current page, then we'll reverse and read everything before our current page + var from = request.getPageParams().getFrom(); + var size = request.getPageParams().getSize(); + var idTokens = ExpandedIdsMatcher.tokenizeExpression(request.getResourceId()); + + getAllTransformIds(idTokens, false, from, size, remainingTime, transformIdListener.delegateFailureAndWrap((l, nextPages) -> { + var currentPages = Stream.concat(requestedPage, nextPages); + getAllTransformIds(idTokens, true, from, size, remainingTime, l.map(firstPages -> Stream.concat(firstPages, currentPages))); + })); + } + } + + private void getAllTransformIds( + String[] idTokens, + boolean reverse, + int from, + int size, + RemainingTime remainingTime, + ActionListener> listener + ) { + if (reverse && from <= 0) { + listener.onResponse(Stream.empty()); + return; + } + + var thisPage = reverse ? from - size : from + size; + var thisPageFrom = Math.max(0, thisPage); + var thisPageSize = thisPage < 0 ? from : size; + + SearchRequest request = client.prepareSearch( + TransformInternalIndexConstants.INDEX_NAME_PATTERN, + TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED + ) + .addSort(TransformField.ID.getPreferredName(), SortOrder.ASC) + .addSort("_index", SortOrder.DESC) + .setFrom(thisPageFrom) + .setSize(thisPageSize) + .setTimeout(remainingTime.get()) + .setFetchSource(false) + .setTrackTotalHits(true) + .addDocValueField(TransformField.ID.getPreferredName()) + .setQuery(query(idTokens)) + .request(); + + executeAsyncWithOrigin( + client.threadPool().getThreadContext(), + TRANSFORM_ORIGIN, + request, + listener.delegateFailureAndWrap((l, searchResponse) -> { + var transformIds = Arrays.stream(searchResponse.getHits().getHits()) + .map(hit -> (String) hit.field(TransformField.ID.getPreferredName()).getValue()) + .filter(Predicate.not(org.elasticsearch.common.Strings::isNullOrEmpty)) + .toList() + .stream(); + + if (searchResponse.getHits().getHits().length == size) { + getAllTransformIds( + idTokens, + reverse, + thisPageFrom, + thisPageSize, + remainingTime, + l.map(nextTransformIds -> Stream.concat(transformIds, nextTransformIds)) + ); + } else { + l.onResponse(transformIds); + } + }), + client::search + ); + } + + private static QueryBuilder query(String[] idTokens) { + var queryBuilder = QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(TransformField.INDEX_DOC_TYPE.getPreferredName(), TransformConfig.NAME)); + + if (org.elasticsearch.common.Strings.isAllOrWildcard(idTokens) == false) { + var shouldQueries = new BoolQueryBuilder(); + var terms = new ArrayList(); + for (String token : idTokens) { + if (Regex.isSimpleMatchPattern(token)) { + shouldQueries.should(QueryBuilders.wildcardQuery(TransformField.ID.getPreferredName(), token)); + } else { + terms.add(token); + } + } + + if (terms.isEmpty() == false) { + shouldQueries.should(QueryBuilders.termsQuery(TransformField.ID.getPreferredName(), terms)); + } + + if (shouldQueries.should().isEmpty() == false) { + queryBuilder.filter(shouldQueries); + } + } + + return QueryBuilders.constantScoreQuery(queryBuilder); } } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestGetTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestGetTransformAction.java index 1eeafc54098ae..cb11e8e05f1dc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestGetTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestGetTransformAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.transform.rest.action; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; @@ -39,10 +40,11 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) { - GetTransformAction.Request request = new GetTransformAction.Request(); + var id = restRequest.param(TransformField.ID.getPreferredName()); + var checkForDanglingTasks = restRequest.paramAsBoolean(TransformField.CHECK_FOR_DANGLING_TASKS.getPreferredName(), true); + var timeout = restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); - String id = restRequest.param(TransformField.ID.getPreferredName()); - request.setResourceId(id); + var request = new GetTransformAction.Request(id, checkForDanglingTasks, timeout); request.setAllowNoResources(restRequest.paramAsBoolean(ALLOW_NO_MATCH.getPreferredName(), true)); if (restRequest.hasParam(PageParams.FROM.getPreferredName()) || restRequest.hasParam(PageParams.SIZE.getPreferredName())) { request.setPageParams(