Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase async block cache deduplication timeout #57743

Merged
merged 4 commits into from Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 31 additions & 45 deletions src/Storages/MergeTree/AsyncBlockIDsCache.cpp
Expand Up @@ -18,6 +18,7 @@ namespace CurrentMetrics
namespace DB
{

static constexpr int FAILURE_RETRY_MS = 3000;

template <typename TStorage>
struct AsyncBlockIDsCache<TStorage>::Cache : public std::unordered_set<String>
Expand All @@ -29,35 +30,12 @@ struct AsyncBlockIDsCache<TStorage>::Cache : public std::unordered_set<String>
{}
};

template <typename TStorage>
std::vector<String> AsyncBlockIDsCache<TStorage>::getChildren()
{
auto zookeeper = storage.getZooKeeper();

auto watch_callback = [last_time = this->last_updatetime.load()
, my_update_min_interval = this->update_min_interval
, my_task = task->shared_from_this()](const Coordination::WatchResponse &)
{
auto now = std::chrono::steady_clock::now();
if (now - last_time < my_update_min_interval)
{
std::chrono::milliseconds sleep_time = std::chrono::duration_cast<std::chrono::milliseconds>(my_update_min_interval - (now - last_time));
my_task->scheduleAfter(sleep_time.count());
}
else
my_task->schedule();
};
std::vector<String> children;
Coordination::Stat stat;
zookeeper->tryGetChildrenWatch(path, children, &stat, watch_callback);
return children;
}

template <typename TStorage>
void AsyncBlockIDsCache<TStorage>::update()
try
{
std::vector<String> paths = getChildren();
auto zookeeper = storage.getZooKeeper();
std::vector<String> paths = zookeeper->getChildren(path);
std::unordered_set<String> set;
for (String & p : paths)
{
Expand All @@ -69,21 +47,20 @@ try
++version;
}
cv.notify_all();
last_updatetime = std::chrono::steady_clock::now();
}
catch (...)
{
LOG_INFO(log, "Updating async block ids cache failed. Reason: {}", getCurrentExceptionMessage(false));
task->scheduleAfter(update_min_interval.count());
task->scheduleAfter(FAILURE_RETRY_MS);
}

template <typename TStorage>
AsyncBlockIDsCache<TStorage>::AsyncBlockIDsCache(TStorage & storage_)
: storage(storage_),
update_min_interval(storage.getSettings()->async_block_ids_cache_min_update_interval_ms),
path(storage.getZooKeeperPath() + "/async_blocks"),
log_name(storage.getStorageID().getFullTableName() + " (AsyncBlockIDsCache)"),
log(&Poco::Logger::get(log_name))
: storage(storage_)
, update_wait(storage.getSettings()->async_block_ids_cache_update_wait_ms)
, path(storage.getZooKeeperPath() + "/async_blocks")
, log_name(storage.getStorageID().getFullTableName() + " (AsyncBlockIDsCache)")
, log(&Poco::Logger::get(log_name))
{
task = storage.getContext()->getSchedulePool().createTask(log_name, [this]{ update(); });
}
Expand All @@ -95,28 +72,37 @@ void AsyncBlockIDsCache<TStorage>::start()
task->activateAndSchedule();
}

template <typename TStorage>
void AsyncBlockIDsCache<TStorage>::triggerCacheUpdate()
{
/// Trigger task update. Watch-based updates may produce a lot of
/// redundant work in case of multiple replicas, so we use manually controlled updates
/// in case of duplicates
if (!task->schedule())
LOG_TRACE(log, "Task is already scheduled, will wait for update for {}ms", update_wait.count());
}

/// Caller will keep the version of last call. When the caller calls again, it will wait util gets a newer version.
template <typename TStorage>
Strings AsyncBlockIDsCache<TStorage>::detectConflicts(const Strings & paths, UInt64 & last_version)
{
if (!storage.getSettings()->use_async_block_ids_cache)
return {};

std::unique_lock lk(mu);
/// For first time access of this cache, the `last_version` is zero, so it will not block here.
/// For retrying request, We compare the request version and cache version, because zk only returns
/// incomplete information of duplication, we need to update the cache to find out more duplication.
/// The timeout here is to prevent deadlock, just in case.
cv.wait_for(lk, update_min_interval * 2, [&]{return version != last_version;});

if (version == last_version)
LOG_INFO(log, "Read cache with a old version {}", last_version);

CachePtr cur_cache;
cur_cache = cache_ptr;
last_version = version;
{
std::unique_lock lk(mu);
/// For first time access of this cache, the `last_version` is zero, so it will not block here.
/// For retrying request, We compare the request version and cache version, because zk only returns
/// incomplete information of duplication, we need to update the cache to find out more duplication.
cv.wait_for(lk, update_wait, [&]{return version != last_version;});

lk.unlock();
if (version == last_version)
LOG_INFO(log, "Read cache with a old version {}", last_version);

cur_cache = cache_ptr;
last_version = version;
}

if (cur_cache == nullptr)
return {};
Expand Down
7 changes: 3 additions & 4 deletions src/Storages/MergeTree/AsyncBlockIDsCache.h
Expand Up @@ -14,8 +14,6 @@ class AsyncBlockIDsCache
struct Cache;
using CachePtr = std::shared_ptr<Cache>;

std::vector<String> getChildren();

void update();

public:
Expand All @@ -27,12 +25,13 @@ class AsyncBlockIDsCache

Strings detectConflicts(const Strings & paths, UInt64 & last_version);

void triggerCacheUpdate();

private:

TStorage & storage;

std::atomic<std::chrono::steady_clock::time_point> last_updatetime;
const std::chrono::milliseconds update_min_interval;
const std::chrono::milliseconds update_wait;

std::mutex mu;
CachePtr cache_ptr;
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeSettings.h
Expand Up @@ -95,7 +95,7 @@ struct Settings;
M(UInt64, replicated_deduplication_window_seconds, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(UInt64, replicated_deduplication_window_for_async_inserts, 10000, "How many last hash values of async_insert blocks should be kept in ZooKeeper (old blocks will be deleted).", 0) \
M(UInt64, replicated_deduplication_window_seconds_for_async_inserts, 7 * 24 * 60 * 60 /* one week */, "Similar to \"replicated_deduplication_window_for_async_inserts\", but determines old blocks by their lifetime. Hash of an inserted block will be deleted (and the block will not be deduplicated after) if it outside of one \"window\". You can set very big replicated_deduplication_window to avoid duplicating INSERTs during that period of time.", 0) \
M(Milliseconds, async_block_ids_cache_min_update_interval_ms, 100, "Minimum interval between updates of async_block_ids_cache", 0) \
M(Milliseconds, async_block_ids_cache_update_wait_ms, 100, "How long each insert iteration will wait for async_block_ids_cache update", 0) \
M(Bool, use_async_block_ids_cache, true, "Use in-memory cache to filter duplicated async inserts based on block ids", 0) \
M(UInt64, max_replicated_logs_to_keep, 1000, "How many records may be in log, if there is inactive replica. Inactive replica becomes lost when when this number exceed.", 0) \
M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \
Expand Down Expand Up @@ -214,6 +214,7 @@ struct Settings;
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, Bool, use_metadata_cache, false) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, UInt64, merge_tree_enable_clear_old_broken_detached, 0) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, UInt64, merge_tree_clear_old_broken_detached_parts_ttl_timeout_seconds, 1ULL * 3600 * 24 * 30) \
MAKE_OBSOLETE_MERGE_TREE_SETTING(M, UInt64, async_block_ids_cache_min_update_interval_ms, 1000) \

/// Settings that should not change after the creation of a table.
/// NOLINTNEXTLINE
Expand Down
7 changes: 5 additions & 2 deletions src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
Expand Up @@ -307,7 +307,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
auto profile_events_scope = std::make_unique<ProfileEventsScope>(&part_counters);

/// Some merging algorithms can mofidy the block which loses the information about the async insert offsets
/// when preprocessing or filtering data for asnyc inserts deduplication we want to use the initial, unmerged block
/// when preprocessing or filtering data for async inserts deduplication we want to use the initial, unmerged block
std::optional<BlockWithPartition> unmerged_block;

if constexpr (async_insert)
Expand Down Expand Up @@ -456,7 +456,7 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
if (!delayed_chunk)
return;

for (auto & partition: delayed_chunk->partitions)
for (auto & partition : delayed_chunk->partitions)
{
int retry_times = 0;
/// users may have lots of same inserts. It will be helpful to deduplicate in advance.
Expand All @@ -469,13 +469,16 @@ void ReplicatedMergeTreeSinkImpl<true>::finishDelayedChunk(const ZooKeeperWithFa
}

/// reset the cache version to zero for every partition write.
/// Version zero allows to avoid wait on first iteration
cache_version = 0;
while (true)
{
partition.temp_part.finalize();
auto conflict_block_ids = commitPart(zookeeper, partition.temp_part.part, partition.block_id, delayed_chunk->replicas_num, false).first;
if (conflict_block_ids.empty())
break;

storage.async_block_ids_cache.triggerCacheUpdate();
++retry_times;
LOG_DEBUG(log, "Found duplicate block IDs: {}, retry times {}", toString(conflict_block_ids), retry_times);
/// partition clean conflict
Expand Down