Skip to content

Commit

Permalink
Secondary index cache
Browse files Browse the repository at this point in the history
  • Loading branch information
al13n321 committed Sep 5, 2023
1 parent 7403ee6 commit 29d593b
Show file tree
Hide file tree
Showing 34 changed files with 315 additions and 52 deletions.
16 changes: 11 additions & 5 deletions programs/local/LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,17 +699,23 @@ void LocalServer::processConfig()
if (index_mark_cache_size > max_cache_size)
{
index_mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(index_mark_cache_size));
}
global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio);

size_t mmap_cache_size = config().getUInt64("mmap_cache_size", DEFAULT_MMAP_CACHE_MAX_SIZE);
if (mmap_cache_size > max_cache_size)
global_context->setMMappedFileCache(mmap_cache_size);

String secondary_index_cache_policy = config().getString("secondary_index_cache_policy", DEFAULT_SECONDARY_INDEX_CACHE_POLICY);
size_t secondary_index_cache_size = config().getUInt64("secondary_index_cache_size", DEFAULT_SECONDARY_INDEX_CACHE_MAX_SIZE);
size_t secondary_index_cache_max_count = config().getUInt64("secondary_index_cache_max_count", DEFAULT_SECONDARY_INDEX_CACHE_MAX_COUNT);
double secondary_index_cache_size_ratio = config().getDouble("secondary_index_cache_size_ratio", DEFAULT_SECONDARY_INDEX_CACHE_SIZE_RATIO);
if (secondary_index_cache_size > max_cache_size)
{
mmap_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mmap file cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
secondary_index_cache_size = max_cache_size;
LOG_INFO(log, "Lowered secondary index cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(secondary_index_cache_size));
}
global_context->setMMappedFileCache(mmap_cache_size);
global_context->setSecondaryIndexCache(secondary_index_cache_policy, secondary_index_cache_size, secondary_index_cache_max_count, secondary_index_cache_size_ratio);

/// Initialize a dummy query cache.
global_context->setQueryCache(0, 0, 0, 0);
Expand Down
21 changes: 14 additions & 7 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ try
if (index_uncompressed_cache_size > max_cache_size)
{
index_uncompressed_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
LOG_INFO(log, "Lowered index uncompressed cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(index_uncompressed_cache_size));
}
global_context->setIndexUncompressedCache(index_uncompressed_cache_policy, index_uncompressed_cache_size, index_uncompressed_cache_size_ratio);

Expand All @@ -1120,17 +1120,23 @@ try
if (index_mark_cache_size > max_cache_size)
{
index_mark_cache_size = max_cache_size;
LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
LOG_INFO(log, "Lowered index mark cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(index_mark_cache_size));
}
global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio);

size_t mmap_cache_size = server_settings.mmap_cache_size;
if (mmap_cache_size > max_cache_size)
global_context->setMMappedFileCache(mmap_cache_size);

String secondary_index_cache_policy = server_settings.secondary_index_cache_policy;
size_t secondary_index_cache_size = server_settings.secondary_index_cache_size;
size_t secondary_index_cache_max_count = server_settings.secondary_index_cache_max_count;
double secondary_index_cache_size_ratio = server_settings.secondary_index_cache_size_ratio;
if (secondary_index_cache_size > max_cache_size)
{
mmap_cache_size = max_cache_size;
LOG_INFO(log, "Lowered mmap file cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
secondary_index_cache_size = max_cache_size;
LOG_INFO(log, "Lowered secondary index cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(secondary_index_cache_size));
}
global_context->setMMappedFileCache(mmap_cache_size);
global_context->setSecondaryIndexCache(secondary_index_cache_policy, secondary_index_cache_size, secondary_index_cache_max_count, secondary_index_cache_size_ratio);

size_t query_cache_max_size_in_bytes = config().getUInt64("query_cache.max_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_SIZE);
size_t query_cache_max_entries = config().getUInt64("query_cache.max_entries", DEFAULT_QUERY_CACHE_MAX_ENTRIES);
Expand All @@ -1139,7 +1145,7 @@ try
if (query_cache_max_size_in_bytes > max_cache_size)
{
query_cache_max_size_in_bytes = max_cache_size;
LOG_INFO(log, "Lowered query cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
LOG_INFO(log, "Lowered query cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(query_cache_max_size_in_bytes));
}
global_context->setQueryCache(query_cache_max_size_in_bytes, query_cache_max_entries, query_cache_query_cache_max_entry_size_in_bytes, query_cache_max_entry_size_in_rows);

Expand Down Expand Up @@ -1374,6 +1380,7 @@ try
global_context->updateIndexUncompressedCacheConfiguration(*config);
global_context->updateIndexMarkCacheConfiguration(*config);
global_context->updateMMappedFileCacheConfiguration(*config);
global_context->updateSecondaryIndexCacheConfiguration(*config);
global_context->updateQueryCacheConfiguration(*config);

CompressionCodecEncrypted::Configuration::instance().tryLoad(*config, "encryption_codecs");
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ enum class AccessType
M(SYSTEM_DROP_MARK_CACHE, "SYSTEM DROP MARK, DROP MARK CACHE, DROP MARKS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_UNCOMPRESSED_CACHE, "SYSTEM DROP UNCOMPRESSED, DROP UNCOMPRESSED CACHE, DROP UNCOMPRESSED", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_MMAP_CACHE, "SYSTEM DROP MMAP, DROP MMAP CACHE, DROP MMAP", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_SECONDARY_INDEX_CACHE, "SYSTEM DROP SECONDARY INDEX CACHE, DROP SECONDARY INDEX CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_QUERY_CACHE, "SYSTEM DROP QUERY, DROP QUERY CACHE, DROP QUERY", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_COMPILED_EXPRESSION_CACHE, "SYSTEM DROP COMPILED EXPRESSION, DROP COMPILED EXPRESSION CACHE, DROP COMPILED EXPRESSIONS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
Expand Down
1 change: 1 addition & 0 deletions src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@
M(FilesystemCacheDownloadQueueElements, "Filesystem cache elements in download queue") \
M(FilesystemCacheDelayedCleanupElements, "Filesystem cache elements in background cleanup queue") \
M(AsyncInsertCacheSize, "Number of async insert hash id in cache") \
M(SecondaryIndexCacheSize, "Size (in bytes) of the secondary index cache") \
M(S3Requests, "S3 requests") \
M(KeeperAliveConnections, "Number of alive connections") \
M(KeeperOutstandingRequets, "Number of outstanding requests") \
Expand Down
3 changes: 3 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
M(UncompressedCacheWeightLost, "Number of bytes evicted from the uncompressed cache.") \
M(MMappedFileCacheHits, "Number of times a file has been found in the MMap cache (for the 'mmap' read_method), so we didn't have to mmap it again.") \
M(MMappedFileCacheMisses, "Number of times a file has not been found in the MMap cache (for the 'mmap' read_method), so we had to mmap it again.") \
M(SecondaryIndexCacheHits, "Number of times an index granule has been found in the secondary index cache.") \
M(SecondaryIndexCacheMisses, "Number of times an index granule has not been found in the secondary index cache and had to be read from disk.") \
M(SecondaryIndexCacheBytesEvicted, "Approximate total size (memory usage) of index granules evicted from the secondary index cache.") \
M(OpenedFileCacheHits, "Number of times a file has been found in the opened file cache, so we didn't have to open it again.") \
M(OpenedFileCacheMisses, "Number of times a file has been found in the opened file cache, so we had to open it again.") \
M(OpenedFileCacheMicroseconds, "Amount of time spent executing OpenedFileCache methods.") \
Expand Down
6 changes: 5 additions & 1 deletion src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,17 @@ static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO = 0.5l;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_INDEX_MARK_CACHE_MAX_SIZE = 0_MiB;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO = 0.5l;
static constexpr auto DEFAULT_MMAP_CACHE_MAX_SIZE = 1_KiB; /// chosen by rolling dice
static constexpr auto DEFAULT_MMAP_CACHE_MAX_SIZE = 1024; /// chosen by rolling dice
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE = 128_MiB;
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES = 10'000;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_SIZE = 1_GiB;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRIES = 1024uz;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES = 1_MiB;
static constexpr auto DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS = 30'000'000uz;
static constexpr auto DEFAULT_SECONDARY_INDEX_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_SECONDARY_INDEX_CACHE_MAX_SIZE = 5_GiB;
static constexpr auto DEFAULT_SECONDARY_INDEX_CACHE_SIZE_RATIO = 0.5l;
static constexpr auto DEFAULT_SECONDARY_INDEX_CACHE_MAX_COUNT = 10000000;

/// Query profiler cannot work with sanitizers.
/// Sanitizers are using quick "frame walking" stack unwinding (this implies -fno-omit-frame-pointer)
Expand Down
8 changes: 6 additions & 2 deletions src/Core/ServerSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ namespace DB
M(UInt64, max_concurrent_insert_queries, 0, "Maximum number of concurrently INSERT queries. Zero means unlimited.", 0) \
M(UInt64, max_concurrent_select_queries, 0, "Maximum number of concurrently SELECT queries. Zero means unlimited.", 0) \
\
M(Double, cache_size_to_ram_max_ratio, 0.5, "Set cache size ro RAM max ratio. Allows to lower cache size on low-memory systems.", 0) \
M(Double, cache_size_to_ram_max_ratio, 0.5, "Set cache size to RAM max ratio. Allows to lower cache size on low-memory systems.", 0) \
M(String, uncompressed_cache_policy, DEFAULT_UNCOMPRESSED_CACHE_POLICY, "Uncompressed cache policy name.", 0) \
M(UInt64, uncompressed_cache_size, DEFAULT_UNCOMPRESSED_CACHE_MAX_SIZE, "Size of cache for uncompressed blocks. Zero means disabled.", 0) \
M(Double, uncompressed_cache_size_ratio, DEFAULT_UNCOMPRESSED_CACHE_SIZE_RATIO, "The size of the protected queue in the uncompressed cache relative to the cache's total size.", 0) \
Expand All @@ -70,7 +70,11 @@ namespace DB
M(String, index_mark_cache_policy, DEFAULT_INDEX_MARK_CACHE_POLICY, "Index mark cache policy name.", 0) \
M(UInt64, index_mark_cache_size, DEFAULT_INDEX_MARK_CACHE_MAX_SIZE, "Size of cache for index marks. Zero means disabled.", 0) \
M(Double, index_mark_cache_size_ratio, DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO, "The size of the protected queue in the index mark cache relative to the cache's total size.", 0) \
M(UInt64, mmap_cache_size, DEFAULT_MMAP_CACHE_MAX_SIZE, "A cache for mmapped files.", 0) \
M(UInt64, mmap_cache_size, DEFAULT_MMAP_CACHE_MAX_SIZE, "Maximum number of files to keep in the mmapped file cache.", 0) \
M(String, secondary_index_cache_policy, DEFAULT_SECONDARY_INDEX_CACHE_POLICY, "Index mark cache policy name.", 0) \
M(UInt64, secondary_index_cache_size, DEFAULT_SECONDARY_INDEX_CACHE_MAX_SIZE, "Size (in bytes) of the cache for secondary index granules.", 0) \
M(Double, secondary_index_cache_size_ratio, DEFAULT_SECONDARY_INDEX_CACHE_SIZE_RATIO, "The size of the protected queue in the secondary index cache relative to the cache's total size.", 0) \
M(UInt64, secondary_index_cache_max_count, DEFAULT_SECONDARY_INDEX_CACHE_MAX_COUNT, "Limit on the number of entries (index granules) in the secondary index cache. (secondary_index_cache_size is not sufficient because Needed because the entries may be very small.)", 0) \
\
M(Bool, disable_internal_dns_cache, false, "Disable internal DNS caching at all.", 0) \
M(Int32, dns_cache_update_period, 15, "Internal DNS cache update period in seconds.", 0) \
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/BloomFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class BloomFilter
/// For debug.
UInt64 isEmpty() const;

size_t memoryUsageBytes() const { return filter.size() * sizeof(filter[0]); }

friend bool operator== (const BloomFilter & a, const BloomFilter & b);
private:

Expand Down
41 changes: 41 additions & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Storages/MergeTree/SecondaryIndexCache.h>
#include <Interpreters/SynonymsExtensions.h>
#include <Interpreters/Lemmatizers.h>
#include <Interpreters/ClusterDiscovery.h>
Expand Down Expand Up @@ -253,6 +254,7 @@ struct ContextSharedPart : boost::noncopyable
mutable std::unique_ptr<ThreadPool> load_marks_threadpool; /// Threadpool for loading marks cache.
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable SecondaryIndexCachePtr secondary_index_cache; /// Cache of deserialized secondary index granules.
mutable QueryCachePtr query_cache; /// Cache of query results.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
Expand Down Expand Up @@ -2444,6 +2446,41 @@ void Context::clearMMappedFileCache() const
shared->mmap_cache->clear();
}

void Context::setSecondaryIndexCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_count, double size_ratio)
{
auto lock = getLock();

if (shared->secondary_index_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Secondary index cache has been already created.");

shared->secondary_index_cache = std::make_shared<SecondaryIndexCache>(cache_policy, max_size_in_bytes, max_count, size_ratio);
}

void Context::updateSecondaryIndexCacheConfiguration(const Poco::Util::AbstractConfiguration & config)
{
auto lock = getLock();

if (!shared->secondary_index_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Secondary index cache was not created yet.");

size_t max_size_in_bytes = config.getUInt64("secondary_index_cache_size", DEFAULT_SECONDARY_INDEX_CACHE_MAX_SIZE);
shared->secondary_index_cache->setMaxSizeInBytes(max_size_in_bytes);
}

SecondaryIndexCachePtr Context::getSecondaryIndexCache() const
{
auto lock = getLock();
return shared->secondary_index_cache;
}

void Context::clearSecondaryIndexCache() const
{
auto lock = getLock();

if (shared->secondary_index_cache)
shared->secondary_index_cache->clear();
}

void Context::setQueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes, size_t max_entry_size_in_rows)
{
auto lock = getLock();
Expand Down Expand Up @@ -2506,6 +2543,10 @@ void Context::clearCaches() const
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mmapped file cache was not created yet.");
shared->mmap_cache->clear();

if (!shared->secondary_index_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Secondary index cache was not created yet.");
shared->secondary_index_cache->clear();

/// Intentionally not clearing the query cache which is transactionally inconsistent by design.
}

Expand Down
6 changes: 6 additions & 0 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class Compiler;
class MarkCache;
class MMappedFileCache;
class UncompressedCache;
class SecondaryIndexCache;
class ProcessList;
class QueryStatus;
using QueryStatusPtr = std::shared_ptr<QueryStatus>;
Expand Down Expand Up @@ -940,6 +941,11 @@ class Context: public std::enable_shared_from_this<Context>
std::shared_ptr<MMappedFileCache> getMMappedFileCache() const;
void clearMMappedFileCache() const;

void setSecondaryIndexCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_count, double size_ratio);
void updateSecondaryIndexCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<SecondaryIndexCache> getSecondaryIndexCache() const;
void clearSecondaryIndexCache() const;

void setQueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes, size_t max_entry_size_in_rows);
void updateQueryCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<QueryCache> getQueryCache() const;
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/GinFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ class GinFilter
const GinSegmentWithRowIdRangeVector & getFilter() const { return rowid_ranges; }
GinSegmentWithRowIdRangeVector & getFilter() { return rowid_ranges; }

size_t memoryUsageBytes() const { return rowid_ranges.size() * sizeof(rowid_ranges[0]); }

private:
/// Filter parameters
const GinFilterParameters & params;
Expand Down
5 changes: 5 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->checkAccess(AccessType::SYSTEM_DROP_MMAP_CACHE);
system_context->clearMMappedFileCache();
break;
case Type::DROP_SECONDARY_INDEX_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_SECONDARY_INDEX_CACHE);
system_context->clearSecondaryIndexCache();
break;
case Type::DROP_QUERY_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_QUERY_CACHE);
getContext()->clearQueryCache();
Expand Down Expand Up @@ -1070,6 +1074,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_UNCOMPRESSED_CACHE:
case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
case Type::DROP_SECONDARY_INDEX_CACHE:
case Type::DROP_FILESYSTEM_CACHE:
case Type::SYNC_FILESYSTEM_CACHE:
case Type::DROP_SCHEMA_CACHE:
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
DROP_INDEX_MARK_CACHE,
DROP_INDEX_UNCOMPRESSED_CACHE,
DROP_MMAP_CACHE,
DROP_SECONDARY_INDEX_CACHE,
DROP_QUERY_CACHE,
#if USE_EMBEDDED_COMPILER
DROP_COMPILED_EXPRESSION_CACHE,
Expand Down

0 comments on commit 29d593b

Please sign in to comment.