diff --git a/docs/changelog/120256.yaml b/docs/changelog/120256.yaml new file mode 100644 index 0000000000000..c4ee5ab1705c5 --- /dev/null +++ b/docs/changelog/120256.yaml @@ -0,0 +1,7 @@ +pr: 120256 +summary: Improve memory aspects of enrich cache +area: Ingest Node +type: enhancement +issues: + - 96050 + - 120021 diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichProcessorMaxMatchesIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichProcessorMaxMatchesIT.java new file mode 100644 index 0000000000000..85a0f910d30fc --- /dev/null +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichProcessorMaxMatchesIT.java @@ -0,0 +1,192 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.SimulateDocumentBaseResult; +import org.elasticsearch.action.ingest.SimulatePipelineRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.ingest.common.IngestCommonPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.nullValue; + +public class EnrichProcessorMaxMatchesIT extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder() + // TODO Change this to run with security enabled + // https://github.com/elastic/elasticsearch/issues/75940 + .put(XPackSettings.SECURITY_ENABLED.getKey(), false) + .build(); + } + + public void testEnrichCacheValuesAndMaxMatches() { + // this test is meant to be much less ignorable than a mere comment in the code, since the behavior here is tricky. + + // there's an interesting edge case where two processors could be using the same policy and search, etc, + // but that they have a different number of max_matches -- if we're not careful about how we implement caching, + // then we could miss that edge case and return the wrong results from the cache. + + // Ensure enrich cache is empty + var statsRequest = new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT); + var statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(statsResponse.getCacheStats().size(), equalTo(1)); + assertThat(statsResponse.getCacheStats().get(0).count(), equalTo(0L)); + assertThat(statsResponse.getCacheStats().get(0).misses(), equalTo(0L)); + assertThat(statsResponse.getCacheStats().get(0).hits(), equalTo(0L)); + + String policyName = "kv"; + String sourceIndexName = "kv"; + + var enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexName), "key", List.of("value")); + + // Create source index and add two documents: + createSourceIndices(client(), enrichPolicy); + { + IndexRequest indexRequest = new IndexRequest(sourceIndexName); + indexRequest.create(true); + indexRequest.source(""" + { + "key": "k1", + "value": "v1" + } + """, XContentType.JSON); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(indexRequest).actionGet(); + } + { + IndexRequest indexRequest = new IndexRequest(sourceIndexName); + indexRequest.create(true); + indexRequest.source(""" + { + "key": "k1", + "value": "v2" + } + """, XContentType.JSON); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(indexRequest).actionGet(); + } + + // Store policy and execute it: + var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet(); + var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet(); + + { + // run a single enrich processor to fill the cache, note that the default max_matches is 1 (so it's not given explicitly here) + var simulatePipelineRequest = new SimulatePipelineRequest(new BytesArray(""" + { + "pipeline": { + "processors" : [ + { + "enrich": { + "policy_name": "kv", + "field": "key", + "target_field": "result" + } + } + ] + }, + "docs": [ + { + "_source": { + "key": "k1" + } + } + ] + } + """), XContentType.JSON); + var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet(); + var result = (SimulateDocumentBaseResult) response.getResults().get(0); + assertThat(result.getFailure(), nullValue()); + // it's not actually important in this specific test whether the result is v1 or v2 + assertThat(result.getIngestDocument().getFieldValue("result.value", String.class), containsString("v")); + } + + { + // run two enrich processors with different max_matches, and see if we still get the right behavior + var simulatePipelineRequest = new SimulatePipelineRequest(new BytesArray(""" + { + "pipeline": { + "processors" : [ + { + "enrich": { + "policy_name": "kv", + "field": "key", + "target_field": "result" + } + }, + { + "enrich": { + "policy_name": "kv", + "field": "key", + "target_field": "results", + "max_matches": 8 + } + } + ] + }, + "docs": [ + { + "_source": { + "key": "k1" + } + } + ] + } + """), XContentType.JSON); + var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet(); + var result = (SimulateDocumentBaseResult) response.getResults().get(0); + assertThat(result.getFailure(), nullValue()); + // it's not actually important in this specific test whether the result is v1 or v2 + assertThat(result.getIngestDocument().getFieldValue("result.value", String.class), containsString("v")); + + // this is the important part of the test -- did the max_matches=1 case pollute the cache for the max_matches=8 case? + @SuppressWarnings("unchecked") + List> results = (List>) result.getIngestDocument().getSource().get("results"); + List values = results.stream().map(m -> m.get("value")).toList(); + // if these assertions fail, it probably means you were fussing about with the EnrichCache.CacheKey and tried removing + // the max_matches accounting from it + assertThat(values, containsInAnyOrder("v1", "v2")); + assertThat(values, hasSize(2)); + } + + statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet(); + assertThat(statsResponse.getCacheStats().size(), equalTo(1)); + // there are two items in the cache, the single result from max_matches 1 (implied), and the multi-result from max_matches 8 + assertThat(statsResponse.getCacheStats().get(0).count(), equalTo(2L)); + } + +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java index ddcad949b6a79..c2bcc67184958 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java @@ -20,22 +20,24 @@ import java.util.List; import java.util.Map; import java.util.function.BiConsumer; +import java.util.function.Supplier; public abstract class AbstractEnrichProcessor extends AbstractProcessor { private final String policyName; - private final BiConsumer>, Exception>> searchRunner; + private final EnrichProcessorFactory.SearchRunner searchRunner; private final TemplateScript.Factory field; private final TemplateScript.Factory targetField; private final boolean ignoreMissing; private final boolean overrideEnabled; protected final String matchField; protected final int maxMatches; + private final String indexAlias; protected AbstractEnrichProcessor( String tag, String description, - BiConsumer>, Exception>> searchRunner, + EnrichProcessorFactory.SearchRunner searchRunner, String policyName, TemplateScript.Factory field, TemplateScript.Factory targetField, @@ -53,6 +55,8 @@ protected AbstractEnrichProcessor( this.overrideEnabled = overrideEnabled; this.matchField = matchField; this.maxMatches = maxMatches; + // note: since the policyName determines the indexAlias, we can calculate this once + this.indexAlias = EnrichPolicy.getBaseName(policyName); } public abstract QueryBuilder getQueryBuilder(Object fieldValue); @@ -68,20 +72,23 @@ public void execute(IngestDocument ingestDocument, BiConsumer { + Supplier searchRequestSupplier = () -> { + QueryBuilder queryBuilder = getQueryBuilder(value); + ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder); + SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); + searchBuilder.from(0); + searchBuilder.size(maxMatches); + searchBuilder.trackScores(false); + searchBuilder.fetchSource(true); + searchBuilder.query(constantScore); + SearchRequest req = new SearchRequest(); + req.indices(indexAlias); + req.preference(Preference.LOCAL.type()); + req.source(searchBuilder); + return req; + }; + + searchRunner.accept(value, maxMatches, searchRequestSupplier, (searchHits, e) -> { if (e != null) { handler.accept(null, e); return; diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java index 400d9f0cc84b7..e248fe4dd58c5 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java @@ -8,16 +8,12 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.cluster.metadata.IndexAbstraction; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.cache.Cache; import org.elasticsearch.common.cache.CacheBuilder; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; @@ -26,9 +22,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.ToLongBiFunction; @@ -36,28 +31,24 @@ * A simple cache for enrich that uses {@link Cache}. There is one instance of this cache and * multiple enrich processors with different policies will use this cache. *

- * The key of the cache is based on the search request and the enrich index that will be used. - * Search requests that enrich generates target the alias for an enrich policy, this class - * resolves the alias to the actual enrich index and uses that for the cache key. This way - * no stale entries will be returned if a policy execution happens and a new enrich index is created. - *

* There is no cleanup mechanism of stale entries in case a new enrich index is created * as part of a policy execution. This shouldn't be needed as cache entries for prior enrich * indices will be eventually evicted, because these entries will not end up being used. The * latest enrich index name will be used as cache key after an enrich policy execution. - * (Also a cleanup mechanism also wouldn't be straightforward to implement, + * (Also a cleanup mechanism wouldn't be straightforward to implement, * since there is no easy check to see that an enrich index used as cache key no longer is the - * current enrich index the enrich alias of an policy refers to. It would require checking + * current enrich index that the enrich alias of a policy refers to. It would require checking * all cached entries on each cluster state update) */ public final class EnrichCache { + private static final CacheValue EMPTY_CACHE_VALUE = new CacheValue(List.of(), CacheKey.CACHE_KEY_SIZE); + private final Cache cache; private final LongSupplier relativeNanoTimeProvider; private final AtomicLong hitsTimeInNanos = new AtomicLong(0); private final AtomicLong missesTimeInNanos = new AtomicLong(0); private final AtomicLong sizeInBytes = new AtomicLong(0); - private volatile Metadata metadata; EnrichCache(long maxSize) { this(maxSize, System::nanoTime); @@ -89,30 +80,36 @@ private Cache createCache(long maxWeight, ToLongBiFunction } /** - * This method notifies the given listener of the value in this cache for the given searchRequest. If there is no value in the cache - * for the searchRequest, then the new cache value is computed using searchResponseFetcher. - * @param searchRequest The key for the cache request + * This method notifies the given listener of the value in this cache for the given search parameters. If there is no value in the cache + * for these search parameters, then the new cache value is computed using searchResponseFetcher. + * + * @param enrichIndex The enrich index from which the results will be retrieved + * @param lookupValue The value that will be used in the search + * @param maxMatches The max number of matches that the search will return * @param searchResponseFetcher The function used to compute the value to be put in the cache, if there is no value in the cache already * @param listener A listener to be notified of the value in the cache */ public void computeIfAbsent( - SearchRequest searchRequest, - BiConsumer> searchResponseFetcher, + String enrichIndex, + Object lookupValue, + int maxMatches, + Consumer> searchResponseFetcher, ActionListener>> listener ) { // intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition. long cacheStart = relativeNanoTimeProvider.getAsLong(); - List> response = get(searchRequest); + var cacheKey = new CacheKey(enrichIndex, lookupValue, maxMatches); + List> response = get(cacheKey); long cacheRequestTime = relativeNanoTimeProvider.getAsLong() - cacheStart; if (response != null) { hitsTimeInNanos.addAndGet(cacheRequestTime); listener.onResponse(response); } else { final long retrieveStart = relativeNanoTimeProvider.getAsLong(); - searchResponseFetcher.accept(searchRequest, ActionListener.wrap(resp -> { - CacheValue value = toCacheValue(resp); - put(searchRequest, value); - List> copy = deepCopy(value.hits, false); + searchResponseFetcher.accept(ActionListener.wrap(resp -> { + CacheValue cacheValue = toCacheValue(resp); + put(cacheKey, cacheValue); + List> copy = deepCopy(cacheValue.hits, false); long databaseQueryAndCachePutTime = relativeNanoTimeProvider.getAsLong() - retrieveStart; missesTimeInNanos.addAndGet(cacheRequestTime + databaseQueryAndCachePutTime); listener.onResponse(copy); @@ -121,10 +118,7 @@ public void computeIfAbsent( } // non-private for unit testing only - List> get(SearchRequest searchRequest) { - String enrichIndex = getEnrichIndexKey(searchRequest); - CacheKey cacheKey = new CacheKey(enrichIndex, searchRequest); - + List> get(CacheKey cacheKey) { CacheValue response = cache.get(cacheKey); if (response != null) { return deepCopy(response.hits, false); @@ -134,18 +128,11 @@ public void computeIfAbsent( } // non-private for unit testing only - void put(SearchRequest searchRequest, CacheValue cacheValue) { - String enrichIndex = getEnrichIndexKey(searchRequest); - CacheKey cacheKey = new CacheKey(enrichIndex, searchRequest); - + void put(CacheKey cacheKey, CacheValue cacheValue) { cache.put(cacheKey, cacheValue); sizeInBytes.addAndGet(cacheValue.sizeInBytes); } - void setMetadata(Metadata metadata) { - this.metadata = metadata; - } - public EnrichStatsAction.Response.CacheStats getStats(String localNodeId) { Cache.CacheStats cacheStats = cache.stats(); return new EnrichStatsAction.Response.CacheStats( @@ -160,18 +147,13 @@ public EnrichStatsAction.Response.CacheStats getStats(String localNodeId) { ); } - private String getEnrichIndexKey(SearchRequest searchRequest) { - String alias = searchRequest.indices()[0]; - IndexAbstraction ia = metadata.getIndicesLookup().get(alias); - if (ia == null) { - throw new IndexNotFoundException("no generated enrich index [" + alias + "]"); - } - return ia.getIndices().get(0).getName(); - } - static CacheValue toCacheValue(SearchResponse response) { + if (response.getHits().getHits().length == 0) { + return EMPTY_CACHE_VALUE; + } List> result = new ArrayList<>(response.getHits().getHits().length); - long size = 0; + // Include the size of the cache key. + long size = CacheKey.CACHE_KEY_SIZE; for (SearchHit hit : response.getHits()) { result.add(deepCopy(hit.getSourceAsMap(), true)); size += hit.getSourceRef() != null ? hit.getSourceRef().ramBytesUsed() : 0; @@ -206,28 +188,26 @@ private static Object innerDeepCopy(Object value, boolean unmodifiable) { } } - private static class CacheKey { - - final String enrichIndex; - final SearchRequest searchRequest; - - private CacheKey(String enrichIndex, SearchRequest searchRequest) { - this.enrichIndex = enrichIndex; - this.searchRequest = searchRequest; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CacheKey cacheKey = (CacheKey) o; - return enrichIndex.equals(cacheKey.enrichIndex) && searchRequest.equals(cacheKey.searchRequest); - } - - @Override - public int hashCode() { - return Objects.hash(enrichIndex, searchRequest); - } + /** + * The cache key consists of the (variable) parameters that are used to construct a search request for the enrich lookup. We define a + * custom record to group these fields to avoid constructing and storing the much larger + * {@link org.elasticsearch.action.search.SearchRequest}. + * + * @param enrichIndex The enrich index (i.e. not the alias, but the concrete index that the alias points to) + * @param lookupValue The value that is used to find matches in the enrich index + * @param maxMatches The max number of matches that the enrich lookup should return. This changes the size of the search response and + * should thus be included in the cache key + */ + // Visibility for testing + record CacheKey(String enrichIndex, Object lookupValue, int maxMatches) { + /** + * In reality, the size in bytes of the cache key is a function of the {@link CacheKey#lookupValue} field plus some constant for + * the object itself, the string reference for the enrich index (but not the string itself because it's taken from the metadata), + * and the integer for the max number of matches. However, by defining a static cache key size, we can make the + * {@link EnrichCache#EMPTY_CACHE_VALUE} static as well, which allows us to avoid having to instantiate new cache values for + * empty results and thus save some heap space. + */ + private static final long CACHE_KEY_SIZE = 256L; } // Visibility for testing diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index 9890a96aae820..0c1ad73c96c26 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.geo.Orientation; import org.elasticsearch.common.geo.ShapeRelation; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.Processor; import org.elasticsearch.script.ScriptService; @@ -29,6 +30,7 @@ import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Supplier; import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN; @@ -50,12 +52,12 @@ final class EnrichProcessorFactory implements Processor.Factory, Consumer processorFactories, String tag, String description, Map config) throws Exception { - String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name"); - String policyAlias = EnrichPolicy.getBaseName(policyName); + final String policyName = ConfigurationUtils.readStringProperty(TYPE, tag, config, "policy_name"); + final String indexAlias = EnrichPolicy.getBaseName(policyName); if (metadata == null) { throw new IllegalStateException("enrich processor factory has not yet been initialized with cluster state"); } - IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(policyAlias); + IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(indexAlias); if (indexAbstraction == null) { throw new IllegalArgumentException("no enrich index exists for policy with name [" + policyName + "]"); } @@ -78,7 +80,7 @@ public Processor create(Map processorFactories, Strin if (maxMatches < 1 || maxMatches > 128) { throw ConfigurationUtils.newConfigurationException(TYPE, tag, "max_matches", "should be between 1 and 128"); } - BiConsumer>, Exception>> searchRunner = createSearchRunner(client, enrichCache); + var searchRunner = createSearchRunner(indexAlias, client, enrichCache); switch (policyType) { case EnrichPolicy.MATCH_TYPE: case EnrichPolicy.RANGE_TYPE: @@ -121,25 +123,40 @@ public Processor create(Map processorFactories, Strin @Override public void accept(ClusterState state) { metadata = state.getMetadata(); - enrichCache.setMetadata(metadata); } - private static BiConsumer>, Exception>> createSearchRunner( - Client client, - EnrichCache enrichCache - ) { + private SearchRunner createSearchRunner(String indexAlias, Client client, EnrichCache enrichCache) { Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN); - return (req, handler) -> { + return (value, maxMatches, reqSupplier, handler) -> { // intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition. enrichCache.computeIfAbsent( - req, - (searchRequest, searchResponseActionListener) -> originClient.execute( + getEnrichIndexKey(indexAlias), + value, + maxMatches, + (searchResponseActionListener) -> originClient.execute( EnrichCoordinatorProxyAction.INSTANCE, - searchRequest, + reqSupplier.get(), searchResponseActionListener ), ActionListener.wrap(resp -> handler.accept(resp, null), e -> handler.accept(null, e)) ); }; } + + private String getEnrichIndexKey(String indexAlias) { + IndexAbstraction ia = metadata.getIndicesLookup().get(indexAlias); + if (ia == null) { + throw new IndexNotFoundException("no generated enrich index [" + indexAlias + "]"); + } + return ia.getIndices().get(0).getName(); + } + + public interface SearchRunner { + void accept( + Object value, + int maxMatches, + Supplier searchRequestSupplier, + BiConsumer>, Exception> handler + ); + } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java index dd164c630495c..998b06e870b7f 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/GeoMatchProcessor.java @@ -6,7 +6,6 @@ */ package org.elasticsearch.xpack.enrich; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.common.geo.GeometryParser; import org.elasticsearch.common.geo.Orientation; import org.elasticsearch.common.geo.ShapeRelation; @@ -15,10 +14,6 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.script.TemplateScript; -import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; - public final class GeoMatchProcessor extends AbstractEnrichProcessor { private final ShapeRelation shapeRelation; @@ -27,7 +22,7 @@ public final class GeoMatchProcessor extends AbstractEnrichProcessor { GeoMatchProcessor( String tag, String description, - BiConsumer>, Exception>> searchRunner, + EnrichProcessorFactory.SearchRunner searchRunner, String policyName, TemplateScript.Factory field, TemplateScript.Factory targetField, diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java index 76156c84c22b2..b8b2f1b17fa85 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/MatchProcessor.java @@ -6,22 +6,19 @@ */ package org.elasticsearch.xpack.enrich; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.script.TemplateScript; import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; public final class MatchProcessor extends AbstractEnrichProcessor { MatchProcessor( String tag, String description, - BiConsumer>, Exception>> searchRunner, + EnrichProcessorFactory.SearchRunner searchRunner, String policyName, TemplateScript.Factory field, TemplateScript.Factory targetField, diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichCacheTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichCacheTests.java index 19af929017a3b..7125dfd45eaff 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichCacheTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichCacheTests.java @@ -7,23 +7,14 @@ package org.elasticsearch.xpack.enrich; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.cluster.metadata.AliasMetadata; -import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.index.IndexVersion; -import org.elasticsearch.index.query.MatchQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.json.JsonXContent; -import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction; import java.io.IOException; @@ -35,7 +26,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -45,47 +35,19 @@ public class EnrichCacheTests extends ESTestCase { public void testCaching() { - // Emulate cluster metadata: - // (two enrich indices with corresponding alias entries) - var metadata = Metadata.builder() - .put( - IndexMetadata.builder(EnrichPolicy.getBaseName("policy1") + "-1") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy1")).build()) - ) - .put( - IndexMetadata.builder(EnrichPolicy.getBaseName("policy2") + "-1") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy2")).build()) - ) - .build(); - // Emulated search requests that an enrich processor could generate: // (two unique searches for two enrich policies) - var searchRequest1 = new SearchRequest(EnrichPolicy.getBaseName("policy1")).source( - new SearchSourceBuilder().query(new MatchQueryBuilder("match_field", "1")) - ); - var searchRequest2 = new SearchRequest(EnrichPolicy.getBaseName("policy1")).source( - new SearchSourceBuilder().query(new MatchQueryBuilder("match_field", "2")) - ); - var searchRequest3 = new SearchRequest(EnrichPolicy.getBaseName("policy2")).source( - new SearchSourceBuilder().query(new MatchQueryBuilder("match_field", "1")) - ); - var searchRequest4 = new SearchRequest(EnrichPolicy.getBaseName("policy2")).source( - new SearchSourceBuilder().query(new MatchQueryBuilder("match_field", "2")) - ); + var cacheKey1 = new EnrichCache.CacheKey("policy1-1", "1", 1); + var cacheKey2 = new EnrichCache.CacheKey("policy1-1", "2", 1); + var cacheKey3 = new EnrichCache.CacheKey("policy2-1", "1", 1); + var cacheKey4 = new EnrichCache.CacheKey("policy2-1", "2", 1); // Emulated search response (content doesn't matter, since it isn't used, it just a cache entry) EnrichCache.CacheValue searchResponse = new EnrichCache.CacheValue(List.of(Map.of("test", "entry")), 1L); EnrichCache enrichCache = new EnrichCache(3); - enrichCache.setMetadata(metadata); - enrichCache.put(searchRequest1, searchResponse); - enrichCache.put(searchRequest2, searchResponse); - enrichCache.put(searchRequest3, searchResponse); + enrichCache.put(cacheKey1, searchResponse); + enrichCache.put(cacheKey2, searchResponse); + enrichCache.put(cacheKey3, searchResponse); var cacheStats = enrichCache.getStats("_id"); assertThat(cacheStats.count(), equalTo(3L)); assertThat(cacheStats.hits(), equalTo(0L)); @@ -93,10 +55,10 @@ public void testCaching() { assertThat(cacheStats.evictions(), equalTo(0L)); assertThat(cacheStats.cacheSizeInBytes(), equalTo(3L)); - assertThat(enrichCache.get(searchRequest1), notNullValue()); - assertThat(enrichCache.get(searchRequest2), notNullValue()); - assertThat(enrichCache.get(searchRequest3), notNullValue()); - assertThat(enrichCache.get(searchRequest4), nullValue()); + assertThat(enrichCache.get(cacheKey1), notNullValue()); + assertThat(enrichCache.get(cacheKey2), notNullValue()); + assertThat(enrichCache.get(cacheKey3), notNullValue()); + assertThat(enrichCache.get(cacheKey4), nullValue()); cacheStats = enrichCache.getStats("_id"); assertThat(cacheStats.count(), equalTo(3L)); assertThat(cacheStats.hits(), equalTo(3L)); @@ -104,7 +66,7 @@ public void testCaching() { assertThat(cacheStats.evictions(), equalTo(0L)); assertThat(cacheStats.cacheSizeInBytes(), equalTo(3L)); - enrichCache.put(searchRequest4, searchResponse); + enrichCache.put(cacheKey4, searchResponse); cacheStats = enrichCache.getStats("_id"); assertThat(cacheStats.count(), equalTo(3L)); assertThat(cacheStats.hits(), equalTo(3L)); @@ -112,41 +74,27 @@ public void testCaching() { assertThat(cacheStats.evictions(), equalTo(1L)); assertThat(cacheStats.cacheSizeInBytes(), equalTo(3L)); - // Simulate enrich policy execution, which should make current cache entries unused. - metadata = Metadata.builder() - .put( - IndexMetadata.builder(EnrichPolicy.getBaseName("policy1") + "-2") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy1")).build()) - ) - .put( - IndexMetadata.builder(EnrichPolicy.getBaseName("policy2") + "-2") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy2")).build()) - ) - .build(); - enrichCache.setMetadata(metadata); + cacheKey1 = new EnrichCache.CacheKey("policy1-2", "1", 1); + cacheKey2 = new EnrichCache.CacheKey("policy1-2", "2", 1); + cacheKey3 = new EnrichCache.CacheKey("policy2-2", "1", 1); + cacheKey4 = new EnrichCache.CacheKey("policy2-2", "2", 1); // Because enrich index has changed, cache can't serve cached entries - assertThat(enrichCache.get(searchRequest1), nullValue()); - assertThat(enrichCache.get(searchRequest2), nullValue()); - assertThat(enrichCache.get(searchRequest3), nullValue()); - assertThat(enrichCache.get(searchRequest4), nullValue()); + assertThat(enrichCache.get(cacheKey1), nullValue()); + assertThat(enrichCache.get(cacheKey2), nullValue()); + assertThat(enrichCache.get(cacheKey3), nullValue()); + assertThat(enrichCache.get(cacheKey4), nullValue()); // Add new entries using new enrich index name as key - enrichCache.put(searchRequest1, searchResponse); - enrichCache.put(searchRequest2, searchResponse); - enrichCache.put(searchRequest3, searchResponse); + enrichCache.put(cacheKey1, searchResponse); + enrichCache.put(cacheKey2, searchResponse); + enrichCache.put(cacheKey3, searchResponse); // Entries can now be served: - assertThat(enrichCache.get(searchRequest1), notNullValue()); - assertThat(enrichCache.get(searchRequest2), notNullValue()); - assertThat(enrichCache.get(searchRequest3), notNullValue()); - assertThat(enrichCache.get(searchRequest4), nullValue()); + assertThat(enrichCache.get(cacheKey1), notNullValue()); + assertThat(enrichCache.get(cacheKey2), notNullValue()); + assertThat(enrichCache.get(cacheKey3), notNullValue()); + assertThat(enrichCache.get(cacheKey4), nullValue()); cacheStats = enrichCache.getStats("_id"); assertThat(cacheStats.count(), equalTo(3L)); assertThat(cacheStats.hits(), equalTo(6L)); @@ -156,30 +104,8 @@ public void testCaching() { } public void testComputeIfAbsent() throws InterruptedException { - // Emulate cluster metadata: - // (two enrich indices with corresponding alias entries) - var metadata = Metadata.builder() - .put( - IndexMetadata.builder(EnrichPolicy.getBaseName("policy1") + "-1") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy1")).build()) - ) - .put( - IndexMetadata.builder(EnrichPolicy.getBaseName("policy2") + "-1") - .settings(settings(IndexVersion.current())) - .numberOfShards(1) - .numberOfReplicas(0) - .putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy2")).build()) - ) - .build(); - // Emulated search requests that an enrich processor could generate: // (two unique searches for two enrich policies) - var searchRequest1 = new SearchRequest(EnrichPolicy.getBaseName("policy1")).source( - new SearchSourceBuilder().query(new MatchQueryBuilder("match_field", "1")) - ); final List> searchResponseMap = List.of( Map.of("key1", "value1", "key2", "value2"), Map.of("key3", "value3", "key4", "value4") @@ -187,12 +113,11 @@ public void testComputeIfAbsent() throws InterruptedException { final AtomicLong testNanoTime = new AtomicLong(0); // We use a relative time provider that increments 1ms every time it is called. So each operation appears to take 1ms EnrichCache enrichCache = new EnrichCache(3, () -> testNanoTime.addAndGet(TimeValue.timeValueMillis(1).getNanos())); - enrichCache.setMetadata(metadata); { CountDownLatch queriedDatabaseLatch = new CountDownLatch(1); CountDownLatch notifiedOfResultLatch = new CountDownLatch(1); - enrichCache.computeIfAbsent(searchRequest1, (searchRequest, searchResponseActionListener) -> { + enrichCache.computeIfAbsent("policy1-1", "1", 1, (searchResponseActionListener) -> { SearchResponse searchResponse = convertToSearchResponse(searchResponseMap); searchResponseActionListener.onResponse(searchResponse); searchResponse.decRef(); @@ -222,7 +147,7 @@ public void onFailure(Exception e) { { CountDownLatch notifiedOfResultLatch = new CountDownLatch(1); - enrichCache.computeIfAbsent(searchRequest1, (searchRequest, searchResponseActionListener) -> { + enrichCache.computeIfAbsent("policy1-1", "1", 1, (searchResponseActionListener) -> { fail("Expected no call to the database because item should have been in the cache"); }, new ActionListener<>() { @Override @@ -326,22 +251,4 @@ public void testDeepCopy() { assertArrayEquals(new byte[] { 1, 2, 3 }, (byte[]) result.get("embedded_object")); } - public void testEnrichIndexNotExist() { - // Emulate cluster metadata: - var metadata = Metadata.builder().build(); - - // Emulated search request on a non-exist enrich index that an enrich processor could generate - var searchRequest = new SearchRequest(EnrichPolicy.getBaseName("policy-enrich-index-not-generated")).source( - new SearchSourceBuilder().query(new MatchQueryBuilder("test", "query")) - ); - // Emulated search response (content doesn't matter, since it isn't used, it just a cache entry) - EnrichCache.CacheValue searchResponse = new EnrichCache.CacheValue(List.of(Map.of("test", "entry")), 1L); - - EnrichCache enrichCache = new EnrichCache(1); - enrichCache.setMetadata(metadata); - - IndexNotFoundException e = expectThrows(IndexNotFoundException.class, () -> enrichCache.put(searchRequest, searchResponse)); - assertThat(e.getMessage(), containsString("no generated enrich index [.enrich-policy-enrich-index-not-generated]")); - } - } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java index 5642e685a592d..fcf2bc3c14292 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.function.BiConsumer; +import java.util.function.Supplier; import static org.elasticsearch.xpack.enrich.MatchProcessorTests.str; import static org.hamcrest.Matchers.emptyArray; @@ -139,7 +140,7 @@ private void testBasicsForFieldValue(Object fieldValue, Geometry expectedGeometr } - private static final class MockSearchFunction implements BiConsumer>, Exception>> { + private static final class MockSearchFunction implements EnrichProcessorFactory.SearchRunner { private final List> mockResponse; private final SetOnce capturedRequest; private final Exception exception; @@ -157,8 +158,13 @@ private static final class MockSearchFunction implements BiConsumer>, Exception> handler) { - capturedRequest.set(request); + public void accept( + Object value, + int maxMatches, + Supplier searchRequestSupplier, + BiConsumer>, Exception> handler + ) { + capturedRequest.set(searchRequestSupplier.get()); if (exception != null) { handler.accept(null, exception); } else { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java index 0d7f900188ba1..b4d3ec15d31d3 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.function.BiConsumer; +import java.util.function.Supplier; import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.equalTo; @@ -376,7 +377,7 @@ public void testArray() { assertThat(entry.get("tld"), equalTo("co")); } - private static final class MockSearchFunction implements BiConsumer>, Exception>> { + private static final class MockSearchFunction implements EnrichProcessorFactory.SearchRunner { private final List> mockResponse; private final SetOnce capturedRequest; private final Exception exception; @@ -394,8 +395,13 @@ private static final class MockSearchFunction implements BiConsumer>, Exception> handler) { - capturedRequest.set(request); + public void accept( + Object value, + int maxMatches, + Supplier searchRequestSupplier, + BiConsumer>, Exception> handler + ) { + capturedRequest.set(searchRequestSupplier.get()); if (exception != null) { handler.accept(null, exception); } else {