Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup moving parts #50489

Merged
merged 9 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,13 +492,17 @@ void IMergeTreeDataPart::removeIfNeeded()

if (is_temp)
{
String file_name = fileName(getDataPartStorage().getPartDirectory());
const auto & part_directory = getDataPartStorage().getPartDirectory();

String file_name = fileName(part_directory);

if (file_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set",
getDataPartStorage().getPartDirectory(), name);

if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj"))
const auto part_parent_directory = directoryPath(part_directory);
bool is_moving_part = part_parent_directory.ends_with("moving/");
if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj") && !is_moving_part)
{
LOG_ERROR(
storage.log,
Expand All @@ -507,6 +511,11 @@ void IMergeTreeDataPart::removeIfNeeded()
path);
return;
}

if (is_moving_part)
{
LOG_TRACE(storage.log, "Removing unneeded moved part from {}", path);
}
}

remove();
Expand Down
30 changes: 24 additions & 6 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/ThreadFuzzer.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/Config/ConfigHelper.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeEnum.h>
Expand Down Expand Up @@ -2000,6 +2001,21 @@ static bool isOldPartDirectory(const DiskPtr & disk, const String & directory_pa


size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes)
{
size_t cleared_count = 0;

cleared_count += clearOldTemporaryDirectories(relative_data_path, custom_directories_lifetime_seconds, valid_prefixes);

if (allowRemoveStaleMovingParts())
{
/// Clear _all_ parts from the `moving` directory
cleared_count += clearOldTemporaryDirectories(fs::path(relative_data_path) / "moving", custom_directories_lifetime_seconds, {""});
}

return cleared_count;
}

size_t MergeTreeData::clearOldTemporaryDirectories(const String & root_path, size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes)
{
/// If the method is already called from another thread, then we don't need to do anything.
std::unique_lock lock(clear_old_temporary_directories_mutex, std::defer_lock);
Expand All @@ -2018,7 +2034,7 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif
if (disk->isBroken())
continue;

for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
for (auto it = disk->iterateDirectory(root_path); it->isValid(); it->next())
{
const std::string & basename = it->name();
bool start_with_valid_prefix = false;
Expand Down Expand Up @@ -7802,7 +7818,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
for (const auto & moving_part : moving_tagger->parts_to_move)
{
Stopwatch stopwatch;
MutableDataPartPtr cloned_part;
MergeTreePartsMover::TemporaryClonedPart cloned_part;
ProfileEventsScope profile_events_scope;

auto write_part_log = [&](const ExecutionStatus & execution_status)
Expand All @@ -7812,7 +7828,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
execution_status,
stopwatch.elapsed(),
moving_part.part->name,
cloned_part,
cloned_part.part,
{moving_part.part},
nullptr,
profile_events_scope.getSnapshot());
Expand Down Expand Up @@ -7888,9 +7904,6 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException("", true));
if (cloned_part)
cloned_part->remove();

throw;
}
}
Expand Down Expand Up @@ -8405,6 +8418,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
return new_data_part;
}

bool MergeTreeData::allowRemoveStaleMovingParts() const
{
return ConfigHelper::getBool(getContext()->getConfigRef(), "allow_remove_stale_moving_parts");
}

CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()
{
std::lock_guard lock(storage.currently_submerging_emerging_mutex);
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
/// Delete all directories which names begin with "tmp"
/// Must be called with locked lockForShare() because it's using relative_data_path.
size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes = {"tmp_", "tmp-fetch_"});
size_t clearOldTemporaryDirectories(const String & root_path, size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes);

size_t clearEmptyParts();

Expand Down Expand Up @@ -1059,6 +1060,9 @@ class MergeTreeData : public IStorage, public WithMutableContext
void waitForOutdatedPartsToBeLoaded() const;
bool canUsePolymorphicParts() const;

/// TODO: make enabled by default in the next release if no problems found.
bool allowRemoveStaleMovingParts() const;

protected:
friend class IMergeTreeDataPart;
friend class MergeTreeDataMergerMutator;
Expand Down
46 changes: 31 additions & 15 deletions src/Storages/MergeTree/MergeTreePartsMover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ABORTED;
extern const int DIRECTORY_ALREADY_EXISTS;
}

namespace
Expand Down Expand Up @@ -203,7 +204,7 @@ bool MergeTreePartsMover::selectPartsForMove(
return false;
}

MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part) const
MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part) const
{
if (moves_blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");
Expand All @@ -212,6 +213,8 @@ MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEn
auto part = moving_part.part;
auto disk = moving_part.reserved_space->getDisk();
LOG_DEBUG(log, "Cloning part {} from '{}' to '{}'", part->name, part->getDataPartStorage().getDiskName(), disk->getName());
TemporaryClonedPart cloned_part;
cloned_part.temporary_directory_lock = data->getTemporaryPartDirectoryHolder(part->name);

MutableDataPartStoragePtr cloned_part_storage;
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
Expand All @@ -222,8 +225,10 @@ MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEn
String relative_path = part->getDataPartStorage().getPartDirectory();
if (disk->exists(path_to_clone + relative_path))
{
LOG_WARNING(log, "Path {} already exists. Will remove it and clone again.", fullPath(disk, path_to_clone + relative_path));
disk->removeRecursive(fs::path(path_to_clone) / relative_path / "");
throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS,
"Cannot clone part {} from '{}' to '{}': path '{}' already exists",
part->name, part->getDataPartStorage().getDiskName(), disk->getName(),
fullPath(disk, path_to_clone + relative_path));
}

disk->createDirectories(path_to_clone);
Expand All @@ -242,37 +247,48 @@ MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEn
}

MergeTreeDataPartBuilder builder(*data, part->name, cloned_part_storage);
auto cloned_part = std::move(builder).withPartFormatFromDisk().build();
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part->getDataPartStorage().getFullPath());
cloned_part.part = std::move(builder).withPartFormatFromDisk().build();
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part.part->getDataPartStorage().getFullPath());

cloned_part->loadColumnsChecksumsIndexes(true, true);
cloned_part->loadVersionMetadata();
cloned_part->modification_time = cloned_part->getDataPartStorage().getLastModified().epochTime();
cloned_part.part->is_temp = data->allowRemoveStaleMovingParts();
cloned_part.part->loadColumnsChecksumsIndexes(true, true);
cloned_part.part->loadVersionMetadata();
cloned_part.part->modification_time = cloned_part.part->getDataPartStorage().getLastModified().epochTime();
return cloned_part;
}


void MergeTreePartsMover::swapClonedPart(const MergeTreeMutableDataPartPtr & cloned_part) const
void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const
{
if (moves_blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");

auto active_part = data->getActiveContainingPart(cloned_part->name);
auto active_part = data->getActiveContainingPart(cloned_part.part->name);

/// It's ok, because we don't block moving parts for merges or mutations
if (!active_part || active_part->name != cloned_part->name)
if (!active_part || active_part->name != cloned_part.part->name)
{
LOG_INFO(log, "Failed to swap {}. Active part doesn't exist. Possible it was merged or mutated. Will remove copy on path '{}'.", cloned_part->name, cloned_part->getDataPartStorage().getFullPath());
LOG_INFO(log,
"Failed to swap {}. Active part doesn't exist (containing part {}). "
"Possible it was merged or mutated. Part on path '{}' {}",
cloned_part.part->name,
active_part ? active_part->name : "doesn't exist",
cloned_part.part->getDataPartStorage().getFullPath(),
data->allowRemoveStaleMovingParts() ? "will be removed" : "will remain intact (set <allow_remove_stale_moving_parts> in config.xml, exercise caution when using)");
return;
}

cloned_part.part->is_temp = false;

/// Don't remove new directory but throw an error because it may contain part which is currently in use.
cloned_part->renameTo(active_part->name, false);
cloned_part.part->renameTo(active_part->name, false);

/// TODO what happen if server goes down here?
data->swapActivePart(cloned_part);
data->swapActivePart(cloned_part.part);

LOG_TRACE(log, "Part {} was moved to {}", cloned_part.part->name, cloned_part.part->getDataPartStorage().getFullPath());

LOG_TRACE(log, "Part {} was moved to {}", cloned_part->name, cloned_part->getDataPartStorage().getFullPath());
cloned_part.temporary_directory_lock = {};
}

}
12 changes: 10 additions & 2 deletions src/Storages/MergeTree/MergeTreePartsMover.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <functional>
#include <optional>
#include <vector>
#include <base/scope_guard.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MovesList.h>
Expand Down Expand Up @@ -43,12 +44,19 @@ class MergeTreePartsMover
using AllowedMovingPredicate = std::function<bool(const std::shared_ptr<const IMergeTreeDataPart> &, String * reason)>;

public:

explicit MergeTreePartsMover(MergeTreeData * data_)
: data(data_)
, log(&Poco::Logger::get("MergeTreePartsMover"))
{
}

struct TemporaryClonedPart
{
MergeTreeMutableDataPartPtr part;
scope_guard temporary_directory_lock;
};

/// Select parts for background moves according to storage_policy configuration.
/// Returns true if at least one part was selected for move.
bool selectPartsForMove(
Expand All @@ -57,14 +65,14 @@ class MergeTreePartsMover
const std::lock_guard<std::mutex> & moving_parts_lock);

/// Copies part to selected reservation in detached folder. Throws exception if part already exists.
MergeTreeMutableDataPartPtr clonePart(const MergeTreeMoveEntry & moving_part) const;
TemporaryClonedPart clonePart(const MergeTreeMoveEntry & moving_part) const;

/// Replaces cloned part from detached directory into active data parts set.
/// Replacing part changes state to DeleteOnDestroy and will be removed from disk after destructor of
/// IMergeTreeDataPart called. If replacing part doesn't exists or not active (committed) than
/// cloned part will be removed and log message will be reported. It may happen in case of concurrent
/// merge or mutation.
void swapClonedPart(const MergeTreeMutableDataPartPtr & cloned_parts) const;
void swapClonedPart(TemporaryClonedPart & cloned_part) const;

/// Can stop background moves and moves from queries
ActionBlocker moves_blocker;
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<two_disks>
<volumes>
<default>
<disk>default</disk>
</default>
<external>
<disk>s3</disk>
</external>
</volumes>
</two_disks>
</policies>
</storage_configuration>

<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
</clickhouse>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<clickhouse>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<max_concurrent_queries>500</max_concurrent_queries>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</clickhouse>