Skip to content

Commit

Permalink
[Test] Reduce amount of work and concurrency for test stability (#91489
Browse files Browse the repository at this point in the history
…) (#91534)

The test fails from time to time because not every thread finishes the
work within 1 second of waiting time. It is possible that the work is
just sometimes taking too long to complete. The getBitSet method
internally acquires a read lock, have concurrent read/write access to
cache, concurrentHashMap, atomicLong and a log call that may go to the
disk. It is possible some of these can be stalled and the delay
accumulates to exceed more than 1 second.

This PR reduces the amount of work by decrease the number of loops and
reduces the concurrency by having less maximum number of concurrent
threads. It also catches any exception that gets thrown in the worker
threads and report it when the test fails (otherwise, the exception is
swallow and the final failure shows only timeout).

The test took quite long (1m20s on locally) to complete even with when
concurrentThreads and numberOfIndices are randomized to be the minimum
(5 and 3 respectively). With the reduction, it now takes 28 seconds
which is also a gain. Because how long the test takes, it is also viable
in the future to bump the wait time should it fails again.

Resolves: #91471
  • Loading branch information
ywangd committed Nov 14, 2022
1 parent 69a6011 commit b8d8227
Showing 1 changed file with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,12 @@ public void testIndexLookupIsClearedWhenBitSetIsEvicted() throws Exception {
});
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/91471")
public void testCacheUnderConcurrentAccess() throws Exception {
// This value is based on the internal implementation details of lucene's FixedBitSet
// If the implementation changes, this can be safely updated to match the new ram usage for a single bitset
final long expectedBytesPerBitSet = 56;

final int concurrentThreads = randomIntBetween(5, 15);
final int concurrentThreads = randomIntBetween(5, 8);
final int numberOfIndices = randomIntBetween(3, 8);

// Force cache evictions by setting the size to be less than the number of distinct queries we search on.
Expand Down Expand Up @@ -396,27 +395,39 @@ public void testCacheUnderConcurrentAccess() throws Exception {
final CountDownLatch start = new CountDownLatch(concurrentThreads);
final CountDownLatch end = new CountDownLatch(concurrentThreads);
final Set<BitSet> uniqueBitSets = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>()));
final AtomicReference<Throwable> exceptionInThread = new AtomicReference<>();
for (int thread = 0; thread < concurrentThreads; thread++) {
threads.submit(() -> {
start.countDown();
start.await(100, TimeUnit.MILLISECONDS);
for (int loop = 0; loop < 15; loop++) {
for (int field = 1; field <= FIELD_COUNT; field++) {
final TermQueryBuilder queryBuilder = QueryBuilders.termQuery("field-" + field, "value-" + field);
final TestIndexContext randomContext = randomFrom(contexts);
final Query query = queryBuilder.toQuery(randomContext.searchExecutionContext);
final BitSet bitSet = cache.getBitSet(query, randomContext.leafReaderContext);
assertThat(bitSet, notNullValue());
assertThat(bitSet.ramBytesUsed(), equalTo(expectedBytesPerBitSet));
uniqueBitSets.add(bitSet);
try {
start.countDown();
if (false == start.await(100, TimeUnit.MILLISECONDS)) {
// We still proceed even when some threads are not ready. All threads being ready increases the chance
// of them running concurrently and competing for caching. But this is not guaranteed either way.
logger.info("[{}] out of [{}] worker threads are ready", start.getCount(), concurrentThreads);
}
for (int loop = 0; loop < 5; loop++) {
for (int field = 1; field <= FIELD_COUNT; field++) {
final TermQueryBuilder queryBuilder = QueryBuilders.termQuery("field-" + field, "value-" + field);
final TestIndexContext randomContext = randomFrom(contexts);
final Query query = queryBuilder.toQuery(randomContext.searchExecutionContext);
final BitSet bitSet = cache.getBitSet(query, randomContext.leafReaderContext);
assertThat(bitSet, notNullValue());
assertThat(bitSet.ramBytesUsed(), equalTo(expectedBytesPerBitSet));
uniqueBitSets.add(bitSet);
}
}
end.countDown();
} catch (Throwable e) {
logger.warn("caught exception in worker thread", e);
exceptionInThread.compareAndSet(null, e);
}
end.countDown();
return null;
});
}

assertTrue("Query threads did not complete in expected time", end.await(1, TimeUnit.SECONDS));
if (false == end.await(1, TimeUnit.SECONDS)) {
fail("Query threads did not complete in expected time. Possible exception [" + exceptionInThread.get() + "]");
}

threads.shutdown();
assertTrue("Cleanup thread did not complete in expected time", threads.awaitTermination(3, TimeUnit.SECONDS));
Expand Down

0 comments on commit b8d8227

Please sign in to comment.