Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Percolator doesn't reduce CircuitBreaker stats in every case. #5588

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -32,7 +32,10 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

/**
Expand Down Expand Up @@ -70,11 +73,13 @@ static abstract class FieldBased implements IndexFieldDataCache, SegmentReader.C
private final FieldMapper.Names fieldNames;
private final FieldDataType fieldDataType;
private final Cache<Key, AtomicFieldData> cache;
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;

protected FieldBased(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache) {
protected FieldBased(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
this.indexService = indexService;
this.fieldNames = fieldNames;
this.fieldDataType = fieldDataType;
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
cache.removalListener(this);
//noinspection unchecked
this.cache = cache.build();
Expand All @@ -83,15 +88,16 @@ protected FieldBased(@Nullable IndexService indexService, FieldMapper.Names fiel
@Override
public void onRemoval(RemovalNotification<Key, AtomicFieldData> notification) {
Key key = notification.getKey();
if (key == null || key.listener == null) {
return; // we can't do anything here...
}
assert key != null && key.listeners != null;

AtomicFieldData value = notification.getValue();
long sizeInBytes = key.sizeInBytes;
if (sizeInBytes == -1 && value != null) {
sizeInBytes = value.getMemorySizeInBytes();
}
key.listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes, value);
for (Listener listener : key.listeners) {
listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes, value);
}
}

@Override
Expand All @@ -104,21 +110,20 @@ public AtomicFieldData call() throws Exception {
SegmentReaderUtils.registerCoreListener(context.reader(), FieldBased.this);
AtomicFieldData fieldData = indexFieldData.loadDirect(context);
key.sizeInBytes = fieldData.getMemorySizeInBytes();
key.listeners.add(indicesFieldDataCacheListener);

if (indexService != null) {
ShardId shardId = ShardUtils.extractShardId(context.reader());
if (shardId != null) {
IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
key.listener = shard.fieldData();
key.listeners.add(shard.fieldData());
}
}
}

if (key.listener != null) {
key.listener.onLoad(fieldNames, fieldDataType, fieldData);
for (Listener listener : key.listeners) {
listener.onLoad(fieldNames, fieldDataType, fieldData);
}

return fieldData;
}
});
Expand Down Expand Up @@ -146,8 +151,7 @@ public void onClose(Object coreCacheKey) {

static class Key {
final Object readerKey;
@Nullable
Listener listener; // optional stats listener
final List<Listener> listeners = new ArrayList<>(); // optional stats listener
long sizeInBytes = -1; // optional size in bytes (we keep it here in case the values are soft references)

Key(Object readerKey) {
Expand All @@ -171,15 +175,15 @@ public int hashCode() {

static class Resident extends FieldBased {

public Resident(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder());
public Resident(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder(), indicesFieldDataCacheListener);
}
}

static class Soft extends FieldBased {

public Soft(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues());
public Soft(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues(), indicesFieldDataCacheListener);
}
}
}
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;

Expand All @@ -58,6 +59,7 @@ public class IndexFieldDataService extends AbstractIndexComponent {
private final static ImmutableMap<String, IndexFieldData.Builder> docValuesBuildersByType;
private final static ImmutableMap<Tuple<String, String>, IndexFieldData.Builder> buildersByTypeAndFormat;
private final CircuitBreakerService circuitBreakerService;
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;

static {
buildersByType = MapBuilder.<String, IndexFieldData.Builder>newMapBuilder()
Expand Down Expand Up @@ -129,15 +131,16 @@ public class IndexFieldDataService extends AbstractIndexComponent {

// public for testing
public IndexFieldDataService(Index index, CircuitBreakerService circuitBreakerService) {
this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS), circuitBreakerService);
this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCacheListener(circuitBreakerService)), circuitBreakerService, new IndicesFieldDataCacheListener(circuitBreakerService));
}

@Inject
public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache,
CircuitBreakerService circuitBreakerService) {
CircuitBreakerService circuitBreakerService, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(index, indexSettings);
this.indicesFieldDataCache = indicesFieldDataCache;
this.circuitBreakerService = circuitBreakerService;
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
}

// we need to "inject" the index service to not create cyclic dep
Expand Down Expand Up @@ -227,9 +230,9 @@ public <IFD extends IndexFieldData<?>> IFD getForField(FieldMapper<?> mapper) {
// this means changing the node level settings is simple, just set the bounds there
String cacheType = type.getSettings().get("cache", indexSettings.get("index.fielddata.cache", "node"));
if ("resident".equals(cacheType)) {
cache = new IndexFieldDataCache.Resident(indexService, fieldNames, type);
cache = new IndexFieldDataCache.Resident(indexService, fieldNames, type, indicesFieldDataCacheListener);
} else if ("soft".equals(cacheType)) {
cache = new IndexFieldDataCache.Soft(indexService, fieldNames, type);
cache = new IndexFieldDataCache.Soft(indexService, fieldNames, type, indicesFieldDataCacheListener);
} else if ("node".equals(cacheType)) {
cache = indicesFieldDataCache.buildIndexFieldDataCache(indexService, index, fieldNames, type);
} else {
Expand Down
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;

import java.util.Map;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -45,12 +44,9 @@ public class ShardFieldData extends AbstractIndexShardComponent implements Index

final ConcurrentMap<String, CounterMetric> perFieldTotals = ConcurrentCollections.newConcurrentMap();

private final CircuitBreakerService breakerService;

@Inject
public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings, CircuitBreakerService breakerService) {
public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
this.breakerService = breakerService;
}

public FieldDataStats stats(String... fields) {
Expand Down Expand Up @@ -101,10 +97,6 @@ public void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType,
evictionsMetric.inc();
}
if (sizeInBytes != -1) {
// Since field data is being unloaded (due to expiration or manual
// clearing), we also need to decrement the used bytes in the breaker
breakerService.getBreaker().addWithoutBreaking(-sizeInBytes);

totalMetric.dec(sizeInBytes);

String keyFieldName = fieldNames.indexName();
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/elasticsearch/indices/IndicesModule.java
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.query.IndicesQueriesModule;
Expand Down Expand Up @@ -81,5 +82,6 @@ protected void configure() {
bind(UpdateHelper.class).asEagerSingleton();

bind(CircuitBreakerService.class).to(InternalCircuitBreakerService.class).asEagerSingleton();
bind(IndicesFieldDataCacheListener.class).asEagerSingleton();
}
}
Expand Up @@ -40,13 +40,17 @@
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.service.IndexShard;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

/**
*/
public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, AtomicFieldData> {

private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;

Cache<Key, AtomicFieldData> cache;

private volatile String size;
Expand All @@ -55,8 +59,9 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL


@Inject
public IndicesFieldDataCache(Settings settings) {
public IndicesFieldDataCache(Settings settings, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(settings);
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
this.size = componentSettings.get("size", "-1");
this.sizeInBytes = componentSettings.getAsMemory("size", "-1").bytes();
this.expire = componentSettings.getAsTime("expire", null);
Expand Down Expand Up @@ -89,16 +94,17 @@ public IndexFieldDataCache buildIndexFieldDataCache(@Nullable IndexService index
@Override
public void onRemoval(RemovalNotification<Key, AtomicFieldData> notification) {
Key key = notification.getKey();
if (key == null || key.listener == null) {
return; // nothing to do here really...
}
assert key != null && key.listeners != null;

IndexFieldCache indexCache = key.indexCache;
long sizeInBytes = key.sizeInBytes;
AtomicFieldData value = notification.getValue();
if (sizeInBytes == -1 && value != null) {
sizeInBytes = value.getMemorySizeInBytes();
}
key.listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes, value);
for (IndexFieldDataCache.Listener listener : key.listeners) {
listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes, value);
}
}

public static class FieldDataWeigher implements Weigher<Key, AtomicFieldData> {
Expand Down Expand Up @@ -137,21 +143,20 @@ public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(fina
public AtomicFieldData call() throws Exception {
SegmentReaderUtils.registerCoreListener(context.reader(), IndexFieldCache.this);
AtomicFieldData fieldData = indexFieldData.loadDirect(context);
key.listeners.add(indicesFieldDataCacheListener);

if (indexService != null) {
ShardId shardId = ShardUtils.extractShardId(context.reader());
if (shardId != null) {
IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
key.listener = shard.fieldData();
key.listeners.add(shard.fieldData());
}
}
}

if (key.listener != null) {
key.listener.onLoad(fieldNames, fieldDataType, fieldData);
for (Listener listener : key.listeners) {
listener.onLoad(fieldNames, fieldDataType, fieldData);
}

return fieldData;
}
});
Expand Down Expand Up @@ -192,8 +197,7 @@ public static class Key {
public final IndexFieldCache indexCache;
public final Object readerKey;

@Nullable
public IndexFieldDataCache.Listener listener; // optional stats listener
public final List<IndexFieldDataCache.Listener> listeners = new ArrayList<>(); // optional stats listener
long sizeInBytes = -1; // optional size in bytes (we keep it here in case the values are soft references)


Expand Down
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices.fielddata.cache;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.fielddata.AtomicFieldData;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;

/**
* A {@link IndexFieldDataCache.Listener} implementation that updates indices (node) level statistics / service about
* field data entries being loaded and unloaded.
*
* Currently it only decrements the memory used in the {@link CircuitBreakerService}.
*/
public class IndicesFieldDataCacheListener implements IndexFieldDataCache.Listener {

private final CircuitBreakerService circuitBreakerService;

@Inject
public IndicesFieldDataCacheListener(CircuitBreakerService circuitBreakerService) {
this.circuitBreakerService = circuitBreakerService;
}

@Override
public void onLoad(FieldMapper.Names fieldNames, FieldDataType fieldDataType, AtomicFieldData fieldData) {
}

@Override
public void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes, @Nullable AtomicFieldData fieldData) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this assert that the size is positive?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assert makes sense, let me add it

circuitBreakerService.getBreaker().addWithoutBreaking(-sizeInBytes);
}

}
Expand Up @@ -70,10 +70,7 @@
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.percolator.QueryCollector.Count;
import org.elasticsearch.percolator.QueryCollector.Match;
import org.elasticsearch.percolator.QueryCollector.MatchAndScore;
import org.elasticsearch.percolator.QueryCollector.MatchAndSort;
import org.elasticsearch.percolator.QueryCollector.*;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchShardTarget;
Expand Down Expand Up @@ -441,8 +438,9 @@ public PercolateShardResponse doPercolate(PercolateShardRequest request, Percola
collector.reset();
try {
context.docSearcher().search(entry.getValue(), collector);
} catch (IOException e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
} catch (Throwable e) {
logger.debug("[" + entry.getKey() + "] failed to execute query", e);
throw new PercolateException(context.indexShard().shardId(), "failed to execute", e);
}

if (collector.exists()) {
Expand Down Expand Up @@ -539,7 +537,8 @@ public PercolateShardResponse doPercolate(PercolateShardRequest request, Percola
try {
context.docSearcher().search(entry.getValue(), collector);
} catch (Throwable e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
logger.debug("[" + entry.getKey() + "] failed to execute query", e);
throw new PercolateException(context.indexShard().shardId(), "failed to execute", e);
}

if (collector.exists()) {
Expand Down