Skip to content

Commit

Permalink
Merge pull request #51256 from ClickHouse/backport/23.3/50489
Browse files Browse the repository at this point in the history
Backport #50489 to 23.3: Cleanup moving parts
  • Loading branch information
KochetovNicolai committed Jun 22, 2023
2 parents 79f278f + 3c63533 commit f5fbc2f
Show file tree
Hide file tree
Showing 32 changed files with 315 additions and 34 deletions.
13 changes: 11 additions & 2 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,13 +489,17 @@ void IMergeTreeDataPart::removeIfNeeded()

if (is_temp)
{
String file_name = fileName(getDataPartStorage().getPartDirectory());
const auto & part_directory = getDataPartStorage().getPartDirectory();

String file_name = fileName(part_directory);

if (file_name.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set",
getDataPartStorage().getPartDirectory(), name);

if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj"))
const auto part_parent_directory = directoryPath(part_directory);
bool is_moving_part = part_parent_directory.ends_with("moving/");
if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj") && !is_moving_part)
{
LOG_ERROR(
storage.log,
Expand All @@ -504,6 +508,11 @@ void IMergeTreeDataPart::removeIfNeeded()
path);
return;
}

if (is_moving_part)
{
LOG_TRACE(storage.log, "Removing unneeded moved part from {}", path);
}
}

remove();
Expand Down
32 changes: 26 additions & 6 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadFuzzer.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/Config/ConfigHelper.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeEnum.h>
Expand Down Expand Up @@ -1927,6 +1930,21 @@ static bool isOldPartDirectory(const DiskPtr & disk, const String & directory_pa


size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes)
{
size_t cleared_count = 0;

cleared_count += clearOldTemporaryDirectories(relative_data_path, custom_directories_lifetime_seconds, valid_prefixes);

if (allowRemoveStaleMovingParts())
{
/// Clear _all_ parts from the `moving` directory
cleared_count += clearOldTemporaryDirectories(fs::path(relative_data_path) / "moving", custom_directories_lifetime_seconds, {""});
}

return cleared_count;
}

size_t MergeTreeData::clearOldTemporaryDirectories(const String & root_path, size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes)
{
/// If the method is already called from another thread, then we don't need to do anything.
std::unique_lock lock(clear_old_temporary_directories_mutex, std::defer_lock);
Expand All @@ -1945,7 +1963,7 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif
if (disk->isBroken())
continue;

for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
for (auto it = disk->iterateDirectory(root_path); it->isValid(); it->next())
{
const std::string & basename = it->name();
bool start_with_valid_prefix = false;
Expand Down Expand Up @@ -7586,7 +7604,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
for (const auto & moving_part : moving_tagger->parts_to_move)
{
Stopwatch stopwatch;
MutableDataPartPtr cloned_part;
MergeTreePartsMover::TemporaryClonedPart cloned_part;
ProfileEventsScope profile_events_scope;

auto write_part_log = [&](const ExecutionStatus & execution_status)
Expand All @@ -7596,7 +7614,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
execution_status,
stopwatch.elapsed(),
moving_part.part->name,
cloned_part,
cloned_part.part,
{moving_part.part},
nullptr,
profile_events_scope.getSnapshot());
Expand Down Expand Up @@ -7672,9 +7690,6 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException("", true));
if (cloned_part)
cloned_part->remove();

throw;
}
}
Expand Down Expand Up @@ -8188,6 +8203,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
return new_data_part;
}

bool MergeTreeData::allowRemoveStaleMovingParts() const
{
return ConfigHelper::getBool(getContext()->getConfigRef(), "allow_remove_stale_moving_parts");
}

CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()
{
std::lock_guard lock(storage.currently_submerging_emerging_mutex);
Expand Down
4 changes: 4 additions & 0 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
/// Delete all directories which names begin with "tmp"
/// Must be called with locked lockForShare() because it's using relative_data_path.
size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes = {"tmp_", "tmp-fetch_"});
size_t clearOldTemporaryDirectories(const String & root_path, size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes);

size_t clearEmptyParts();

Expand Down Expand Up @@ -1054,6 +1055,9 @@ class MergeTreeData : public IStorage, public WithMutableContext
void waitForOutdatedPartsToBeLoaded() const;
bool canUsePolymorphicParts() const;

/// TODO: make enabled by default in the next release if no problems found.
bool allowRemoveStaleMovingParts() const;

protected:
friend class IMergeTreeDataPart;
friend class MergeTreeDataMergerMutator;
Expand Down
46 changes: 31 additions & 15 deletions src/Storages/MergeTree/MergeTreePartsMover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ABORTED;
extern const int DIRECTORY_ALREADY_EXISTS;
}

namespace
Expand Down Expand Up @@ -202,7 +203,7 @@ bool MergeTreePartsMover::selectPartsForMove(
return false;
}

MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part) const
MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part) const
{
if (moves_blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");
Expand All @@ -211,6 +212,8 @@ MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEn
auto part = moving_part.part;
auto disk = moving_part.reserved_space->getDisk();
LOG_DEBUG(log, "Cloning part {} from '{}' to '{}'", part->name, part->getDataPartStorage().getDiskName(), disk->getName());
TemporaryClonedPart cloned_part;
cloned_part.temporary_directory_lock = data->getTemporaryPartDirectoryHolder(part->name);

MutableDataPartStoragePtr cloned_part_storage;
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
Expand All @@ -221,8 +224,10 @@ MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEn
String relative_path = part->getDataPartStorage().getPartDirectory();
if (disk->exists(path_to_clone + relative_path))
{
LOG_WARNING(log, "Path {} already exists. Will remove it and clone again.", fullPath(disk, path_to_clone + relative_path));
disk->removeRecursive(fs::path(path_to_clone) / relative_path / "");
throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS,
"Cannot clone part {} from '{}' to '{}': path '{}' already exists",
part->name, part->getDataPartStorage().getDiskName(), disk->getName(),
fullPath(disk, path_to_clone + relative_path));
}

disk->createDirectories(path_to_clone);
Expand All @@ -241,37 +246,48 @@ MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEn
}

MergeTreeDataPartBuilder builder(*data, part->name, cloned_part_storage);
auto cloned_part = std::move(builder).withPartFormatFromDisk().build();
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part->getDataPartStorage().getFullPath());
cloned_part.part = std::move(builder).withPartFormatFromDisk().build();
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part.part->getDataPartStorage().getFullPath());

cloned_part->loadColumnsChecksumsIndexes(true, true);
cloned_part->loadVersionMetadata();
cloned_part->modification_time = cloned_part->getDataPartStorage().getLastModified().epochTime();
cloned_part.part->is_temp = data->allowRemoveStaleMovingParts();
cloned_part.part->loadColumnsChecksumsIndexes(true, true);
cloned_part.part->loadVersionMetadata();
cloned_part.part->modification_time = cloned_part.part->getDataPartStorage().getLastModified().epochTime();
return cloned_part;
}


void MergeTreePartsMover::swapClonedPart(const MergeTreeMutableDataPartPtr & cloned_part) const
void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const
{
if (moves_blocker.isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");

auto active_part = data->getActiveContainingPart(cloned_part->name);
auto active_part = data->getActiveContainingPart(cloned_part.part->name);

/// It's ok, because we don't block moving parts for merges or mutations
if (!active_part || active_part->name != cloned_part->name)
if (!active_part || active_part->name != cloned_part.part->name)
{
LOG_INFO(log, "Failed to swap {}. Active part doesn't exist. Possible it was merged or mutated. Will remove copy on path '{}'.", cloned_part->name, cloned_part->getDataPartStorage().getFullPath());
LOG_INFO(log,
"Failed to swap {}. Active part doesn't exist (containing part {}). "
"Possible it was merged or mutated. Part on path '{}' {}",
cloned_part.part->name,
active_part ? active_part->name : "doesn't exist",
cloned_part.part->getDataPartStorage().getFullPath(),
data->allowRemoveStaleMovingParts() ? "will be removed" : "will remain intact (set <allow_remove_stale_moving_parts> in config.xml, exercise caution when using)");
return;
}

cloned_part.part->is_temp = false;

/// Don't remove new directory but throw an error because it may contain part which is currently in use.
cloned_part->renameTo(active_part->name, false);
cloned_part.part->renameTo(active_part->name, false);

/// TODO what happen if server goes down here?
data->swapActivePart(cloned_part);
data->swapActivePart(cloned_part.part);

LOG_TRACE(log, "Part {} was moved to {}", cloned_part.part->name, cloned_part.part->getDataPartStorage().getFullPath());

LOG_TRACE(log, "Part {} was moved to {}", cloned_part->name, cloned_part->getDataPartStorage().getFullPath());
cloned_part.temporary_directory_lock = {};
}

}
12 changes: 10 additions & 2 deletions src/Storages/MergeTree/MergeTreePartsMover.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <functional>
#include <optional>
#include <vector>
#include <base/scope_guard.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MovesList.h>
Expand Down Expand Up @@ -43,12 +44,19 @@ class MergeTreePartsMover
using AllowedMovingPredicate = std::function<bool(const std::shared_ptr<const IMergeTreeDataPart> &, String * reason)>;

public:

explicit MergeTreePartsMover(MergeTreeData * data_)
: data(data_)
, log(&Poco::Logger::get("MergeTreePartsMover"))
{
}

struct TemporaryClonedPart
{
MergeTreeMutableDataPartPtr part;
scope_guard temporary_directory_lock;
};

/// Select parts for background moves according to storage_policy configuration.
/// Returns true if at least one part was selected for move.
bool selectPartsForMove(
Expand All @@ -57,14 +65,14 @@ class MergeTreePartsMover
const std::lock_guard<std::mutex> & moving_parts_lock);

/// Copies part to selected reservation in detached folder. Throws exception if part already exists.
MergeTreeMutableDataPartPtr clonePart(const MergeTreeMoveEntry & moving_part) const;
TemporaryClonedPart clonePart(const MergeTreeMoveEntry & moving_part) const;

/// Replaces cloned part from detached directory into active data parts set.
/// Replacing part changes state to DeleteOnDestroy and will be removed from disk after destructor of
/// IMergeTreeDataPart called. If replacing part doesn't exists or not active (committed) than
/// cloned part will be removed and log message will be reported. It may happen in case of concurrent
/// merge or mutation.
void swapClonedPart(const MergeTreeMutableDataPartPtr & cloned_parts) const;
void swapClonedPart(TemporaryClonedPart & cloned_part) const;

/// Can stop background moves and moves from queries
ActionBlocker moves_blocker;
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<clickhouse>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</clickhouse>
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<clickhouse>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<endpoint>http://minio1:9001/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3>
</disks>
<policies>
<two_disks>
<volumes>
<default>
<disk>default</disk>
</default>
<external>
<disk>s3</disk>
</external>
</volumes>
</two_disks>
</policies>
</storage_configuration>

<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
</clickhouse>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<clickhouse>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<max_concurrent_queries>500</max_concurrent_queries>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</clickhouse>

0 comments on commit f5fbc2f

Please sign in to comment.