Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak when percolating with nested documents #6578

Merged
merged 1 commit into from Jul 1, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 0 additions & 6 deletions src/main/java/org/elasticsearch/index/cache/IndexCache.java
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.cache;

import org.apache.lucene.index.IndexReader;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService;
Expand Down Expand Up @@ -84,11 +83,6 @@ public void close() throws ElasticsearchException {
}
}

public void clear(IndexReader reader) {
filterCache.clear(reader);
docSetCache.clear(reader);
}

public void clear(String reason) {
filterCache.clear(reason);
queryParserCache.clear();
Expand Down
Expand Up @@ -23,6 +23,7 @@
import com.google.common.cache.RemovalListener;
import com.google.common.cache.Weigher;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.Filter;
Expand All @@ -32,6 +33,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.SegmentReaderUtils;
import org.elasticsearch.common.lucene.docset.DocIdSets;
import org.elasticsearch.common.lucene.search.CachedFilter;
import org.elasticsearch.common.lucene.search.NoCacheFilter;
Expand All @@ -51,7 +53,7 @@
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;

public class WeightedFilterCache extends AbstractIndexComponent implements FilterCache, SegmentReader.CoreClosedListener {
public class WeightedFilterCache extends AbstractIndexComponent implements FilterCache, SegmentReader.CoreClosedListener, IndexReader.ReaderClosedListener {

final IndicesFilterCache indicesFilterCache;
IndexService indexService;
Expand Down Expand Up @@ -79,6 +81,12 @@ public void close() throws ElasticsearchException {
clear("close");
}

@Override
public void onClose(IndexReader reader) {
clear(reader.getCoreCacheKey());
}


@Override
public void clear(String reason) {
logger.debug("full cache clear, reason [{}]", reason);
Expand Down Expand Up @@ -160,9 +168,7 @@ public DocIdSet getDocIdSet(AtomicReaderContext context, Bits acceptDocs) throws
Boolean previous = cache.seenReaders.putIfAbsent(context.reader().getCoreCacheKey(), Boolean.TRUE);
if (previous == null) {
// we add a core closed listener only, for non core IndexReaders we rely on clear being called (percolator for example)
if (context.reader() instanceof SegmentReader) {
((SegmentReader) context.reader()).addCoreClosedListener(cache);
}
SegmentReaderUtils.registerCoreListener(context.reader(), cache);
}
}
// we can't pass down acceptedDocs provided, because we are caching the result, and acceptedDocs
Expand Down Expand Up @@ -247,4 +253,4 @@ public int hashCode() {
return readerKey().hashCode() + 31 * filterKey.hashCode();
}
}
}
}
Expand Up @@ -69,6 +69,33 @@ interface Listener {
void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes);
}

class None implements IndexFieldDataCache {

@Override
public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(AtomicReaderContext context, IFD indexFieldData) throws Exception {
return indexFieldData.loadDirect(context);
}

@Override
@SuppressWarnings("unchecked")
public <IFD extends IndexFieldData.WithOrdinals<?>> IFD load(IndexReader indexReader, IFD indexFieldData) throws Exception {
return (IFD) indexFieldData.localGlobalDirect(indexReader);
}

@Override
public void clear() {
}

@Override
public void clear(String fieldName) {
}

@Override
public void clear(Object coreCacheKey) {

}
}

/**
* The resident field data cache is a *per field* cache that keeps all the values in memory.
*/
Expand Down
Expand Up @@ -21,7 +21,6 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
Expand All @@ -40,6 +39,7 @@
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;

Expand Down Expand Up @@ -187,17 +187,6 @@ public void clearField(String fieldName) {
}
}

public void clear(IndexReader reader) {
synchronized (loadedFieldData) {
for (IndexFieldData<?> indexFieldData : loadedFieldData.values()) {
indexFieldData.clear(reader);
}
for (IndexFieldDataCache cache : fieldDataCaches.values()) {
cache.clear(reader);
}
}
}

public void onMappingUpdate() {
// synchronize to make sure to not miss field data instances that are being loaded
synchronized (loadedFieldData) {
Expand All @@ -206,6 +195,7 @@ public void onMappingUpdate() {
}
}

@SuppressWarnings("unchecked")
public <IFD extends IndexFieldData<?>> IFD getForField(FieldMapper<?> mapper) {
final FieldMapper.Names fieldNames = mapper.names();
final FieldDataType type = mapper.fieldDataType();
Expand Down Expand Up @@ -251,6 +241,8 @@ public <IFD extends IndexFieldData<?>> IFD getForField(FieldMapper<?> mapper) {
cache = new IndexFieldDataCache.Soft(logger, indexService, fieldNames, type, indicesFieldDataCacheListener);
} else if ("node".equals(cacheType)) {
cache = indicesFieldDataCache.buildIndexFieldDataCache(indexService, index, fieldNames, type);
} else if ("none".equals(cacheType)){
cache = new IndexFieldDataCache.None();
} else {
throw new ElasticsearchIllegalArgumentException("cache type not supported [" + cacheType + "] for field [" + fieldNames.fullName() + "]");
}
Expand All @@ -266,4 +258,41 @@ public <IFD extends IndexFieldData<?>> IFD getForField(FieldMapper<?> mapper) {
return (IFD) fieldData;
}

public <IFD extends IndexFieldData<?>> IFD getForFieldDirect(FieldMapper<?> mapper) {
final FieldMapper.Names fieldNames = mapper.names();
final FieldDataType type = mapper.fieldDataType();
if (type == null) {
throw new ElasticsearchIllegalArgumentException("found no fielddata type for field [" + fieldNames.fullName() + "]");
}
final boolean docValues = mapper.hasDocValues();

IndexFieldData.Builder builder = null;
String format = type.getFormat(indexSettings);
if (format != null && FieldDataType.DOC_VALUES_FORMAT_VALUE.equals(format) && !docValues) {
logger.warn("field [" + fieldNames.fullName() + "] has no doc values, will use default field data format");
format = null;
}
if (format != null) {
builder = buildersByTypeAndFormat.get(Tuple.tuple(type.getType(), format));
if (builder == null) {
logger.warn("failed to find format [" + format + "] for field [" + fieldNames.fullName() + "], will use default");
}
}
if (builder == null && docValues) {
builder = docValuesBuildersByType.get(type.getType());
}
if (builder == null) {
builder = buildersByType.get(type.getType());
}
if (builder == null) {
throw new ElasticsearchIllegalArgumentException("failed to find field data builder for field " + fieldNames.fullName() + ", and type " + type.getType());
}

CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
GlobalOrdinalsBuilder globalOrdinalBuilder = new InternalGlobalOrdinalsBuilder(index(), indexSettings);
@SuppressWarnings("unchecked")
IFD ifd = (IFD) builder.build(index, indexSettings, mapper, new IndexFieldDataCache.None(), circuitBreakerService, indexService.mapperService(), globalOrdinalBuilder);
return ifd;
}

}
Expand Up @@ -32,7 +32,6 @@
import org.elasticsearch.index.codec.docvaluesformat.DocValuesFormatProvider;
import org.elasticsearch.index.codec.postingsformat.PostingsFormatProvider;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.core.AbstractFieldMapper;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.similarity.SimilarityProvider;
Expand Down Expand Up @@ -249,7 +248,7 @@ public static Loading parse(String loading, Loading defaultValue) {

Filter termsFilter(List values, @Nullable QueryParseContext context);

Filter termsFilter(IndexFieldDataService fieldData, List values, @Nullable QueryParseContext context);
Filter termsFilter(QueryParseContext parseContext, List values, @Nullable QueryParseContext context);

Query rangeQuery(Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context);

Expand Down
Expand Up @@ -50,7 +50,6 @@
import org.elasticsearch.index.codec.postingsformat.PostingsFormatProvider;
import org.elasticsearch.index.codec.postingsformat.PostingsFormatService;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.internal.AllFieldMapper;
import org.elasticsearch.index.mapper.object.ObjectMapper;
Expand Down Expand Up @@ -491,7 +490,7 @@ public Filter termsFilter(List values, @Nullable QueryParseContext context) {
* A terms filter based on the field data cache
*/
@Override
public Filter termsFilter(IndexFieldDataService fieldDataService, List values, @Nullable QueryParseContext context) {
public Filter termsFilter(QueryParseContext fieldDataService, List values, @Nullable QueryParseContext context) {
// create with initial size large enough to avoid rehashing
ObjectOpenHashSet<BytesRef> terms =
new ObjectOpenHashSet<>((int) (values.size() * (1 + ObjectOpenHashSet.DEFAULT_LOAD_FACTOR)));
Expand Down
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.index.codec.docvaluesformat.DocValuesFormatProvider;
import org.elasticsearch.index.codec.postingsformat.PostingsFormatProvider;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.query.QueryParseContext;
Expand Down Expand Up @@ -223,8 +222,8 @@ public Filter rangeFilter(Object lowerTerm, Object upperTerm, boolean includeLow
}

@Override
public Filter rangeFilter(IndexFieldDataService fieldData, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return NumericRangeFieldDataFilter.newByteRange((IndexNumericFieldData) fieldData.getForField(this),
public Filter rangeFilter(QueryParseContext parseContext, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return NumericRangeFieldDataFilter.newByteRange((IndexNumericFieldData) parseContext.getForField(this),
lowerTerm == null ? null : parseValue(lowerTerm),
upperTerm == null ? null : parseValue(upperTerm),
includeLower, includeUpper);
Expand Down
Expand Up @@ -46,7 +46,6 @@
import org.elasticsearch.index.codec.docvaluesformat.DocValuesFormatProvider;
import org.elasticsearch.index.codec.postingsformat.PostingsFormatProvider;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.mapper.core.LongFieldMapper.CustomLongNumericField;
Expand Down Expand Up @@ -369,11 +368,11 @@ public Filter rangeFilter(Object lowerTerm, Object upperTerm, boolean includeLow
}

@Override
public Filter rangeFilter(IndexFieldDataService fieldData, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return rangeFilter(fieldData, lowerTerm, upperTerm, includeLower, includeUpper, context, false);
public Filter rangeFilter(QueryParseContext parseContext, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return rangeFilter(parseContext, lowerTerm, upperTerm, includeLower, includeUpper, context, false);
}

public Filter rangeFilter(IndexFieldDataService fieldData, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context, boolean explicitCaching) {
public Filter rangeFilter(QueryParseContext parseContext, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context, boolean explicitCaching) {
boolean cache = explicitCaching;
Long lowerVal = null;
Long upperVal = null;
Expand All @@ -397,7 +396,7 @@ public Filter rangeFilter(IndexFieldDataService fieldData, Object lowerTerm, Obj
}

Filter filter = NumericRangeFieldDataFilter.newLongRange(
(IndexNumericFieldData<?>) fieldData.getForField(this), lowerVal,upperVal, includeLower, includeUpper
(IndexNumericFieldData<?>) parseContext.getForField(this), lowerVal,upperVal, includeLower, includeUpper
);
if (!cache) {
// We don't cache range filter if `now` date expression is used and also when a compound filter wraps
Expand Down
Expand Up @@ -45,7 +45,6 @@
import org.elasticsearch.index.codec.docvaluesformat.DocValuesFormatProvider;
import org.elasticsearch.index.codec.postingsformat.PostingsFormatProvider;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.query.QueryParseContext;
Expand Down Expand Up @@ -218,8 +217,8 @@ public Filter rangeFilter(Double lowerTerm, Double upperTerm, boolean includeLow
}

@Override
public Filter rangeFilter(IndexFieldDataService fieldData, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return NumericRangeFieldDataFilter.newDoubleRange((IndexNumericFieldData) fieldData.getForField(this),
public Filter rangeFilter(QueryParseContext parseContext, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return NumericRangeFieldDataFilter.newDoubleRange((IndexNumericFieldData) parseContext.getForField(this),
lowerTerm == null ? null : parseDoubleValue(lowerTerm),
upperTerm == null ? null : parseDoubleValue(upperTerm),
includeLower, includeUpper);
Expand Down
Expand Up @@ -46,7 +46,6 @@
import org.elasticsearch.index.codec.docvaluesformat.DocValuesFormatProvider;
import org.elasticsearch.index.codec.postingsformat.PostingsFormatProvider;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.query.QueryParseContext;
Expand Down Expand Up @@ -223,8 +222,8 @@ public Filter rangeFilter(Object lowerTerm, Object upperTerm, boolean includeLow
}

@Override
public Filter rangeFilter(IndexFieldDataService fieldData, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return NumericRangeFieldDataFilter.newFloatRange((IndexNumericFieldData) fieldData.getForField(this),
public Filter rangeFilter(QueryParseContext parseContext, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return NumericRangeFieldDataFilter.newFloatRange((IndexNumericFieldData) parseContext.getForField(this),
lowerTerm == null ? null : parseValue(lowerTerm),
upperTerm == null ? null : parseValue(upperTerm),
includeLower, includeUpper);
Expand Down
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.index.codec.docvaluesformat.DocValuesFormatProvider;
import org.elasticsearch.index.codec.postingsformat.PostingsFormatProvider;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.query.QueryParseContext;
Expand Down Expand Up @@ -218,8 +217,8 @@ public Filter rangeFilter(Object lowerTerm, Object upperTerm, boolean includeLow
}

@Override
public Filter rangeFilter(IndexFieldDataService fieldData, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return NumericRangeFieldDataFilter.newIntRange((IndexNumericFieldData) fieldData.getForField(this),
public Filter rangeFilter(QueryParseContext parseContext, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return NumericRangeFieldDataFilter.newIntRange((IndexNumericFieldData) parseContext.getForField(this),
lowerTerm == null ? null : parseValue(lowerTerm),
upperTerm == null ? null : parseValue(upperTerm),
includeLower, includeUpper);
Expand Down
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.index.codec.docvaluesformat.DocValuesFormatProvider;
import org.elasticsearch.index.codec.postingsformat.PostingsFormatProvider;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.mapper.*;
import org.elasticsearch.index.query.QueryParseContext;
Expand Down Expand Up @@ -208,8 +207,8 @@ public Filter rangeFilter(Object lowerTerm, Object upperTerm, boolean includeLow
}

@Override
public Filter rangeFilter(IndexFieldDataService fieldData, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return NumericRangeFieldDataFilter.newLongRange((IndexNumericFieldData) fieldData.getForField(this),
public Filter rangeFilter(QueryParseContext parseContext, Object lowerTerm, Object upperTerm, boolean includeLower, boolean includeUpper, @Nullable QueryParseContext context) {
return NumericRangeFieldDataFilter.newLongRange((IndexNumericFieldData) parseContext.getForField(this),
lowerTerm == null ? null : parseLongValue(lowerTerm),
upperTerm == null ? null : parseLongValue(upperTerm),
includeLower, includeUpper);
Expand Down