Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated checking of SYSTEM SYNC REPLICA #45648

Merged
merged 18 commits into from Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
8c885ff
Updated checking of SYSTEM SYNC REPLICA
SmitaRKulkarni Jan 26, 2023
a95609e
Updated to validate & remove each element instead of just last elemen…
SmitaRKulkarni Jan 27, 2023
3996645
Updated to remove waiting ids till the removed log entry - Updated ch…
SmitaRKulkarni Jan 27, 2023
b22fd40
Merge branch 'master' into 45508_Update_strategy_for_system_sync_replica
SmitaRKulkarni Jan 27, 2023
2bbe08c
Updated destructor to notify even when queue empty - Updated checking…
SmitaRKulkarni Jan 27, 2023
9b457a5
Fixed clang-tidy build - Updated checking of SYSTEM SYNC REPLICA
SmitaRKulkarni Jan 28, 2023
809354d
Merge branch 'master' into 45508_Update_strategy_for_system_sync_replica
SmitaRKulkarni Jan 29, 2023
6cf2498
Fixed clang-tidy build for comparing strings and updated to notify af…
SmitaRKulkarni Jan 30, 2023
123f3cf
Used unordered_set for log entries and fixed the callback function - …
SmitaRKulkarni Feb 3, 2023
b94f9f8
Merge branch 'master' into 45508_Update_strategy_for_system_sync_replica
alesapin Feb 6, 2023
f34ef86
Moved background task trigger before getting log entries to wait - Up…
SmitaRKulkarni Feb 7, 2023
ef29b61
Updated callback to return log entry ids - Updated checking of SYSTE…
SmitaRKulkarni Feb 7, 2023
135615c
Removed unwanted check from before callback - Updated checking of SY…
SmitaRKulkarni Feb 7, 2023
fb76569
Removed state_mutex lock from addSubscriber - Updated checking of SY…
SmitaRKulkarni Feb 7, 2023
34341dd
Updated locks and removed getLogEntryIds function - Updated checking…
SmitaRKulkarni Feb 7, 2023
ecea28a
Fixed style check - Updated checking of SYSTEM SYNC REPLICA
SmitaRKulkarni Feb 7, 2023
8c9d994
Extended scope of state_mutex while adding subscriber - Updated chec…
SmitaRKulkarni Feb 7, 2023
87ec32f
Updated to use znode_name instead of log_entry_id - Updated checking …
SmitaRKulkarni Feb 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Interpreters/InterpreterSystemQuery.cpp
Expand Up @@ -886,8 +886,8 @@ void InterpreterSystemQuery::syncReplica()

if (auto * storage_replicated = dynamic_cast<StorageReplicatedMergeTree *>(table.get()))
{
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for it to become empty");
if (!storage_replicated->waitForShrinkingQueueSize(0, getContext()->getSettingsRef().receive_timeout.totalMilliseconds()))
LOG_TRACE(log, "Synchronizing entries in replica's queue with table's log and waiting for current last entry to be processed");
if (!storage_replicated->waitForProcessingQueue(getContext()->getSettingsRef().receive_timeout.totalMilliseconds()))
{
LOG_ERROR(log, "SYNC REPLICA {}: Timed out!", table_id.getNameForLogs());
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "SYNC REPLICA {}: command timed out. " \
Expand Down
24 changes: 18 additions & 6 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Expand Up @@ -545,7 +545,7 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find {} in the memory queue. It is a bug. Entry: {}",
entry->znode_name, entry->toString());

notifySubscribers(queue_size);
notifySubscribers(queue_size, entry->log_entry_id);

if (!need_remove_from_zk)
return;
Expand Down Expand Up @@ -1977,6 +1977,17 @@ void ReplicatedMergeTreeQueue::getEntries(LogEntriesData & res) const
res.emplace_back(*entry);
}

std::unordered_set<String> ReplicatedMergeTreeQueue::getLogEntryIds() const
{
std::unordered_set<String> result;
std::lock_guard lock(state_mutex);

result.reserve(queue.size());
for (const auto & entry : queue)
result.insert(entry->log_entry_id);

return result;
}

void ReplicatedMergeTreeQueue::getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const
{
Expand Down Expand Up @@ -2470,8 +2481,9 @@ ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCall

auto it = subscribers.emplace(subscribers.end(), std::move(callback));

/// Atomically notify about current size
(*it)(queue.size());
/// Notify if queue is empty
if (queue.empty())
(*it)(0, std::nullopt);

return SubscriberHandler(it, *this);
}
Expand All @@ -2482,16 +2494,16 @@ ReplicatedMergeTreeQueue::SubscriberHandler::~SubscriberHandler()
queue.subscribers.erase(it);
}

void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size)
void ReplicatedMergeTreeQueue::notifySubscribers(size_t new_queue_size, std::optional<String> removed_log_entry_id)
{
std::lock_guard lock_subscribers(subscribers_mutex);
for (auto & subscriber_callback : subscribers)
subscriber_callback(new_queue_size);
subscriber_callback(new_queue_size, removed_log_entry_id);
}

ReplicatedMergeTreeQueue::~ReplicatedMergeTreeQueue()
{
notifySubscribers(0);
notifySubscribers(0, std::nullopt);
}

String padIndex(Int64 index)
Expand Down
8 changes: 5 additions & 3 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.h
Expand Up @@ -163,7 +163,7 @@ class ReplicatedMergeTreeQueue
/// A subscriber callback is called when an entry queue is deleted
mutable std::mutex subscribers_mutex;

using SubscriberCallBack = std::function<void(size_t /* queue_size */)>;
using SubscriberCallBack = std::function<void(size_t /* queue_size */, std::optional<String> /* removed_log_entry_id */)>;
using Subscribers = std::list<SubscriberCallBack>;
using SubscriberIterator = Subscribers::iterator;

Expand All @@ -180,8 +180,8 @@ class ReplicatedMergeTreeQueue

Subscribers subscribers;

/// Notify subscribers about queue change
void notifySubscribers(size_t new_queue_size);
/// Notify subscribers about queue change (new queue size and entry that was removed)
void notifySubscribers(size_t new_queue_size, std::optional<String> removed_log_entry_id);

/// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it
bool checkReplaceRangeCanBeRemoved(
Expand Down Expand Up @@ -451,6 +451,8 @@ class ReplicatedMergeTreeQueue
using LogEntriesData = std::vector<ReplicatedMergeTreeLogEntryData>;
void getEntries(LogEntriesData & res) const;

std::unordered_set<String> getLogEntryIds() const;

/// Get information about the insertion times.
void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const;

Expand Down
41 changes: 24 additions & 17 deletions src/Storages/StorageReplicatedMergeTree.cpp
Expand Up @@ -7549,32 +7549,39 @@ void StorageReplicatedMergeTree::onActionLockRemove(StorageActionBlockType actio
background_moves_assignee.trigger();
}

bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds)
bool StorageReplicatedMergeTree::waitForProcessingQueue(UInt64 max_wait_milliseconds)
{
Stopwatch watch;

/// Let's fetch new log entries firstly
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
std::unordered_set<String> wait_for_ids = queue.getLogEntryIds();

/// This is significant, because the execution of this task could be delayed at BackgroundPool.
/// And we force it to be executed.
background_operations_assignee.trigger();

Poco::Event target_size_event;
auto callback = [&target_size_event, queue_size] (size_t new_queue_size)
if (!wait_for_ids.empty())
{
if (new_queue_size <= queue_size)
target_size_event.set();
};
const auto handler = queue.addSubscriber(std::move(callback));
/// This is significant, because the execution of this task could be delayed at BackgroundPool.
/// And we force it to be executed.
background_operations_assignee.trigger();

while (!target_size_event.tryWait(50))
{
if (max_wait_milliseconds && watch.elapsedMilliseconds() > max_wait_milliseconds)
return false;
Poco::Event target_entry_event;
auto callback = [&target_entry_event, &wait_for_ids](size_t new_queue_size, std::optional<String> removed_log_entry_id)
{
if (removed_log_entry_id.has_value())
wait_for_ids.erase(removed_log_entry_id.value());

if (partial_shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Shutdown is called for table");
if (wait_for_ids.empty() || new_queue_size == 0)
target_entry_event.set();
};
const auto handler = queue.addSubscriber(std::move(callback));

while (!target_entry_event.tryWait(50))
{
if (max_wait_milliseconds && watch.elapsedMilliseconds() > max_wait_milliseconds)
return false;

if (partial_shutdown_called)
throw Exception(ErrorCodes::ABORTED, "Shutdown is called for table");
}
}

return true;
Expand Down
4 changes: 2 additions & 2 deletions src/Storages/StorageReplicatedMergeTree.h
Expand Up @@ -178,9 +178,9 @@ class StorageReplicatedMergeTree final : public MergeTreeData

void onActionLockRemove(StorageActionBlockType action_type) override;

/// Wait when replication queue size becomes less or equal than queue_size
/// Wait till replication queue's current last entry is processed or till size becomes 0
/// If timeout is exceeded returns false
bool waitForShrinkingQueueSize(size_t queue_size = 0, UInt64 max_wait_milliseconds = 0);
bool waitForProcessingQueue(UInt64 max_wait_milliseconds = 0);

/// Get the status of the table. If with_zk_fields = false - do not fill in the fields that require queries to ZK.
void getStatus(ReplicatedTableStatus & res, bool with_zk_fields = true);
Expand Down