diff --git a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp index 0ae577602b1d..160ed766c7ee 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageTransaction.cpp @@ -7,6 +7,7 @@ #include #include +#include namespace DB { @@ -156,14 +157,13 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation struct RemoveManyObjectStorageOperation final : public IDiskObjectStorageOperation { - RemoveBatchRequest remove_paths; - bool keep_all_batch_data; - NameSet file_names_remove_metadata_only; + const RemoveBatchRequest remove_paths; + const bool keep_all_batch_data; + const NameSet file_names_remove_metadata_only; + std::vector paths_removed_with_objects; std::vector objects_to_remove; - bool remove_from_cache = false; - RemoveManyObjectStorageOperation( IObjectStorage & object_storage_, IMetadataStorage & metadata_storage_, @@ -203,6 +203,7 @@ struct RemoveManyObjectStorageOperation final : public IDiskObjectStorageOperati if (unlink_outcome && !keep_all_batch_data && !file_names_remove_metadata_only.contains(fs::path(path).filename())) { objects_to_remove.emplace_back(ObjectsToRemove{std::move(objects), std::move(unlink_outcome)}); + paths_removed_with_objects.push_back(path); } } catch (const Exception & e) @@ -213,6 +214,12 @@ struct RemoveManyObjectStorageOperation final : public IDiskObjectStorageOperati || e.code() == ErrorCodes::CANNOT_READ_ALL_DATA || e.code() == ErrorCodes::CANNOT_OPEN_FILE) { + LOG_DEBUG( + &Poco::Logger::get("RemoveManyObjectStorageOperation"), + "Can't read metadata because of an exception. Just remove it from the filesystem. Path: {}, exception: {}", + metadata_storage.getPath() + path, + e.message()); + tx->unlinkFile(path); } else @@ -238,16 +245,31 @@ struct RemoveManyObjectStorageOperation final : public IDiskObjectStorageOperati /// TL;DR Don't pay any attention to 404 status code if (!remove_from_remote.empty()) object_storage.removeObjectsIfExist(remove_from_remote); + + if (!keep_all_batch_data) + { + LOG_DEBUG( + &Poco::Logger::get("RemoveManyObjectStorageOperation"), + "metadata and objects were removed for [{}], " + "only metadata were removed for [{}].", + boost::algorithm::join(paths_removed_with_objects, ", "), + boost::algorithm::join(file_names_remove_metadata_only, ", ")); + } } }; struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOperation { - std::string path; + /// path inside disk with metadata + const std::string path; + const bool keep_all_batch_data; + /// paths inside the 'this->path' + const NameSet file_names_remove_metadata_only; + + /// map from local_path to its remote objects with hardlinks counter + /// local_path is the path inside 'this->path' std::unordered_map objects_to_remove_by_path; - bool keep_all_batch_data; - NameSet file_names_remove_metadata_only; RemoveRecursiveObjectStorageOperation( IObjectStorage & object_storage_, @@ -274,11 +296,16 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp { try { + chassert(path_to_remove.starts_with(path)); + auto rel_path = String(fs::relative(fs::path(path_to_remove), fs::path(path))); + auto objects_paths = metadata_storage.getStorageObjects(path_to_remove); auto unlink_outcome = tx->unlinkMetadata(path_to_remove); - if (unlink_outcome) + + if (unlink_outcome && !file_names_remove_metadata_only.contains(rel_path)) { - objects_to_remove_by_path[path_to_remove] = ObjectsToRemove{std::move(objects_paths), std::move(unlink_outcome)}; + objects_to_remove_by_path[std::move(rel_path)] + = ObjectsToRemove{std::move(objects_paths), std::move(unlink_outcome)}; } } catch (const Exception & e) @@ -320,25 +347,38 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp void undo() override { - } void finalize() override { if (!keep_all_batch_data) { + std::vector total_removed_paths; + total_removed_paths.reserve(objects_to_remove_by_path.size()); + StoredObjects remove_from_remote; for (auto && [local_path, objects_to_remove] : objects_to_remove_by_path) { - if (!file_names_remove_metadata_only.contains(fs::path(local_path).filename())) + chassert(!file_names_remove_metadata_only.contains(local_path)); + if (objects_to_remove.unlink_outcome->num_hardlinks == 0) { - if (objects_to_remove.unlink_outcome->num_hardlinks == 0) - std::move(objects_to_remove.objects.begin(), objects_to_remove.objects.end(), std::back_inserter(remove_from_remote)); + std::move(objects_to_remove.objects.begin(), objects_to_remove.objects.end(), std::back_inserter(remove_from_remote)); + total_removed_paths.push_back(local_path); } } + /// Read comment inside RemoveObjectStorageOperation class /// TL;DR Don't pay any attention to 404 status code object_storage.removeObjectsIfExist(remove_from_remote); + + LOG_DEBUG( + &Poco::Logger::get("RemoveRecursiveObjectStorageOperation"), + "Recursively remove path {}: " + "metadata and objects were removed for [{}], " + "only metadata were removed for [{}].", + path, + boost::algorithm::join(total_removed_paths, ", "), + boost::algorithm::join(file_names_remove_metadata_only, ", ")); } } }; diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index bbfc6609079f..0d9670efebe1 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -320,7 +320,7 @@ void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exis throwIfUnexpectedError(outcome, if_exists); - LOG_TRACE(log, "Object with path {} was removed from S3", object.remote_path); + LOG_DEBUG(log, "Object with path {} was removed from S3", object.remote_path); } void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_exists) @@ -368,7 +368,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e throwIfUnexpectedError(outcome, if_exists); - LOG_TRACE(log, "Objects with paths [{}] were removed from S3", keys); + LOG_DEBUG(log, "Objects with paths [{}] were removed from S3", keys); } } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index cabdf67a3152..d1be8bd249d0 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1593,6 +1593,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd while (true) { LOG_DEBUG(log, "Committing part {} to zookeeper", part->name); + Coordination::Requests ops; NameSet absent_part_paths_on_replicas; @@ -8788,6 +8789,14 @@ void StorageReplicatedMergeTree::getLockSharedDataOps( { String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name; + if (!path_to_set_hardlinked_files.empty() && !hardlinks.empty()) + { + LOG_DEBUG(log, "Locking shared node {} with hardlinks from the other shared node {}, " + "hardlinks: [{}]", + zookeeper_node, path_to_set_hardlinked_files, + boost::algorithm::join(hardlinks, ",")); + } + getZeroCopyLockNodeCreateOps( zookeeper, zookeeper_node, requests, zkutil::CreateMode::Persistent, replace_existing_lock, path_to_set_hardlinked_files, hardlinks); diff --git a/tests/integration/test_replicated_zero_copy_projection_mutation/__init__.py b/tests/integration/test_replicated_zero_copy_projection_mutation/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/integration/test_replicated_zero_copy_projection_mutation/configs/config.d/storage_conf.xml b/tests/integration/test_replicated_zero_copy_projection_mutation/configs/config.d/storage_conf.xml new file mode 100644 index 000000000000..44d043b944f4 --- /dev/null +++ b/tests/integration/test_replicated_zero_copy_projection_mutation/configs/config.d/storage_conf.xml @@ -0,0 +1,28 @@ + + + + + + s3 + http://minio1:9001/root/data/ + minio + minio123 + true + + + + + +
+ s3 +
+
+
+
+
+ + + 1 + + +
diff --git a/tests/integration/test_replicated_zero_copy_projection_mutation/configs/config.d/users.xml b/tests/integration/test_replicated_zero_copy_projection_mutation/configs/config.d/users.xml new file mode 100644 index 000000000000..246de9ecb961 --- /dev/null +++ b/tests/integration/test_replicated_zero_copy_projection_mutation/configs/config.d/users.xml @@ -0,0 +1,7 @@ + + + + 1 + + + diff --git a/tests/integration/test_replicated_zero_copy_projection_mutation/test.py b/tests/integration/test_replicated_zero_copy_projection_mutation/test.py new file mode 100644 index 000000000000..1b68aac08a7d --- /dev/null +++ b/tests/integration/test_replicated_zero_copy_projection_mutation/test.py @@ -0,0 +1,355 @@ +import logging +import time +from contextlib import contextmanager +import pathlib + +import pytest + +from helpers.mock_servers import start_s3_mock +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import assert_eq_with_retry + + +def args_to_dict(**kwargs): + return kwargs + + +@pytest.fixture(scope="module") +def cluster(): + try: + cluster = ClickHouseCluster(__file__) + + kwargs = args_to_dict( + main_configs=[ + "configs/config.d/storage_conf.xml", + ], + user_configs=[ + "configs/config.d/users.xml", + ], + with_minio=True, + with_zookeeper=True, + stay_alive=True, + ) + + cluster.add_instance("node1", **kwargs) + cluster.add_instance("node2", **kwargs) + + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +@pytest.fixture(scope="module") +def all_cluster_nodes(cluster): + yield cluster.instances.values() + + +@pytest.fixture(scope="module") +def first_cluster_node(cluster): + yield cluster.instances["node1"] + + +@pytest.fixture(scope="module") +def second_cluster_node(cluster): + yield cluster.instances["node2"] + + +@pytest.fixture(scope="module") +def init_broken_s3(cluster): + yield start_s3_mock(cluster, "broken_s3", "8081") + + +@pytest.fixture(scope="function") +def broken_s3(init_broken_s3): + init_broken_s3.reset() + yield init_broken_s3 + + +def list_objects(cluster, path="data/", hint="list_objects"): + minio = cluster.minio_client + objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True)) + names = [x.object_name for x in objects] + names.sort() + logging.info(f"{hint} ({len(objects)}): {names}") + return names + + +def wait_for_delete_s3_objects(cluster, expected, timeout=30): + while timeout > 0: + if len(list_objects(cluster, "data/")) == expected: + return + timeout -= 1 + time.sleep(1) + final_listing = list_objects(cluster, "data/") + assert len(final_listing) == expected, ",".join(final_listing) + + +def remove_all_s3_objects(cluster): + minio = cluster.minio_client + for obj in list_objects(cluster, "data/"): + minio.remove_object(cluster.minio_bucket, obj) + + +@pytest.fixture(autouse=True, scope="function") +def clear_minio(cluster): + try: + # CH do some writes to the S3 at start. For example, file data/clickhouse_access_check_{server_uuid}. + # Set the timeout there as 10 sec in order to resolve the race with that file exists. + wait_for_delete_s3_objects(cluster, 0, timeout=10) + except: + # Remove extra objects to prevent tests cascade failing + remove_all_s3_objects(cluster) + + yield + + +@contextmanager +def drop_table_guard(nodes, table): + for node in nodes: + node.query(f"DROP TABLE IF EXISTS {table} SYNC") + try: + yield + finally: + for node in nodes: + node.query(f"DROP TABLE IF EXISTS {table} SYNC") + + +def test_all_projection_files_are_dropped_when_part_is_dropped( + cluster, first_cluster_node +): + node = first_cluster_node + + with drop_table_guard([node], "test_all_projection_files_are_dropped"): + node.query( + """ + CREATE TABLE test_all_projection_files_are_dropped(a UInt32, b UInt32) + ENGINE MergeTree() + ORDER BY a + SETTINGS storage_policy='s3', old_parts_lifetime=0 + """ + ) + + objects_empty_table = list_objects(cluster) + + node.query( + "ALTER TABLE test_all_projection_files_are_dropped ADD projection b_order (SELECT a, b ORDER BY b)" + ) + node.query( + "ALTER TABLE test_all_projection_files_are_dropped MATERIALIZE projection b_order" + ) + + node.query( + """ + INSERT INTO test_all_projection_files_are_dropped + VALUES (1, 105), (5, 101), (3, 103), (4, 102), (2, 104) + """ + ) + + node.query( + "ALTER TABLE test_all_projection_files_are_dropped DROP PARTITION ID 'all'" + ) + + objects_at_the_end = list_objects(cluster) + assert objects_at_the_end == objects_empty_table + + +def test_hardlinks_preserved_when_projection_dropped( + cluster, all_cluster_nodes, first_cluster_node, second_cluster_node +): + with drop_table_guard( + all_cluster_nodes, "test_hardlinks_preserved_when_projection_dropped" + ): + create_query = """ + CREATE TABLE test_hardlinks_preserved_when_projection_dropped + ( + a UInt32, + b UInt32, + c UInt32, + PROJECTION projection_order_by_b + ( + SELECT a, b ORDER BY b + ) + ) + ENGINE ReplicatedMergeTree('/clickhouse/tables/test_projection', '{instance}') + ORDER BY a + """ + + first_node_settings = """ + SETTINGS + storage_policy='s3', + old_parts_lifetime=0 + """ + + # big old_parts_lifetime value makes second node to hold outdated part for us, we make it as broken_on_start + second_node_settings = """ + SETTINGS + storage_policy='s3', + old_parts_lifetime=10000 + """ + + first_cluster_node.query(create_query + first_node_settings) + second_cluster_node.query(create_query + second_node_settings) + + objects_empty_table = list_objects(cluster) + + first_cluster_node.query("SYSTEM FLUSH LOGS") + table_uuid = first_cluster_node.query( + """ + SELECT uuid FROM system.tables + WHERE name = 'test_hardlinks_preserved_when_projection_dropped' + """ + ).strip() + + first_cluster_node.query( + """ + INSERT INTO test_hardlinks_preserved_when_projection_dropped + VALUES (1, 105, 1), (5, 101, 1), (3, 103, 1), (4, 102, 1), (2, 104, 1) + """ + ) + + # second_cluster_node will fetch the mutated part when it is ready on first_cluster_node + second_cluster_node.query("SYSTEM STOP MERGES") + + first_cluster_node.query( + """ + ALTER TABLE test_hardlinks_preserved_when_projection_dropped + UPDATE c = 2 where c = 1 + """, + settings={"mutations_sync": "1"}, + ) + + assert_eq_with_retry( + first_cluster_node, "SELECT COUNT() FROM system.replication_queue", "0" + ) + + # the mutated part is ready on first_cluster_node, second replica just fetches it + second_cluster_node.query("SYSTEM START MERGES") + + # fist node removed outdated part + assert_eq_with_retry( + first_cluster_node, + """ + SELECT removal_state FROM system.parts + WHERE name = 'all_0_0_0' + AND table = 'test_hardlinks_preserved_when_projection_dropped' + AND not active + """, + "", + retry_count=300, + sleep_time=1, + ) + + # make sure that alter update made hardlinks inside projection + hardlinks = ( + first_cluster_node.query( + f""" + SELECT value + FROM system.zookeeper + WHERE + path like '/clickhouse/zero_copy/zero_copy_s3/{table_uuid}' AND name = 'all_0_0_0' + """, + settings={"allow_unrestricted_reads_from_keeper": "1"}, + ) + .strip() + .split() + ) + assert len(hardlinks) > 0, ",".join(hardlinks) + assert any(["proj/" in x for x in hardlinks]), ",".join(hardlinks) + + part_path_on_second_node = second_cluster_node.query( + """ + SELECT path FROM system.parts + WHERE + name = 'all_0_0_0' AND table = 'test_hardlinks_preserved_when_projection_dropped' + """ + ).strip() + + # that corrupts outdatated part all_0_0_0 + script = ( + f"INDEX_FILE={part_path_on_second_node}/primary.cidx" + """ + cp $INDEX_FILE $INDEX_FILE.backup + echo "unexpected data in metadata file" | cat > $INDEX_FILE + """ + ) + second_cluster_node.exec_in_container(["bash", "-c", script]) + + # corrupted outdatated part all_0_0_0 is detached as broken_on_start + second_cluster_node.restart_clickhouse() + + second_cluster_node.query( + "SYSTEM WAIT LOADING PARTS test_hardlinks_preserved_when_projection_dropped" + ) + + second_cluster_node.query("SYSTEM FLUSH LOGS") + + # make sure there is outdated broken-on-start part + broken_parts = ( + second_cluster_node.query( + """ + SELECT name, reason, path FROM system.detached_parts + WHERE + table = 'test_hardlinks_preserved_when_projection_dropped' + """ + ) + .strip() + .split("\n") + ) + assert len(broken_parts) == 1, broken_parts + # style checker black asked to do this. It is crazy + broken_part_name, reason, broken_part_path_on_second_node = broken_parts[ + 0 + ].split("\t") + assert "broken-on-start" == reason + + script = ( + f"INDEX_FILE={broken_part_path_on_second_node}/primary.cidx" + """ + mv $INDEX_FILE.backup $INDEX_FILE + """ + ) + second_cluster_node.exec_in_container(["bash", "-c", script]) + + # when detached part is removed, removeSharedRecursive is called + second_cluster_node.query( + f""" + ALTER TABLE test_hardlinks_preserved_when_projection_dropped + DROP DETACHED PART '{broken_part_name}' + """, + settings={"allow_drop_detached": "1"}, + ) + + # it is an easy way to read all data in part + # "0" means corrupted, https://clickhouse.com/docs/en/sql-reference/statements/check-table + assert ( + "1" + == first_cluster_node.query( + """ + CHECK TABLE test_hardlinks_preserved_when_projection_dropped + """ + ).strip() + ) + + assert ( + "1" + == second_cluster_node.query( + """ + CHECK TABLE test_hardlinks_preserved_when_projection_dropped + """ + ).strip() + ) + + second_cluster_node.query( + f""" + ALTER TABLE test_hardlinks_preserved_when_projection_dropped + DROP PART 'all_0_0_0_1' + """, + settings={"alter_sync": 2}, + ) + + wait_for_delete_s3_objects(cluster, len(objects_empty_table)) + + objects_at_the_end = list_objects(cluster) + assert objects_at_the_end == objects_empty_table