Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform] Improve force stop robustness in case of an error #51072

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class TransformMessages {
"Interrupted while waiting for transform [{0}] to stop";
public static final String REST_PUT_TRANSFORM_EXISTS = "Transform with id [{0}] already exists";
public static final String REST_UNKNOWN_TRANSFORM = "Transform with id [{0}] could not be found";
public static final String REST_STOP_TRANSFORM_WITHOUT_CONFIG =
"Detected transforms with no config [{0}]. Use force to stop/delete them.";
public static final String REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION =
"Failed to validate configuration";
public static final String REST_PUT_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist transform configuration";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,12 +918,4 @@ public void testContinuousStopWaitForCheckpoint() throws Exception {
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
deleteIndex(indexName);
}

private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertEquals(expected, actual, 0.000001);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE
}

protected void createReviewsIndex(String indexName, int numDocs) throws IOException {
int[] distributionTable = {5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1};
int[] distributionTable = { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1 };

// create mapping
try (XContentBuilder builder = jsonBuilder()) {
Expand Down Expand Up @@ -158,6 +158,7 @@ protected void createReviewsIndex(String indexName, int numDocs) throws IOExcept
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);
}

/**
* Create a simple dataset for testing with reviewers, ratings and businesses
*/
Expand All @@ -182,9 +183,8 @@ protected void createContinuousPivotReviewsTransform(String transformId, String

final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);

String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
//Set frequency high for testing
String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
// Set frequency high for testing
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}},"
+ " \"frequency\": \"1s\","
+ " \"pivot\": {"
Expand All @@ -206,7 +206,6 @@ protected void createContinuousPivotReviewsTransform(String transformId, String
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}


protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline, String authHeader)
throws IOException {
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
Expand All @@ -226,30 +225,30 @@ protected void createPivotReviewsTransform(String transformId, String dataFrameI
}

config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } },"
+ "\"frequency\":\"1s\""
+ "}";
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } },"
+ "\"frequency\":\"1s\""
+ "}";

createDataframeTransformRequest.setJsonEntity(config);

Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}

protected void startDataframeTransform(String transformId) throws IOException {
startDataframeTransform(transformId, null);
protected void startTransform(String transformId) throws IOException {
startTransform(transformId, null);
}

protected void startDataframeTransform(String transformId, String authHeader, String... warnings) throws IOException {
protected void startTransform(String transformId, String authHeader, String... warnings) throws IOException {
// start the transform
final Request startTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_start", authHeader);
if (warnings.length > 0) {
Expand Down Expand Up @@ -280,10 +279,10 @@ protected void startAndWaitForTransform(String transformId, String dataFrameInde
startAndWaitForTransform(transformId, dataFrameIndex, authHeader, new String[0]);
}

protected void startAndWaitForTransform(String transformId, String dataFrameIndex,
String authHeader, String... warnings) throws Exception {
protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader, String... warnings)
throws Exception {
// start the transform
startDataframeTransform(transformId, authHeader, warnings);
startTransform(transformId, authHeader, warnings);
assertTrue(indexExists(dataFrameIndex));
// wait until the dataframe has been created and all data is available
waitForDataFrameCheckpoint(transformId);
Expand All @@ -292,18 +291,14 @@ protected void startAndWaitForTransform(String transformId, String dataFrameInde
refreshIndex(dataFrameIndex);
}

protected void startAndWaitForContinuousTransform(String transformId,
String dataFrameIndex,
String authHeader) throws Exception {
protected void startAndWaitForContinuousTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception {
startAndWaitForContinuousTransform(transformId, dataFrameIndex, authHeader, 1L);
}

protected void startAndWaitForContinuousTransform(String transformId,
String dataFrameIndex,
String authHeader,
long checkpoint) throws Exception {
protected void startAndWaitForContinuousTransform(String transformId, String dataFrameIndex, String authHeader, long checkpoint)
throws Exception {
// start the transform
startDataframeTransform(transformId, authHeader, new String[0]);
startTransform(transformId, authHeader, new String[0]);
assertTrue(indexExists(dataFrameIndex));
// wait until the dataframe has been created and all data is available
waitForTransformCheckpoint(transformId, checkpoint);
Expand All @@ -323,9 +318,7 @@ protected Request createRequestWithAuth(final String method, final String endpoi
}

void waitForDataFrameStopped(String transformId) throws Exception {
assertBusy(() -> {
assertEquals("stopped", getDataFrameTransformState(transformId));
}, 15, TimeUnit.SECONDS);
assertBusy(() -> { assertEquals("stopped", getTransformState(transformId)); }, 15, TimeUnit.SECONDS);
}

void waitForDataFrameCheckpoint(String transformId) throws Exception {
Expand All @@ -341,20 +334,20 @@ void refreshIndex(String index) throws IOException {
}

@SuppressWarnings("unchecked")
private static List<Map<String, Object>> getDataFrameTransforms() throws IOException {
protected static List<Map<String, Object>> getTransforms() throws IOException {
Response response = adminClient().performRequest(new Request("GET", getTransformEndpoint() + "_all"));
Map<String, Object> transforms = entityAsMap(response);
List<Map<String, Object>> transformConfigs = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", transforms);

return transformConfigs == null ? Collections.emptyList() : transformConfigs;
}

protected static String getDataFrameTransformState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getDataFrameState(transformId);
protected static String getTransformState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap);
}

protected static Map<?, ?> getDataFrameState(String transformId) throws IOException {
protected static Map<?, ?> getTransformStateAndStats(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats"));
List<?> transforms = ((List<?>) entityAsMap(statsResponse).get("transforms"));
if (transforms.isEmpty()) {
Expand Down Expand Up @@ -383,7 +376,7 @@ public static void removeIndices() throws Exception {
}

public void wipeTransforms() throws IOException {
List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
List<Map<String, Object>> transformConfigs = getTransforms();
for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");
Request request = new Request("POST", getTransformEndpoint() + transformId + "/_stop");
Expand All @@ -395,7 +388,7 @@ public void wipeTransforms() throws IOException {

for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");
String state = getDataFrameTransformState(transformId);
String state = getTransformState(transformId);
assertEquals("Transform [" + transformId + "] is not in the stopped state", "stopped", state);
}

Expand All @@ -405,7 +398,7 @@ public void wipeTransforms() throws IOException {
}

// transforms should be all gone
transformConfigs = getDataFrameTransforms();
transformConfigs = getTransforms();
assertTrue(transformConfigs.isEmpty());

// the configuration index should be empty
Expand Down Expand Up @@ -437,11 +430,15 @@ static int getDataFrameCheckpoint(String transformId) throws IOException {
protected void setupDataAccessRole(String role, String... indices) throws IOException {
String indicesStr = Arrays.stream(indices).collect(Collectors.joining("\",\"", "\"", "\""));
Request request = new Request("PUT", "/_security/role/" + role);
request.setJsonEntity("{"
+ " \"indices\" : ["
+ " { \"names\": [" + indicesStr + "], \"privileges\": [\"create_index\", \"read\", \"write\", \"view_index_metadata\"] }"
+ " ]"
+ "}");
request.setJsonEntity(
"{"
+ " \"indices\" : ["
+ " { \"names\": ["
+ indicesStr
+ "], \"privileges\": [\"create_index\", \"read\", \"write\", \"view_index_metadata\"] }"
+ " ]"
+ "}"
);
client().performRequest(request);
}

Expand All @@ -450,13 +447,18 @@ protected void setupUser(String user, List<String> roles) throws IOException {

String rolesStr = roles.stream().collect(Collectors.joining("\",\"", "\"", "\""));
Request request = new Request("PUT", "/_security/user/" + user);
request.setJsonEntity("{"
+ " \"password\" : \"" + password + "\","
+ " \"roles\" : [ " + rolesStr + " ]"
+ "}");
request.setJsonEntity("{" + " \"password\" : \"" + password + "\"," + " \"roles\" : [ " + rolesStr + " ]" + "}");
client().performRequest(request);
}

protected void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertEquals(expected, actual, 0.000001);
}

protected static String getTransformEndpoint() {
return useDeprecatedEndpoints ? TransformField.REST_BASE_PATH_TRANSFORMS_DEPRECATED : TransformField.REST_BASE_PATH_TRANSFORMS;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.integration;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

public class TransformRobustnessIT extends TransformRestTestCase {

public void testTaskRemovalAfterInternalIndexGotDeleted() throws Exception {
String indexName = "continuous_reviews";
createReviewsIndex(indexName);
String transformId = "simple_continuous_pivot";
String transformIndex = "pivot_reviews_continuous";
final Request createTransformRequest = new Request("PUT", TransformField.REST_BASE_PATH_TRANSFORMS + transformId);
String config = "{"
+ " \"source\": {\"index\":\""
+ indexName
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},"
+ " \"frequency\": \"1s\","
+ " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertEquals(1, getTransforms().size());
// there shouldn't be a task yet
assertEquals(0, getNumberOfTransformTasks());
startAndWaitForContinuousTransform(transformId, transformIndex, null);
assertTrue(indexExists(transformIndex));

// a task exists
assertEquals(1, getNumberOfTransformTasks());
// get and check some users
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72);
assertNotNull(getTransformState(transformId));

assertEquals(1, getTransforms().size());

// delete the transform index
beEvilAndDeleteTheTransformIndex();
// transform is gone
assertEquals(0, getTransforms().size());
// but the task is still there
assertEquals(1, getNumberOfTransformTasks());

Request stopTransformRequest = new Request("POST", TransformField.REST_BASE_PATH_TRANSFORMS + transformId + "/_stop");
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(stopTransformRequest));

assertEquals(409, e.getResponse().getStatusLine().getStatusCode());
assertThat(
e.getMessage(),
containsString("Detected transforms with no config [" + transformId + "]. Use force to stop/delete them.")
);
stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(true));
Map<String, Object> stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest));
assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

// the task is gone
assertEquals(1, getNumberOfTransformTasks());
}

@SuppressWarnings("unchecked")
private int getNumberOfTransformTasks() throws IOException {
final Request tasksRequest = new Request("GET", "/_tasks");
tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));

Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
if (nodes == null) {
return 0;
}

int foundTasks = 0;
for (Entry<String, Object> node : nodes.entrySet()) {
Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
foundTasks += tasks != null ? tasks.size() : 0;
}

return foundTasks;
}

private void beEvilAndDeleteTheTransformIndex() throws IOException {
adminClient().performRequest(new Request("DELETE", TransformInternalIndexConstants.LATEST_INDEX_NAME));
}
}
Loading