Skip to content

Commit

Permalink
allow to start a transform if validation fails and let it start worki…
Browse files Browse the repository at this point in the history
…ng (#89759)

allow to start a transform if validation fails and let it start working once validation passes

relates #89212
  • Loading branch information
Hendrik Muhs committed Sep 13, 2022
1 parent eaf188b commit 1cc5b9e
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ protected AbstractAuditor(
this.putTemplateInProgress = new AtomicBoolean();
}

public void audit(Level level, String resourceId, String message) {
indexDoc(messageFactory.newMessage(resourceId, message, level, new Date(), nodeName));
}

public void info(String resourceId, String message) {
indexDoc(messageFactory.newMessage(resourceId, message, Level.INFO, new Date(), nodeName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,27 @@ public void testError() throws IOException {
assertThat(auditMessage.getNodeName(), equalTo(TEST_NODE_NAME));
}

public void testAudit() throws IOException {
Level level = randomFrom(Level.ERROR, Level.INFO, Level.WARNING);

AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithTemplateInstalled();
auditor.audit(level, "r_id", "Here is my audit");

verify(client).execute(eq(IndexAction.INSTANCE), indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertThat(indexRequest.indices(), arrayContaining(TEST_INDEX));
assertThat(indexRequest.timeout(), equalTo(TimeValue.timeValueSeconds(5)));
AbstractAuditMessageTests.TestAuditMessage auditMessage = parseAuditMessage(indexRequest.source());
assertThat(auditMessage.getResourceId(), equalTo("r_id"));
assertThat(auditMessage.getMessage(), equalTo("Here is my audit"));
assertThat(auditMessage.getLevel(), equalTo(level));
assertThat(
auditMessage.getTimestamp().getTime(),
allOf(greaterThanOrEqualTo(startMillis), lessThanOrEqualTo(System.currentTimeMillis()))
);
assertThat(auditMessage.getNodeName(), equalTo(TEST_NODE_NAME));
}

public void testAuditingBeforeTemplateInstalled() throws Exception {
CountDownLatch writeSomeDocsBeforeTemplateLatch = new CountDownLatch(1);
AbstractAuditor<AbstractAuditMessageTests.TestAuditMessage> auditor = createTestAuditorWithoutTemplate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,3 +601,20 @@ setup:
}
}
}
---
"Test preview of unattended transform":
- do:
transform.preview_transform:
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "dest-airline-data-by-airline-unattended" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"settings": {
"unattended": true
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
setup:
- do:
cluster.put_settings:
body: >
{
"persistent": {
"logger.org.elasticsearch.xpack.transform.action": "DEBUG"
}
}
---
teardown:
- do:
transform.stop_transform:
wait_for_checkpoint: false
transform_id: "transform-unattended"
timeout: "10m"
wait_for_completion: true
- do:
transform.delete_transform:
transform_id: "transform-unattended"
- do:
cluster.put_settings:
body: >
{
"persistent": {
"logger.org.elasticsearch.xpack.transform.action": "INFO"
}
}
---
"Test unattended put and start":
- do:
transform.put_transform:
transform_id: "transform-unattended"
defer_validation: true
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "dest-airline-data-by-airline-start-stop" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"settings": {
"unattended": true
}
}
- do:
transform.start_transform:
transform_id: "transform-unattended"


---
"Test unattended put and start wildcard":
- do:
transform.put_transform:
transform_id: "transform-unattended"
body: >
{
"source": { "index": "airline-data*" },
"dest": { "index": "dest-airline-data-by-airline-start-stop" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"settings": {
"unattended": true
}
}
- do:
transform.start_transform:
transform_id: "transform-unattended"
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.function.Consumer;
import java.util.function.Predicate;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.core.transform.TransformMessages.CANNOT_START_FAILED_TRANSFORM;

public class TransportStartTransformAction extends TransportMasterNodeAction<StartTransformAction.Request, StartTransformAction.Response> {
Expand Down Expand Up @@ -175,44 +176,30 @@ protected void masterOperation(
}
}, listener::onFailure);

// <2> If the destination index exists, start the task, otherwise deduce our mappings for the destination index and create it
ActionListener<ValidateTransformAction.Response> validationListener = ActionListener.wrap(validationResponse -> {
final String destinationIndex = transformConfigHolder.get().getDestination().getIndex();
String[] dest = indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), destinationIndex);

if (dest.length == 0) {
createDestinationIndex(transformConfigHolder.get(), validationResponse.getDestIndexMappings(), ActionListener.wrap(r -> {
String message = Boolean.FALSE.equals(transformConfigHolder.get().getSettings().getDeduceMappings())
? "Created destination index [" + destinationIndex + "]."
: "Created destination index [" + destinationIndex + "] with deduced mappings.";
auditor.info(request.getId(), message);
createOrGetIndexListener.onResponse(r);
}, createOrGetIndexListener::onFailure));
} else {
auditor.info(request.getId(), "Using existing destination index [" + destinationIndex + "].");
ClientHelper.executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ClientHelper.TRANSFORM_ORIGIN,
client.admin().indices().prepareStats(dest).clear().setDocs(true).request(),
ActionListener.<IndicesStatsResponse>wrap(r -> {
long docTotal = r.getTotal().docs.getCount();
if (docTotal > 0L) {
auditor.warning(
request.getId(),
"Non-empty destination index [" + destinationIndex + "]. " + "Contains [" + docTotal + "] total documents."
);
}
createOrGetIndexListener.onResponse(true);
}, e -> {
String msg = "Unable to determine destination index stats, error: " + e.getMessage();
logger.warn(msg, e);
auditor.warning(request.getId(), msg);
createOrGetIndexListener.onResponse(true);
}),
client.admin().indices()::stats
// <3> If the destination index exists, start the task, otherwise deduce our mappings for the destination index and create it
ActionListener<ValidateTransformAction.Response> validationListener = ActionListener.wrap(
validationResponse -> {
createDestinationIndex(
state,
transformConfigHolder.get(),
validationResponse.getDestIndexMappings(),
createOrGetIndexListener
);
},
e -> {
if (Boolean.TRUE.equals(transformConfigHolder.get().getSettings().getUnattended())) {
logger.debug(
() -> format(
"[%s] Skip dest index creation as this is an unattended transform",
transformConfigHolder.get().getId()
)
);
createOrGetIndexListener.onResponse(true);
return;
}
listener.onFailure(e);
}
}, listener::onFailure);
);

// <2> run transform validations
ActionListener<TransformConfig> getTransformListener = ActionListener.wrap(config -> {
Expand Down Expand Up @@ -254,17 +241,57 @@ protected void masterOperation(
}

private void createDestinationIndex(
final TransformConfig config,
final Map<String, String> mappings,
final ActionListener<Boolean> listener
ClusterState state,
TransformConfig config,
Map<String, String> destIndexMappings,
ActionListener<Boolean> listener
) {
if (Boolean.TRUE.equals(config.getSettings().getUnattended())) {
logger.debug(() -> format("[%s] Skip dest index creation as this is an unattended transform", config.getId()));
listener.onResponse(true);
return;
}

TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
mappings,
config.getId(),
Clock.systemUTC()
);
TransformIndex.createDestinationIndex(client, config, generatedDestIndexSettings, listener);
final String destinationIndex = config.getDestination().getIndex();
String[] dest = indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), destinationIndex);

if (dest.length == 0) {
TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
destIndexMappings,
config.getId(),
Clock.systemUTC()
);
TransformIndex.createDestinationIndex(client, config, generatedDestIndexSettings, ActionListener.wrap(r -> {
String message = Boolean.FALSE.equals(config.getSettings().getDeduceMappings())
? "Created destination index [" + destinationIndex + "]."
: "Created destination index [" + destinationIndex + "] with deduced mappings.";
auditor.info(config.getId(), message);
listener.onResponse(r);
}, listener::onFailure));
} else {
auditor.info(config.getId(), "Using existing destination index [" + destinationIndex + "].");
ClientHelper.executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
ClientHelper.TRANSFORM_ORIGIN,
client.admin().indices().prepareStats(dest).clear().setDocs(true).request(),
ActionListener.<IndicesStatsResponse>wrap(r -> {
long docTotal = r.getTotal().docs.getCount();
if (docTotal > 0L) {
auditor.warning(
config.getId(),
"Non-empty destination index [" + destinationIndex + "]. " + "Contains [" + docTotal + "] total documents."
);
}
listener.onResponse(true);
}, e -> {
String msg = "Unable to determine destination index stats, error: " + e.getMessage();
logger.warn(msg, e);
auditor.warning(config.getId(), msg);
listener.onResponse(true);
}),
client.admin().indices()::stats
);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
Expand All @@ -25,6 +26,7 @@
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.core.Nullable;
Expand All @@ -42,6 +44,7 @@
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
Expand Down Expand Up @@ -271,6 +274,17 @@ void doGetFieldMappings(ActionListener<Map<String, String>> fieldMappingsListene
SchemaUtil.getDestinationFieldMappings(client, getConfig().getDestination().getIndex(), fieldMappingsListener);
}

void validate(ActionListener<Void> listener) {
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
ValidateTransformAction.INSTANCE,
new ValidateTransformAction.Request(transformConfig, false, AcknowledgedRequest.DEFAULT_ACK_TIMEOUT),
ActionListener.wrap(response -> listener.onResponse(null), listener::onFailure)
);
}

/**
* Runs the persistence part of state storage
*/
Expand Down Expand Up @@ -455,6 +469,13 @@ private void injectPointInTimeIfNeeded(
listener.onResponse(namedSearchRequest);
}, e -> {
Throwable unwrappedException = ExceptionsHelper.findSearchExceptionRootCause(e);

// in case of a 404 forward the error, this isn't due to pit usage
if (unwrappedException instanceof ResourceNotFoundException) {
listener.onFailure(e);
return;
}

// if point in time is not supported, disable it but do not remember forever (stopping and starting will give it another
// try)
if (unwrappedException instanceof ActionNotFoundTransportException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface Listener {
private final Listener taskListener;
private volatile int numFailureRetries = Transform.DEFAULT_FAILURE_RETRIES;
private final AtomicInteger failureCount;
// Keeps track of the last failure that occured, used for throttling logs and audit
// Keeps track of the last failure that occurred, used for throttling logs and audit
private final AtomicReference<String> lastFailure = new AtomicReference<>();
private final AtomicInteger statePersistenceFailureCount = new AtomicInteger();
private volatile Instant changesLastDetectedAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.transform.transforms;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -21,6 +22,8 @@
import java.util.Optional;

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.xpack.core.common.notifications.Level.INFO;
import static org.elasticsearch.xpack.core.common.notifications.Level.WARNING;

/**
* Handles all failures a transform can run into when searching, indexing as well as internal
Expand Down Expand Up @@ -240,8 +243,9 @@ private void retry(Throwable unwrappedException, String message, boolean unatten
failureCount,
numFailureRetries
);
logger.warn(() -> "[" + transformId + "] " + retryMessage);
auditor.warning(transformId, retryMessage);

logger.log(unattended ? Level.INFO : Level.WARN, () -> "[" + transformId + "] " + retryMessage);
auditor.audit(unattended ? INFO : WARNING, transformId, retryMessage);
}
}

Expand Down

0 comments on commit 1cc5b9e

Please sign in to comment.