Skip to content

Commit

Permalink
[Transform] Improve force stop robustness in case of an error (elasti…
Browse files Browse the repository at this point in the history
…c#51072)

If a transform config got lost (e.g. because the internal index disappeared) tasks could not be
stopped using transform API. This change makes it possible to stop transforms without a config,
meaning to remove the background task. In order to do so force must be set to true.
  • Loading branch information
Hendrik Muhs authored and SivagurunathanV committed Jan 21, 2020
1 parent d0fefc5 commit 5689cea
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class TransformMessages {
"Interrupted while waiting for transform [{0}] to stop";
public static final String REST_PUT_TRANSFORM_EXISTS = "Transform with id [{0}] already exists";
public static final String REST_UNKNOWN_TRANSFORM = "Transform with id [{0}] could not be found";
public static final String REST_STOP_TRANSFORM_WITHOUT_CONFIG =
"Detected transforms with no config [{0}]. Use force to stop/delete them.";
public static final String REST_PUT_TRANSFORM_FAILED_TO_VALIDATE_CONFIGURATION =
"Failed to validate configuration";
public static final String REST_PUT_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist transform configuration";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,12 +918,4 @@ public void testContinuousStopWaitForCheckpoint() throws Exception {
assertOnePivotValue(dataFrameIndex + "/_search?q=reviewer:user_26", 3.918918918);
deleteIndex(indexName);
}

private void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertEquals(expected, actual, 0.000001);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE
}

protected void createReviewsIndex(String indexName, int numDocs) throws IOException {
int[] distributionTable = {5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1};
int[] distributionTable = { 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 4, 3, 3, 2, 1, 1, 1 };

// create mapping
try (XContentBuilder builder = jsonBuilder()) {
Expand Down Expand Up @@ -158,6 +158,7 @@ protected void createReviewsIndex(String indexName, int numDocs) throws IOExcept
bulkRequest.setJsonEntity(bulk.toString());
client().performRequest(bulkRequest);
}

/**
* Create a simple dataset for testing with reviewers, ratings and businesses
*/
Expand All @@ -182,9 +183,8 @@ protected void createContinuousPivotReviewsTransform(String transformId, String

final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);

String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
+ " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
//Set frequency high for testing
String config = "{ \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + " \"source\": {\"index\":\"" + REVIEWS_INDEX_NAME + "\"},"
// Set frequency high for testing
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"15m\"}},"
+ " \"frequency\": \"1s\","
+ " \"pivot\": {"
Expand All @@ -206,7 +206,6 @@ protected void createContinuousPivotReviewsTransform(String transformId, String
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}


protected void createPivotReviewsTransform(String transformId, String dataFrameIndex, String query, String pipeline, String authHeader)
throws IOException {
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", getTransformEndpoint() + transformId, authHeader);
Expand All @@ -226,30 +225,30 @@ protected void createPivotReviewsTransform(String transformId, String dataFrameI
}

config += " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } },"
+ "\"frequency\":\"1s\""
+ "}";
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } },"
+ "\"frequency\":\"1s\""
+ "}";

createDataframeTransformRequest.setJsonEntity(config);

Map<String, Object> createDataframeTransformResponse = entityAsMap(client().performRequest(createDataframeTransformRequest));
assertThat(createDataframeTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}

protected void startDataframeTransform(String transformId) throws IOException {
startDataframeTransform(transformId, null);
protected void startTransform(String transformId) throws IOException {
startTransform(transformId, null);
}

protected void startDataframeTransform(String transformId, String authHeader, String... warnings) throws IOException {
protected void startTransform(String transformId, String authHeader, String... warnings) throws IOException {
// start the transform
final Request startTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_start", authHeader);
if (warnings.length > 0) {
Expand Down Expand Up @@ -280,10 +279,10 @@ protected void startAndWaitForTransform(String transformId, String dataFrameInde
startAndWaitForTransform(transformId, dataFrameIndex, authHeader, new String[0]);
}

protected void startAndWaitForTransform(String transformId, String dataFrameIndex,
String authHeader, String... warnings) throws Exception {
protected void startAndWaitForTransform(String transformId, String dataFrameIndex, String authHeader, String... warnings)
throws Exception {
// start the transform
startDataframeTransform(transformId, authHeader, warnings);
startTransform(transformId, authHeader, warnings);
assertTrue(indexExists(dataFrameIndex));
// wait until the dataframe has been created and all data is available
waitForDataFrameCheckpoint(transformId);
Expand All @@ -292,18 +291,14 @@ protected void startAndWaitForTransform(String transformId, String dataFrameInde
refreshIndex(dataFrameIndex);
}

protected void startAndWaitForContinuousTransform(String transformId,
String dataFrameIndex,
String authHeader) throws Exception {
protected void startAndWaitForContinuousTransform(String transformId, String dataFrameIndex, String authHeader) throws Exception {
startAndWaitForContinuousTransform(transformId, dataFrameIndex, authHeader, 1L);
}

protected void startAndWaitForContinuousTransform(String transformId,
String dataFrameIndex,
String authHeader,
long checkpoint) throws Exception {
protected void startAndWaitForContinuousTransform(String transformId, String dataFrameIndex, String authHeader, long checkpoint)
throws Exception {
// start the transform
startDataframeTransform(transformId, authHeader, new String[0]);
startTransform(transformId, authHeader, new String[0]);
assertTrue(indexExists(dataFrameIndex));
// wait until the dataframe has been created and all data is available
waitForTransformCheckpoint(transformId, checkpoint);
Expand All @@ -323,9 +318,7 @@ protected Request createRequestWithAuth(final String method, final String endpoi
}

void waitForDataFrameStopped(String transformId) throws Exception {
assertBusy(() -> {
assertEquals("stopped", getDataFrameTransformState(transformId));
}, 15, TimeUnit.SECONDS);
assertBusy(() -> { assertEquals("stopped", getTransformState(transformId)); }, 15, TimeUnit.SECONDS);
}

void waitForDataFrameCheckpoint(String transformId) throws Exception {
Expand All @@ -341,20 +334,20 @@ void refreshIndex(String index) throws IOException {
}

@SuppressWarnings("unchecked")
private static List<Map<String, Object>> getDataFrameTransforms() throws IOException {
protected static List<Map<String, Object>> getTransforms() throws IOException {
Response response = adminClient().performRequest(new Request("GET", getTransformEndpoint() + "_all"));
Map<String, Object> transforms = entityAsMap(response);
List<Map<String, Object>> transformConfigs = (List<Map<String, Object>>) XContentMapValues.extractValue("transforms", transforms);

return transformConfigs == null ? Collections.emptyList() : transformConfigs;
}

protected static String getDataFrameTransformState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getDataFrameState(transformId);
protected static String getTransformState(String transformId) throws IOException {
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
return transformStatsAsMap == null ? null : (String) XContentMapValues.extractValue("state", transformStatsAsMap);
}

protected static Map<?, ?> getDataFrameState(String transformId) throws IOException {
protected static Map<?, ?> getTransformStateAndStats(String transformId) throws IOException {
Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats"));
List<?> transforms = ((List<?>) entityAsMap(statsResponse).get("transforms"));
if (transforms.isEmpty()) {
Expand Down Expand Up @@ -383,7 +376,7 @@ public static void removeIndices() throws Exception {
}

public void wipeTransforms() throws IOException {
List<Map<String, Object>> transformConfigs = getDataFrameTransforms();
List<Map<String, Object>> transformConfigs = getTransforms();
for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");
Request request = new Request("POST", getTransformEndpoint() + transformId + "/_stop");
Expand All @@ -395,7 +388,7 @@ public void wipeTransforms() throws IOException {

for (Map<String, Object> transformConfig : transformConfigs) {
String transformId = (String) transformConfig.get("id");
String state = getDataFrameTransformState(transformId);
String state = getTransformState(transformId);
assertEquals("Transform [" + transformId + "] is not in the stopped state", "stopped", state);
}

Expand All @@ -405,7 +398,7 @@ public void wipeTransforms() throws IOException {
}

// transforms should be all gone
transformConfigs = getDataFrameTransforms();
transformConfigs = getTransforms();
assertTrue(transformConfigs.isEmpty());

// the configuration index should be empty
Expand Down Expand Up @@ -437,11 +430,15 @@ static int getDataFrameCheckpoint(String transformId) throws IOException {
protected void setupDataAccessRole(String role, String... indices) throws IOException {
String indicesStr = Arrays.stream(indices).collect(Collectors.joining("\",\"", "\"", "\""));
Request request = new Request("PUT", "/_security/role/" + role);
request.setJsonEntity("{"
+ " \"indices\" : ["
+ " { \"names\": [" + indicesStr + "], \"privileges\": [\"create_index\", \"read\", \"write\", \"view_index_metadata\"] }"
+ " ]"
+ "}");
request.setJsonEntity(
"{"
+ " \"indices\" : ["
+ " { \"names\": ["
+ indicesStr
+ "], \"privileges\": [\"create_index\", \"read\", \"write\", \"view_index_metadata\"] }"
+ " ]"
+ "}"
);
client().performRequest(request);
}

Expand All @@ -450,13 +447,18 @@ protected void setupUser(String user, List<String> roles) throws IOException {

String rolesStr = roles.stream().collect(Collectors.joining("\",\"", "\"", "\""));
Request request = new Request("PUT", "/_security/user/" + user);
request.setJsonEntity("{"
+ " \"password\" : \"" + password + "\","
+ " \"roles\" : [ " + rolesStr + " ]"
+ "}");
request.setJsonEntity("{" + " \"password\" : \"" + password + "\"," + " \"roles\" : [ " + rolesStr + " ]" + "}");
client().performRequest(request);
}

protected void assertOnePivotValue(String query, double expected) throws IOException {
Map<String, Object> searchResult = getAsMap(query);

assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
double actual = (Double) ((List<?>) XContentMapValues.extractValue("hits.hits._source.avg_rating", searchResult)).get(0);
assertEquals(expected, actual, 0.000001);
}

protected static String getTransformEndpoint() {
return useDeprecatedEndpoints ? TransformField.REST_BASE_PATH_TRANSFORMS_DEPRECATED : TransformField.REST_BASE_PATH_TRANSFORMS;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.transform.integration;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

public class TransformRobustnessIT extends TransformRestTestCase {

public void testTaskRemovalAfterInternalIndexGotDeleted() throws Exception {
String indexName = "continuous_reviews";
createReviewsIndex(indexName);
String transformId = "simple_continuous_pivot";
String transformIndex = "pivot_reviews_continuous";
final Request createTransformRequest = new Request("PUT", TransformField.REST_BASE_PATH_TRANSFORMS + transformId);
String config = "{"
+ " \"source\": {\"index\":\""
+ indexName
+ "\"},"
+ " \"dest\": {\"index\":\""
+ transformIndex
+ "\"},"
+ " \"frequency\": \"1s\","
+ " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}},"
+ " \"pivot\": {"
+ " \"group_by\": {"
+ " \"reviewer\": {"
+ " \"terms\": {"
+ " \"field\": \"user_id\""
+ " } } },"
+ " \"aggregations\": {"
+ " \"avg_rating\": {"
+ " \"avg\": {"
+ " \"field\": \"stars\""
+ " } } } }"
+ "}";
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertEquals(1, getTransforms().size());
// there shouldn't be a task yet
assertEquals(0, getNumberOfTransformTasks());
startAndWaitForContinuousTransform(transformId, transformIndex, null);
assertTrue(indexExists(transformIndex));

// a task exists
assertEquals(1, getNumberOfTransformTasks());
// get and check some users
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_0", 3.776978417);
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_5", 3.72);
assertNotNull(getTransformState(transformId));

assertEquals(1, getTransforms().size());

// delete the transform index
beEvilAndDeleteTheTransformIndex();
// transform is gone
assertEquals(0, getTransforms().size());
// but the task is still there
assertEquals(1, getNumberOfTransformTasks());

Request stopTransformRequest = new Request("POST", TransformField.REST_BASE_PATH_TRANSFORMS + transformId + "/_stop");
ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(stopTransformRequest));

assertEquals(409, e.getResponse().getStatusLine().getStatusCode());
assertThat(
e.getMessage(),
containsString("Detected transforms with no config [" + transformId + "]. Use force to stop/delete them.")
);
stopTransformRequest.addParameter(TransformField.FORCE.getPreferredName(), Boolean.toString(true));
Map<String, Object> stopTransformResponse = entityAsMap(client().performRequest(stopTransformRequest));
assertThat(stopTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));

// the task is gone
assertEquals(1, getNumberOfTransformTasks());
}

@SuppressWarnings("unchecked")
private int getNumberOfTransformTasks() throws IOException {
final Request tasksRequest = new Request("GET", "/_tasks");
tasksRequest.addParameter("actions", TransformField.TASK_NAME + "*");
Map<String, Object> tasksResponse = entityAsMap(client().performRequest(tasksRequest));

Map<String, Object> nodes = (Map<String, Object>) tasksResponse.get("nodes");
if (nodes == null) {
return 0;
}

int foundTasks = 0;
for (Entry<String, Object> node : nodes.entrySet()) {
Map<String, Object> nodeInfo = (Map<String, Object>) node.getValue();
Map<String, Object> tasks = (Map<String, Object>) nodeInfo.get("tasks");
foundTasks += tasks != null ? tasks.size() : 0;
}

return foundTasks;
}

private void beEvilAndDeleteTheTransformIndex() throws IOException {
adminClient().performRequest(new Request("DELETE", TransformInternalIndexConstants.LATEST_INDEX_NAME));
}
}
Loading

0 comments on commit 5689cea

Please sign in to comment.