diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java b/server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java index 9bca59e9e4d62..6c4694b4f8235 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesQueryCache.java @@ -89,16 +89,19 @@ private static QueryCacheStats toQueryCacheStatsSafe(@Nullable Stats stats) { return stats == null ? new QueryCacheStats() : stats.toQueryCacheStats(); } - private long getShareOfAdditionalRamBytesUsed(long cacheSize) { + private long getShareOfAdditionalRamBytesUsed(long itemsInCacheForShard) { if (sharedRamBytesUsed == 0L) { return 0L; } - // We also have some shared ram usage that we try to distribute proportionally to the cache footprint of each shard. + /* + * We have some shared ram usage that we try to distribute proportionally to the number of segment-requests in the cache for each + * shard. + */ // TODO avoid looping over all local shards here - see https://github.com/elastic/elasticsearch/issues/97222 - long totalSize = 0L; + long totalItemsInCache = 0L; int shardCount = 0; - if (cacheSize == 0L) { + if (itemsInCacheForShard == 0L) { for (final var stats : shardStats.values()) { shardCount += 1; if (stats.cacheSize > 0L) { @@ -110,7 +113,7 @@ private long getShareOfAdditionalRamBytesUsed(long cacheSize) { // branchless loop for the common case for (final var stats : shardStats.values()) { shardCount += 1; - totalSize += stats.cacheSize; + totalItemsInCache += stats.cacheSize; } } @@ -121,12 +124,20 @@ private long getShareOfAdditionalRamBytesUsed(long cacheSize) { } final long additionalRamBytesUsed; - if (totalSize == 0) { + if (totalItemsInCache == 0) { // all shards have zero cache footprint, so we apportion the size of the shared bytes equally across all shards additionalRamBytesUsed = Math.round((double) sharedRamBytesUsed / shardCount); } else { - // some shards have nonzero cache footprint, so we apportion the size of the shared bytes proportionally to cache footprint - additionalRamBytesUsed = Math.round((double) sharedRamBytesUsed * cacheSize / totalSize); + /* + * Some shards have nonzero cache footprint, so we apportion the size of the shared bytes proportionally to the number of + * segment-requests in the cache for this shard (the number and size of documents associated with those requests is irrelevant + * for this calculation). + * Note that this was a somewhat arbitrary decision. Calculating it by number of documents might have been better. Calculating + * it by number of documents weighted by size would also be good, but possibly more expensive. But the decision to attribute + * memory proportionally to the number of segment-requests was made a long time ago, and we're sticking with that here for the + * sake of consistency and backwards compatibility. + */ + additionalRamBytesUsed = Math.round((double) sharedRamBytesUsed * itemsInCacheForShard / totalItemsInCache); } assert additionalRamBytesUsed >= 0L : additionalRamBytesUsed; return additionalRamBytesUsed; diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesQueryCacheTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesQueryCacheTests.java index 4f73672471942..5629fe81511c6 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesQueryCacheTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesQueryCacheTests.java @@ -26,6 +26,7 @@ import org.apache.lucene.search.ScorerSupplier; import org.apache.lucene.search.Weight; import org.apache.lucene.store.Directory; +import org.apache.lucene.util.Accountable; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.IOUtils; @@ -34,26 +35,44 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; +import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; public class IndicesQueryCacheTests extends ESTestCase { - private static class DummyQuery extends Query { + private static class DummyQuery extends Query implements Accountable { - private final int id; + private final String id; + private final long sizeInCache; DummyQuery(int id) { + this(Integer.toString(id), 10); + } + + DummyQuery(String id) { + this(id, 10); + } + + DummyQuery(String id, long sizeInCache) { this.id = id; + this.sizeInCache = sizeInCache; } @Override public boolean equals(Object obj) { - return sameClassAs(obj) && id == ((DummyQuery) obj).id; + return sameClassAs(obj) && id.equals(((DummyQuery) obj).id); } @Override public int hashCode() { - return 31 * classHash() + id; + return 31 * classHash() + id.hashCode(); } @Override @@ -82,6 +101,10 @@ public boolean isCacheable(LeafReaderContext ctx) { }; } + @Override + public long ramBytesUsed() { + return sizeInCache; + } } public void testBasics() throws IOException { @@ -404,4 +427,155 @@ public void testDelegatesScorerSupplier() throws Exception { cache.onClose(shard); cache.close(); } + + public void testGetStatsMemory() throws Exception { + /* + * This test creates 2 shards, one with two segments and one with one. It makes unique queries against all 3 segments (so that each + * query will be cached, up to the max cache size), and then asserts various things about the cache memory. Most importantly, it + * asserts that the memory the cache attributes to each shard is proportional to the number of segment-queries for the shard in the + * cache (and not to the number of documents in the query). + */ + String indexName = randomIdentifier(); + String uuid = randomUUID(); + ShardId shard1 = new ShardId(indexName, uuid, 0); + ShardId shard2 = new ShardId(indexName, uuid, 1); + List closeableList = new ArrayList<>(); + // We're going to create 2 segments for shard1, and 1 segment for shard2: + int shard1Segment1Docs = randomIntBetween(11, 1000); + int shard1Segment2Docs = randomIntBetween(1, 10); + int shard2Segment1Docs = randomIntBetween(1, 10); + IndexSearcher shard1Segment1Searcher = initializeSegment(shard1, shard1Segment1Docs, closeableList); + IndexSearcher shard1Segment2Searcher = initializeSegment(shard1, shard1Segment2Docs, closeableList); + IndexSearcher shard2Searcher = initializeSegment(shard2, shard2Segment1Docs, closeableList); + + final int maxCacheSize = 200; + Settings settings = Settings.builder() + .put(IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING.getKey(), maxCacheSize) + .put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), true) + .build(); + IndicesQueryCache cache = new IndicesQueryCache(settings); + shard1Segment1Searcher.setQueryCache(cache); + shard1Segment2Searcher.setQueryCache(cache); + shard2Searcher.setQueryCache(cache); + + assertEquals(0L, cache.getStats(shard1).getMemorySizeInBytes()); + + final long largeQuerySize = randomIntBetween(100, 1000); + final long smallQuerySize = randomIntBetween(10, 50); + + final int shard1Queries = randomIntBetween(20, 50); + final int shard2Queries = randomIntBetween(5, 10); + + for (int i = 0; i < shard1Queries; ++i) { + shard1Segment1Searcher.count(new DummyQuery("ingest1-" + i, largeQuerySize)); + } + long shard1Segment1CacheMemory = calculateActualCacheMemoryForSegment(shard1Queries, largeQuerySize, shard1Segment1Docs); + assertThat(cache.getStats(shard1).getMemorySizeInBytes(), equalTo(shard1Segment1CacheMemory)); + assertThat(cache.getStats(shard2).getMemorySizeInBytes(), equalTo(0L)); + for (int i = 0; i < shard2Queries; ++i) { + shard2Searcher.count(new DummyQuery("ingest2-" + i, smallQuerySize)); + } + /* + * Now that we have cached some smaller things for shard2, the cache memory for shard1 has gone down. This is expected because we + * report cache memory proportional to the number of segments for each shard, ignoring the number of documents or the actual + * document sizes. Since the shard2 requests were smaller, the average cache memory size per segment has now gone down. + */ + assertThat(cache.getStats(shard1).getMemorySizeInBytes(), lessThan(shard1Segment1CacheMemory)); + long shard1CacheBytes = cache.getStats(shard1).getMemorySizeInBytes(); + long shard2CacheBytes = cache.getStats(shard2).getMemorySizeInBytes(); + long shard2Segment1CacheMemory = calculateActualCacheMemoryForSegment(shard2Queries, smallQuerySize, shard2Segment1Docs); + + long totalMemory = shard1Segment1CacheMemory + shard2Segment1CacheMemory; + // Each shard has some fixed overhead that we need to account for: + long shard1Overhead = calculateOverheadForSegment(shard1Queries, shard1Segment1Docs); + long shard2Overhead = calculateOverheadForSegment(shard2Queries, shard2Segment1Docs); + long totalMemoryMinusOverhead = totalMemory - (shard1Overhead + shard2Overhead); + /* + * Note that the expected amount of memory we're calculating is based on the proportion of the number of queries to each shard + * (since each shard currently only has queries to one segment) + */ + double shard1Segment1CacheMemoryShare = ((double) shard1Queries / (shard1Queries + shard2Queries)) * (totalMemoryMinusOverhead) + + shard1Overhead; + double shard2Segment1CacheMemoryShare = ((double) shard2Queries / (shard1Queries + shard2Queries)) * (totalMemoryMinusOverhead) + + shard2Overhead; + assertThat((double) shard1CacheBytes, closeTo(shard1Segment1CacheMemoryShare, 1)); // accounting for rounding + assertThat((double) shard2CacheBytes, closeTo(shard2Segment1CacheMemoryShare, 1)); // accounting for rounding + + // Now we cache just more "big" searches on shard1, but on a different segment: + for (int i = 0; i < shard1Queries; ++i) { + shard1Segment2Searcher.count(new DummyQuery("ingest3-" + i, largeQuerySize)); + } + long shard1Segment2CacheMemory = calculateActualCacheMemoryForSegment(shard1Queries, largeQuerySize, shard1Segment2Docs); + totalMemory = shard1Segment1CacheMemory + shard2Segment1CacheMemory + shard1Segment2CacheMemory; + // Each shard has some fixed overhead that we need to account for: + shard1Overhead = shard1Overhead + calculateOverheadForSegment(shard1Queries, shard1Segment2Docs); + totalMemoryMinusOverhead = totalMemory - (shard1Overhead + shard2Overhead); + /* + * Note that the expected amount of memory we're calculating is based on the proportion of the number of queries to each segment. + * The number of documents and the size of documents is irrelevant (aside from computing the fixed overhead). + */ + shard1Segment1CacheMemoryShare = ((double) (2 * shard1Queries) / ((2 * shard1Queries) + shard2Queries)) * (totalMemoryMinusOverhead) + + shard1Overhead; + shard1CacheBytes = cache.getStats(shard1).getMemorySizeInBytes(); + assertThat((double) shard1CacheBytes, closeTo(shard1Segment1CacheMemoryShare, 1)); // accounting for rounding + + // Now make sure the cache only has items for shard2: + for (int i = 0; i < (maxCacheSize * 2); ++i) { + shard2Searcher.count(new DummyQuery("ingest4-" + i, smallQuerySize)); + } + assertThat(cache.getStats(shard1).getMemorySizeInBytes(), equalTo(0L)); + assertThat( + cache.getStats(shard2).getMemorySizeInBytes(), + equalTo(calculateActualCacheMemoryForSegment(maxCacheSize, smallQuerySize, shard2Segment1Docs)) + ); + + IOUtils.close(closeableList); + cache.onClose(shard1); + cache.onClose(shard2); + cache.close(); + } + + /* + * This calculates the memory that actually used by a segment in the IndicesQueryCache. It assumes queryCount queries are made to the + * segment, and query is querySize bytes in size. It assumes that the shard contains numDocs documents. + */ + private long calculateActualCacheMemoryForSegment(long queryCount, long querySize, long numDocs) { + return (queryCount * (querySize + 24)) + calculateOverheadForSegment(queryCount, numDocs); + } + + /* + * This computes the part of the recorded IndicesQueryCache memory that is assigned to a segment and *not* divided up proportionally + * when the cache reports the memory usage of each shard. + */ + private long calculateOverheadForSegment(long queryCount, long numDocs) { + return queryCount * (112 + (8 * ((numDocs - 1) / 64))); + } + + /* + * This returns an IndexSearcher for a single new segment in the given shard. + */ + private IndexSearcher initializeSegment(ShardId shard, int numDocs, List closeableList) throws Exception { + AtomicReference indexSearcherReference = new AtomicReference<>(); + /* + * Usually creating an IndexWriter like this results in a single segment getting created, but sometimes it results in more. For the + * sake of keeping the calculations in this test simple we want just a single segment. So we do this in an assertBusy. + */ + assertBusy(() -> { + Directory dir = newDirectory(); + IndexWriter indexWriter = new IndexWriter(dir, newIndexWriterConfig()); + for (int i = 0; i < numDocs; i++) { + indexWriter.addDocument(new Document()); + } + DirectoryReader directoryReader = DirectoryReader.open(indexWriter); + indexWriter.close(); + directoryReader = ElasticsearchDirectoryReader.wrap(directoryReader, shard); + IndexSearcher indexSearcher = new IndexSearcher(directoryReader); + indexSearcherReference.set(indexSearcher); + indexSearcher.setQueryCachingPolicy(TrivialQueryCachingPolicy.ALWAYS); + closeableList.add(directoryReader); + closeableList.add(dir); + assertThat(indexSearcher.getLeafContexts().size(), equalTo(1)); + }); + return indexSearcherReference.get(); + } }