Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,19 @@ private static QueryCacheStats toQueryCacheStatsSafe(@Nullable Stats stats) {
return stats == null ? new QueryCacheStats() : stats.toQueryCacheStats();
}

private long getShareOfAdditionalRamBytesUsed(long cacheSize) {
private long getShareOfAdditionalRamBytesUsed(long itemsInCacheForShard) {
if (sharedRamBytesUsed == 0L) {
return 0L;
}

// We also have some shared ram usage that we try to distribute proportionally to the cache footprint of each shard.
/*
* We have some shared ram usage that we try to distribute proportionally to the number of segment-requests in the cache for each
* shard.
*/
// TODO avoid looping over all local shards here - see https://github.com/elastic/elasticsearch/issues/97222
long totalSize = 0L;
long totalItemsInCache = 0L;
int shardCount = 0;
if (cacheSize == 0L) {
if (itemsInCacheForShard == 0L) {
for (final var stats : shardStats.values()) {
shardCount += 1;
if (stats.cacheSize > 0L) {
Expand All @@ -110,7 +113,7 @@ private long getShareOfAdditionalRamBytesUsed(long cacheSize) {
// branchless loop for the common case
for (final var stats : shardStats.values()) {
shardCount += 1;
totalSize += stats.cacheSize;
totalItemsInCache += stats.cacheSize;
}
}

Expand All @@ -121,12 +124,20 @@ private long getShareOfAdditionalRamBytesUsed(long cacheSize) {
}

final long additionalRamBytesUsed;
if (totalSize == 0) {
if (totalItemsInCache == 0) {
// all shards have zero cache footprint, so we apportion the size of the shared bytes equally across all shards
additionalRamBytesUsed = Math.round((double) sharedRamBytesUsed / shardCount);
} else {
// some shards have nonzero cache footprint, so we apportion the size of the shared bytes proportionally to cache footprint
additionalRamBytesUsed = Math.round((double) sharedRamBytesUsed * cacheSize / totalSize);
/*
* Some shards have nonzero cache footprint, so we apportion the size of the shared bytes proportionally to the number of
* segment-requests in the cache for this shard (the number and size of documents associated with those requests is irrelevant
* for this calculation).
* Note that this was a somewhat arbitrary decision. Calculating it by number of documents might have been better. Calculating
* it by number of documents weighted by size would also be good, but possibly more expensive. But the decision to attribute
* memory proportionally to the number of segment-requests was made a long time ago, and we're sticking with that here for the
* sake of consistency and backwards compatibility.
*/
additionalRamBytesUsed = Math.round((double) sharedRamBytesUsed * itemsInCacheForShard / totalItemsInCache);
}
assert additionalRamBytesUsed >= 0L : additionalRamBytesUsed;
return additionalRamBytesUsed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.IOUtils;
Expand All @@ -34,26 +35,44 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;

public class IndicesQueryCacheTests extends ESTestCase {

private static class DummyQuery extends Query {
private static class DummyQuery extends Query implements Accountable {

private final int id;
private final String id;
private final long sizeInCache;

DummyQuery(int id) {
this(Integer.toString(id), 10);
}

DummyQuery(String id) {
this(id, 10);
}

DummyQuery(String id, long sizeInCache) {
this.id = id;
this.sizeInCache = sizeInCache;
}

@Override
public boolean equals(Object obj) {
return sameClassAs(obj) && id == ((DummyQuery) obj).id;
return sameClassAs(obj) && id.equals(((DummyQuery) obj).id);
}

@Override
public int hashCode() {
return 31 * classHash() + id;
return 31 * classHash() + id.hashCode();
}

@Override
Expand Down Expand Up @@ -82,6 +101,10 @@ public boolean isCacheable(LeafReaderContext ctx) {
};
}

@Override
public long ramBytesUsed() {
return sizeInCache;
}
}

public void testBasics() throws IOException {
Expand Down Expand Up @@ -404,4 +427,155 @@ public void testDelegatesScorerSupplier() throws Exception {
cache.onClose(shard);
cache.close();
}

public void testGetStatsMemory() throws Exception {
/*
* This test creates 2 shards, one with two segments and one with one. It makes unique queries against all 3 segments (so that each
* query will be cached, up to the max cache size), and then asserts various things about the cache memory. Most importantly, it
* asserts that the memory the cache attributes to each shard is proportional to the number of segment-queries for the shard in the
* cache (and not to the number of documents in the query).
*/
String indexName = randomIdentifier();
String uuid = randomUUID();
ShardId shard1 = new ShardId(indexName, uuid, 0);
ShardId shard2 = new ShardId(indexName, uuid, 1);
List<Closeable> closeableList = new ArrayList<>();
// We're going to create 2 segments for shard1, and 1 segment for shard2:
int shard1Segment1Docs = randomIntBetween(11, 1000);
int shard1Segment2Docs = randomIntBetween(1, 10);
int shard2Segment1Docs = randomIntBetween(1, 10);
IndexSearcher shard1Segment1Searcher = initializeSegment(shard1, shard1Segment1Docs, closeableList);
IndexSearcher shard1Segment2Searcher = initializeSegment(shard1, shard1Segment2Docs, closeableList);
IndexSearcher shard2Searcher = initializeSegment(shard2, shard2Segment1Docs, closeableList);

final int maxCacheSize = 200;
Settings settings = Settings.builder()
.put(IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING.getKey(), maxCacheSize)
.put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), true)
.build();
IndicesQueryCache cache = new IndicesQueryCache(settings);
shard1Segment1Searcher.setQueryCache(cache);
shard1Segment2Searcher.setQueryCache(cache);
shard2Searcher.setQueryCache(cache);

assertEquals(0L, cache.getStats(shard1).getMemorySizeInBytes());

final long largeQuerySize = randomIntBetween(100, 1000);
final long smallQuerySize = randomIntBetween(10, 50);

final int shard1Queries = randomIntBetween(20, 50);
final int shard2Queries = randomIntBetween(5, 10);

for (int i = 0; i < shard1Queries; ++i) {
shard1Segment1Searcher.count(new DummyQuery("ingest1-" + i, largeQuerySize));
}
long shard1Segment1CacheMemory = calculateActualCacheMemoryForSegment(shard1Queries, largeQuerySize, shard1Segment1Docs);
assertThat(cache.getStats(shard1).getMemorySizeInBytes(), equalTo(shard1Segment1CacheMemory));
assertThat(cache.getStats(shard2).getMemorySizeInBytes(), equalTo(0L));
for (int i = 0; i < shard2Queries; ++i) {
shard2Searcher.count(new DummyQuery("ingest2-" + i, smallQuerySize));
}
/*
* Now that we have cached some smaller things for shard2, the cache memory for shard1 has gone down. This is expected because we
* report cache memory proportional to the number of segments for each shard, ignoring the number of documents or the actual
* document sizes. Since the shard2 requests were smaller, the average cache memory size per segment has now gone down.
*/
assertThat(cache.getStats(shard1).getMemorySizeInBytes(), lessThan(shard1Segment1CacheMemory));
long shard1CacheBytes = cache.getStats(shard1).getMemorySizeInBytes();
long shard2CacheBytes = cache.getStats(shard2).getMemorySizeInBytes();
long shard2Segment1CacheMemory = calculateActualCacheMemoryForSegment(shard2Queries, smallQuerySize, shard2Segment1Docs);

long totalMemory = shard1Segment1CacheMemory + shard2Segment1CacheMemory;
// Each shard has some fixed overhead that we need to account for:
long shard1Overhead = calculateOverheadForSegment(shard1Queries, shard1Segment1Docs);
long shard2Overhead = calculateOverheadForSegment(shard2Queries, shard2Segment1Docs);
long totalMemoryMinusOverhead = totalMemory - (shard1Overhead + shard2Overhead);
/*
* Note that the expected amount of memory we're calculating is based on the proportion of the number of queries to each shard
* (since each shard currently only has queries to one segment)
*/
double shard1Segment1CacheMemoryShare = ((double) shard1Queries / (shard1Queries + shard2Queries)) * (totalMemoryMinusOverhead)
+ shard1Overhead;
double shard2Segment1CacheMemoryShare = ((double) shard2Queries / (shard1Queries + shard2Queries)) * (totalMemoryMinusOverhead)
+ shard2Overhead;
assertThat((double) shard1CacheBytes, closeTo(shard1Segment1CacheMemoryShare, 1)); // accounting for rounding
assertThat((double) shard2CacheBytes, closeTo(shard2Segment1CacheMemoryShare, 1)); // accounting for rounding

// Now we cache just more "big" searches on shard1, but on a different segment:
for (int i = 0; i < shard1Queries; ++i) {
shard1Segment2Searcher.count(new DummyQuery("ingest3-" + i, largeQuerySize));
}
long shard1Segment2CacheMemory = calculateActualCacheMemoryForSegment(shard1Queries, largeQuerySize, shard1Segment2Docs);
totalMemory = shard1Segment1CacheMemory + shard2Segment1CacheMemory + shard1Segment2CacheMemory;
// Each shard has some fixed overhead that we need to account for:
shard1Overhead = shard1Overhead + calculateOverheadForSegment(shard1Queries, shard1Segment2Docs);
totalMemoryMinusOverhead = totalMemory - (shard1Overhead + shard2Overhead);
/*
* Note that the expected amount of memory we're calculating is based on the proportion of the number of queries to each segment.
* The number of documents and the size of documents is irrelevant (aside from computing the fixed overhead).
*/
shard1Segment1CacheMemoryShare = ((double) (2 * shard1Queries) / ((2 * shard1Queries) + shard2Queries)) * (totalMemoryMinusOverhead)
+ shard1Overhead;
shard1CacheBytes = cache.getStats(shard1).getMemorySizeInBytes();
assertThat((double) shard1CacheBytes, closeTo(shard1Segment1CacheMemoryShare, 1)); // accounting for rounding

// Now make sure the cache only has items for shard2:
for (int i = 0; i < (maxCacheSize * 2); ++i) {
shard2Searcher.count(new DummyQuery("ingest4-" + i, smallQuerySize));
}
assertThat(cache.getStats(shard1).getMemorySizeInBytes(), equalTo(0L));
assertThat(
cache.getStats(shard2).getMemorySizeInBytes(),
equalTo(calculateActualCacheMemoryForSegment(maxCacheSize, smallQuerySize, shard2Segment1Docs))
);

IOUtils.close(closeableList);
cache.onClose(shard1);
cache.onClose(shard2);
cache.close();
}

/*
* This calculates the memory that actually used by a segment in the IndicesQueryCache. It assumes queryCount queries are made to the
* segment, and query is querySize bytes in size. It assumes that the shard contains numDocs documents.
*/
private long calculateActualCacheMemoryForSegment(long queryCount, long querySize, long numDocs) {
return (queryCount * (querySize + 24)) + calculateOverheadForSegment(queryCount, numDocs);
}

/*
* This computes the part of the recorded IndicesQueryCache memory that is assigned to a segment and *not* divided up proportionally
* when the cache reports the memory usage of each shard.
*/
private long calculateOverheadForSegment(long queryCount, long numDocs) {
return queryCount * (112 + (8 * ((numDocs - 1) / 64)));
}

/*
* This returns an IndexSearcher for a single new segment in the given shard.
*/
private IndexSearcher initializeSegment(ShardId shard, int numDocs, List<Closeable> closeableList) throws Exception {
AtomicReference<IndexSearcher> indexSearcherReference = new AtomicReference<>();
/*
* Usually creating an IndexWriter like this results in a single segment getting created, but sometimes it results in more. For the
* sake of keeping the calculations in this test simple we want just a single segment. So we do this in an assertBusy.
*/
assertBusy(() -> {
Directory dir = newDirectory();
IndexWriter indexWriter = new IndexWriter(dir, newIndexWriterConfig());
for (int i = 0; i < numDocs; i++) {
indexWriter.addDocument(new Document());
}
DirectoryReader directoryReader = DirectoryReader.open(indexWriter);
indexWriter.close();
directoryReader = ElasticsearchDirectoryReader.wrap(directoryReader, shard);
IndexSearcher indexSearcher = new IndexSearcher(directoryReader);
indexSearcherReference.set(indexSearcher);
indexSearcher.setQueryCachingPolicy(TrivialQueryCachingPolicy.ALWAYS);
closeableList.add(directoryReader);
closeableList.add(dir);
assertThat(indexSearcher.getLeafContexts().size(), equalTo(1));
});
return indexSearcherReference.get();
}
}