Skip to content

Commit

Permalink
Use a 1024 byte minimum weight for filter cache entries
Browse files Browse the repository at this point in the history
This changes the weighing function for the filter cache to use a
configurable minimum weight for each filter cached. This value defaults
to 1kb and can be configured with the
`indices.cache.filter.minimum_entry_weight` setting.

This also fixes an issue with the filter cache where the concurrency
level of the cache was exposed as a setting, but not used in cache
construction.

Relates to elastic#8268
  • Loading branch information
dakrone committed Oct 31, 2014
1 parent 4ac7b02 commit 42b6e01
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,19 @@ public int hashCode() {
}


/** A weigher for the Guava filter cache that uses a minimum entry size */
public static class FilterCacheValueWeigher implements Weigher<WeightedFilterCache.FilterCacheKey, DocIdSet> {

private final int minimumEntrySize;

public FilterCacheValueWeigher(int minimumEntrySize) {
this.minimumEntrySize = minimumEntrySize;
}

@Override
public int weigh(FilterCacheKey key, DocIdSet value) {
int weight = (int) Math.min(DocIdSets.sizeInBytes(value), Integer.MAX_VALUE);
return weight == 0 ? 1 : weight;
return Math.max(weight, this.minimumEntrySize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,29 +55,33 @@ public class IndicesFilterCache extends AbstractComponent implements RemovalList
private volatile int concurrencyLevel;

private final TimeValue cleanInterval;
private final int minimumEntryWeight;

private final Set<Object> readersKeysToClean = ConcurrentCollections.newConcurrentSet();

private volatile boolean closed;


public static final String INDICES_CACHE_FILTER_SIZE = "indices.cache.filter.size";
public static final String INDICES_CACHE_FILTER_EXPIRE = "indices.cache.filter.expire";
public static final String INDICES_CACHE_FILTER_CONCURRENCY_LEVEL = "indices.cache.filter.concurrency_level";
public static final String INDICES_CACHE_FILTER_CLEAN_INTERVAL = "indices.cache.filter.clean_interval";
public static final String INDICES_CACHE_FILTER_MINIMUM_ENTRY_WEIGHT = "indices.cache.filter.minimum_entry_weight";

class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
boolean replace = false;
String size = settings.get(INDICES_CACHE_FILTER_SIZE, IndicesFilterCache.this.size);
if (!size.equals(IndicesFilterCache.this.size)) {
logger.info("updating [indices.cache.filter.size] from [{}] to [{}]", IndicesFilterCache.this.size, size);
logger.info("updating [{}] from [{}] to [{}]",
INDICES_CACHE_FILTER_SIZE, IndicesFilterCache.this.size, size);
IndicesFilterCache.this.size = size;
replace = true;
}
TimeValue expire = settings.getAsTime(INDICES_CACHE_FILTER_EXPIRE, IndicesFilterCache.this.expire);
if (!Objects.equal(expire, IndicesFilterCache.this.expire)) {
logger.info("updating [indices.cache.filter.expire] from [{}] to [{}]", IndicesFilterCache.this.expire, expire);
logger.info("updating [{}] from [{}] to [{}]",
INDICES_CACHE_FILTER_EXPIRE, IndicesFilterCache.this.expire, expire);
IndicesFilterCache.this.expire = expire;
replace = true;
}
Expand All @@ -86,7 +90,8 @@ public void onRefreshSettings(Settings settings) {
throw new ElasticsearchIllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel);
}
if (!Objects.equal(concurrencyLevel, IndicesFilterCache.this.concurrencyLevel)) {
logger.info("updating [indices.cache.filter.concurrency_level] from [{}] to [{}]", IndicesFilterCache.this.concurrencyLevel, concurrencyLevel);
logger.info("updating [{}] from [{}] to [{}]",
INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, IndicesFilterCache.this.concurrencyLevel, concurrencyLevel);
IndicesFilterCache.this.concurrencyLevel = concurrencyLevel;
replace = true;
}
Expand All @@ -103,9 +108,14 @@ public void onRefreshSettings(Settings settings) {
public IndicesFilterCache(Settings settings, ThreadPool threadPool, NodeSettingsService nodeSettingsService) {
super(settings);
this.threadPool = threadPool;
this.size = componentSettings.get("size", "10%");
this.expire = componentSettings.getAsTime("expire", null);
this.cleanInterval = componentSettings.getAsTime("clean_interval", TimeValue.timeValueSeconds(60));
this.size = settings.get(INDICES_CACHE_FILTER_SIZE, "10%");
this.expire = settings.getAsTime(INDICES_CACHE_FILTER_EXPIRE, null);
this.minimumEntryWeight = settings.getAsInt(INDICES_CACHE_FILTER_MINIMUM_ENTRY_WEIGHT, 1024); // 1k per entry minimum
if (minimumEntryWeight <= 0) {
throw new ElasticsearchIllegalArgumentException("minimum_entry_weight must be > 0 but was: " + minimumEntryWeight);
}
this.cleanInterval = settings.getAsTime(INDICES_CACHE_FILTER_CLEAN_INTERVAL, TimeValue.timeValueSeconds(60));
// defaults to 4, but this is a busy map for all indices, increase it a bit
this.concurrencyLevel = settings.getAsInt(INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, 16);
if (concurrencyLevel <= 0) {
throw new ElasticsearchIllegalArgumentException("concurrency_level must be > 0 but was: " + concurrencyLevel);
Expand All @@ -122,10 +132,9 @@ public IndicesFilterCache(Settings settings, ThreadPool threadPool, NodeSettings
private void buildCache() {
CacheBuilder<WeightedFilterCache.FilterCacheKey, DocIdSet> cacheBuilder = CacheBuilder.newBuilder()
.removalListener(this)
.maximumWeight(sizeInBytes).weigher(new WeightedFilterCache.FilterCacheValueWeigher());
.maximumWeight(sizeInBytes).weigher(new WeightedFilterCache.FilterCacheValueWeigher(minimumEntryWeight));

// defaults to 4, but this is a busy map for all indices, increase it a bit
cacheBuilder.concurrencyLevel(16);
cacheBuilder.concurrencyLevel(this.concurrencyLevel);

if (expire != null) {
cacheBuilder.expireAfterAccess(expire.millis(), TimeUnit.MILLISECONDS);
Expand All @@ -145,6 +154,7 @@ public void addReaderKeyToClean(Object readerKey) {
public void close() {
closed = true;
cache.invalidateAll();
cache.cleanUp();
}

public Cache<WeightedFilterCache.FilterCacheKey, DocIdSet> cache() {
Expand Down

0 comments on commit 42b6e01

Please sign in to comment.