Skip to content

Commit

Permalink
Avoid potential unsupported operation exception in doc bitset cache (#…
Browse files Browse the repository at this point in the history
…91490) (#91535)

This PR replaces Set.of() with explicit null handling so that remove
does not get called against an immutableSet.
  • Loading branch information
ywangd committed Nov 14, 2022
1 parent b8d8227 commit 523b3d9
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 35 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/91490.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 91490
summary: Avoid potential unsupported operation exception in doc bitset cache
area: Authorization
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -179,7 +180,7 @@ private void onCacheEviction(RemovalNotification<BitsetCacheKey, BitSet> notific
// it's possible for the key to be back in the cache if it was immediately repopulated after it was evicted, so check
if (bitsetCache.get(bitsetKey) == null) {
// key is no longer in the cache, make sure it is no longer in the lookup map either.
keysByIndex.getOrDefault(indexKey, Set.of()).remove(bitsetKey);
Optional.ofNullable(keysByIndex.get(indexKey)).ifPresent(set -> set.remove(bitsetKey));
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
Expand Down Expand Up @@ -83,6 +84,9 @@
public class DocumentSubsetBitsetCacheTests extends ESTestCase {

private static final int FIELD_COUNT = 10;
// This value is based on the internal implementation details of lucene's FixedBitSet
// If the implementation changes, this can be safely updated to match the new ram usage for a single bitset
private static final long EXPECTED_BYTES_PER_BIT_SET = 56;
private ExecutorService singleThreadExecutor;

@Before
Expand Down Expand Up @@ -137,12 +141,8 @@ public void testNullEntriesAreNotCountedInMemoryUsage() throws Exception {
}

public void testCacheRespectsMemoryLimit() throws Exception {
// This value is based on the internal implementation details of lucene's FixedBitSet
// If the implementation changes, this can be safely updated to match the new ram usage for a single bitset
final long expectedBytesPerBitSet = 56;

// Enough to hold exactly 2 bit-sets in the cache
final long maxCacheBytes = expectedBytesPerBitSet * 2;
final long maxCacheBytes = EXPECTED_BYTES_PER_BIT_SET * 2;
final Settings settings = Settings.builder()
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
.build();
Expand All @@ -158,29 +158,29 @@ public void testCacheRespectsMemoryLimit() throws Exception {
final Query query = queryBuilder.toQuery(searchExecutionContext);
final BitSet bitSet = cache.getBitSet(query, leafContext);
assertThat(bitSet, notNullValue());
assertThat(bitSet.ramBytesUsed(), equalTo(expectedBytesPerBitSet));
assertThat(bitSet.ramBytesUsed(), equalTo(EXPECTED_BYTES_PER_BIT_SET));

// The first time through we have 1 entry, after that we have 2
final int expectedCount = i == 1 ? 1 : 2;
assertThat(cache.entryCount(), equalTo(expectedCount));
assertThat(cache.ramBytesUsed(), equalTo(expectedCount * expectedBytesPerBitSet));
assertThat(cache.ramBytesUsed(), equalTo(expectedCount * EXPECTED_BYTES_PER_BIT_SET));

// Older queries should get evicted, but the query from last iteration should still be cached
if (previousQuery != null) {
assertThat(cache.getBitSet(previousQuery, leafContext), sameInstance(previousBitSet));
assertThat(cache.entryCount(), equalTo(expectedCount));
assertThat(cache.ramBytesUsed(), equalTo(expectedCount * expectedBytesPerBitSet));
assertThat(cache.ramBytesUsed(), equalTo(expectedCount * EXPECTED_BYTES_PER_BIT_SET));
}
previousQuery = query;
previousBitSet = bitSet;

assertThat(cache.getBitSet(queryBuilder.toQuery(searchExecutionContext), leafContext), sameInstance(bitSet));
assertThat(cache.entryCount(), equalTo(expectedCount));
assertThat(cache.ramBytesUsed(), equalTo(expectedCount * expectedBytesPerBitSet));
assertThat(cache.ramBytesUsed(), equalTo(expectedCount * EXPECTED_BYTES_PER_BIT_SET));
}

assertThat(cache.entryCount(), equalTo(2));
assertThat(cache.ramBytesUsed(), equalTo(2 * expectedBytesPerBitSet));
assertThat(cache.ramBytesUsed(), equalTo(2 * EXPECTED_BYTES_PER_BIT_SET));

cache.clear("testing");

Expand All @@ -190,12 +190,8 @@ public void testCacheRespectsMemoryLimit() throws Exception {
}

public void testLogWarningIfBitSetExceedsCacheSize() throws Exception {
// This value is based on the internal implementation details of lucene's FixedBitSet
// If the implementation changes, this can be safely updated to match the new ram usage for a single bitset
final long expectedBytesPerBitSet = 56;

// Enough to hold less than 1 bit-sets in the cache
final long maxCacheBytes = expectedBytesPerBitSet - expectedBytesPerBitSet / 3;
final long maxCacheBytes = EXPECTED_BYTES_PER_BIT_SET - EXPECTED_BYTES_PER_BIT_SET / 3;
final Settings settings = Settings.builder()
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
.build();
Expand All @@ -214,7 +210,7 @@ public void testLogWarningIfBitSetExceedsCacheSize() throws Exception {
cache.getClass().getName(),
Level.WARN,
"built a DLS BitSet that uses ["
+ expectedBytesPerBitSet
+ EXPECTED_BYTES_PER_BIT_SET
+ "] bytes; the DLS BitSet cache has a maximum size of ["
+ maxCacheBytes
+ "] bytes; this object cannot be cached and will need to be rebuilt for each use;"
Expand All @@ -227,7 +223,7 @@ public void testLogWarningIfBitSetExceedsCacheSize() throws Exception {
final Query query = queryBuilder.toQuery(searchExecutionContext);
final BitSet bitSet = cache.getBitSet(query, leafContext);
assertThat(bitSet, notNullValue());
assertThat(bitSet.ramBytesUsed(), equalTo(expectedBytesPerBitSet));
assertThat(bitSet.ramBytesUsed(), equalTo(EXPECTED_BYTES_PER_BIT_SET));
});

mockAppender.assertAllExpectationsMatched();
Expand All @@ -238,12 +234,8 @@ public void testLogWarningIfBitSetExceedsCacheSize() throws Exception {
}

public void testLogMessageIfCacheFull() throws Exception {
// This value is based on the internal implementation details of lucene's FixedBitSet
// If the implementation changes, this can be safely updated to match the new ram usage for a single bitset
final long expectedBytesPerBitSet = 56;

// Enough to hold slightly more than 1 bit-sets in the cache
final long maxCacheBytes = expectedBytesPerBitSet + expectedBytesPerBitSet / 3;
final long maxCacheBytes = EXPECTED_BYTES_PER_BIT_SET + EXPECTED_BYTES_PER_BIT_SET / 3;
final Settings settings = Settings.builder()
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
.build();
Expand Down Expand Up @@ -272,7 +264,7 @@ public void testLogMessageIfCacheFull() throws Exception {
final Query query = queryBuilder.toQuery(searchExecutionContext);
final BitSet bitSet = cache.getBitSet(query, leafContext);
assertThat(bitSet, notNullValue());
assertThat(bitSet.ramBytesUsed(), equalTo(expectedBytesPerBitSet));
assertThat(bitSet.ramBytesUsed(), equalTo(EXPECTED_BYTES_PER_BIT_SET));
}
});

Expand Down Expand Up @@ -311,12 +303,8 @@ public void testCacheRespectsAccessTimeExpiry() throws Exception {
}

public void testIndexLookupIsClearedWhenBitSetIsEvicted() throws Exception {
// This value is based on the internal implementation details of lucene's FixedBitSet
// If the implementation changes, this can be safely updated to match the new ram usage for a single bitset
final long expectedBytesPerBitSet = 56;

// Enough to hold slightly more than 1 bit-set in the cache
final long maxCacheBytes = expectedBytesPerBitSet + expectedBytesPerBitSet / 2;
final long maxCacheBytes = EXPECTED_BYTES_PER_BIT_SET + EXPECTED_BYTES_PER_BIT_SET / 2;
final Settings settings = Settings.builder()
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
.build();
Expand Down Expand Up @@ -360,16 +348,12 @@ public void testIndexLookupIsClearedWhenBitSetIsEvicted() throws Exception {
}

public void testCacheUnderConcurrentAccess() throws Exception {
// This value is based on the internal implementation details of lucene's FixedBitSet
// If the implementation changes, this can be safely updated to match the new ram usage for a single bitset
final long expectedBytesPerBitSet = 56;

final int concurrentThreads = randomIntBetween(5, 8);
final int numberOfIndices = randomIntBetween(3, 8);

// Force cache evictions by setting the size to be less than the number of distinct queries we search on.
final int maxCacheCount = randomIntBetween(FIELD_COUNT / 2, FIELD_COUNT * 3 / 4);
final long maxCacheBytes = expectedBytesPerBitSet * maxCacheCount;
final long maxCacheBytes = EXPECTED_BYTES_PER_BIT_SET * maxCacheCount;
final Settings settings = Settings.builder()
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
.build();
Expand Down Expand Up @@ -412,7 +396,7 @@ public void testCacheUnderConcurrentAccess() throws Exception {
final Query query = queryBuilder.toQuery(randomContext.searchExecutionContext);
final BitSet bitSet = cache.getBitSet(query, randomContext.leafReaderContext);
assertThat(bitSet, notNullValue());
assertThat(bitSet.ramBytesUsed(), equalTo(expectedBytesPerBitSet));
assertThat(bitSet.ramBytesUsed(), equalTo(EXPECTED_BYTES_PER_BIT_SET));
uniqueBitSets.add(bitSet);
}
}
Expand Down Expand Up @@ -446,6 +430,62 @@ public void testCacheUnderConcurrentAccess() throws Exception {
}
}

public void testCleanupWorksWhenIndexIsClosing() throws Exception {
// Enough to hold slightly more than 1 bit-set in the cache
final long maxCacheBytes = EXPECTED_BYTES_PER_BIT_SET + EXPECTED_BYTES_PER_BIT_SET / 2;
final Settings settings = Settings.builder()
.put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b")
.build();
final ExecutorService threads = Executors.newFixedThreadPool(1);
final ExecutorService cleanupExecutor = Mockito.mock(ExecutorService.class);
final CountDownLatch cleanupReadyLatch = new CountDownLatch(1);
final CountDownLatch cleanupCompleteLatch = new CountDownLatch(1);
final CountDownLatch indexCloseLatch = new CountDownLatch(1);
final AtomicReference<Throwable> cleanupException = new AtomicReference<>();
when(cleanupExecutor.submit(any(Runnable.class))).thenAnswer(inv -> {
final Runnable runnable = (Runnable) inv.getArguments()[0];
return threads.submit(() -> {
try {
cleanupReadyLatch.countDown();
assertTrue("index close did not completed in expected time", indexCloseLatch.await(1, TimeUnit.SECONDS));
runnable.run();
} catch (Throwable e) {
logger.warn("caught error in cleanup thread", e);
cleanupException.compareAndSet(null, e);
} finally {
cleanupCompleteLatch.countDown();
}
return null;
});
});

final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings, cleanupExecutor);
assertThat(cache.entryCount(), equalTo(0));
assertThat(cache.ramBytesUsed(), equalTo(0L));

try {
runTestOnIndex((searchExecutionContext, leafContext) -> {
final Query query1 = QueryBuilders.termQuery("field-1", "value-1").toQuery(searchExecutionContext);
final BitSet bitSet1 = cache.getBitSet(query1, leafContext);
assertThat(bitSet1, notNullValue());

// Second query should trigger a cache eviction
final Query query2 = QueryBuilders.termQuery("field-2", "value-2").toQuery(searchExecutionContext);
final BitSet bitSet2 = cache.getBitSet(query2, leafContext);
assertThat(bitSet2, notNullValue());

final IndexReader.CacheKey indexKey = leafContext.reader().getCoreCacheHelper().getKey();
assertTrue("cleanup did not trigger in expected time", cleanupReadyLatch.await(1, TimeUnit.SECONDS));
cache.onClose(indexKey);
indexCloseLatch.countDown();
assertTrue("cleanup did not complete in expected time", cleanupCompleteLatch.await(1, TimeUnit.SECONDS));
assertThat("caught error in cleanup thread: " + cleanupException.get(), cleanupException.get(), nullValue());
});
} finally {
threads.shutdown();
}
}

public void testCacheIsPerIndex() throws Exception {
final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY);
assertThat(cache.entryCount(), equalTo(0));
Expand Down

0 comments on commit 523b3d9

Please sign in to comment.