Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase async block cache deduplication timeout #57743

Merged
merged 4 commits into from Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 18 additions & 18 deletions src/Storages/MergeTree/AsyncBlockIDsCache.cpp
Expand Up @@ -79,11 +79,12 @@ catch (...)

template <typename TStorage>
AsyncBlockIDsCache<TStorage>::AsyncBlockIDsCache(TStorage & storage_)
: storage(storage_),
update_min_interval(storage.getSettings()->async_block_ids_cache_min_update_interval_ms),
path(storage.getZooKeeperPath() + "/async_blocks"),
log_name(storage.getStorageID().getFullTableName() + " (AsyncBlockIDsCache)"),
log(&Poco::Logger::get(log_name))
: storage(storage_)
, update_min_interval(storage.getSettings()->async_block_ids_cache_min_update_interval_ms)
, update_wait(storage.getSettings()->async_block_ids_cache_update_wait_ms)
, path(storage.getZooKeeperPath() + "/async_blocks")
, log_name(storage.getStorageID().getFullTableName() + " (AsyncBlockIDsCache)")
, log(&Poco::Logger::get(log_name))
{
task = storage.getContext()->getSchedulePool().createTask(log_name, [this]{ update(); });
}
Expand All @@ -102,21 +103,20 @@ Strings AsyncBlockIDsCache<TStorage>::detectConflicts(const Strings & paths, UIn
if (!storage.getSettings()->use_async_block_ids_cache)
return {};

std::unique_lock lk(mu);
/// For first time access of this cache, the `last_version` is zero, so it will not block here.
/// For retrying request, We compare the request version and cache version, because zk only returns
/// incomplete information of duplication, we need to update the cache to find out more duplication.
/// The timeout here is to prevent deadlock, just in case.
cv.wait_for(lk, update_min_interval * 2, [&]{return version != last_version;});

if (version == last_version)
LOG_INFO(log, "Read cache with a old version {}", last_version);

CachePtr cur_cache;
cur_cache = cache_ptr;
last_version = version;
{
std::unique_lock lk(mu);
/// For first time access of this cache, the `last_version` is zero, so it will not block here.
/// For retrying request, We compare the request version and cache version, because zk only returns
/// incomplete information of duplication, we need to update the cache to find out more duplication.
cv.wait_for(lk, update_wait, [&]{return version != last_version;});

lk.unlock();
if (version == last_version)
LOG_INFO(log, "Read cache with a old version {}", last_version);

cur_cache = cache_ptr;
last_version = version;
}

if (cur_cache == nullptr)
return {};
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/AsyncBlockIDsCache.h
Expand Up @@ -33,6 +33,7 @@ class AsyncBlockIDsCache

std::atomic<std::chrono::steady_clock::time_point> last_updatetime;
const std::chrono::milliseconds update_min_interval;
const std::chrono::milliseconds update_wait;

std::mutex mu;
CachePtr cache_ptr;
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeSettings.h
Expand Up @@ -95,7 +95,8 @@ struct Settings;
M(UInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(UInt64, replicated_deduplication_window_for_async_inserts, 10000, "How many last hash values of async_insert blocks should be kept in ZooKeeper (old blocks will be deleted).", 0) \
M(UInt64, replicated_deduplication_window_seconds_for_async_inserts, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window_for_async_inserts\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(Milliseconds, async_block_ids_cache_min_update_interval_ms, 100, "Minimum interval between updates of async_block_ids_cache", 0) \
M(Milliseconds, async_block_ids_cache_min_update_interval_ms, 1000, "Minimum interval between updates of async_block_ids_cache", 0) \
M(Milliseconds, async_block_ids_cache_update_wait_ms, 100, "How long each insert iteration will wait for async_block_ids_cache update", 0) \
M(Bool, use_async_block_ids_cache, true, "Use in-memory cache to filter duplicated async inserts based on block ids", 0) \
M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \
M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \
Expand Down