Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate enrich index before completing policy execution #100106

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/100106.yaml
@@ -0,0 +1,5 @@
pr: 100106
summary: Validate enrich index before completing policy execution
area: Ingest Node
type: bug
issues: []
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
Expand Down Expand Up @@ -43,6 +44,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperService;
Expand Down Expand Up @@ -132,13 +134,9 @@ public void run() {
logger.debug("Policy [{}]: Checking source indices [{}]", policyName, sourceIndices);
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndices);
// This call does not set the origin to ensure that the user executing the policy has permission to access the source index
client.admin().indices().getIndex(getIndexRequest, listener.delegateFailure((l, getIndexResponse) -> {
try {
validateMappings(getIndexResponse);
prepareAndCreateEnrichIndex(toMappings(getIndexResponse));
} catch (Exception e) {
l.onFailure(e);
}
client.admin().indices().getIndex(getIndexRequest, listener.delegateFailureAndWrap((l, getIndexResponse) -> {
validateMappings(getIndexResponse);
prepareAndCreateEnrichIndex(toMappings(getIndexResponse));
}));
} catch (Exception e) {
listener.onFailure(e);
Expand Down Expand Up @@ -624,14 +622,66 @@ private void waitForIndexGreen(final String destinationIndexName) {
ClusterHealthRequest request = new ClusterHealthRequest(destinationIndexName).waitForGreenStatus();
enrichOriginClient().admin()
.cluster()
.health(request, listener.delegateFailure((l, r) -> updateEnrichPolicyAlias(destinationIndexName)));
.health(request, listener.delegateFailureAndWrap((l, r) -> updateEnrichPolicyAlias(destinationIndexName)));
}

/**
* Ensures that the index we are about to promote at the end of a policy execution exists, is intact, and has not been damaged
* during the policy execution. In some cases, it is possible for the index being constructed to be deleted during the policy execution
* and recreated with invalid mappings/data. We validate that the mapping exists and that it contains the expected meta fields on it to
* guard against accidental removal and recreation during policy execution.
*/
private void validateIndexBeforePromotion(String destinationIndexName, ClusterState clusterState) {
IndexMetadata destinationIndex = clusterState.metadata().index(destinationIndexName);
if (destinationIndex == null) {
throw new IndexNotFoundException(
"was not able to promote it as part of executing enrich policy [" + policyName + "]",
destinationIndexName
);
}
MappingMetadata mapping = destinationIndex.mapping();
if (mapping == null) {
throw new ResourceNotFoundException(
"Could not locate mapping for enrich index [{}] while completing [{}] policy run",
destinationIndexName,
policyName
);
}
Map<String, Object> mappingSource = mapping.sourceAsMap();
Object meta = mappingSource.get("_meta");
if (meta instanceof Map<?, ?> metaMap) {
Object policyNameMetaField = metaMap.get(ENRICH_POLICY_NAME_FIELD_NAME);
if (policyNameMetaField == null) {
throw new ElasticsearchException(
"Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field missing",
destinationIndexName,
policyName
);
} else if (policyName.equals(policyNameMetaField) == false) {
throw new ElasticsearchException(
"Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field does not "
+ "match expected value of [{}], was [{}]",
destinationIndexName,
policyName,
policyName,
policyNameMetaField.toString()
);
}
} else {
throw new ElasticsearchException(
"Could not verify enrich index [{}] metadata before completing [{}] policy run: mapping meta field missing",
destinationIndexName,
policyName
);
}
}

private void updateEnrichPolicyAlias(final String destinationIndexName) {
String enrichIndexBase = EnrichPolicy.getBaseName(policyName);
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
ClusterState clusterState = clusterService.state();
validateIndexBeforePromotion(destinationIndexName, clusterState);
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(clusterState, aliasRequest);
String[] aliases = aliasRequest.aliases();
IndicesAliasesRequest aliasToggleRequest = new IndicesAliasesRequest();
Expand Down
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
Expand Down Expand Up @@ -2253,6 +2255,79 @@ public void testEnrichNestedField() throws Exception {
""");
}

public void testRunnerValidatesIndexIntegrity() throws Exception {
final String sourceIndex = "source-index";
IndexResponse indexRequest = client().index(new IndexRequest().index(sourceIndex).id("id").source("""
{
"field1": "value1",
"field2": 2,
"field3": "ignored",
"field4": "ignored",
"field5": "value5"
}""", XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).actionGet();
assertEquals(RestStatus.CREATED, indexRequest.status());

SearchResponse sourceSearchResponse = client().search(
new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
).actionGet();
assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L));
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
assertNotNull(sourceDocMap);
assertThat(sourceDocMap.get("field1"), is(equalTo("value1")));
assertThat(sourceDocMap.get("field2"), is(equalTo(2)));
assertThat(sourceDocMap.get("field3"), is(equalTo("ignored")));
assertThat(sourceDocMap.get("field4"), is(equalTo("ignored")));
assertThat(sourceDocMap.get("field5"), is(equalTo("value5")));

List<String> enrichFields = List.of("field2", "field5");
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields);
String policyName = "test1";

final long createTime = randomNonNegativeLong();
String createdEnrichIndex = ".enrich-test1-" + createTime;
final AtomicReference<Exception> exception = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);

// Wrap the client so that when we receive the reindex action, we delete the index then resume operation. This mimics an invalid
// state for the resulting index.
Client client = new FilterClient(client()) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
if (action.equals(EnrichReindexAction.INSTANCE)) {
super.doExecute(
DeleteIndexAction.INSTANCE,
new DeleteIndexRequest(createdEnrichIndex),
listener.delegateFailureAndWrap((delegate, response) -> {
if (response.isAcknowledged() == false) {
fail("Enrich index should have been deleted but was not");
}
super.doExecute(action, request, delegate);
})
);
} else {
super.doExecute(action, request, listener);
}
}
};
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(client, policyName, policy, listener, createdEnrichIndex);

logger.info("Starting policy run");
enrichPolicyRunner.run();
latch.await();
Exception runnerException = exception.get();
if (runnerException == null) {
fail("Expected the runner to fail when the underlying index was deleted during policy execution!");
}
assertThat(runnerException, is(instanceOf(ElasticsearchException.class)));
assertThat(runnerException.getMessage(), containsString("Could not verify enrich index"));
assertThat(runnerException.getMessage(), containsString("mapping meta field missing"));
}

private EnrichPolicyRunner createPolicyRunner(
String policyName,
EnrichPolicy policy,
Expand Down