Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove terms filter lookup cache. #9056

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/reference/migration/migrate_2_0.asciidoc
Expand Up @@ -102,4 +102,11 @@ Some query builders have been removed or renamed:
* `filtered(...)` removed. Use `filteredQuery(...)` instead.
* `inQuery(...)` removed.

=== Terms filter lookup caching

The terms filter lookup mechanism does not support the `cache` option anymore
and relies on the filesystem cache instead. If the lookup index is not too
large, it is recommended to make it replicated to all nodes by setting
`index.auto_expand_replicas: 0-all` in order to remove the network overhead as
well.

43 changes: 3 additions & 40 deletions docs/reference/query-dsl/filters/terms-filter.asciidoc
Expand Up @@ -121,11 +121,6 @@ The terms lookup mechanism supports the following options:
A custom routing value to be used when retrieving the
external terms doc.

`cache`::
Whether to cache the filter built from the retrieved document
(`true` - default) or whether to fetch and rebuild the filter on every
request (`false`). See "<<query-dsl-terms-filter-lookup-caching,Terms lookup caching>>" below

The values for the `terms` filter will be fetched from a field in a
document with the specified id in the specified type and index.
Internally a get request is executed to fetch the values from the
Expand All @@ -137,28 +132,6 @@ across all nodes if the "reference" terms data is not large. The lookup
terms filter will prefer to execute the get request on a local node if
possible, reducing the need for networking.

["float",id="query-dsl-terms-filter-lookup-caching"]
==== Terms lookup caching

There is an additional cache involved, which caches the lookup of the
lookup document to the actual terms. This lookup cache is a LRU cache.
This cache has the following options:

`indices.cache.filter.terms.size`::
The size of the lookup cache. The default is `10mb`.

`indices.cache.filter.terms.expire_after_access`::
The time after the last read an entry should expire. Disabled by default.

`indices.cache.filter.terms.expire_after_write`::
The time after the last write an entry should expire. Disabled by default.

All options for the lookup of the documents cache can only be configured
via the `elasticsearch.yml` file.

When using the terms lookup the `execution` option isn't taken into
account and behaves as if the execution mode was set to `plain`.

[float]
==== Terms lookup twitter example

Expand Down Expand Up @@ -194,19 +167,9 @@ curl -XGET localhost:9200/tweets/_search -d '{
}'
--------------------------------------------------

The above is highly optimized, both in a sense that the list of
followers will not be fetched if the filter is already cached in the
filter cache, and with internal LRU cache for fetching external values
for the terms filter. Also, the entry in the filter cache will not hold
`all` the terms reducing the memory required for it.

`_cache_key` is recommended to be set, so its simple to clear the cache
associated with it using the clear cache API. For example:

[source,js]
--------------------------------------------------
curl -XPOST 'localhost:9200/tweets/_cache/clear?filter_keys=user_2_friends'
--------------------------------------------------
If there are lots of matching values, then `_cache_key` is recommended to be set,
so that the filter cache will not store a reference to the potentially heavy
terms filter.

The structure of the external terms document can also include array of
inner objects, for example:
Expand Down
Expand Up @@ -33,11 +33,10 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -53,16 +52,14 @@
public class TransportClearIndicesCacheAction extends TransportBroadcastOperationAction<ClearIndicesCacheRequest, ClearIndicesCacheResponse, ShardClearIndicesCacheRequest, ShardClearIndicesCacheResponse> {

private final IndicesService indicesService;
private final IndicesTermsFilterCache termsFilterCache;
private final IndicesQueryCache indicesQueryCache;

@Inject
public TransportClearIndicesCacheAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService, IndicesTermsFilterCache termsFilterCache,
TransportService transportService, IndicesService indicesService,
IndicesQueryCache indicesQueryCache, ActionFilters actionFilters) {
super(settings, ClearIndicesCacheAction.NAME, threadPool, clusterService, transportService, actionFilters);
this.indicesService = indicesService;
this.termsFilterCache = termsFilterCache;
this.indicesQueryCache = indicesQueryCache;
}

Expand Down Expand Up @@ -124,12 +121,10 @@ protected ShardClearIndicesCacheResponse shardOperation(ShardClearIndicesCacheRe
if (request.filterCache()) {
clearedAtLeastOne = true;
service.cache().filter().clear("api");
termsFilterCache.clear("api");
}
if (request.filterKeys() != null && request.filterKeys().length > 0) {
clearedAtLeastOne = true;
service.cache().filter().clear("api", request.filterKeys());
termsFilterCache.clear("api", request.filterKeys());
}
if (request.fieldDataCache()) {
clearedAtLeastOne = true;
Expand Down Expand Up @@ -163,7 +158,6 @@ protected ShardClearIndicesCacheResponse shardOperation(ShardClearIndicesCacheRe
} else {
service.cache().clear("api");
service.fieldData().clear();
termsFilterCache.clear("api");
indicesQueryCache.clear(shard);
}
}
Expand Down
29 changes: 13 additions & 16 deletions src/main/java/org/elasticsearch/index/query/TermsFilterParser.java
Expand Up @@ -28,6 +28,9 @@
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.FilterCachingPolicy;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.common.lucene.HashedBytesRef;
Expand All @@ -36,9 +39,9 @@
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.lucene.search.XBooleanFilter;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cache.filter.terms.TermsLookup;

import java.io.IOException;
Expand All @@ -52,7 +55,7 @@
public class TermsFilterParser implements FilterParser {

public static final String NAME = "terms";
private IndicesTermsFilterCache termsFilterCache;
private Client client;

public static final String EXECUTION_KEY = "execution";
public static final String EXECUTION_VALUE_PLAIN = "plain";
Expand All @@ -74,8 +77,8 @@ public String[] names() {
}

@Inject(optional = true)
public void setIndicesTermsFilterCache(IndicesTermsFilterCache termsFilterCache) {
this.termsFilterCache = termsFilterCache;
public void setClient(Client client) {
this.client = client;
}

@Override
Expand All @@ -92,7 +95,6 @@ public Filter parse(QueryParseContext parseContext) throws IOException, QueryPar
String lookupId = null;
String lookupPath = null;
String lookupRouting = null;
boolean lookupCache = true;

HashedBytesRef cacheKey = null;
XContentParser.Token token;
Expand Down Expand Up @@ -131,8 +133,6 @@ public Filter parse(QueryParseContext parseContext) throws IOException, QueryPar
lookupPath = parser.text();
} else if ("routing".equals(currentFieldName)) {
lookupRouting = parser.textOrNull();
} else if ("cache".equals(currentFieldName)) {
lookupCache = parser.booleanValue();
} else {
throw new QueryParsingException(parseContext.index(), "[terms] filter does not support [" + currentFieldName + "] within lookup element");
}
Expand Down Expand Up @@ -166,7 +166,7 @@ public Filter parse(QueryParseContext parseContext) throws IOException, QueryPar
throw new QueryParsingException(parseContext.index(), "terms filter requires a field name, followed by array of terms");
}

FieldMapper fieldMapper = null;
FieldMapper<?> fieldMapper = null;
smartNameFieldMappers = parseContext.smartFieldMappers(fieldName);
String[] previousTypes = null;
if (smartNameFieldMappers != null) {
Expand All @@ -181,15 +181,12 @@ public Filter parse(QueryParseContext parseContext) throws IOException, QueryPar
}

if (lookupId != null) {
// if there are no mappings, then nothing has been indexing yet against this shard, so we can return
// no match (but not cached!), since the Terms Lookup relies on the fact that there are mappings...
if (fieldMapper == null) {
return Queries.MATCH_NO_FILTER;
final TermsLookup lookup = new TermsLookup(lookupIndex, lookupType, lookupId, lookupRouting, lookupPath, parseContext);
final GetResponse getResponse = client.get(new GetRequest(lookup.getIndex(), lookup.getType(), lookup.getId()).preference("_local").routing(lookup.getRouting())).actionGet();
if (getResponse.isExists()) {
List<Object> values = XContentMapValues.extractRawValues(lookup.getPath(), getResponse.getSourceAsMap());
terms.addAll(values);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm what do we do if isExists() returns false?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it might be a preexisting issue but if we execute this on a network thread we might deadlock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do we do if isExists() returns false?

If there is no such entry in the index, then we will call the terms filter parser on an empty list, and this generates a MatchNoDocsFilter (see AbstractFieldMapper.termsFilter).

I know it might be a preexisting issue but if we execute this on a network thread we might deadlock?

I did not change the way that the request is performed, so if there is an issue, it is pre-existing indeed. Any suggestion on how to fix the issue (I don't know much about how we deal with thread pools and network threads)? If it's not easy to fix, I'd lean towards opening another issue for it if you don't mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it might be ok to do this since we are only parsing it on a search thread. I wonder if we can add an assertion for the threadpool that it is not a nework thread ever? but opening a diff issue is good


// external lookup, use it
TermsLookup termsLookup = new TermsLookup(lookupIndex, lookupType, lookupId, lookupRouting, lookupPath, parseContext);
terms.addAll(termsFilterCache.terms(termsLookup, lookupCache, cacheKey));
}

if (terms.isEmpty()) {
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/org/elasticsearch/indices/IndicesModule.java
Expand Up @@ -20,14 +20,14 @@
package org.elasticsearch.indices;

import com.google.common.collect.ImmutableList;

import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.analysis.IndicesAnalysisModule;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -75,7 +75,6 @@ protected void configure() {
bind(IndicesFilterCache.class).asEagerSingleton();
bind(IndicesQueryCache.class).asEagerSingleton();
bind(IndicesFieldDataCache.class).asEagerSingleton();
bind(IndicesTermsFilterCache.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(IndicesTTLService.class).asEagerSingleton();
bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton();
Expand Down