Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a couple of bugs that may cause replicas to diverge #27808

Merged
merged 2 commits into from Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -145,6 +145,7 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP

if (found_part_with_the_same_min_block && found_part_with_the_same_max_block)
{
/// FIXME It may never appear
LOG_WARNING(log, "Found parts with the same min block and with the same max block as the missing part {}. Hoping that it will eventually appear as a result of a merge.", part_name);
return MissingPartSearchResult::FoundAndDontNeedFetch;
}
Expand Down
27 changes: 23 additions & 4 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Expand Up @@ -23,6 +23,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int UNEXPECTED_NODE_IN_ZOOKEEPER;
extern const int ABORTED;
extern const int READONLY;
}


Expand Down Expand Up @@ -472,9 +473,15 @@ bool ReplicatedMergeTreeQueue::removeFailedQuorumPart(const MergeTreePartInfo &
return virtual_parts.remove(part_info);
}

int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback)
int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback, PullLogsReason reason)
{
std::lock_guard lock(pull_logs_to_queue_mutex);
if (storage.is_readonly && reason == SYNC)
{
throw Exception(ErrorCodes::READONLY, "Cannot SYNC REPLICA, because replica is readonly");
tavplubix marked this conversation as resolved.
Show resolved Hide resolved
/// TODO throw logical error for other reasons (except LOAD)
}

if (pull_log_blocker.isCancelled())
throw Exception("Log pulling is cancelled", ErrorCodes::ABORTED);

Expand Down Expand Up @@ -714,13 +721,22 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C

std::vector<std::future<Coordination::GetResponse>> futures;
for (const String & entry : entries_to_load)
futures.emplace_back(zookeeper->asyncGet(fs::path(zookeeper_path) / "mutations" / entry));
futures.emplace_back(zookeeper->asyncTryGet(fs::path(zookeeper_path) / "mutations" / entry));

std::vector<ReplicatedMergeTreeMutationEntryPtr> new_mutations;
for (size_t i = 0; i < entries_to_load.size(); ++i)
{
auto maybe_response = futures[i].get();
if (maybe_response.error != Coordination::Error::ZOK)
{
assert(maybe_response.error == Coordination::Error::ZNONODE);
/// It's ok if it happened on server startup or table creation and replica loads all mutation entries.
/// It's also ok if mutation was killed.
LOG_WARNING(log, "Cannot get mutation node {} ({}), probably it was concurrently removed", entries_to_load[i], maybe_response.error);
continue;
}
new_mutations.push_back(std::make_shared<ReplicatedMergeTreeMutationEntry>(
ReplicatedMergeTreeMutationEntry::parse(futures[i].get().data, entries_to_load[i])));
ReplicatedMergeTreeMutationEntry::parse(maybe_response.data, entries_to_load[i])));
}

bool some_mutations_are_probably_done = false;
Expand Down Expand Up @@ -1504,6 +1520,9 @@ MutationCommands ReplicatedMergeTreeQueue::getMutationCommands(
/// to allow recovering from a mutation that cannot be executed. This way you can delete the mutation entry
/// from /mutations in ZK and the replicas will simply skip the mutation.

/// NOTE: However, it's quite dangerous to skip MUTATE_PART. Replicas may diverge if one of them have executed part mutation,
/// and then mutation was killed before execution of MUTATE_PART on remaining replicas.

if (part->info.getDataVersion() > desired_mutation_version)
{
LOG_WARNING(log, "Data version of part {} is already greater than desired mutation version {}", part->name, desired_mutation_version);
Expand Down Expand Up @@ -1831,7 +1850,7 @@ ReplicatedMergeTreeMergePredicate::ReplicatedMergeTreeMergePredicate(
}
}

merges_version = queue_.pullLogsToQueue(zookeeper);
merges_version = queue_.pullLogsToQueue(zookeeper, {}, ReplicatedMergeTreeQueue::MERGE_PREDICATE);

{
/// We avoid returning here a version to be used in a lightweight transaction.
Expand Down
11 changes: 10 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeQueue.h
Expand Up @@ -294,13 +294,22 @@ class ReplicatedMergeTreeQueue

bool removeFailedQuorumPart(const MergeTreePartInfo & part_info);

enum PullLogsReason
{
LOAD,
UPDATE,
MERGE_PREDICATE,
SYNC,
OTHER,
};

/** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value.
* If watch_callback is not empty, will call it when new entries appear in the log.
* If there were new entries, notifies storage.queue_task_handle.
* Additionally loads mutations (so that the set of mutations is always more recent than the queue).
* Return the version of "logs" node (that is updated for every merge/mutation/... added to the log)
*/
int32_t pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {});
int32_t pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback = {}, PullLogsReason reason = OTHER);

/// Load new mutation entries. If something new is loaded, schedule storage.merge_selecting_task.
/// If watch_callback is not empty, will call it when new mutations appear in ZK.
Expand Down
23 changes: 20 additions & 3 deletions src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp
Expand Up @@ -25,6 +25,8 @@ namespace DB
namespace ErrorCodes
{
extern const int REPLICA_IS_ALREADY_ACTIVE;
extern const int REPLICA_STATUS_CHANGED;

}

namespace
Expand Down Expand Up @@ -55,6 +57,7 @@ void ReplicatedMergeTreeRestartingThread::run()
if (need_stop)
return;

bool reschedule_now = false;
try
{
if (first_time || readonly_mode_was_set || storage.getZooKeeper()->expired())
Expand Down Expand Up @@ -131,15 +134,29 @@ void ReplicatedMergeTreeRestartingThread::run()
first_time = false;
}
}
catch (...)
catch (const Exception & e)
{
/// We couldn't activate table let's set it into readonly mode
setReadonly();
partialShutdown();
storage.startup_event.set();
tryLogCurrentException(log, __PRETTY_FUNCTION__);

if (e.code() == ErrorCodes::REPLICA_STATUS_CHANGED)
reschedule_now = true;
}
catch (...)
{
setReadonly();
partialShutdown();
storage.startup_event.set();
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}

task->scheduleAfter(check_period_ms);
if (reschedule_now)
task->schedule();
else
task->scheduleAfter(check_period_ms);
}


Expand All @@ -159,7 +176,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()

/// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost);
/// because cleanup_thread doesn't delete log_pointer of active replicas.
storage.queue.pullLogsToQueue(zookeeper);
storage.queue.pullLogsToQueue(zookeeper, {}, ReplicatedMergeTreeQueue::LOAD);
storage.queue.removeCurrentPartsFromMutations();
storage.last_queue_update_finish_time.store(time(nullptr));

Expand Down
51 changes: 27 additions & 24 deletions src/Storages/StorageReplicatedMergeTree.cpp
Expand Up @@ -141,6 +141,7 @@ namespace ErrorCodes
extern const int DUPLICATE_DATA_PART;
extern const int BAD_ARGUMENTS;
extern const int CONCURRENT_ACCESS_NOT_SUPPORTED;
extern const int CHECKSUM_DOESNT_MATCH;
}

namespace ActionLocks
Expand Down Expand Up @@ -1314,32 +1315,35 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
}

ReplicatedMergeTreePartHeader replica_part_header;
if (!part_zk_str.empty())
replica_part_header = ReplicatedMergeTreePartHeader::fromString(part_zk_str);
else
if (part_zk_str.empty())
{
Coordination::Stat columns_stat_before, columns_stat_after;
String columns_str;
String checksums_str;
/// Let's check that the node's version with the columns did not change while we were reading the checksums.
/// This ensures that the columns and the checksum refer to the same
if (!zookeeper->tryGet(fs::path(current_part_path) / "columns", columns_str, &columns_stat_before) ||
!zookeeper->tryGet(fs::path(current_part_path) / "checksums", checksums_str) ||
!zookeeper->exists(fs::path(current_part_path) / "columns", &columns_stat_after) ||
columns_stat_before.version != columns_stat_after.version)
if (zookeeper->tryGet(fs::path(current_part_path) / "columns", columns_str) &&
zookeeper->tryGet(fs::path(current_part_path) / "checksums", checksums_str))
{
replica_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes(columns_str, checksums_str);
}
else
{
LOG_INFO(log, "Not checking checksums of part {} with replica {} because part changed while we were reading its checksums", part_name, replica);
if (zookeeper->exists(current_part_path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} has empty header and does not have columns and checksums. "
"Looks like a bug.", current_part_path);
LOG_INFO(log, "Not checking checksums of part {} with replica {} because part was removed from ZooKeeper", part_name, replica);
continue;
}

replica_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes(
columns_str, checksums_str);
}
else
{
replica_part_header = ReplicatedMergeTreePartHeader::fromString(part_zk_str);
}

if (replica_part_header.getColumnsHash() != local_part_header.getColumnsHash())
{
LOG_INFO(log, "Not checking checksums of part {} with replica {} because columns are different", part_name, replica);
continue;
/// Either it's a bug or ZooKeeper contains broken data.
/// TODO Fix KILL MUTATION and replace CHECKSUM_DOESNT_MATCH with LOGICAL_ERROR
/// (some replicas may skip killed mutation even if it was executed on other replicas)
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH, "Part {} from {} has different columns hash", part_name, replica);
}

replica_part_header.getChecksums().checkEqual(local_part_header.getChecksums(), true);
Expand Down Expand Up @@ -2137,6 +2141,8 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
if (!parts_for_merge.empty() && replica.empty())
{
LOG_INFO(log, "No active replica has part {}. Will fetch merged part instead.", entry.new_part_name);
/// We should enqueue it for check, because merged part may never appear if source part is lost
enqueuePartForCheck(entry.new_part_name);
return false;
}

Expand Down Expand Up @@ -3083,7 +3089,7 @@ void StorageReplicatedMergeTree::queueUpdatingTask()
}
try
{
queue.pullLogsToQueue(getZooKeeper(), queue_updating_task->getWatchCallback());
queue.pullLogsToQueue(getZooKeeper(), queue_updating_task->getWatchCallback(), ReplicatedMergeTreeQueue::UPDATE);
last_queue_update_finish_time.store(time(nullptr));
queue_update_in_progress = false;
}
Expand Down Expand Up @@ -4321,9 +4327,6 @@ void StorageReplicatedMergeTree::startup()
/// Wait while restarting_thread initializes LeaderElection (and so on) or makes first attempt to do it
startup_event.wait();

/// If we don't separate create/start steps, race condition will happen
/// between the assignment of queue_task_handle and queueTask that use the queue_task_handle.
background_executor.start();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we don't need it now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we start it here:

startBackgroundMovesIfNeeded();

part_moves_between_shards_orchestrator.start();
Expand Down Expand Up @@ -5460,9 +5463,9 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry(

const auto & stop_waiting = [&]()
{
bool stop_waiting_itself = waiting_itself && (partial_shutdown_called || is_dropped);
bool stop_waiting_itself = waiting_itself && partial_shutdown_called;
bool stop_waiting_non_active = !wait_for_non_active && !getZooKeeper()->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active");
return stop_waiting_itself || stop_waiting_non_active;
return is_dropped || stop_waiting_itself || stop_waiting_non_active;
};

/// Don't recheck ZooKeeper too often
Expand Down Expand Up @@ -6058,7 +6061,7 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio

zkutil::ZooKeeperPtr zookeeper = getZooKeeper();

LOG_TRACE(log, "Killing mutation {}", mutation_id);
LOG_INFO(log, "Killing mutation {}", mutation_id);

auto mutation_entry = queue.removeMutation(zookeeper, mutation_id);
if (!mutation_entry)
Expand Down Expand Up @@ -6964,7 +6967,7 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI
Stopwatch watch;

/// Let's fetch new log entries firstly
queue.pullLogsToQueue(getZooKeeper());
queue.pullLogsToQueue(getZooKeeper(), {}, ReplicatedMergeTreeQueue::SYNC);

/// This is significant, because the execution of this task could be delayed at BackgroundPool.
/// And we force it to be executed.
Expand Down