Skip to content

Commit

Permalink
[percolator] Fix memory leak when percolator uses bitset or field dat…
Browse files Browse the repository at this point in the history
…a cache.

The percolator doesn't close the IndexReader of the memory index any more.
Prior to 2.x the percolator had its own SearchContext (PercolatorContext) that did this,
but that was removed when the percolator was refactored as part of the 5.0 release.

I think an alternative way to fix this is to let percolator not use the bitset and fielddata caches,
that way we prevent the memory leak.

Closes #24108
  • Loading branch information
martijnvg committed Apr 26, 2017
1 parent 99e5fa0 commit d3e754b
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 8 deletions.
Expand Up @@ -117,8 +117,7 @@ public void clear(String reason) {
private BitSet getAndLoadIfNotPresent(final Query query, final LeafReaderContext context) throws IOException, ExecutionException {
final Object coreCacheReader = context.reader().getCoreCacheKey();
final ShardId shardId = ShardUtils.extractShardId(context.reader());
if (shardId != null // can't require it because of the percolator
&& indexSettings.getIndex().equals(shardId.getIndex()) == false) {
if (indexSettings.getIndex().equals(shardId.getIndex()) == false) {
// insanity
throw new IllegalStateException("Trying to load bit set for index " + shardId.getIndex()
+ " with cache of index " + indexSettings.getIndex());
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.search.aggregations;

import org.apache.lucene.index.CompositeReaderContext;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
Expand All @@ -31,8 +32,10 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache.Listener;
Expand All @@ -48,6 +51,7 @@
import org.elasticsearch.index.mapper.ObjectMapper.Nested;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.support.NestedScope;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
Expand Down Expand Up @@ -280,4 +284,8 @@ public String toString() {
return "ShardSearcher(" + ctx.get(0) + ")";
}
}

protected static DirectoryReader wrap(DirectoryReader directoryReader) throws IOException {
return ElasticsearchDirectoryReader.wrap(directoryReader, new ShardId(new Index("_index", "_na_"), 0));
}
}
Expand Up @@ -54,7 +54,7 @@ public void testNoDocs() throws IOException {
try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) {
// intentionally not writing any docs
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
NESTED_OBJECT);
ReverseNestedAggregationBuilder reverseNestedBuilder
Expand Down Expand Up @@ -117,7 +117,7 @@ public void testMaxFromParentDocs() throws IOException {
}
iw.commit();
}
try (IndexReader indexReader = DirectoryReader.open(directory)) {
try (IndexReader indexReader = wrap(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG,
NESTED_OBJECT);
ReverseNestedAggregationBuilder reverseNestedBuilder
Expand Down
Expand Up @@ -24,20 +24,26 @@
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexReaderContext;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BitDocIdSet;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException;
Expand All @@ -59,6 +65,8 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.analysis.FieldNameAnalyzer;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand All @@ -72,6 +80,8 @@
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;

import java.io.IOException;
import java.util.Objects;
Expand Down Expand Up @@ -423,12 +433,9 @@ protected Analyzer getWrappedAnalyzer(String fieldName) {
docSearcher.setQueryCache(null);
}

Version indexVersionCreated = context.getIndexSettings().getIndexVersionCreated();
boolean mapUnmappedFieldsAsString = context.getIndexSettings()
.getValue(PercolatorFieldMapper.INDEX_MAP_UNMAPPED_FIELDS_AS_STRING_SETTING);
// We have to make a copy of the QueryShardContext here so we can have a unfrozen version for parsing the legacy
// percolator queries
QueryShardContext percolateShardContext = new QueryShardContext(context);
QueryShardContext percolateShardContext = wrap(context);
if (indexVersionCreated.onOrAfter(Version.V_5_0_0_alpha1)) {
MappedFieldType fieldType = context.fieldMapper(field);
if (fieldType == null) {
Expand Down Expand Up @@ -581,4 +588,36 @@ public Status needsField(FieldInfo fieldInfo) throws IOException {

}

static QueryShardContext wrap(QueryShardContext shardContext) {
return new QueryShardContext(shardContext) {

@Override
public BitSetProducer bitsetFilter(Query query) {
return context -> {
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
searcher.setQueryCache(null);
final Weight weight = searcher.createNormalizedWeight(query, false);
final Scorer s = weight.scorer(context);

if (s != null) {
return new BitDocIdSet(BitSet.of(s.iterator(), context.reader().maxDoc())).bits();
} else {
return null;
}
};
}

@Override
@SuppressWarnings("unchecked")
public <IFD extends IndexFieldData<?>> IFD getForField(MappedFieldType fieldType) {
IndexFieldData.Builder builder = fieldType.fielddataBuilder();
IndexFieldDataCache cache = new IndexFieldDataCache.None();
CircuitBreakerService circuitBreaker = new NoneCircuitBreakerService();
return (IFD) builder.build(shardContext.getIndexSettings(), fieldType, cache, circuitBreaker,
shardContext.getMapperService());
}
};
}

}
Expand Up @@ -24,9 +24,12 @@
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.query.MatchPhraseQueryBuilder;
import org.elasticsearch.index.query.MultiMatchQueryBuilder;
Expand All @@ -37,6 +40,7 @@
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.lookup.LeafDocLookup;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ESSingleNodeTestCase;

Expand Down Expand Up @@ -82,6 +86,11 @@ public static class CustomScriptPlugin extends MockScriptPlugin {
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
scripts.put("1==1", vars -> Boolean.TRUE);
scripts.put("use_fielddata_please", vars -> {
LeafDocLookup leafDocLookup = (LeafDocLookup) vars.get("_doc");
ScriptDocValues scriptDocValues = leafDocLookup.get("employees.name");
return "virginia_potts".equals(scriptDocValues.get(0));
});
return scripts;
}
}
Expand Down Expand Up @@ -606,4 +615,196 @@ public void testPercolateQueryWithNestedDocuments() throws Exception {
.get();
assertHitCount(response, 0);
}

public void testPercolateQueryWithNestedDocuments_doNotLeakBitsetCacheEntries() throws Exception {
XContentBuilder mapping = XContentFactory.jsonBuilder();
mapping.startObject().startObject("properties").startObject("companyname").field("type", "text").endObject()
.startObject("employee").field("type", "nested").startObject("properties")
.startObject("name").field("type", "text").endObject().endObject().endObject().endObject()
.endObject();
createIndex("test", client().admin().indices().prepareCreate("test")
// to avoid normal document from being cached by BitsetFilterCache
.setSettings(Settings.builder().put(BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING.getKey(), false))
.addMapping("employee", mapping)
.addMapping("queries", "query", "type=percolator")
);
client().prepareIndex("test", "queries", "q1").setSource(jsonBuilder().startObject()
.field("query", QueryBuilders.nestedQuery("employee",
QueryBuilders.matchQuery("employee.name", "virginia potts").operator(Operator.AND), ScoreMode.Avg)
).endObject())
.get();
client().admin().indices().prepareRefresh().get();

for (int i = 0; i < 32; i++) {
SearchResponse response = client().prepareSearch()
.setQuery(new PercolateQueryBuilder("query", "employee",
XContentFactory.jsonBuilder()
.startObject().field("companyname", "stark")
.startArray("employee")
.startObject().field("name", "virginia potts").endObject()
.startObject().field("name", "tony stark").endObject()
.endArray()
.endObject().bytes(), XContentType.JSON))
.addSort("_doc", SortOrder.ASC)
// size 0, because other wise load bitsets for normal document in FetchPhase#findRootDocumentIfNested(...)
.setSize(0)
.get();
assertHitCount(response, 1);
}

// We can't check via api... because BitsetCacheListener requires that it can extract shardId from index reader
// and for percolator it can't do that, but that means we don't keep track of
// memory for BitsetCache in case of percolator
long bitsetSize = client().admin().cluster().prepareClusterStats().get()
.getIndicesStats().getSegments().getBitsetMemoryInBytes();
assertEquals("The percolator works with in-memory index and therefor shouldn't use bitset cache", 0L, bitsetSize);
}

public void testPercolateQueryWithNestedDocuments_doLeakFieldDataCacheEntries() throws Exception {
XContentBuilder mapping = XContentFactory.jsonBuilder();
mapping.startObject();
{
mapping.startObject("properties");
{
mapping.startObject("companyname");
mapping.field("type", "text");
mapping.endObject();
}
{
mapping.startObject("employees");
mapping.field("type", "nested");
{
mapping.startObject("properties");
{
mapping.startObject("name");
mapping.field("type", "text");
mapping.field("fielddata", true);
mapping.endObject();
}
mapping.endObject();
}
mapping.endObject();
}
mapping.endObject();
}
mapping.endObject();
createIndex("test", client().admin().indices().prepareCreate("test")
.addMapping("employee", mapping)
.addMapping("queries", "query", "type=percolator")
);
Script script = new Script(ScriptType.INLINE, MockScriptPlugin.NAME, "use_fielddata_please", Collections.emptyMap());
client().prepareIndex("test", "queries", "q1").setSource(jsonBuilder().startObject()
.field("query", QueryBuilders.nestedQuery("employees",
QueryBuilders.scriptQuery(script), ScoreMode.Avg)
).endObject()).get();
client().admin().indices().prepareRefresh().get();
XContentBuilder doc = jsonBuilder();
doc.startObject();
{
doc.field("companyname", "stark");
doc.startArray("employees");
{
doc.startObject();
doc.field("name", "virginia_potts");
doc.endObject();
}
{
doc.startObject();
doc.field("name", "tony_stark");
doc.endObject();
}
doc.endArray();
}
doc.endObject();
for (int i = 0; i < 32; i++) {
SearchResponse response = client().prepareSearch()
.setQuery(new PercolateQueryBuilder("query", "employee", doc.bytes(), XContentType.JSON))
.addSort("_doc", SortOrder.ASC)
.get();
assertHitCount(response, 1);
}

long fieldDataSize = client().admin().cluster().prepareClusterStats().get()
.getIndicesStats().getFieldData().getMemorySizeInBytes();
assertEquals("The percolator works with in-memory index and therefor shouldn't use field-data cache", 0L, fieldDataSize);
}

public void testPercolatorQueryViaMultiSearch() throws Exception {
createIndex("test", client().admin().indices().prepareCreate("test")
.addMapping("type", "field1", "type=text")
.addMapping("queries", "query", "type=percolator")
);

client().prepareIndex("test", "queries", "1")
.setSource(jsonBuilder().startObject().field("query", matchQuery("field1", "b")).field("a", "b").endObject())
.execute().actionGet();
client().prepareIndex("test", "queries", "2")
.setSource(jsonBuilder().startObject().field("query", matchQuery("field1", "c")).endObject())
.execute().actionGet();
client().prepareIndex("test", "queries", "3")
.setSource(jsonBuilder().startObject().field("query", boolQuery()
.must(matchQuery("field1", "b"))
.must(matchQuery("field1", "c"))
).endObject())
.execute().actionGet();
client().prepareIndex("test", "queries", "4")
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject())
.execute().actionGet();
client().prepareIndex("test", "type", "1")
.setSource(jsonBuilder().startObject().field("field1", "c").endObject())
.execute().actionGet();
client().admin().indices().prepareRefresh().get();

MultiSearchResponse response = client().prepareMultiSearch()
.add(client().prepareSearch("test")
.setQuery(new PercolateQueryBuilder("query", "type",
jsonBuilder().startObject().field("field1", "b").endObject().bytes(), XContentType.JSON)))
.add(client().prepareSearch("test")
.setQuery(new PercolateQueryBuilder("query", "type",
yamlBuilder().startObject().field("field1", "c").endObject().bytes(), XContentType.JSON)))
.add(client().prepareSearch("test")
.setQuery(new PercolateQueryBuilder("query", "type",
smileBuilder().startObject().field("field1", "b c").endObject().bytes(), XContentType.JSON)))
.add(client().prepareSearch("test")
.setQuery(new PercolateQueryBuilder("query", "type",
jsonBuilder().startObject().field("field1", "d").endObject().bytes(), XContentType.JSON)))
.add(client().prepareSearch("test")
.setQuery(new PercolateQueryBuilder("query", "type", "test", "type", "1", null, null, null)))
.add(client().prepareSearch("test") // non existing doc, so error element
.setQuery(new PercolateQueryBuilder("query", "type", "test", "type", "2", null, null, null)))
.get();

MultiSearchResponse.Item item = response.getResponses()[0];
assertHitCount(item.getResponse(), 2L);
assertSearchHits(item.getResponse(), "1", "4");
assertThat(item.getFailureMessage(), nullValue());

item = response.getResponses()[1];
assertHitCount(item.getResponse(), 2L);
assertSearchHits(item.getResponse(), "2", "4");
assertThat(item.getFailureMessage(), nullValue());

item = response.getResponses()[2];
assertHitCount(item.getResponse(), 4L);
assertSearchHits(item.getResponse(), "1", "2", "3", "4");
assertThat(item.getFailureMessage(), nullValue());

item = response.getResponses()[3];
assertHitCount(item.getResponse(), 1L);
assertSearchHits(item.getResponse(), "4");
assertThat(item.getFailureMessage(), nullValue());

item = response.getResponses()[4];
assertHitCount(item.getResponse(), 2L);
assertSearchHits(item.getResponse(), "2", "4");
assertThat(item.getFailureMessage(), nullValue());

item = response.getResponses()[5];
assertThat(item.getResponse(), nullValue());
assertThat(item.getFailureMessage(), notNullValue());
assertThat(item.getFailureMessage(), equalTo("all shards failed"));
assertThat(ExceptionsHelper.unwrapCause(item.getFailure().getCause()).getMessage(),
containsString("[test/type/2] couldn't be found"));
}

}

0 comments on commit d3e754b

Please sign in to comment.