Skip to content

Commit

Permalink
Validate enrich index before completing policy execution (#100106) (#…
Browse files Browse the repository at this point in the history
…100160)

This PR adds a validation step to the end of an enrich policy run to ensure the integrity of the
enrich index that is about to be promoted.

(cherry picked from commit 225db31)

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
jbaiera and elasticmachine committed Oct 3, 2023
1 parent 304781b commit eaef3a9
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/100106.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100106
summary: Validate enrich index before completing policy execution
area: Ingest Node
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -42,6 +44,7 @@
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
Expand Down Expand Up @@ -569,16 +572,73 @@ private void setIndexReadOnly(final String destinationIndexName) {

private void waitForIndexGreen(final String destinationIndexName) {
ClusterHealthRequest request = new ClusterHealthRequest(destinationIndexName).waitForGreenStatus();
enrichOriginClient().admin()
.cluster()
.health(request, listener.delegateFailure((l, r) -> updateEnrichPolicyAlias(destinationIndexName)));
enrichOriginClient().admin().cluster().health(request, listener.delegateFailure((l, r) -> {
try {
updateEnrichPolicyAlias(destinationIndexName);
} catch (Exception e) {
l.onFailure(e);
}
}));
}

/**
* Ensures that the index we are about to promote at the end of a policy execution exists, is intact, and has not been damaged
* during the policy execution. In some cases, it is possible for the index being constructed to be deleted during the policy execution
* and recreated with invalid mappings/data. We validate that the mapping exists and that it contains the expected meta fields on it to
* guard against accidental removal and recreation during policy execution.
*/
private void validateIndexBeforePromotion(String destinationIndexName, ClusterState clusterState) {
IndexMetadata destinationIndex = clusterState.metadata().index(destinationIndexName);
if (destinationIndex == null) {
throw new IndexNotFoundException(
"was not able to promote it as part of executing enrich policy [" + policyName + "]",
destinationIndexName
);
}
MappingMetadata mapping = destinationIndex.mapping();
if (mapping == null) {
throw new ResourceNotFoundException(
"Could not locate mapping for enrich index [{}] while completing [{}] policy run",
destinationIndexName,
policyName
);
}
Map<String, Object> mappingSource = mapping.sourceAsMap();
Object meta = mappingSource.get("_meta");
if (meta instanceof Map<?, ?>) {
Map<?, ?> metaMap = ((Map<?, ?>) meta);
Object policyNameMetaField = metaMap.get(ENRICH_POLICY_NAME_FIELD_NAME);
if (policyNameMetaField == null) {
throw new ElasticsearchException(
"Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field missing",
destinationIndexName,
policyName
);
} else if (policyName.equals(policyNameMetaField) == false) {
throw new ElasticsearchException(
"Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field does not "
+ "match expected value of [{}], was [{}]",
destinationIndexName,
policyName,
policyName,
policyNameMetaField.toString()
);
}
} else {
throw new ElasticsearchException(
"Could not verify enrich index [{}] metadata before completing [{}] policy run: mapping meta field missing",
destinationIndexName,
policyName
);
}
}

private void updateEnrichPolicyAlias(final String destinationIndexName) {
String enrichIndexBase = EnrichPolicy.getBaseName(policyName);
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
ClusterState clusterState = clusterService.state();
validateIndexBeforePromotion(destinationIndexName, clusterState);
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(clusterState, aliasRequest);
ImmutableOpenMap<String, List<AliasMetadata>> aliases = clusterState.metadata().findAliases(aliasRequest, concreteIndices);
IndicesAliasesRequest aliasToggleRequest = new IndicesAliasesRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
Expand Down Expand Up @@ -2090,6 +2092,88 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
assertThat(exception.get().getMessage(), containsString("cancelled policy execution [test1], status ["));
}

public void testRunnerValidatesIndexIntegrity() throws Exception {
final String sourceIndex = "source-index";
IndexResponse indexRequest = client().index(
new IndexRequest().index(sourceIndex)
.id("id")
.source(
"{"
+ "\"field1\":\"value1\","
+ "\"field2\":2,"
+ "\"field3\":\"ignored\","
+ "\"field4\":\"ignored\","
+ "\"field5\":\"value5\""
+ "}",
XContentType.JSON
)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
).actionGet();
assertEquals(RestStatus.CREATED, indexRequest.status());

SearchResponse sourceSearchResponse = client().search(
new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
).actionGet();
assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L));
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
assertNotNull(sourceDocMap);
assertThat(sourceDocMap.get("field1"), is(equalTo("value1")));
assertThat(sourceDocMap.get("field2"), is(equalTo(2)));
assertThat(sourceDocMap.get("field3"), is(equalTo("ignored")));
assertThat(sourceDocMap.get("field4"), is(equalTo("ignored")));
assertThat(sourceDocMap.get("field5"), is(equalTo("value5")));

List<String> enrichFields = Arrays.asList("field2", "field5");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, singletonList(sourceIndex), "field1", enrichFields);
String policyName = "test1";

final long createTime = randomNonNegativeLong();
String createdEnrichIndex = ".enrich-test1-" + createTime;
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);

// Wrap the client so that when we receive the reindex action, we delete the index then resume operation. This mimics an invalid
// state for the resulting index.
Client client = new FilterClient(client()) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
if (action.equals(EnrichReindexAction.INSTANCE)) {
super.doExecute(
DeleteIndexAction.INSTANCE,
new DeleteIndexRequest(createdEnrichIndex),
listener.delegateFailure((delegate, response) -> {
if (response.isAcknowledged()) {
super.doExecute(action, request, delegate);
} else {
fail("Enrich index should have been deleted but was not");
delegate.onFailure(new ElasticsearchException("Could not delete enrich index - cleaning up"));
}
})
);
} else {
super.doExecute(action, request, listener);
}
}
};
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(client, policyName, policy, listener, createdEnrichIndex);

logger.info("Starting policy run");
enrichPolicyRunner.run();
latch.await();
Exception runnerException = exception.get();
if (runnerException == null) {
fail("Expected the runner to fail when the underlying index was deleted during policy execution!");
}
assertThat(runnerException, is(instanceOf(ElasticsearchException.class)));
assertThat(runnerException.getMessage(), containsString("Could not verify enrich index"));
assertThat(runnerException.getMessage(), containsString("mapping meta field missing"));
}

private EnrichPolicyRunner createPolicyRunner(
String policyName,
EnrichPolicy policy,
Expand Down

0 comments on commit eaef3a9

Please sign in to comment.