Skip to content

Commit

Permalink
Added an indices level field data cache listener that always gets inv…
Browse files Browse the repository at this point in the history
…oked and updates indices statistics and services about field data loading and unloading.

Moved the circuit breaker memory reducing logic to the IndicesFieldDataCacheListener, so it always reduces the memory used when field data gets unloaded,
this fixes a issue where the circuit breaker didn't get reduced when segments where no shardId could be resolved get cleared up.

Also made sure that exceptions in the percolator service are bubbled up properly.

Closes #5588
  • Loading branch information
martijnvg committed Apr 2, 2014
1 parent 42b20d6 commit b745153
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 49 deletions.
Expand Up @@ -32,7 +32,10 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

/**
Expand Down Expand Up @@ -70,11 +73,13 @@ static abstract class FieldBased implements IndexFieldDataCache, SegmentReader.C
private final FieldMapper.Names fieldNames;
private final FieldDataType fieldDataType;
private final Cache<Key, AtomicFieldData> cache;
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;

protected FieldBased(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache) {
protected FieldBased(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
this.indexService = indexService;
this.fieldNames = fieldNames;
this.fieldDataType = fieldDataType;
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
cache.removalListener(this);
//noinspection unchecked
this.cache = cache.build();
Expand All @@ -83,15 +88,16 @@ protected FieldBased(@Nullable IndexService indexService, FieldMapper.Names fiel
@Override
public void onRemoval(RemovalNotification<Key, AtomicFieldData> notification) {
Key key = notification.getKey();
if (key == null || key.listener == null) {
return; // we can't do anything here...
}
assert key != null && key.listeners != null;

AtomicFieldData value = notification.getValue();
long sizeInBytes = key.sizeInBytes;
if (sizeInBytes == -1 && value != null) {
sizeInBytes = value.getMemorySizeInBytes();
}
key.listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes, value);
for (Listener listener : key.listeners) {
listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes, value);
}
}

@Override
Expand All @@ -104,21 +110,20 @@ public AtomicFieldData call() throws Exception {
SegmentReaderUtils.registerCoreListener(context.reader(), FieldBased.this);
AtomicFieldData fieldData = indexFieldData.loadDirect(context);
key.sizeInBytes = fieldData.getMemorySizeInBytes();
key.listeners.add(indicesFieldDataCacheListener);

if (indexService != null) {
ShardId shardId = ShardUtils.extractShardId(context.reader());
if (shardId != null) {
IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
key.listener = shard.fieldData();
key.listeners.add(shard.fieldData());
}
}
}

if (key.listener != null) {
key.listener.onLoad(fieldNames, fieldDataType, fieldData);
for (Listener listener : key.listeners) {
listener.onLoad(fieldNames, fieldDataType, fieldData);
}

return fieldData;
}
});
Expand Down Expand Up @@ -146,8 +151,7 @@ public void onClose(Object coreCacheKey) {

static class Key {
final Object readerKey;
@Nullable
Listener listener; // optional stats listener
final List<Listener> listeners = new ArrayList<>(); // optional stats listener
long sizeInBytes = -1; // optional size in bytes (we keep it here in case the values are soft references)

Key(Object readerKey) {
Expand All @@ -171,15 +175,15 @@ public int hashCode() {

static class Resident extends FieldBased {

public Resident(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder());
public Resident(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder(), indicesFieldDataCacheListener);
}
}

static class Soft extends FieldBased {

public Soft(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues());
public Soft(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues(), indicesFieldDataCacheListener);
}
}
}
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;

Expand All @@ -58,6 +59,7 @@ public class IndexFieldDataService extends AbstractIndexComponent {
private final static ImmutableMap<String, IndexFieldData.Builder> docValuesBuildersByType;
private final static ImmutableMap<Tuple<String, String>, IndexFieldData.Builder> buildersByTypeAndFormat;
private final CircuitBreakerService circuitBreakerService;
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;

static {
buildersByType = MapBuilder.<String, IndexFieldData.Builder>newMapBuilder()
Expand Down Expand Up @@ -129,15 +131,16 @@ public class IndexFieldDataService extends AbstractIndexComponent {

// public for testing
public IndexFieldDataService(Index index, CircuitBreakerService circuitBreakerService) {
this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS), circuitBreakerService);
this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCacheListener(circuitBreakerService)), circuitBreakerService, new IndicesFieldDataCacheListener(circuitBreakerService));
}

@Inject
public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache,
CircuitBreakerService circuitBreakerService) {
CircuitBreakerService circuitBreakerService, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(index, indexSettings);
this.indicesFieldDataCache = indicesFieldDataCache;
this.circuitBreakerService = circuitBreakerService;
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
}

// we need to "inject" the index service to not create cyclic dep
Expand Down Expand Up @@ -227,9 +230,9 @@ public <IFD extends IndexFieldData<?>> IFD getForField(FieldMapper<?> mapper) {
// this means changing the node level settings is simple, just set the bounds there
String cacheType = type.getSettings().get("cache", indexSettings.get("index.fielddata.cache", "node"));
if ("resident".equals(cacheType)) {
cache = new IndexFieldDataCache.Resident(indexService, fieldNames, type);
cache = new IndexFieldDataCache.Resident(indexService, fieldNames, type, indicesFieldDataCacheListener);
} else if ("soft".equals(cacheType)) {
cache = new IndexFieldDataCache.Soft(indexService, fieldNames, type);
cache = new IndexFieldDataCache.Soft(indexService, fieldNames, type, indicesFieldDataCacheListener);
} else if ("node".equals(cacheType)) {
cache = indicesFieldDataCache.buildIndexFieldDataCache(indexService, index, fieldNames, type);
} else {
Expand Down
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;

import java.util.Map;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -45,12 +44,9 @@ public class ShardFieldData extends AbstractIndexShardComponent implements Index

final ConcurrentMap<String, CounterMetric> perFieldTotals = ConcurrentCollections.newConcurrentMap();

private final CircuitBreakerService breakerService;

@Inject
public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings, CircuitBreakerService breakerService) {
public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
this.breakerService = breakerService;
}

public FieldDataStats stats(String... fields) {
Expand Down Expand Up @@ -101,10 +97,6 @@ public void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType,
evictionsMetric.inc();
}
if (sizeInBytes != -1) {
// Since field data is being unloaded (due to expiration or manual
// clearing), we also need to decrement the used bytes in the breaker
breakerService.getBreaker().addWithoutBreaking(-sizeInBytes);

totalMetric.dec(sizeInBytes);

String keyFieldName = fieldNames.indexName();
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/elasticsearch/indices/IndicesModule.java
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.query.IndicesQueriesModule;
Expand Down Expand Up @@ -81,5 +82,6 @@ protected void configure() {
bind(UpdateHelper.class).asEagerSingleton();

bind(CircuitBreakerService.class).to(InternalCircuitBreakerService.class).asEagerSingleton();
bind(IndicesFieldDataCacheListener.class).asEagerSingleton();
}
}
Expand Up @@ -40,13 +40,17 @@
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.service.IndexShard;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
*/
public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, AtomicFieldData> {

private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;

Cache<Key, AtomicFieldData> cache;

private volatile String size;
Expand All @@ -55,8 +59,9 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL


@Inject
public IndicesFieldDataCache(Settings settings) {
public IndicesFieldDataCache(Settings settings, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(settings);
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
this.size = componentSettings.get("size", "-1");
this.sizeInBytes = componentSettings.getAsMemory("size", "-1").bytes();
this.expire = componentSettings.getAsTime("expire", null);
Expand Down Expand Up @@ -89,16 +94,17 @@ public IndexFieldDataCache buildIndexFieldDataCache(@Nullable IndexService index
@Override
public void onRemoval(RemovalNotification<Key, AtomicFieldData> notification) {
Key key = notification.getKey();
if (key == null || key.listener == null) {
return; // nothing to do here really...
}
assert key != null && key.listeners != null;

IndexFieldCache indexCache = key.indexCache;
long sizeInBytes = key.sizeInBytes;
AtomicFieldData value = notification.getValue();
if (sizeInBytes == -1 && value != null) {
sizeInBytes = value.getMemorySizeInBytes();
}
key.listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes, value);
for (IndexFieldDataCache.Listener listener : key.listeners) {
listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes, value);
}
}

public static class FieldDataWeigher implements Weigher<Key, AtomicFieldData> {
Expand Down Expand Up @@ -137,21 +143,20 @@ public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(fina
public AtomicFieldData call() throws Exception {
SegmentReaderUtils.registerCoreListener(context.reader(), IndexFieldCache.this);
AtomicFieldData fieldData = indexFieldData.loadDirect(context);
key.listeners.add(indicesFieldDataCacheListener);

if (indexService != null) {
ShardId shardId = ShardUtils.extractShardId(context.reader());
if (shardId != null) {
IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
key.listener = shard.fieldData();
key.listeners.add(shard.fieldData());
}
}
}

if (key.listener != null) {
key.listener.onLoad(fieldNames, fieldDataType, fieldData);
for (Listener listener : key.listeners) {
listener.onLoad(fieldNames, fieldDataType, fieldData);
}

return fieldData;
}
});
Expand Down Expand Up @@ -192,8 +197,7 @@ public static class Key {
public final IndexFieldCache indexCache;
public final Object readerKey;

@Nullable
public IndexFieldDataCache.Listener listener; // optional stats listener
public final List<IndexFieldDataCache.Listener> listeners = new ArrayList<>(); // optional stats listener
long sizeInBytes = -1; // optional size in bytes (we keep it here in case the values are soft references)


Expand Down
@@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices.fielddata.cache;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.fielddata.AtomicFieldData;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;

/**
* A {@link IndexFieldDataCache.Listener} implementation that updates indices (node) level statistics / service about
* field data entries being loaded and unloaded.
*
* Currently it only decrements the memory used in the {@link CircuitBreakerService}.
*/
public class IndicesFieldDataCacheListener implements IndexFieldDataCache.Listener {

private final CircuitBreakerService circuitBreakerService;

@Inject
public IndicesFieldDataCacheListener(CircuitBreakerService circuitBreakerService) {
this.circuitBreakerService = circuitBreakerService;
}

@Override
public void onLoad(FieldMapper.Names fieldNames, FieldDataType fieldDataType, AtomicFieldData fieldData) {
}

@Override
public void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes, @Nullable AtomicFieldData fieldData) {
assert sizeInBytes > 0 : "When reducing circuit breaker, it should be adjusted with a positive number and not [" + sizeInBytes + "]";
circuitBreakerService.getBreaker().addWithoutBreaking(-sizeInBytes);
}

}
Expand Up @@ -70,10 +70,7 @@
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.percolator.QueryCollector.Count;
import org.elasticsearch.percolator.QueryCollector.Match;
import org.elasticsearch.percolator.QueryCollector.MatchAndScore;
import org.elasticsearch.percolator.QueryCollector.MatchAndSort;
import org.elasticsearch.percolator.QueryCollector.*;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchShardTarget;
Expand Down Expand Up @@ -441,8 +438,9 @@ public PercolateShardResponse doPercolate(PercolateShardRequest request, Percola
collector.reset();
try {
context.docSearcher().search(entry.getValue(), collector);
} catch (IOException e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
} catch (Throwable e) {
logger.debug("[" + entry.getKey() + "] failed to execute query", e);
throw new PercolateException(context.indexShard().shardId(), "failed to execute", e);
}

if (collector.exists()) {
Expand Down Expand Up @@ -539,7 +537,8 @@ public PercolateShardResponse doPercolate(PercolateShardRequest request, Percola
try {
context.docSearcher().search(entry.getValue(), collector);
} catch (Throwable e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
logger.debug("[" + entry.getKey() + "] failed to execute query", e);
throw new PercolateException(context.indexShard().shardId(), "failed to execute", e);
}

if (collector.exists()) {
Expand Down

0 comments on commit b745153

Please sign in to comment.