From b7451533c893214caf754737036330d3dea892cd Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 1 Apr 2014 16:18:56 +0700 Subject: [PATCH] Added an indices level field data cache listener that always gets invoked and updates indices statistics and services about field data loading and unloading. Moved the circuit breaker memory reducing logic to the IndicesFieldDataCacheListener, so it always reduces the memory used when field data gets unloaded, this fixes a issue where the circuit breaker didn't get reduced when segments where no shardId could be resolved get cleared up. Also made sure that exceptions in the percolator service are bubbled up properly. Closes #5588 --- .../index/fielddata/IndexFieldDataCache.java | 36 ++++++------ .../fielddata/IndexFieldDataService.java | 11 ++-- .../index/fielddata/ShardFieldData.java | 10 +--- .../elasticsearch/indices/IndicesModule.java | 2 + .../cache/IndicesFieldDataCache.java | 28 ++++++---- .../cache/IndicesFieldDataCacheListener.java | 55 +++++++++++++++++++ .../percolator/PercolatorService.java | 13 ++--- .../percolator/PercolatorTests.java | 24 +++++++- 8 files changed, 130 insertions(+), 49 deletions(-) create mode 100644 src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java index 9d0af0b5df27f..c48652f854e17 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java @@ -32,7 +32,10 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Callable; /** @@ -70,11 +73,13 @@ static abstract class FieldBased implements IndexFieldDataCache, SegmentReader.C private final FieldMapper.Names fieldNames; private final FieldDataType fieldDataType; private final Cache cache; + private final IndicesFieldDataCacheListener indicesFieldDataCacheListener; - protected FieldBased(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache) { + protected FieldBased(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener) { this.indexService = indexService; this.fieldNames = fieldNames; this.fieldDataType = fieldDataType; + this.indicesFieldDataCacheListener = indicesFieldDataCacheListener; cache.removalListener(this); //noinspection unchecked this.cache = cache.build(); @@ -83,15 +88,16 @@ protected FieldBased(@Nullable IndexService indexService, FieldMapper.Names fiel @Override public void onRemoval(RemovalNotification notification) { Key key = notification.getKey(); - if (key == null || key.listener == null) { - return; // we can't do anything here... - } + assert key != null && key.listeners != null; + AtomicFieldData value = notification.getValue(); long sizeInBytes = key.sizeInBytes; if (sizeInBytes == -1 && value != null) { sizeInBytes = value.getMemorySizeInBytes(); } - key.listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes, value); + for (Listener listener : key.listeners) { + listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes, value); + } } @Override @@ -104,21 +110,20 @@ public AtomicFieldData call() throws Exception { SegmentReaderUtils.registerCoreListener(context.reader(), FieldBased.this); AtomicFieldData fieldData = indexFieldData.loadDirect(context); key.sizeInBytes = fieldData.getMemorySizeInBytes(); + key.listeners.add(indicesFieldDataCacheListener); if (indexService != null) { ShardId shardId = ShardUtils.extractShardId(context.reader()); if (shardId != null) { IndexShard shard = indexService.shard(shardId.id()); if (shard != null) { - key.listener = shard.fieldData(); + key.listeners.add(shard.fieldData()); } } } - - if (key.listener != null) { - key.listener.onLoad(fieldNames, fieldDataType, fieldData); + for (Listener listener : key.listeners) { + listener.onLoad(fieldNames, fieldDataType, fieldData); } - return fieldData; } }); @@ -146,8 +151,7 @@ public void onClose(Object coreCacheKey) { static class Key { final Object readerKey; - @Nullable - Listener listener; // optional stats listener + final List listeners = new ArrayList<>(); // optional stats listener long sizeInBytes = -1; // optional size in bytes (we keep it here in case the values are soft references) Key(Object readerKey) { @@ -171,15 +175,15 @@ public int hashCode() { static class Resident extends FieldBased { - public Resident(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType) { - super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder()); + public Resident(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) { + super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder(), indicesFieldDataCacheListener); } } static class Soft extends FieldBased { - public Soft(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType) { - super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues()); + public Soft(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) { + super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues(), indicesFieldDataCacheListener); } } } diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java index 0b5623e11c120..5eb6eaa23ab67 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.mapper.internal.ParentFieldMapper; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; @@ -58,6 +59,7 @@ public class IndexFieldDataService extends AbstractIndexComponent { private final static ImmutableMap docValuesBuildersByType; private final static ImmutableMap, IndexFieldData.Builder> buildersByTypeAndFormat; private final CircuitBreakerService circuitBreakerService; + private final IndicesFieldDataCacheListener indicesFieldDataCacheListener; static { buildersByType = MapBuilder.newMapBuilder() @@ -129,15 +131,16 @@ public class IndexFieldDataService extends AbstractIndexComponent { // public for testing public IndexFieldDataService(Index index, CircuitBreakerService circuitBreakerService) { - this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS), circuitBreakerService); + this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCacheListener(circuitBreakerService)), circuitBreakerService, new IndicesFieldDataCacheListener(circuitBreakerService)); } @Inject public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache, - CircuitBreakerService circuitBreakerService) { + CircuitBreakerService circuitBreakerService, IndicesFieldDataCacheListener indicesFieldDataCacheListener) { super(index, indexSettings); this.indicesFieldDataCache = indicesFieldDataCache; this.circuitBreakerService = circuitBreakerService; + this.indicesFieldDataCacheListener = indicesFieldDataCacheListener; } // we need to "inject" the index service to not create cyclic dep @@ -227,9 +230,9 @@ public > IFD getForField(FieldMapper mapper) { // this means changing the node level settings is simple, just set the bounds there String cacheType = type.getSettings().get("cache", indexSettings.get("index.fielddata.cache", "node")); if ("resident".equals(cacheType)) { - cache = new IndexFieldDataCache.Resident(indexService, fieldNames, type); + cache = new IndexFieldDataCache.Resident(indexService, fieldNames, type, indicesFieldDataCacheListener); } else if ("soft".equals(cacheType)) { - cache = new IndexFieldDataCache.Soft(indexService, fieldNames, type); + cache = new IndexFieldDataCache.Soft(indexService, fieldNames, type, indicesFieldDataCacheListener); } else if ("node".equals(cacheType)) { cache = indicesFieldDataCache.buildIndexFieldDataCache(indexService, index, fieldNames, type); } else { diff --git a/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java b/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java index 3c86d9c40794e..198d8762d84a7 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java +++ b/src/main/java/org/elasticsearch/index/fielddata/ShardFieldData.java @@ -31,7 +31,6 @@ import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -45,12 +44,9 @@ public class ShardFieldData extends AbstractIndexShardComponent implements Index final ConcurrentMap perFieldTotals = ConcurrentCollections.newConcurrentMap(); - private final CircuitBreakerService breakerService; - @Inject - public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings, CircuitBreakerService breakerService) { + public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings) { super(shardId, indexSettings); - this.breakerService = breakerService; } public FieldDataStats stats(String... fields) { @@ -101,10 +97,6 @@ public void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType, evictionsMetric.inc(); } if (sizeInBytes != -1) { - // Since field data is being unloaded (due to expiration or manual - // clearing), we also need to decrement the used bytes in the breaker - breakerService.getBreaker().addWithoutBreaking(-sizeInBytes); - totalMetric.dec(sizeInBytes); String keyFieldName = fieldNames.indexName(); diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java index 16106a4f996ad..76ac7e165d70e 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -31,6 +31,7 @@ import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService; +import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.memory.IndexingMemoryController; import org.elasticsearch.indices.query.IndicesQueriesModule; @@ -81,5 +82,6 @@ protected void configure() { bind(UpdateHelper.class).asEagerSingleton(); bind(CircuitBreakerService.class).to(InternalCircuitBreakerService.class).asEagerSingleton(); + bind(IndicesFieldDataCacheListener.class).asEagerSingleton(); } } diff --git a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java index 1c5cdfabe0bcf..36969833ab707 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -40,6 +40,8 @@ import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.shard.service.IndexShard; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -47,6 +49,8 @@ */ public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener { + private final IndicesFieldDataCacheListener indicesFieldDataCacheListener; + Cache cache; private volatile String size; @@ -55,8 +59,9 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL @Inject - public IndicesFieldDataCache(Settings settings) { + public IndicesFieldDataCache(Settings settings, IndicesFieldDataCacheListener indicesFieldDataCacheListener) { super(settings); + this.indicesFieldDataCacheListener = indicesFieldDataCacheListener; this.size = componentSettings.get("size", "-1"); this.sizeInBytes = componentSettings.getAsMemory("size", "-1").bytes(); this.expire = componentSettings.getAsTime("expire", null); @@ -89,16 +94,17 @@ public IndexFieldDataCache buildIndexFieldDataCache(@Nullable IndexService index @Override public void onRemoval(RemovalNotification notification) { Key key = notification.getKey(); - if (key == null || key.listener == null) { - return; // nothing to do here really... - } + assert key != null && key.listeners != null; + IndexFieldCache indexCache = key.indexCache; long sizeInBytes = key.sizeInBytes; AtomicFieldData value = notification.getValue(); if (sizeInBytes == -1 && value != null) { sizeInBytes = value.getMemorySizeInBytes(); } - key.listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes, value); + for (IndexFieldDataCache.Listener listener : key.listeners) { + listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes, value); + } } public static class FieldDataWeigher implements Weigher { @@ -137,21 +143,20 @@ public > FD load(fina public AtomicFieldData call() throws Exception { SegmentReaderUtils.registerCoreListener(context.reader(), IndexFieldCache.this); AtomicFieldData fieldData = indexFieldData.loadDirect(context); + key.listeners.add(indicesFieldDataCacheListener); if (indexService != null) { ShardId shardId = ShardUtils.extractShardId(context.reader()); if (shardId != null) { IndexShard shard = indexService.shard(shardId.id()); if (shard != null) { - key.listener = shard.fieldData(); + key.listeners.add(shard.fieldData()); } } } - - if (key.listener != null) { - key.listener.onLoad(fieldNames, fieldDataType, fieldData); + for (Listener listener : key.listeners) { + listener.onLoad(fieldNames, fieldDataType, fieldData); } - return fieldData; } }); @@ -192,8 +197,7 @@ public static class Key { public final IndexFieldCache indexCache; public final Object readerKey; - @Nullable - public IndexFieldDataCache.Listener listener; // optional stats listener + public final List listeners = new ArrayList<>(); // optional stats listener long sizeInBytes = -1; // optional size in bytes (we keep it here in case the values are soft references) diff --git a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java new file mode 100644 index 0000000000000..09b7e81e74faf --- /dev/null +++ b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCacheListener.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.fielddata.cache; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.fielddata.AtomicFieldData; +import org.elasticsearch.index.fielddata.FieldDataType; +import org.elasticsearch.index.fielddata.IndexFieldDataCache; +import org.elasticsearch.index.mapper.FieldMapper; +import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService; + +/** + * A {@link IndexFieldDataCache.Listener} implementation that updates indices (node) level statistics / service about + * field data entries being loaded and unloaded. + * + * Currently it only decrements the memory used in the {@link CircuitBreakerService}. + */ +public class IndicesFieldDataCacheListener implements IndexFieldDataCache.Listener { + + private final CircuitBreakerService circuitBreakerService; + + @Inject + public IndicesFieldDataCacheListener(CircuitBreakerService circuitBreakerService) { + this.circuitBreakerService = circuitBreakerService; + } + + @Override + public void onLoad(FieldMapper.Names fieldNames, FieldDataType fieldDataType, AtomicFieldData fieldData) { + } + + @Override + public void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes, @Nullable AtomicFieldData fieldData) { + assert sizeInBytes > 0 : "When reducing circuit breaker, it should be adjusted with a positive number and not [" + sizeInBytes + "]"; + circuitBreakerService.getBreaker().addWithoutBreaking(-sizeInBytes); + } + +} diff --git a/src/main/java/org/elasticsearch/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/percolator/PercolatorService.java index 014c36e911de5..3c9b6274a2787 100644 --- a/src/main/java/org/elasticsearch/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/percolator/PercolatorService.java @@ -70,10 +70,7 @@ import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.percolator.QueryCollector.Count; -import org.elasticsearch.percolator.QueryCollector.Match; -import org.elasticsearch.percolator.QueryCollector.MatchAndScore; -import org.elasticsearch.percolator.QueryCollector.MatchAndSort; +import org.elasticsearch.percolator.QueryCollector.*; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchParseElement; import org.elasticsearch.search.SearchShardTarget; @@ -441,8 +438,9 @@ public PercolateShardResponse doPercolate(PercolateShardRequest request, Percola collector.reset(); try { context.docSearcher().search(entry.getValue(), collector); - } catch (IOException e) { - logger.warn("[" + entry.getKey() + "] failed to execute query", e); + } catch (Throwable e) { + logger.debug("[" + entry.getKey() + "] failed to execute query", e); + throw new PercolateException(context.indexShard().shardId(), "failed to execute", e); } if (collector.exists()) { @@ -539,7 +537,8 @@ public PercolateShardResponse doPercolate(PercolateShardRequest request, Percola try { context.docSearcher().search(entry.getValue(), collector); } catch (Throwable e) { - logger.warn("[" + entry.getKey() + "] failed to execute query", e); + logger.debug("[" + entry.getKey() + "] failed to execute query", e); + throw new PercolateException(context.indexShard().shardId(), "failed to execute", e); } if (collector.exists()) { diff --git a/src/test/java/org/elasticsearch/percolator/PercolatorTests.java b/src/test/java/org/elasticsearch/percolator/PercolatorTests.java index e930a1d373cf0..bd8c1d45d8330 100644 --- a/src/test/java/org/elasticsearch/percolator/PercolatorTests.java +++ b/src/test/java/org/elasticsearch/percolator/PercolatorTests.java @@ -217,7 +217,7 @@ public void testSimple2() throws Exception { client().prepareIndex("test1", PercolatorService.TYPE_NAME) .setSource( XContentFactory.jsonBuilder().startObject().field("query", - constantScoreQuery(FilterBuilders.rangeFilter("field2").from("value").includeLower(true)) + constantScoreQuery(FilterBuilders.rangeFilter("field2").from(1).to(5).includeLower(true).setExecution("fielddata")) ).endObject() ) .execute().actionGet(); @@ -230,6 +230,28 @@ public void testSimple2() throws Exception { assertThat(convertFromTextArray(response.getMatches(), "test"), arrayContaining("test1")); } + @Test + public void testRangeFilterThatUsesFD() throws Exception { + client().admin().indices().prepareCreate("test") + .addMapping("type1", "field1", "type=long") + .get(); + + + client().prepareIndex("test", PercolatorService.TYPE_NAME, "1") + .setSource( + XContentFactory.jsonBuilder().startObject().field("query", + constantScoreQuery(FilterBuilders.rangeFilter("field1").from(1).to(5).setExecution("fielddata")) + ).endObject() + ).get(); + + PercolateResponse response = client().preparePercolate() + .setIndices("test").setDocumentType("type1") + .setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc("field1", 3)).get(); + assertMatchCount(response, 1l); + assertThat(response.getMatches(), arrayWithSize(1)); + assertThat(convertFromTextArray(response.getMatches(), "test"), arrayContaining("1")); + } + @Test public void testPercolateQueriesWithRouting() throws Exception { client().admin().indices().prepareCreate("test")