Skip to content

Commit

Permalink
Merge pull request #54157 from ClickHouse/backport/23.3-moving-parts-…
Browse files Browse the repository at this point in the history
…checker
  • Loading branch information
vdimir committed Sep 5, 2023
2 parents 0d6d3c3 + c71e21a commit 25635e2
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 27 deletions.
7 changes: 5 additions & 2 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,11 @@ void IMergeTreeDataPart::removeIfNeeded()
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set",
getDataPartStorage().getPartDirectory(), name);

const auto part_parent_directory = directoryPath(part_directory);
bool is_moving_part = part_parent_directory.ends_with("moving/");
fs::path part_directory_path = getDataPartStorage().getRelativePath();
if (part_directory_path.filename().empty())
part_directory_path = part_directory_path.parent_path();
const bool is_moving_part = part_directory_path.parent_path().filename() == "moving";

if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj") && !is_moving_part)
{
LOG_ERROR(
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8205,7 +8205,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(

bool MergeTreeData::allowRemoveStaleMovingParts() const
{
return ConfigHelper::getBool(getContext()->getConfigRef(), "allow_remove_stale_moving_parts");
return ConfigHelper::getBool(getContext()->getConfigRef(), "allow_remove_stale_moving_parts", /* default_ = */ true);
}

CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ class MergeTreeData : public IStorage, public WithMutableContext

/// Fetch part only if some replica has it on shared storage like S3
/// Overridden in StorageReplicatedMergeTree
virtual MutableDataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return nullptr; }
virtual MutableDataPartPtr tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return nullptr; }

/// Check shared data usage on other replicas for detached/freezed part
/// Remove local files and remote files if needed
Expand Down
10 changes: 8 additions & 2 deletions src/Storages/MergeTree/MergeTreePartsMover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,15 @@ MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const Me

disk->createDirectories(path_to_clone);

cloned_part_storage = data->tryToFetchIfShared(*part, disk, fs::path(path_to_clone) / part->name);
auto zero_copy_part = data->tryToFetchIfShared(*part, disk, fs::path(path_to_clone) / part->name);

if (!cloned_part_storage)
if (zero_copy_part)
{
/// FIXME for some reason we cannot just use this part, we have to re-create it through MergeTreeDataPartBuilder
zero_copy_part->is_temp = false; /// Do not remove it in dtor
cloned_part_storage = zero_copy_part->getDataPartStoragePtr();
}
else
{
LOG_INFO(log, "Part {} was not fetched, we are the first who move it to another disk, so we will copy it", part->name);
cloned_part_storage = part->getDataPartStorage().clonePart(path_to_clone, part->getDataPartStorage().getPartDirectory(), disk, log);
Expand Down
8 changes: 4 additions & 4 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1863,7 +1863,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
}


MutableDataPartStoragePtr StorageReplicatedMergeTree::executeFetchShared(
MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::executeFetchShared(
const String & source_replica,
const String & new_part_name,
const DiskPtr & disk,
Expand Down Expand Up @@ -4218,7 +4218,7 @@ bool StorageReplicatedMergeTree::fetchPart(
}


MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::fetchExistsPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & source_replica_path,
Expand Down Expand Up @@ -4321,7 +4321,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart(
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);

LOG_DEBUG(log, "Fetched part {} from {}", part_name, source_replica_path);
return part->getDataPartStoragePtr();
return part;
}

void StorageReplicatedMergeTree::startup()
Expand Down Expand Up @@ -8472,7 +8472,7 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
}


MutableDataPartStoragePtr StorageReplicatedMergeTree::tryToFetchIfShared(
MergeTreeData::MutableDataPartPtr StorageReplicatedMergeTree::tryToFetchIfShared(
const IMergeTreeDataPart & part,
const DiskPtr & disk,
const String & path)
Expand Down
6 changes: 3 additions & 3 deletions src/Storages/StorageReplicatedMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const;

/// Fetch part only when it stored on shared storage like S3
MutableDataPartStoragePtr executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);
MutableDataPartPtr executeFetchShared(const String & source_replica, const String & new_part_name, const DiskPtr & disk, const String & path);

/// Lock part in zookeeper for use shared data in several nodes
void lockSharedData(const IMergeTreeDataPart & part, bool replace_existing_lock, std::optional<HardlinkedFiles> hardlinked_files) const override;
Expand Down Expand Up @@ -278,7 +278,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
MergeTreeDataFormatVersion data_format_version);

/// Fetch part only if some replica has it on shared storage like S3
MutableDataPartStoragePtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
MutableDataPartPtr tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;

/// Get best replica having this partition on a same type remote disk
String getSharedDataReplica(const IMergeTreeDataPart & part, DataSourceType data_source_type) const;
Expand Down Expand Up @@ -694,7 +694,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
* Used for replace local part on the same s3-shared part in hybrid storage.
* Returns false if part is already fetching right now.
*/
MutableDataPartStoragePtr fetchExistsPart(
MutableDataPartPtr fetchExistsPart(
const String & part_name,
const StorageMetadataPtr & metadata_snapshot,
const String & replica_path,
Expand Down
36 changes: 23 additions & 13 deletions tests/integration/test_alter_moving_garbage/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,22 +218,32 @@ def test_delete_race_leftovers(cluster):
time.sleep(5)

# Check that we correctly deleted all outdated parts and no leftovers on s3
known_remote_paths = set(
node.query(
f"SELECT remote_path FROM system.remote_data_paths WHERE disk_name = 's32'"
).splitlines()
)
# Do it with retries because we delete blobs in the background
# and it can be race condition between removing from remote_data_paths and deleting blobs
all_remote_paths = set()
known_remote_paths = set()
for i in range(3):
known_remote_paths = set(
node.query(
f"SELECT remote_path FROM system.remote_data_paths WHERE disk_name = 's32'"
).splitlines()
)

all_remote_paths = set(
obj.object_name
for obj in cluster.minio_client.list_objects(
cluster.minio_bucket, "data2/", recursive=True
all_remote_paths = set(
obj.object_name
for obj in cluster.minio_client.list_objects(
cluster.minio_bucket, "data2/", recursive=True
)
)
)

# Some blobs can be deleted after we listed remote_data_paths
# It's alright, thus we check only that all remote paths are known
# (in other words, all remote paths is subset of known paths)
# Some blobs can be deleted after we listed remote_data_paths
# It's alright, thus we check only that all remote paths are known
# (in other words, all remote paths is subset of known paths)
if all_remote_paths == {p for p in known_remote_paths if p in all_remote_paths}:
break

time.sleep(1)

assert all_remote_paths == {p for p in known_remote_paths if p in all_remote_paths}

# Check that we have all data
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/test_s3_zero_copy_ttl/configs/s3.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@
<merge_tree>
<allow_remote_fs_zero_copy_replication>true</allow_remote_fs_zero_copy_replication>
</merge_tree>

<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
</clickhouse>
2 changes: 1 addition & 1 deletion tests/integration/test_s3_zero_copy_ttl/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_ttl_move_and_s3(started_cluster):
ORDER BY id
PARTITION BY id
TTL date TO DISK 's3_disk'
SETTINGS storage_policy='s3_and_default'
SETTINGS storage_policy='s3_and_default', temporary_directories_lifetime=1
""".format(
i
)
Expand Down

0 comments on commit 25635e2

Please sign in to comment.