Skip to content

Commit

Permalink
Fix enrich cache corruption bug (#82540)
Browse files Browse the repository at this point in the history
Backport #82441 to 7.17 branch.

The enrich cache currently uses `SearchResponse` as cache value,
which contains the hits used by the enrich processor for enrichment.
What is actually used is by the enrich processor is `SearchHit.getSourceAsMap()`,
which is a map of maps representation of a search hit.

The problem is that this map is mutable and the map of maps is directly
passed into `IngestDocument` and at the same time this is cached by the
enrich cache via `SearchResponse` cache value. Any processor that modifies
the content added by the enrich processor, also changes the map of maps
representation of a search hit in the cache. This corrupts the cache,
because if this the enrich cache serves a cache entry for the same key,
a different snippet added to the document being enriched.

The following changes have been made to fix this bug:
* Use `List<Map<?, ?>>` as cache value for the enrich cache.
* Upon caching lookup / search, make an immutable deep copy of `SearchHit.getSourceAsMap()`.
* Upon serving an entry from the cache, make a normal deep copy,
  so the enrichent can be safely modified by subsequent processors.

Closes #82340
  • Loading branch information
martijnvg committed Jan 13, 2022
1 parent d83c499 commit ad2d118
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 191 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
import org.elasticsearch.action.ingest.SimulatePipelineResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public class EnrichProcessorIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Arrays.asList(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
}

@Override
protected Settings nodeSettings() {
return Settings.builder()
// TODO Change this to run with security enabled
// https://github.com/elastic/elasticsearch/issues/75940
.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
.build();
}

public void testEnrichCacheValuesCannotBeCorrupted() {
// Ensure enrich cache is empty
EnrichStatsAction.Request statsRequest = new EnrichStatsAction.Request();
EnrichStatsAction.Response statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(statsResponse.getCacheStats().size(), equalTo(1));
assertThat(statsResponse.getCacheStats().get(0).getCount(), equalTo(0L));
assertThat(statsResponse.getCacheStats().get(0).getMisses(), equalTo(0L));
assertThat(statsResponse.getCacheStats().get(0).getHits(), equalTo(0L));

String policyName = "device-enrich-policy";
String sourceIndexName = "devices-idx";

EnrichPolicy enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
Collections.singletonList(sourceIndexName),
"host.ip",
Arrays.asList("device.name", "host.ip")
);

// Create source index and add a single document:
createSourceIndices(client(), enrichPolicy);
IndexRequest indexRequest = new IndexRequest(sourceIndexName);
indexRequest.create(true);
indexRequest.source("{\"host\": {\"ip\": \"10.151.80.8\"},\"device\": {\"name\": \"bla\"}}", XContentType.JSON);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client().index(indexRequest).actionGet();

// Store policy and execute it:
PutEnrichPolicyAction.Request putPolicyRequest = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet();
ExecuteEnrichPolicyAction.Request executePolicyRequest = new ExecuteEnrichPolicyAction.Request(policyName);
client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();

SimulatePipelineRequest simulatePipelineRequest = new SimulatePipelineRequest(
new BytesArray(
"{\n"
+ " \"pipeline\": {\n"
+ " \"processors\": [\n"
+ " {\n"
+ " \"enrich\": {\n"
+ " \"policy_name\": \"device-enrich-policy\",\n"
+ " \"field\": \"host.ip\",\n"
+ " \"target_field\": \"_tmp.device\"\n"
+ " }\n"
+ " },\n"
+ " {\n"
+ " \"rename\" : {\n"
+ " \"field\" : \"_tmp.device.device.name\",\n"
+ " \"target_field\" : \"device.name\"\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"docs\": [\n"
+ " {\n"
+ " \"_source\": {\n"
+ " \"host\": {\n"
+ " \"ip\": \"10.151.80.8\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " }"
),
XContentType.JSON
);
SimulatePipelineResponse response = client().admin().cluster().simulatePipeline(simulatePipelineRequest).actionGet();
SimulateDocumentBaseResult result = (SimulateDocumentBaseResult) response.getResults().get(0);
assertThat(result.getFailure(), nullValue());
assertThat(result.getIngestDocument().getFieldValue("device.name", String.class), equalTo("bla"));

// Verify that there was a cache miss and a new entry was added to enrich cache.
statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(statsResponse.getCacheStats().size(), equalTo(1));
assertThat(statsResponse.getCacheStats().get(0).getCount(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).getMisses(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).getHits(), equalTo(0L));

simulatePipelineRequest = new SimulatePipelineRequest(
new BytesArray(
"{\n"
+ " \"pipeline\": {\n"
+ " \"processors\": [\n"
+ " {\n"
+ " \"enrich\": {\n"
+ " \"policy_name\": \"device-enrich-policy\",\n"
+ " \"field\": \"host.ip\",\n"
+ " \"target_field\": \"_tmp\"\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " },\n"
+ " \"docs\": [\n"
+ " {\n"
+ " \"_source\": {\n"
+ " \"host\": {\n"
+ " \"ip\": \"10.151.80.8\"\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " ]\n"
+ " }"
),
XContentType.JSON
);
response = client().admin().cluster().simulatePipeline(simulatePipelineRequest).actionGet();
result = (SimulateDocumentBaseResult) response.getResults().get(0);
assertThat(result.getFailure(), nullValue());
assertThat(result.getIngestDocument().getFieldValue("_tmp.device.name", String.class), equalTo("bla"));

// Verify that enrich lookup was served from cache:
statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(statsResponse.getCacheStats().size(), equalTo(1));
assertThat(statsResponse.getCacheStats().get(0).getCount(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).getMisses(), equalTo(1L));
assertThat(statsResponse.getCacheStats().get(0).getHits(), equalTo(1L));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,12 @@
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

Expand All @@ -27,7 +25,7 @@
public abstract class AbstractEnrichProcessor extends AbstractProcessor {

private final String policyName;
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
private final BiConsumer<SearchRequest, BiConsumer<List<Map<?, ?>>, Exception>> searchRunner;
private final TemplateScript.Factory field;
private final TemplateScript.Factory targetField;
private final boolean ignoreMissing;
Expand All @@ -38,7 +36,7 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
protected AbstractEnrichProcessor(
String tag,
String description,
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner,
BiConsumer<SearchRequest, BiConsumer<List<Map<?, ?>>, Exception>> searchRunner,
String policyName,
TemplateScript.Factory field,
TemplateScript.Factory targetField,
Expand Down Expand Up @@ -84,7 +82,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
req.preference(Preference.LOCAL.type());
req.source(searchBuilder);

searchRunner.accept(req, (searchResponse, e) -> {
searchRunner.accept(req, (searchHits, e) -> {
if (e != null) {
handler.accept(null, e);
return;
Expand All @@ -93,23 +91,19 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
// If the index is empty, return the unchanged document
// If the enrich key does not exist in the index, throw an error
// If no documents match the key, return the unchanged document
SearchHit[] searchHits = searchResponse.getHits().getHits();
if (searchHits.length < 1) {
if (searchHits.size() < 1) {
handler.accept(ingestDocument, null);
return;
}

String renderedTargetField = ingestDocument.renderTemplate(this.targetField);
if (overrideEnabled || ingestDocument.hasField(renderedTargetField) == false) {
if (maxMatches == 1) {
Map<String, Object> firstDocument = searchHits[0].getSourceAsMap();
Map<?, ?> firstDocument = searchHits.get(0);
ingestDocument.setFieldValue(renderedTargetField, firstDocument);
} else {
List<Map<String, Object>> enrichDocuments = new ArrayList<>(searchHits.length);
for (SearchHit searchHit : searchHits) {
Map<String, Object> enrichDocument = searchHit.getSourceAsMap();
enrichDocuments.add(enrichDocument);
}
List<Map<?, ?>> enrichDocuments = new ArrayList<>(searchHits.size());
enrichDocuments.addAll(searchHits);
ingestDocument.setFieldValue(renderedTargetField, enrichDocuments);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -43,11 +50,11 @@
*/
public class EnrichCache {

protected final Cache<CacheKey, CompletableFuture<SearchResponse>> cache;
protected final Cache<CacheKey, CompletableFuture<List<Map<?, ?>>>> cache;
private volatile Metadata metadata;

EnrichCache(long maxSize) {
this.cache = CacheBuilder.<CacheKey, CompletableFuture<SearchResponse>>builder().setMaximumWeight(maxSize).build();
this.cache = CacheBuilder.<CacheKey, CompletableFuture<List<Map<?, ?>>>>builder().setMaximumWeight(maxSize).build();
}

/**
Expand All @@ -56,7 +63,7 @@ public class EnrichCache {
* @param searchRequest the key
* @return the cached value or null
*/
CompletableFuture<SearchResponse> get(SearchRequest searchRequest) {
CompletableFuture<List<Map<?, ?>>> get(SearchRequest searchRequest) {
CacheKey cacheKey = toKey(searchRequest);
return cache.get(cacheKey);
}
Expand Down Expand Up @@ -89,27 +96,30 @@ public EnrichStatsAction.Response.CacheStats getStats(String localNodeId) {
public void resolveOrDispatchSearch(
SearchRequest searchRequest,
BiConsumer<SearchRequest, ActionListener<SearchResponse>> searchDispatcher,
BiConsumer<SearchResponse, Exception> callBack
BiConsumer<List<Map<?, ?>>, Exception> callBack
) {
CacheKey cacheKey = toKey(searchRequest);
try {
CompletableFuture<SearchResponse> cacheEntry = cache.computeIfAbsent(cacheKey, request -> {
CompletableFuture<SearchResponse> completableFuture = new CompletableFuture<>();
searchDispatcher.accept(searchRequest, wrap(completableFuture::complete, completableFuture::completeExceptionally));
CompletableFuture<List<Map<?, ?>>> cacheEntry = cache.computeIfAbsent(cacheKey, request -> {
CompletableFuture<List<Map<?, ?>>> completableFuture = new CompletableFuture<>();
searchDispatcher.accept(
searchRequest,
wrap(response -> completableFuture.complete(toCacheValue(response)), completableFuture::completeExceptionally)
);
return completableFuture;
});
cacheEntry.whenComplete((response, throwable) -> {
if (throwable != null) {
// Don't cache failures
cache.invalidate(cacheKey, cacheEntry);
if (throwable instanceof Exception) {
callBack.accept(response, (Exception) throwable);
callBack.accept(null, (Exception) throwable);
return;
}
// Let ElasticsearchUncaughtExceptionHandler handle this, which should halt Elasticsearch
throw (Error) throwable;
}
callBack.accept(response, null);
callBack.accept(deepCopy(response, false), null);
});
} catch (ExecutionException e) {
callBack.accept(null, e);
Expand All @@ -127,6 +137,44 @@ private String getEnrichIndexKey(SearchRequest searchRequest) {
return ia.getIndices().get(0).getName();
}

private List<Map<?, ?>> toCacheValue(SearchResponse response) {
List<Map<?, ?>> result = new ArrayList<>(response.getHits().getHits().length);
for (SearchHit hit : response.getHits()) {
result.add(deepCopy(hit.getSourceAsMap(), true));
}
return Collections.unmodifiableList(result);
}

@SuppressWarnings("unchecked")
static <T> T deepCopy(T value, boolean unmodifiable) {
return (T) innerDeepCopy(value, unmodifiable);
}

private static Object innerDeepCopy(Object value, boolean unmodifiable) {
if (value instanceof Map<?, ?>) {
Map<?, ?> mapValue = (Map<?, ?>) value;
Map<Object, Object> copy = new HashMap<>(mapValue.size());
for (Map.Entry<?, ?> entry : mapValue.entrySet()) {
copy.put(entry.getKey(), innerDeepCopy(entry.getValue(), unmodifiable));
}
return unmodifiable ? Collections.unmodifiableMap(copy) : copy;
} else if (value instanceof List<?>) {
List<?> listValue = (List<?>) value;
List<Object> copy = new ArrayList<>(listValue.size());
for (Object itemValue : listValue) {
copy.add(innerDeepCopy(itemValue, unmodifiable));
}
return unmodifiable ? Collections.unmodifiableList(copy) : copy;
} else if (value instanceof byte[]) {
byte[] bytes = (byte[]) value;
return Arrays.copyOf(bytes, bytes.length);
} else if (value == null || value instanceof String || value instanceof Number || value instanceof Boolean) {
return value;
} else {
throw new IllegalArgumentException("unexpected value type [" + value.getClass() + "]");
}
}

private static class CacheKey {

final String enrichIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -24,6 +23,7 @@
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.enrich.action.EnrichCoordinatorProxyAction;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -77,7 +77,7 @@ public Processor create(Map<String, Processor.Factory> processorFactories, Strin
if (maxMatches < 1 || maxMatches > 128) {
throw ConfigurationUtils.newConfigurationException(TYPE, tag, "max_matches", "should be between 1 and 128");
}
BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner = createSearchRunner(client, enrichCache);
BiConsumer<SearchRequest, BiConsumer<List<Map<?, ?>>, Exception>> searchRunner = createSearchRunner(client, enrichCache);
switch (policyType) {
case EnrichPolicy.MATCH_TYPE:
case EnrichPolicy.RANGE_TYPE:
Expand Down Expand Up @@ -123,7 +123,7 @@ public void accept(ClusterState state) {
enrichCache.setMetadata(metadata);
}

private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(
private static BiConsumer<SearchRequest, BiConsumer<List<Map<?, ?>>, Exception>> createSearchRunner(
Client client,
EnrichCache enrichCache
) {
Expand Down

0 comments on commit ad2d118

Please sign in to comment.