Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a setting to force read-through cache for merges #60308

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ class IColumn;
M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \
M(Bool, enable_filesystem_cache_log, false, "Allows to record the filesystem caching log for each query", 0) \
M(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, "Allow to use the filesystem cache in passive mode - benefit from the existing cache entries, but don't put more entries into the cache. If you set this setting for heavy ad-hoc queries and leave it disabled for short real-time queries, this will allows to avoid cache threshing by too heavy queries and to improve the overall system efficiency.", 0) \
M(Bool, force_read_through_cache_for_merges, false, "Force read-through cache for merges", 0) \
kssenii marked this conversation as resolved.
Show resolved Hide resolved
M(Bool, skip_download_if_exceeds_query_cache, true, "Skip download from remote filesystem if exceeds query cache size", 0) \
M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
Expand Down
10 changes: 4 additions & 6 deletions src/Disks/IO/ReadBufferFromRemoteFSGather.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ using namespace DB;

namespace
{
bool withCache(const ReadSettings & settings)
{
return settings.remote_fs_cache && settings.enable_filesystem_cache
&& (!CurrentThread::getQueryId().empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache
|| !settings.avoid_readthrough_cache_outside_query_context);
}
bool withCache(const ReadSettings & settings)
{
return settings.remote_fs_cache && settings.enable_filesystem_cache;
}
}

namespace DB
Expand Down
14 changes: 0 additions & 14 deletions src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ ReadSettings CachedObjectStorage::patchSettings(const ReadSettings & read_settin
{
ReadSettings modified_settings{read_settings};
modified_settings.remote_fs_cache = cache;

if (!canUseReadThroughCache(read_settings))
modified_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;

return object_storage->patchSettings(modified_settings);
}

Expand Down Expand Up @@ -206,14 +202,4 @@ String CachedObjectStorage::getObjectsNamespace() const
return object_storage->getObjectsNamespace();
}

bool CachedObjectStorage::canUseReadThroughCache(const ReadSettings & settings)
{
if (!settings.avoid_readthrough_cache_outside_query_context)
return true;

return CurrentThread::isInitialized()
&& CurrentThread::get().getQueryContext()
&& !CurrentThread::getQueryId().empty();
}

}
2 changes: 0 additions & 2 deletions src/Disks/ObjectStorages/Cached/CachedObjectStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ class CachedObjectStorage final : public IObjectStorage

const FileCacheSettings & getCacheSettings() const { return cache_settings; }

static bool canUseReadThroughCache(const ReadSettings & settings);

#if USE_AZURE_BLOB_STORAGE
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> getAzureBlobStorageClient() override
{
Expand Down
2 changes: 1 addition & 1 deletion src/IO/ReadSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ struct ReadSettings
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
bool enable_filesystem_cache_log = false;
/// Don't populate cache when the read is not part of query execution (e.g. background thread).
kssenii marked this conversation as resolved.
Show resolved Hide resolved
bool avoid_readthrough_cache_outside_query_context = true;
bool force_read_through_cache_merges = false;
size_t filesystem_cache_segments_batch_size = 20;

size_t filesystem_cache_max_download_size = (128UL * 1024 * 1024 * 1024);
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/Cache/FileSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <Common/logger_useful.h>
#include <Common/scope_guard_safe.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <Common/setThreadName.h>

#include <magic_enum.hpp>

Expand Down Expand Up @@ -194,7 +195,7 @@ bool FileSegment::isDownloaded() const
String FileSegment::getCallerId()
{
if (!CurrentThread::isInitialized() || CurrentThread::getQueryId().empty())
return "None:" + toString(getThreadId());
return fmt::format("None:{}:{}", getThreadName(), toString(getThreadId()));

return std::string(CurrentThread::getQueryId()) + ":" + toString(getThreadId());
}
Expand Down
1 change: 1 addition & 0 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5079,6 +5079,7 @@ ReadSettings Context::getReadSettings() const
res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache;
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
res.filesystem_cache_segments_batch_size = settings.filesystem_cache_segments_batch_size;
res.force_read_through_cache_merges = settings.force_read_through_cache_for_merges;

res.filesystem_cache_max_download_size = settings.filesystem_cache_max_download_size;
res.skip_download_if_exceeds_query_cache = settings.skip_download_if_exceeds_query_cache;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeSequentialSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(

const auto & context = storage.getContext();
ReadSettings read_settings = context->getReadSettings();
read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = true;
read_settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = !read_settings.force_read_through_cache_merges;
/// It does not make sense to use pthread_threadpool for background merges/mutations
/// And also to preserve backward compatibility
read_settings.local_fs_method = LocalFSReadMethod::pread;
Expand Down
79 changes: 79 additions & 0 deletions tests/integration/test_filesystem_cache/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ def cluster():
main_configs=[
"config.d/storage_conf.xml",
],
user_configs=[
"users.d/cache_on_write_operations.xml",
],
stay_alive=True,
)
cluster.add_instance(
Expand All @@ -35,6 +38,17 @@ def cluster():
],
stay_alive=True,
)
cluster.add_instance(
"node_force_read_through_cache_on_merge",
main_configs=[
"config.d/storage_conf.xml",
],
user_configs=[
"users.d/force_read_through_cache_on_merge.xml",
"users.d/cache_on_write_operations.xml",
],
stay_alive=True,
)

logging.info("Starting cluster...")
cluster.start()
Expand Down Expand Up @@ -323,3 +337,68 @@ def test_custom_cached_disk(cluster):
"SELECT cache_path FROM system.disks WHERE name = 'custom_cached4'"
).strip()
)


def test_force_filesystem_cache_on_merges(cluster):
def test(node, forced_read_through_cache_on_merge):
node.query(
"""
DROP TABLE IF EXISTS test SYNC;

CREATE TABLE test (key UInt32, value String)
Engine=MergeTree()
ORDER BY value
SETTINGS disk = disk(
type = cache,
path = 'force_cache_on_merges',
disk = 'hdd_blob',
max_file_segment_size = '1Ki',
cache_on_write_operations = 1,
boundary_alignment = '1Ki',
max_size = '10Gi',
max_elements = 10000000,
load_metadata_threads = 30);

SYSTEM DROP FILESYSTEM CACHE;
INSERT INTO test SELECT * FROM generateRandom('a Int32, b String') LIMIT 1000000;
INSERT INTO test SELECT * FROM generateRandom('a Int32, b String') LIMIT 1000000;
"""
)
assert int(node.query("SELECT count() FROM system.filesystem_cache")) > 0
assert int(node.query("SELECT max(size) FROM system.filesystem_cache")) == 1024

write_count = int(
node.query(
"SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'"
)
)
assert write_count > 100000
assert "" == node.query(
"SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'"
)

node.query("SYSTEM DROP FILESYSTEM CACHE")
node.query("OPTIMIZE TABLE test FINAL")

new_write_count = int(
node.query(
"SELECT value FROM system.events WHERE name = 'CachedWriteBufferCacheWriteBytes'"
)
)
assert new_write_count >= write_count

if forced_read_through_cache_on_merge:
assert 100000 < int(
node.query(
"SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'"
)
)
else:
assert "" == node.query(
"SELECT value FROM system.events WHERE name = 'CachedReadBufferCacheWriteBytes'"
)

node = cluster.instances["node_force_read_through_cache_on_merge"]
test(node, True)
node = cluster.instances["node"]
test(node, False)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<enable_filesystem_cache_on_write_operations>1</enable_filesystem_cache_on_write_operations>
</default>
</profiles>
</clickhouse>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<force_read_through_cache_for_merges>1</force_read_through_cache_for_merges>
</default>
</profiles>
</clickhouse>