Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/134963.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 134963
summary: Fix a bug in the GET _transform API that incorrectly claims some Transform configurations are missing
area: Transform
type: bug
issues:
- 134263
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9170000,9112009,9000018,8841070,8840011
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/8.18.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_elasticsearch_8_18_8,8840010
transform_check_for_dangling_tasks,8840011
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/8.19.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
initial_elasticsearch_8_19_5,8841069
transform_check_for_dangling_tasks,8841070
1 change: 1 addition & 0 deletions server/src/main/resources/transport/upper_bounds/9.0.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
transform_check_for_dangling_tasks,9000018
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.1.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
esql_fixed_index_like,9112002
transform_check_for_dangling_tasks,9112009
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.2.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
esql_fixed_index_like,9119000
transform_check_for_dangling_tasks,9170000
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public final class TransformField {
public static final ParseField MAX_AGE = new ParseField("max_age");

public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");
public static final ParseField CHECK_FOR_DANGLING_TASKS = new ParseField("check_dangling_tasks");
/**
* Fields for checkpointing
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
Expand All @@ -16,6 +17,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -39,6 +41,8 @@ public class GetTransformAction extends ActionType<GetTransformAction.Response>
public static final GetTransformAction INSTANCE = new GetTransformAction();
public static final String NAME = "cluster:monitor/transform/get";

static final TransportVersion DANGLING_TASKS = TransportVersion.fromName("transform_check_for_dangling_tasks");

private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(GetTransformAction.class);

private GetTransformAction() {
Expand All @@ -47,24 +51,49 @@ private GetTransformAction() {

public static class Request extends AbstractGetResourcesRequest {

// for legacy purposes, this transport action previously had no timeout
private static final TimeValue LEGACY_TIMEOUT_VALUE = TimeValue.MAX_VALUE;
private static final int MAX_SIZE_RETURN = 1000;
private final boolean checkForDanglingTasks;
private final TimeValue timeout;

public Request(String id) {
super(id, PageParams.defaultParams(), true);
this(id, false, LEGACY_TIMEOUT_VALUE);
}

public Request() {
super(null, PageParams.defaultParams(), true);
public Request(String id, boolean checkForDanglingTasks, TimeValue timeout) {
super(id, PageParams.defaultParams(), true);
this.checkForDanglingTasks = checkForDanglingTasks;
this.timeout = timeout;
}

public Request(StreamInput in) throws IOException {
super(in);
this.checkForDanglingTasks = in.getTransportVersion().onOrAfter(DANGLING_TASKS) ? in.readBoolean() : true;
this.timeout = in.getTransportVersion().onOrAfter(DANGLING_TASKS) ? in.readTimeValue() : LEGACY_TIMEOUT_VALUE;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getTransportVersion().onOrAfter(DANGLING_TASKS)) {
out.writeBoolean(checkForDanglingTasks);
out.writeTimeValue(timeout);
}
}

public String getId() {
return getResourceId();
}

public boolean checkForDanglingTasks() {
return checkForDanglingTasks;
}

public TimeValue timeout() {
return timeout;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException exception = null;
Expand All @@ -86,6 +115,20 @@ public String getCancelableTaskDescription() {
public String getResourceIdField() {
return TransformField.ID.getPreferredName();
}

@Override
public boolean equals(Object obj) {
return this == obj
|| (obj instanceof Request other
&& super.equals(obj)
&& (checkForDanglingTasks == other.checkForDanglingTasks)
&& Objects.equals(timeout, other.timeout));
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), checkForDanglingTasks, timeout);
}
}

public static class Response extends AbstractGetResourcesResponse<TransformConfig> implements ToXContentObject {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,37 @@

package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
import org.elasticsearch.xpack.core.transform.action.GetTransformAction.Request;

public class GetTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
import static org.elasticsearch.xpack.core.transform.action.GetTransformAction.DANGLING_TASKS;

public class GetTransformActionRequestTests extends AbstractBWCWireSerializationTestCase<Request> {

@Override
protected Request createTestInstance() {
if (randomBoolean()) {
return new Request(Metadata.ALL);
}
return new Request(randomAlphaOfLengthBetween(1, 20));
return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean(), randomPositiveTimeValue());
}

@Override
protected Request mutateInstance(Request instance) {
return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929
return randomValueOtherThan(instance, this::createTestInstance);
}

@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}

@Override
protected Request mutateInstanceForVersion(Request instance, TransportVersion version) {
return version.onOrAfter(DANGLING_TASKS) ? instance : new Request(instance.getId(), true, TimeValue.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.core.transform.TransformField.BASIC_STATS;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasEntry;
Expand Down Expand Up @@ -460,45 +461,7 @@ public void testGetStatsWithContinuous() throws Exception {
String transformDest = transformId + "_idx";
String transformSrc = "reviews_cont_pivot_test";
createReviewsIndex(transformSrc);
final Request createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, null);
String config = Strings.format("""
{
"dest": {
"index": "%s"
},
"source": {
"index": "%s"
},
"frequency": "1s",
"sync": {
"time": {
"field": "timestamp",
"delay": "1s"
}
},
"pivot": {
"group_by": {
"reviewer": {
"terms": {
"field": "user_id"
}
}
},
"aggregations": {
"avg_rating": {
"avg": {
"field": "stars"
}
}
}
}
}""", transformDest, transformSrc);

createTransformRequest.setJsonEntity(config);

Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
startAndWaitForContinuousTransform(transformId, transformDest, null);
createAndStartTransform(transformId, transformSrc, transformDest);

Request getRequest = createRequestWithAuthAndTimeout(
"GET",
Expand Down Expand Up @@ -577,6 +540,99 @@ public void testGetStatsWithContinuous() throws Exception {
}, 120, TimeUnit.SECONDS);
}

private void createAndStartTransform(String transformId, String transformSrc, String transformDest) throws Exception {
var createTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, null);
var config = Strings.format("""
{
"dest": {
"index": "%s"
},
"source": {
"index": "%s"
},
"frequency": "1s",
"sync": {
"time": {
"field": "timestamp",
"delay": "1s"
}
},
"pivot": {
"group_by": {
"reviewer": {
"terms": {
"field": "user_id"
}
}
},
"aggregations": {
"avg_rating": {
"avg": {
"field": "stars"
}
}
}
}
}""", transformDest, transformSrc);

createTransformRequest.setJsonEntity(config);

Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
startAndWaitForContinuousTransform(transformId, transformDest, null);
}

/**
* For Github Issue #134263
* https://github.com/elastic/elasticsearch/issues/134263
*/
public void testGetTransformsDoesNotErrorOnPageSize() throws Exception {
var transformId1 = "multiple-transforms-1";
var transformSrc = "reviews_multiple_transforms_test";
createReviewsIndex(transformSrc);
createAndStartTransform(transformId1, transformSrc, transformId1 + "_idx");

var transformId2 = "multiple-transforms-2";
createAndStartTransform(transformId2, transformSrc, transformId2 + "_idx");

// getting transform 1 on page 1 will not have any errors for transform 2
assertThat(getTransformIdFromAll(0, 1), equalTo(transformId1));
// getting transform 2 on page 2 will not have any errors for transform 1
assertThat(getTransformIdFromAll(1, 1), equalTo(transformId2));

// getting transform 1 by id will not have any errors for transform 2
getTransformConfig(transformId1, null, null);
// getting transform 2 by id will not have any errors for transform 1
getTransformConfig(transformId2, null, null);

// getting all transform will not have any errors
assertThat(getAllTransformIds(), containsInAnyOrder(transformId1, transformId2));

// getting a stopped transform 1 will not have any errors for transform 2
stopTransform(transformId1, false);
assertThat(getTransformIdFromAll(0, 1), equalTo(transformId1));
}

@SuppressWarnings("unchecked")
private String getTransformIdFromAll(int from, int size) throws IOException {
var params = Strings.format("?from=%d&size=%d", from, size);
var request = new Request("GET", getTransformEndpoint() + "_all" + params);
var response = adminClient().performRequest(request);
var transforms = entityAsMap(response);
var transformConfigs = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", transforms);
var errors = (List<Map<String, String>>) XContentMapValues.extractValue("errors", transforms);
assertThat(errors, is(nullValue()));
assertThat(transformConfigs, hasSize(1));
return (String) transformConfigs.get(0).get("id");
}

@SuppressWarnings("unchecked")
private List<String> getAllTransformIds() throws IOException {
var transforms = getTransforms(0, 1_000);
var configs = (List<Map<String, Object>>) transforms.get("transforms");
return configs.stream().map(transform -> transform.get("id")).map(Object::toString).toList();
}

@SuppressWarnings("unchecked")
public void testManyTransforms() throws IOException {
String config = transformConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,17 @@ protected Map<String, Object> getTransformConfig(String transformId, String auth
return transformConfig;
}

@SuppressWarnings("unchecked")
protected Map<String, Object> getTransformConfig(String transformId, String authHeader, List<Map<String, String>> expectedErrors)
throws IOException {
Request getRequest = createRequestWithAuth("GET", getTransformEndpoint() + transformId, authHeader);
Map<String, Object> transforms = entityAsMap(client().performRequest(getRequest));
assertEquals(1, XContentMapValues.extractValue("count", transforms));
List<Map<String, String>> errors = (List<Map<String, String>>) XContentMapValues.extractValue("errors", transforms);
assertThat(errors, is(equalTo(expectedErrors)));
return ((List<Map<String, Object>>) transforms.get("transforms")).get(0);
}

protected static String getTransformState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap);
Expand Down
Loading