Skip to content

Commit

Permalink
[Transform] Add delete_dest_index parameter to the `Delete Transfor…
Browse files Browse the repository at this point in the history
…m API` (#94162)
  • Loading branch information
przemekwitek committed Mar 10, 2023
1 parent ffc8713 commit a3f34a3
Show file tree
Hide file tree
Showing 14 changed files with 584 additions and 90 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/94162.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 94162
summary: Add `delete_destination_index` parameter to the `Delete Transform API`
area: Transform
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/reference/transform/apis/delete-transform.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-id]
current state. The default value is `false`, meaning that the {transform} must be
`stopped` before it can be deleted.

`delete_dest_index`::
(Optional, Boolean) When `true`, the destination index is deleted together with
the {transform}. The default value is `false`, meaning that the destination
index will not be deleted.

`timeout`::
(Optional, time)
Period to wait for a response. If no response is received before the timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
"required":false,
"description":"When `true`, the transform is deleted regardless of its current state. The default value is `false`, meaning that the transform must be `stopped` before it can be deleted."
},
"delete_dest_index":{
"type":"boolean",
"required":false,
"description":"When `true`, the destination index is deleted together with the transform. The default value is `false`, meaning that the destination index will not be deleted."
},
"timeout":{
"type":"time",
"required":false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public final class TransformField {
public static final ParseField METADATA = new ParseField("_meta");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField FORCE = new ParseField("force");
public static final ParseField DELETE_DEST_INDEX = new ParseField("delete_dest_index");
public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
public static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second");
public static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.core.transform.action;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
Expand All @@ -31,17 +32,24 @@ private DeleteTransformAction() {
public static class Request extends AcknowledgedRequest<Request> {
private final String id;
private final boolean force;
private final boolean deleteDestIndex;

public Request(String id, boolean force, TimeValue timeout) {
public Request(String id, boolean force, boolean deleteDestIndex, TimeValue timeout) {
super(timeout);
this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
this.force = force;
this.deleteDestIndex = deleteDestIndex;
}

public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
force = in.readBoolean();
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) {
deleteDestIndex = in.readBoolean();
} else {
deleteDestIndex = false;
}
}

public String getId() {
Expand All @@ -52,11 +60,18 @@ public boolean isForce() {
return force;
}

public boolean isDeleteDestIndex() {
return deleteDestIndex;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBoolean(force);
if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) {
out.writeBoolean(deleteDestIndex);
}
}

@Override
Expand All @@ -67,7 +82,7 @@ public ActionRequestValidationException validate() {
@Override
public int hashCode() {
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(timeout(), id, force);
return Objects.hash(timeout(), id, force, deleteDestIndex);
}

@Override
Expand All @@ -81,7 +96,10 @@ public boolean equals(Object obj) {
}
Request other = (Request) obj;
// the base class does not implement equals, therefore we need to check timeout ourselves
return Objects.equals(id, other.id) && force == other.force && timeout().equals(other.timeout());
return Objects.equals(id, other.id)
&& force == other.force
&& deleteDestIndex == other.deleteDestIndex
&& timeout().equals(other.timeout());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
public class DeleteTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean(), TimeValue.parseTimeValue(randomTimeValue(), "timeout"));
return new Request(
randomAlphaOfLengthBetween(1, 20),
randomBoolean(),
randomBoolean(),
TimeValue.parseTimeValue(randomTimeValue(), "timeout")
);
}

@Override
Expand All @@ -27,15 +32,17 @@ protected Writeable.Reader<Request> instanceReader() {
protected Request mutateInstance(Request instance) {
String id = instance.getId();
boolean force = instance.isForce();
boolean deleteDestIndex = instance.isDeleteDestIndex();
TimeValue timeout = instance.timeout();

switch (between(0, 2)) {
switch (between(0, 3)) {
case 0 -> id += randomAlphaOfLengthBetween(1, 5);
case 1 -> force ^= true;
case 2 -> timeout = new TimeValue(timeout.duration() + randomLongBetween(1, 5), timeout.timeUnit());
case 2 -> deleteDestIndex ^= true;
case 3 -> timeout = new TimeValue(timeout.duration() + randomLongBetween(1, 5), timeout.timeUnit());
default -> throw new AssertionError("Illegal randomization branch");
}

return new Request(id, force, timeout);
return new Request(id, force, deleteDestIndex, timeout);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.transform.integration;

import org.apache.http.HttpHost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.settings.Settings;
import org.junit.Before;

import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

public class TransformDeleteIT extends TransformRestTestCase {

private static final String TEST_USER_NAME = "transform_user";
private static final String TEST_ADMIN_USER_NAME_1 = "transform_admin_1";
private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1 = basicAuthHeaderValue(
TEST_ADMIN_USER_NAME_1,
TEST_PASSWORD_SECURE_STRING
);
private static final String DATA_ACCESS_ROLE = "test_data_access";

private static boolean indicesCreated = false;

// preserve indices in order to reuse source indices in several test cases
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}

@Override
protected boolean enableWarningsCheck() {
return false;
}

@Override
protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
RestClientBuilder builder = RestClient.builder(hosts);
configureClient(builder, settings);
builder.setStrictDeprecationMode(false);
return builder.build();
}

@Before
public void createIndexes() throws IOException {
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
setupUser(TEST_USER_NAME, Arrays.asList("transform_user", DATA_ACCESS_ROLE));
setupUser(TEST_ADMIN_USER_NAME_1, Arrays.asList("transform_admin", DATA_ACCESS_ROLE));

// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
if (indicesCreated) {
return;
}

createReviewsIndex();
indicesCreated = true;
}

public void testDeleteDoesNotDeleteDestinationIndexByDefault() throws Exception {
String transformId = "transform-1";
String transformDest = transformId + "_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);

createTransform(transformId, transformDest);
assertFalse(indexExists(transformDest));

startTransform(transformId);
waitForTransformCheckpoint(transformId, 1);
stopTransform(transformId, false);
assertTrue(indexExists(transformDest));

deleteTransform(transformId);
assertTrue(indexExists(transformDest));
}

public void testDeleteWithParamDeletesAutoCreatedDestinationIndex() throws Exception {
String transformId = "transform-2";
String transformDest = transformId + "_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);

createTransform(transformId, transformDest);
assertFalse(indexExists(transformDest));

startTransform(transformId);
waitForTransformCheckpoint(transformId, 1);

stopTransform(transformId, false);
assertTrue(indexExists(transformDest));

deleteTransform(transformId, true);
assertFalse(indexExists(transformDest));
}

public void testDeleteWithParamDeletesManuallyCreatedDestinationIndex() throws Exception {
String transformId = "transform-3";
String transformDest = transformId + "_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);

createIndex(transformDest);
assertTrue(indexExists(transformDest));

createTransform(transformId, transformDest);

startTransform(transformId);
waitForTransformCheckpoint(transformId, 1);

stopTransform(transformId, false);
assertTrue(indexExists(transformDest));

deleteTransform(transformId, true);
assertFalse(indexExists(transformDest));
}

public void testDeleteWithParamDoesNotDeleteAlias() throws Exception {
String transformId = "transform-4";
String transformDest = transformId + "_idx";
String transformDestAlias = transformId + "_alias";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, transformDestAlias);

createIndex(transformDest, null, null, "\"" + transformDestAlias + "\": { \"is_write_index\": true }");
assertTrue(indexExists(transformDest));
assertTrue(indexExists(transformDestAlias));

createTransform(transformId, transformDestAlias);

startTransform(transformId);
waitForTransformCheckpoint(transformId, 1);

stopTransform(transformId, false);
assertTrue(indexExists(transformDest));

ResponseException e = expectThrows(ResponseException.class, () -> deleteTransform(transformId, true));
assertThat(
e.getMessage(),
containsString(
String.format(
Locale.ROOT,
"The provided expression [%s] matches an alias, specify the corresponding concrete indices instead.",
transformDestAlias
)
)
);
}

private void createTransform(String transformId, String destIndex) throws IOException {
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1
);
String config = String.format(Locale.ROOT, """
{
"dest": {
"index": "%s"
},
"source": {
"index": "%s"
},
"pivot": {
"group_by": {
"reviewer": {
"terms": {
"field": "user_id"
}
}
},
"aggregations": {
"avg_rating": {
"avg": {
"field": "stars"
}
}
}
}
}""", destIndex, REVIEWS_INDEX_NAME);
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
}

0 comments on commit a3f34a3

Please sign in to comment.