Skip to content

Commit

Permalink
Fix memory leak in DLS bitset cache (#50946)
Browse files Browse the repository at this point in the history
The Document Level Security BitSet cache stores a secondary "lookup
map" so that it can determine which cache entries to invalidate when
a Lucene index is closed (merged, etc).

There was a memory leak because this secondary map was not cleared
when entries were naturally evicted from the cache (due to size/ttl
limits).

This has been solved by adding a cache removal listener and processing
those removal events asyncronously.

Backport of: #50635
  • Loading branch information
tvernum committed Jan 14, 2020
1 parent 27c2eb7 commit 90ba779
Show file tree
Hide file tree
Showing 6 changed files with 386 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,28 @@
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.cache.RemovalNotification;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* This is a cache for {@link BitSet} instances that are used with the {@link DocumentSubsetReader}.
Expand All @@ -66,17 +72,48 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen
private static final BitSet NULL_MARKER = new FixedBitSet(0);

private final Logger logger;

/**
* When a {@link BitSet} is evicted from {@link #bitsetCache}, we need to also remove it from {@link #keysByIndex}.
* We use a {@link ReentrantReadWriteLock} to control atomicity here - the "read" side represents potential insertions to the
* {@link #bitsetCache}, the "write" side represents removals from {@link #keysByIndex}.
* The risk (that {@link Cache} does not provide protection for) is that an entry is removed from the cache, and then immediately
* re-populated, before we process the removal event. To protect against that we need to check the state of the {@link #bitsetCache}
* but we need exclusive ("write") access while performing that check and updating the values in {@link #keysByIndex}.
*/
private final ReleasableLock cacheEvictionLock;
private final ReleasableLock cacheModificationLock;
private final ExecutorService cleanupExecutor;

private final Cache<BitsetCacheKey, BitSet> bitsetCache;
private final Map<IndexReader.CacheKey, Set<BitsetCacheKey>> keysByIndex;

public DocumentSubsetBitsetCache(Settings settings) {
public DocumentSubsetBitsetCache(Settings settings, ThreadPool threadPool) {
this(settings, threadPool.executor(ThreadPool.Names.GENERIC));
}

/**
* @param settings The global settings object for this node
* @param cleanupExecutor An executor on which the cache cleanup tasks can be run. Due to the way the cache is structured internally,
* it is sometimes necessary to run an asynchronous task to synchronize the internal state.
*/
protected DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupExecutor) {
this.logger = LogManager.getLogger(getClass());

final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.cacheEvictionLock = new ReleasableLock(readWriteLock.writeLock());
this.cacheModificationLock = new ReleasableLock(readWriteLock.readLock());
this.cleanupExecutor = cleanupExecutor;

final TimeValue ttl = CACHE_TTL_SETTING.get(settings);
final ByteSizeValue size = CACHE_SIZE_SETTING.get(settings);
this.bitsetCache = CacheBuilder.<BitsetCacheKey, BitSet>builder()
.setExpireAfterAccess(ttl)
.setMaximumWeight(size.getBytes())
.weigher((key, bitSet) -> bitSet == NULL_MARKER ? 0 : bitSet.ramBytesUsed()).build();
.weigher((key, bitSet) -> bitSet == NULL_MARKER ? 0 : bitSet.ramBytesUsed())
.removalListener(this::onCacheEviction)
.build();

this.keysByIndex = new ConcurrentHashMap<>();
}

Expand All @@ -90,6 +127,31 @@ public void onClose(IndexReader.CacheKey ownerCoreCacheKey) {
}
}

/**
* Cleanup (synchronize) the internal state when an object is removed from the primary cache
*/
private void onCacheEviction(RemovalNotification<BitsetCacheKey, BitSet> notification) {
final BitsetCacheKey bitsetKey = notification.getKey();
final IndexReader.CacheKey indexKey = bitsetKey.index;
if (keysByIndex.getOrDefault(indexKey, Collections.emptySet()).contains(bitsetKey) == false) {
// If the bitsetKey isn't in the lookup map, then there's nothing to synchronize
return;
}
// We push this to a background thread, so that it reduces the risk of blocking searches, but also so that the lock management is
// simpler - this callback is likely to take place on a thread that is actively adding something to the cache, and is therefore
// holding the read ("update") side of the lock. It is not possible to upgrade a read lock to a write ("eviction") lock, but we
// need to acquire that lock here.
cleanupExecutor.submit(() -> {
try (ReleasableLock ignored = cacheEvictionLock.acquire()) {
// it's possible for the key to be back in the cache if it was immediately repopulated after it was evicted, so check
if (bitsetCache.get(bitsetKey) == null) {
// key is no longer in the cache, make sure it is no longer in the lookup map either.
keysByIndex.getOrDefault(indexKey, Collections.emptySet()).remove(bitsetKey);
}
}
});
}

@Override
public void close() {
clear("close");
Expand All @@ -98,7 +160,8 @@ public void close() {
public void clear(String reason) {
logger.debug("clearing all DLS bitsets because [{}]", reason);
// Due to the order here, it is possible than a new entry could be added _after_ the keysByIndex map is cleared
// but _before_ the cache is cleared. This would mean it sits orphaned in keysByIndex, but this is not a issue.
// but _before_ the cache is cleared. This should get fixed up in the "onCacheEviction" callback, but if anything slips through
// and sits orphaned in keysByIndex, it will not be a significant issue.
// When the index is closed, the key will be removed from the map, and there will not be a corresponding item
// in the cache, which will make the cache-invalidate a no-op.
// Since the entry is not in the cache, if #getBitSet is called, it will be loaded, and the new key will be added
Expand Down Expand Up @@ -132,31 +195,33 @@ public BitSet getBitSet(final Query query, final LeafReaderContext context) thro
final IndexReader.CacheKey indexKey = coreCacheHelper.getKey();
final BitsetCacheKey cacheKey = new BitsetCacheKey(indexKey, query);

final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> {
// This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
keysByIndex.compute(indexKey, (ignore2, set) -> {
if (set == null) {
set = Sets.newConcurrentHashSet();
try (ReleasableLock ignored = cacheModificationLock.acquire()) {
final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> {
// This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees.
keysByIndex.compute(indexKey, (ignore2, set) -> {
if (set == null) {
set = Sets.newConcurrentHashSet();
}
set.add(cacheKey);
return set;
});
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
searcher.setQueryCache(null);
final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1f);
Scorer s = weight.scorer(context);
if (s == null) {
// A cache loader is not allowed to return null, return a marker object instead.
return NULL_MARKER;
} else {
return BitSet.of(s.iterator(), context.reader().maxDoc());
}
set.add(cacheKey);
return set;
});
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
searcher.setQueryCache(null);
final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1f);
Scorer s = weight.scorer(context);
if (s == null) {
// A cache loader is not allowed to return null, return a marker object instead.
return NULL_MARKER;
if (bitSet == NULL_MARKER) {
return null;
} else {
return BitSet.of(s.iterator(), context.reader().maxDoc());
return bitSet;
}
});
if (bitSet == NULL_MARKER) {
return null;
} else {
return bitSet;
}
}

Expand Down Expand Up @@ -205,4 +270,27 @@ public String toString() {
return getClass().getSimpleName() + "(" + index + "," + query + ")";
}
}

/**
* This method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent with one
* another. This method is only called by tests.
*/
void verifyInternalConsistency() {
this.bitsetCache.keys().forEach(bck -> {
final Set<BitsetCacheKey> set = this.keysByIndex.get(bck.index);
if (set == null) {
throw new IllegalStateException("Key [" + bck + "] is in the cache, but there is no entry for [" + bck.index +
"] in the lookup map");
}
if (set.contains(bck) == false) {
throw new IllegalStateException("Key [" + bck + "] is in the cache, but the lookup entry for [" + bck.index +
"] does not contain that key");
}
});
this.keysByIndex.values().stream().flatMap(Set::stream).forEach(bck -> {
if (this.bitsetCache.get(bck) == null) {
throw new IllegalStateException("Key [" + bck + "] is in the lookup map, but is not in the cache");
}
});
}
}

0 comments on commit 90ba779

Please sign in to comment.