Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*wip* Query cache persistence #63091

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions base/base/cgroupsv2.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
#include <base/cgroupsv2.h>

#include <base/defines.h>

#include <fstream>
#include <sstream>


bool cgroupsV2Enabled()
Expand Down
2 changes: 1 addition & 1 deletion programs/local/LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ void LocalServer::processConfig()
global_context->setMMappedFileCache(mmap_cache_size);

/// Initialize a dummy query cache.
global_context->setQueryCache(0, 0, 0, 0);
global_context->setQueryCache(0, 0, 0, 0, {});

#if USE_EMBEDDED_COMPILER
size_t compiled_expression_cache_max_size_in_bytes = config().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE);
Expand Down
8 changes: 7 additions & 1 deletion programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1314,12 +1314,13 @@ try
size_t query_cache_max_entries = config().getUInt64("query_cache.max_entries", DEFAULT_QUERY_CACHE_MAX_ENTRIES);
size_t query_cache_query_cache_max_entry_size_in_bytes = config().getUInt64("query_cache.max_entry_size_in_bytes", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_BYTES);
size_t query_cache_max_entry_size_in_rows = config().getUInt64("query_cache.max_entry_rows_in_rows", DEFAULT_QUERY_CACHE_MAX_ENTRY_SIZE_IN_ROWS);
bool query_cache_persist_cache = config().getBool("query_cache.persist_cache", true);
if (query_cache_max_size_in_bytes > max_cache_size)
{
query_cache_max_size_in_bytes = max_cache_size;
LOG_INFO(log, "Lowered query cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setQueryCache(query_cache_max_size_in_bytes, query_cache_max_entries, query_cache_query_cache_max_entry_size_in_bytes, query_cache_max_entry_size_in_rows);
global_context->setQueryCache(query_cache_max_size_in_bytes, query_cache_max_entries, query_cache_query_cache_max_entry_size_in_bytes, query_cache_max_entry_size_in_rows, query_cache_persist_cache);

#if USE_EMBEDDED_COMPILER
size_t compiled_expression_cache_max_size_in_bytes = config().getUInt64("compiled_expression_cache_size", DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE);
Expand Down Expand Up @@ -2161,6 +2162,11 @@ try
if (!server_settings.shutdown_wait_unfinished_queries)
global_context->getProcessList().killAllQueries();

/// Persist query cache entries
const auto & query_cache = global_context->getQueryCache();
if (query_cache)
query_cache->shutdown();

if (current_connections)
current_connections = waitServersToFinish(servers, servers_lock, server_settings.shutdown_wait_unfinished);

Expand Down
289 changes: 249 additions & 40 deletions src/Interpreters/Cache/QueryCache.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
#include "Interpreters/Cache/QueryCache.h"

#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <Common/TTLCachePolicy.h>
#include <Common/formatReadable.h>
#include <Common/quoteString.h>
#include <Core/Settings.h>
#include <Functions/FunctionFactory.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/InDepthNodeVisitor.h>
Expand All @@ -13,14 +23,10 @@
#include <Parsers/TokenIterator.h>
#include <Parsers/formatAST.h>
#include <Parsers/parseDatabaseAndTableName.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <Common/TTLCachePolicy.h>
#include <Common/formatReadable.h>
#include <Common/quoteString.h>
#include <Core/Settings.h>
#include <base/defines.h> /// chassert

#include <filesystem>


namespace ProfileEvents
{
Expand Down Expand Up @@ -313,6 +319,50 @@ void QueryCache::Writer::buffer(Chunk && chunk, ChunkType chunk_type)
}
}

namespace
{

/// Combine N (usually small) chunks to M chunks with max_chunk_size rows each.
/// The input chunks are non-const to save unnecessary copies for convertToFullIfSparse and convertToFullIfConst.
Chunks squashChunks(Chunks & chunks, size_t max_chunk_size)
{
Chunks squashed_chunks;
size_t rows_remaining_in_squashed = 0; /// how many further rows can the last squashed chunk consume until it reaches max_block_size

for (auto & chunk : chunks)
{
convertToFullIfSparse(chunk);
convertToFullIfConst(chunk);

const size_t rows_chunk = chunk.getNumRows();
if (rows_chunk == 0)
continue;

size_t rows_chunk_processed = 0;
while (true)
{
if (rows_remaining_in_squashed == 0)
{
Chunk empty_chunk = Chunk(chunk.cloneEmptyColumns(), 0);
squashed_chunks.push_back(std::move(empty_chunk));
rows_remaining_in_squashed = max_chunk_size;
}

const size_t rows_to_append = std::min(rows_chunk - rows_chunk_processed, rows_remaining_in_squashed);
squashed_chunks.back().append(chunk, rows_chunk_processed, rows_to_append);
rows_chunk_processed += rows_to_append;
rows_remaining_in_squashed -= rows_to_append;

if (rows_chunk_processed == rows_chunk)
break;
}
}

return squashed_chunks;
}

}

void QueryCache::Writer::finalizeWrite()
{
if (skip_insert)
Expand Down Expand Up @@ -343,39 +393,7 @@ void QueryCache::Writer::finalizeWrite()
/// Squash partial result chunks to chunks of size 'max_block_size' each. This costs some performance but provides a more natural
/// compression of neither too small nor big blocks. Also, it will look like 'max_block_size' is respected when the query result is
/// served later on from the query cache.

Chunks squashed_chunks;
size_t rows_remaining_in_squashed = 0; /// how many further rows can the last squashed chunk consume until it reaches max_block_size

for (auto & chunk : query_result->chunks)
{
convertToFullIfSparse(chunk);
convertToFullIfConst(chunk);

const size_t rows_chunk = chunk.getNumRows();
if (rows_chunk == 0)
continue;

size_t rows_chunk_processed = 0;
while (true)
{
if (rows_remaining_in_squashed == 0)
{
Chunk empty_chunk = Chunk(chunk.cloneEmptyColumns(), 0);
squashed_chunks.push_back(std::move(empty_chunk));
rows_remaining_in_squashed = max_block_size;
}

const size_t rows_to_append = std::min(rows_chunk - rows_chunk_processed, rows_remaining_in_squashed);
squashed_chunks.back().append(chunk, rows_chunk_processed, rows_to_append);
rows_chunk_processed += rows_to_append;
rows_remaining_in_squashed -= rows_to_append;

if (rows_chunk_processed == rows_chunk)
break;
}
}

Chunks squashed_chunks = squashChunks(query_result->chunks, max_block_size);
query_result->chunks = std::move(squashed_chunks);
}

Expand Down Expand Up @@ -540,10 +558,201 @@ std::unique_ptr<SourceFromChunks> QueryCache::Reader::getSourceExtremes()
return std::move(source_from_chunks_extremes);
}

QueryCache::QueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_)
QueryCache::QueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_, const std::optional<std::filesystem::path> & path_)
: cache(std::make_unique<TTLCachePolicy<Key, Entry, KeyHasher, QueryCacheEntryWeight, IsStale>>(std::make_unique<PerUserTTLCachePolicyUserQuota>()))
, path(path_)
{
updateConfiguration(max_size_in_bytes, max_entries, max_entry_size_in_bytes_, max_entry_size_in_rows_);

readCacheEntriesFromPersistence();
}

void QueryCache::shutdown()
{
writeCacheEntriesToPersistence();
}

static constexpr std::string_view format_version_txt = "format_version.txt";
static constexpr uint32_t current_version = 1;

static constexpr auto * token_user_id = "user_id: ";
static constexpr auto * token_current_user_roles = "current_user_roles: ";
static constexpr auto * token_is_shared = "is_shared: ";
static constexpr auto * token_is_compressed = "is_compressed: ";
static constexpr auto * token_query_string = "query_string: ";

/// TODO remove weird logging (can't really debug)
void QueryCache::readCacheEntriesFromPersistence()
try
{
if (!path)
return; /// loading of persisted query cache entries is disabled by config

/// TODO document the data organization on disk

LoggerPtr logger = getLogger("XXX");

namespace fs = std::filesystem;

fs::path query_cache_path = *path;

fs::path format_version_path = query_cache_path / format_version_txt;
ReadBufferFromFile format_version_file(format_version_path); /// throws if file can't be opened
uint32_t version;
readIntText(version, format_version_file);
if (version != current_version)
return;

for (const auto & entry_file_it : fs::directory_iterator(query_cache_path))
{
const fs::path & entry_path = entry_file_it.path();

if (entry_path == format_version_path)
continue; /// ignore format_version.txt

String ast_hash_str = entry_path.filename();
size_t separator_pos = ast_hash_str.find('_');
chassert(separator_pos != String::npos);
String low64_str = ast_hash_str.substr(0, separator_pos);
String high64_str = ast_hash_str.substr(separator_pos + 1, ast_hash_str.size());
IAST::Hash ast_hash(std::stoull(low64_str), std::stoull(high64_str));

/// TODO construct a key, add it to the cache (needs a new ctor which accepts the hash directly)

ReadBufferFromFile entry_file(entry_path);

assertString(token_user_id, entry_file);
UUID user_id;
readUUIDText(user_id, entry_file);
/// can be UUIDHelpers::Nil

assertString(token_current_user_roles, entry_file);
std::vector<UUID> current_user_roles;
while (!checkChar('\n', entry_file))
{
UUID user_role;
readUUIDText(user_role, entry_file);
current_user_roles.push_back(user_role);
assertChar(',', entry_file);
}
assertChar('\n', entry_file);

assertString(token_is_shared, entry_file);
bool is_shared;
readBoolText(is_shared, entry_file);
assertChar('\n', entry_file);

/// expires_at is not read

assertString(token_is_compressed, entry_file);
bool is_compressed;
readBoolText(is_compressed, entry_file);
assertChar('\n', entry_file);

assertString(token_query_string, entry_file);
String query_string;
readStringUntilNewlineInto(query_string, entry_file);

LOG_TRACE(logger, "entry read {} {} {}", is_shared, is_compressed, query_string);
}
}
catch (...)
{
/// TODO log exception
/// throw; /// TODO don't throw, silently swallow exception
}

/// TODO wrap in try-catch
/// TODO remove weird logging (can't really debug)
void QueryCache::writeCacheEntriesToPersistence()
try
{
LoggerPtr logger = getLogger("XXX");

/// Store query cache entries to persistence:

if (!path)
return; /// storing of query cache entries to persistence disabled by config

namespace fs = std::filesystem;

fs::path query_cache_path = *path;

fs::remove_all(query_cache_path);
fs::create_directory(query_cache_path);

fs::path format_version_path = query_cache_path / format_version_txt;
WriteBufferFromFile format_version_file(format_version_path);
writeIntText(current_version, format_version_file);

const auto & entries = dump();
for (const auto & entry : entries)
{
const auto & entry_key = entry->key;
const auto & entry_mapped = entry->mapped;

IAST::Hash ast_hash = entry_key.ast_hash;
String ast_hash_str = std::to_string(ast_hash.low64) + '_' + std::to_string(ast_hash.high64);

fs::path entry_file_path = query_cache_path / ast_hash_str;
WriteBufferFromFile entry_file(entry_file_path.string());

/// TODO
/// Serializations header_serializations = entry_key.header.getSerializations();
/// for (const auto & serialization : header_serializations)
/// {
/// serialization.serializeBinary()
/// }

writeText(token_user_id, entry_file);
UUID user_id = entry_key.user_id ? *entry_key.user_id : UUIDHelpers::Nil;
writeUUIDText(user_id, entry_file);
writeText("\n", entry_file);

writeText(token_current_user_roles, entry_file);
/// writeVectorBinary(entry_key.current_user_roles, entry_file);
for (size_t i = 0; i < entry_key.current_user_roles.size(); ++i)
{
writeUUIDText(entry_key.current_user_roles[i], entry_file);
if (i != entry_key.current_user_roles.size() - 1)
writeText(",", entry_file);
}
writeText("\n", entry_file);

writeText(token_is_shared, entry_file);
writeBoolText(entry_key.is_shared, entry_file);
writeText("\n", entry_file);

/// expires_at is not written

writeText(token_is_compressed, entry_file);
writeBoolText(entry_key.is_compressed, entry_file);
writeText("\n", entry_file);

writeText(token_query_string, entry_file);
writeText(entry_key.query_string, entry_file);
writeText("\n", entry_file);

/// TODO write chunks, totals, extremes (in separate files?)

Chunks & chunks = entry_mapped->chunks;
const std::optional<Chunk> & totals = entry_mapped->totals;
const std::optional<Chunk> & extremes = entry_mapped->extremes;

/// To keep the file format simple, squash the result chunks to a single chunk.
chunks = squashChunks(chunks, std::numeric_limits<size_t>::max());

/// TODO write num_rows
/// TODO get TypeIndex from column, get DataType from TypeIndex (how??), get Serialization from DataType
/// TODO write into separate file
/// LOL ... see above header_serializations

LOG_TRACE(logger, "entry written");
}
}
catch (...)
{
/// TODO log exception
}

void QueryCache::updateConfiguration(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_)
Expand Down