Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keeper: retry on failure in Disk related operations #59980

Merged
merged 3 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion programs/keeper-converter/KeeperConverter.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <iostream>
#include <boost/program_options.hpp>

#include <Coordination/CoordinationSettings.h>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/ZooKeeperDataReader.h>
#include <Common/TerminalSize.h>
Expand Down Expand Up @@ -39,7 +40,7 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv)

try
{
auto keeper_context = std::make_shared<KeeperContext>(true);
auto keeper_context = std::make_shared<KeeperContext>(true, std::make_shared<CoordinationSettings>());
keeper_context->setDigestEnabled(true);
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("Keeper-snapshots", options["output-dir"].as<std::string>()));

Expand Down
203 changes: 139 additions & 64 deletions src/Coordination/Changelog.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <chrono>
#include <filesystem>
#include <Coordination/Changelog.h>
#include <Coordination/CoordinationSettings.h>
#include <Disks/DiskLocal.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
Expand Down Expand Up @@ -35,21 +37,86 @@ namespace

constexpr std::string_view tmp_prefix = "tmp_";

void moveFileBetweenDisks(DiskPtr disk_from, ChangelogFileDescriptionPtr description, DiskPtr disk_to, const std::string & path_to)
void moveFileBetweenDisks(
DiskPtr disk_from,
ChangelogFileDescriptionPtr description,
DiskPtr disk_to,
const std::string & path_to,
const KeeperContextPtr & keeper_context)
{
auto logger = getLogger("Changelog");
LOG_TRACE(logger, "Moving {} to {} from disk {} to disk {}", description->path, path_to, disk_from->getName(), disk_to->getName());
/// we use empty file with prefix tmp_ to detect incomplete copies
/// if a copy is complete we don't care from which disk we use the same file
/// so it's okay if a failure happens after removing of tmp file but before we remove
/// the changelog from the source disk
auto from_path = fs::path(description->path);
auto tmp_changelog_name = from_path.parent_path() / (std::string{tmp_prefix} + from_path.filename().string());

const auto & coordination_settings = keeper_context->getCoordinationSettings();
auto max_retries_on_init = coordination_settings->disk_move_retries_during_init.value;
auto retries_sleep = std::chrono::milliseconds(coordination_settings->disk_move_retries_wait_ms);
auto run_with_retries = [&](const auto & op, std::string_view operation_description)
{
auto buf = disk_to->writeFile(tmp_changelog_name);
buf->finalize();
/// we limit the amount of retries during initialization phase because shutdown won't be set
/// before initialization is done, i.e. we would be stuck in infinite loop
size_t retry_num = 0;
do
{
try
{
op();
return true;
}
catch (...)
{
tryLogCurrentException(
logger,
fmt::format(
"While moving changelog {} to disk {} and running '{}'",
description->path,
disk_to->getName(),
operation_description));
std::this_thread::sleep_for(retries_sleep);
}

++retry_num;
if (keeper_context->getServerState() == KeeperContext::Phase::INIT && retry_num == max_retries_on_init)
{
LOG_ERROR(logger, "Operation '{}' failed too many times", operation_description);
break;
}

} while (!keeper_context->isShutdownCalled());

LOG_ERROR(
getLogger("Changelog"),
"Failed to run '{}' while moving changelog {} to disk {}",
operation_description,
description->path,
disk_to->getName());
return false;
};

std::array<std::pair<std::function<void()>, std::string_view>, 4> operations{
std::pair{
[&]
{
auto buf = disk_to->writeFile(tmp_changelog_name);
buf->finalize();
},
"creating temporary file"},
std::pair{[&] { disk_from->copyFile(from_path, *disk_to, path_to, {}); }, "copying file"},
std::pair{[&] { disk_to->removeFileIfExists(tmp_changelog_name); }, "removing temporary file"},
std::pair{[&] { disk_from->removeFileIfExists(description->path); }, "removing changelog file from source disk"},
};

for (const auto & [op, operation_description] : operations)
{
if (!run_with_retries(op, operation_description))
return;
}
disk_from->copyFile(from_path, *disk_to, path_to, {});
disk_to->removeFile(tmp_changelog_name);
disk_from->removeFile(description->path);

antonio2368 marked this conversation as resolved.
Show resolved Hide resolved
description->path = path_to;
description->disk = disk_to;
}
Expand Down Expand Up @@ -173,7 +240,7 @@ class ChangelogWriter
}
else
{
moveFileBetweenDisks(log_disk, current_file_description, disk, new_path);
moveFileBetweenDisks(log_disk, current_file_description, disk, new_path, keeper_context);
}
}
}
Expand All @@ -196,7 +263,7 @@ class ChangelogWriter
}
catch (...)
{
tryLogCurrentException(log);
tryLogCurrentException(log, "While setting new changelog file");
throw;
}
}
Expand Down Expand Up @@ -813,7 +880,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
auto disk = getDisk();

if (latest_log_disk != disk && latest_log_disk == description->disk)
moveFileBetweenDisks(latest_log_disk, description, disk, description->path);
moveFileBetweenDisks(latest_log_disk, description, disk, description->path, keeper_context);
};

/// we can have empty log (with zero entries) and last_log_read_result will be initialized
Expand Down Expand Up @@ -899,7 +966,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
}

if (description->disk != disk)
moveFileBetweenDisks(description->disk, description, disk, description->path);
moveFileBetweenDisks(description->disk, description, disk, description->path, keeper_context);
}


Expand All @@ -921,7 +988,7 @@ void Changelog::initWriter(ChangelogFileDescriptionPtr description)
auto log_disk = description->disk;
auto latest_log_disk = getLatestLogDisk();
if (log_disk != latest_log_disk)
moveFileBetweenDisks(log_disk, description, latest_log_disk, description->path);
moveFileBetweenDisks(log_disk, description, latest_log_disk, description->path, keeper_context);

current_writer->setFile(std::move(description), WriteMode::Append);
}
Expand Down Expand Up @@ -984,11 +1051,11 @@ void Changelog::removeExistingLogs(ChangelogIter begin, ChangelogIter end)
catch (const DB::Exception & e)
{
if (e.code() == DB::ErrorCodes::NOT_IMPLEMENTED)
moveFileBetweenDisks(changelog_disk, changelog_description, disk, new_path);
moveFileBetweenDisks(changelog_disk, changelog_description, disk, new_path, keeper_context);
}
}
else
moveFileBetweenDisks(changelog_disk, changelog_description, disk, new_path);
moveFileBetweenDisks(changelog_disk, changelog_description, disk, new_path, keeper_context);

itr = existing_changelogs.erase(itr);
}
Expand Down Expand Up @@ -1085,70 +1152,78 @@ void Changelog::writeThread()
LOG_WARNING(log, "Changelog is shut down");
};

/// NuRaft writes a batch of request by first calling multiple store requests, i.e. AppendLog
/// finished by a flush request
/// We assume that after some number of appends, we always get flush request
while (true)
try
{
if (try_batch_flush)
/// NuRaft writes a batch of request by first calling multiple store requests, i.e. AppendLog
/// finished by a flush request
/// We assume that after some number of appends, we always get flush request
while (true)
{
try_batch_flush = false;
/// we have Flush request stored in write operation
/// but we try to get new append operations
/// if there are none, we apply the currently set Flush
chassert(std::holds_alternative<Flush>(write_operation));
if (!write_operations.tryPop(write_operation))
if (try_batch_flush)
{
chassert(batch_append_ok);
const auto & flush = std::get<Flush>(write_operation);
flush_logs(flush);
notify_append_completion();
if (!write_operations.pop(write_operation))
break;
try_batch_flush = false;
/// we have Flush request stored in write operation
/// but we try to get new append operations
/// if there are none, we apply the currently set Flush
chassert(std::holds_alternative<Flush>(write_operation));
if (!write_operations.tryPop(write_operation))
{
chassert(batch_append_ok);
const auto & flush = std::get<Flush>(write_operation);
flush_logs(flush);
notify_append_completion();
if (!write_operations.pop(write_operation))
break;
}
}
else if (!write_operations.pop(write_operation))
{
break;
}
}
else if (!write_operations.pop(write_operation))
{
break;
}

assert(initialized);

if (auto * append_log = std::get_if<AppendLog>(&write_operation))
{
if (!batch_append_ok)
continue;

std::lock_guard writer_lock(writer_mutex);
assert(current_writer);

batch_append_ok = current_writer->appendRecord(buildRecord(append_log->index, append_log->log_entry));
++pending_appends;
}
else
{
const auto & flush = std::get<Flush>(write_operation);
assert(initialized);

if (batch_append_ok)
if (auto * append_log = std::get_if<AppendLog>(&write_operation))
{
/// we can try batching more logs for flush
if (pending_appends < flush_settings.max_flush_batch_size)
{
try_batch_flush = true;
if (!batch_append_ok)
continue;
}
/// we need to flush because we have maximum allowed pending records
flush_logs(flush);

std::lock_guard writer_lock(writer_mutex);
assert(current_writer);

batch_append_ok = current_writer->appendRecord(buildRecord(append_log->index, append_log->log_entry));
++pending_appends;
}
else
{
std::lock_guard lock{durable_idx_mutex};
*flush.failed = true;
const auto & flush = std::get<Flush>(write_operation);

if (batch_append_ok)
{
/// we can try batching more logs for flush
if (pending_appends < flush_settings.max_flush_batch_size)
{
try_batch_flush = true;
continue;
}
/// we need to flush because we have maximum allowed pending records
flush_logs(flush);
}
else
{
std::lock_guard lock{durable_idx_mutex};
*flush.failed = true;
}
notify_append_completion();
batch_append_ok = true;
}
notify_append_completion();
batch_append_ok = true;
}
}
catch (...)
{
tryLogCurrentException(log, "Write thread failed, aborting");
std::abort();
}
}


Expand Down Expand Up @@ -1191,7 +1266,7 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
auto log_disk = description->disk;
auto latest_log_disk = getLatestLogDisk();
if (log_disk != latest_log_disk)
moveFileBetweenDisks(log_disk, description, latest_log_disk, description->path);
moveFileBetweenDisks(log_disk, description, latest_log_disk, description->path, keeper_context);

current_writer->setFile(std::move(description), WriteMode::Append);

Expand Down
5 changes: 4 additions & 1 deletion src/Coordination/CoordinationSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ struct Settings;
M(UInt64, log_file_overallocate_size, 50 * 1024 * 1024, "If max_log_file_size is not set to 0, this value will be added to it for preallocating bytes on disk. If a log record is larger than this value, it could lead to uncaught out-of-space issues so a larger value is preferred", 0) \
M(UInt64, min_request_size_for_cache, 50 * 1024, "Minimal size of the request to cache the deserialization result. Caching can have negative effect on latency for smaller requests, set to 0 to disable", 0) \
M(UInt64, raft_limits_reconnect_limit, 50, "If connection to a peer is silent longer than this limit * (multiplied by heartbeat interval), we re-establish the connection.", 0) \
M(Bool, async_replication, false, "Enable async replication. All write and read guarantees are preserved while better performance is achieved. Settings is disabled by default to not break backwards compatibility.", 0)
M(Bool, async_replication, false, "Enable async replication. All write and read guarantees are preserved while better performance is achieved. Settings is disabled by default to not break backwards compatibility.", 0) \
M(UInt64, disk_move_retries_wait_ms, 1000, "How long to wait between retries after a failure which happened while a file was being moved between disks.", 0) \
M(UInt64, disk_move_retries_during_init, 100, "The amount of retries after a failure which happened while a file was being moved between disks during initialization.", 0)


DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS)

Expand Down
8 changes: 3 additions & 5 deletions src/Coordination/KeeperAsynchronousMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
size_t ephemerals_count = 0;
size_t approximate_data_size = 0;
size_t key_arena_size = 0;
size_t latest_snapshot_size = 0;
size_t open_file_descriptor_count = 0;
std::optional<size_t> max_file_descriptor_count = 0;
size_t followers = 0;
Expand All @@ -46,11 +45,8 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM
ephemerals_count = state_machine.getTotalEphemeralNodesCount();
approximate_data_size = state_machine.getApproximateDataSize();
key_arena_size = state_machine.getKeyArenaSize();
latest_snapshot_size = state_machine.getLatestSnapshotBufSize();
session_with_watches = state_machine.getSessionsWithWatchesCount();
paths_watched = state_machine.getWatchedPathsCount();
//snapshot_dir_size = keeper_dispatcher.getSnapDirSize();
//log_dir_size = keeper_dispatcher.getLogDirSize();

# if defined(__linux__) || defined(__APPLE__)
open_file_descriptor_count = getCurrentProcessFDCount();
Expand All @@ -76,7 +72,9 @@ void updateKeeperInformation(KeeperDispatcher & keeper_dispatcher, AsynchronousM

new_values["KeeperApproximateDataSize"] = { approximate_data_size, "The approximate data size of ClickHouse Keeper, in bytes." };
new_values["KeeperKeyArenaSize"] = { key_arena_size, "The size in bytes of the memory arena for keys in ClickHouse Keeper." };
new_values["KeeperLatestSnapshotSize"] = { latest_snapshot_size, "The uncompressed size in bytes of the latest snapshot created by ClickHouse Keeper." };
/// TODO: value was incorrectly set to 0 previously for local snapshots
/// it needs to be fixed and it needs to be atomic to avoid deadlock
///new_values["KeeperLatestSnapshotSize"] = { latest_snapshot_size, "The uncompressed size in bytes of the latest snapshot created by ClickHouse Keeper." };

new_values["KeeperOpenFileDescriptorCount"] = { open_file_descriptor_count, "The number of open file descriptors in ClickHouse Keeper." };
if (max_file_descriptor_count.has_value())
Expand Down
16 changes: 12 additions & 4 deletions src/Coordination/KeeperContext.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#include <Coordination/KeeperContext.h>

#include <Coordination/Defines.h>
#include <Coordination/KeeperConstants.h>
#include <Coordination/KeeperFeatureFlags.h>
#include <Disks/DiskLocal.h>
#include <Interpreters/Context.h>
#include <Disks/DiskSelector.h>
#include <IO/S3/Credentials.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Coordination/KeeperConstants.h>
#include <Common/logger_useful.h>
#include <Coordination/KeeperFeatureFlags.h>

#include <boost/algorithm/string.hpp>

namespace DB
Expand All @@ -20,9 +22,10 @@ extern const int BAD_ARGUMENTS;

}

KeeperContext::KeeperContext(bool standalone_keeper_)
KeeperContext::KeeperContext(bool standalone_keeper_, CoordinationSettingsPtr coordination_settings_)
: disk_selector(std::make_shared<DiskSelector>())
, standalone_keeper(standalone_keeper_)
, coordination_settings(std::move(coordination_settings_))
{
/// enable by default some feature flags
feature_flags.enableFeatureFlag(KeeperFeatureFlag::FILTERED_LIST);
Expand Down Expand Up @@ -416,4 +419,9 @@ void KeeperContext::waitLocalLogsPreprocessedOrShutdown()
local_logs_preprocessed_cv.wait(lock, [this]{ return shutdown_called || local_logs_preprocessed; });
}

const CoordinationSettingsPtr & KeeperContext::getCoordinationSettings() const
{
return coordination_settings;
}

}