diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 54a2e2dc5193..09047b5b2325 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -342,6 +342,31 @@ void ZooKeeper::createAncestors(const std::string & path) } } +void ZooKeeper::checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests) +{ + std::vector paths_to_check; + size_t pos = 1; + while (true) + { + pos = path.find('/', pos); + if (pos == std::string::npos) + break; + paths_to_check.emplace_back(path.substr(0, pos)); + ++pos; + } + + MultiExistsResponse response = exists(paths_to_check); + + for (size_t i = 0; i < paths_to_check.size(); ++i) + { + if (response[i].error != Coordination::Error::ZOK) + { + /// Ephemeral nodes cannot have children + requests.emplace_back(makeCreateRequest(paths_to_check[i], "", CreateMode::Persistent)); + } + } +} + Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t version) { auto future_result = asyncTryRemoveNoThrow(path, version); diff --git a/src/Common/ZooKeeper/ZooKeeper.h b/src/Common/ZooKeeper/ZooKeeper.h index 636c9049af07..ca6a44c4cbc7 100644 --- a/src/Common/ZooKeeper/ZooKeeper.h +++ b/src/Common/ZooKeeper/ZooKeeper.h @@ -237,6 +237,8 @@ class ZooKeeper /// Does not create the node itself. void createAncestors(const std::string & path); + void checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests); + /// Remove the node if the version matches. (if version == -1, remove any version). void remove(const std::string & path, int32_t version = -1); @@ -522,8 +524,6 @@ class ZooKeeper void setServerCompletelyStarted(); private: - friend class EphemeralNodeHolder; - void init(ZooKeeperArgs args_); /// The following methods don't any throw exceptions but return error codes. diff --git a/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.h b/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.h index da65f9d96b08..a0b8527f4805 100644 --- a/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.h +++ b/src/Common/ZooKeeper/ZooKeeperWithFaultInjection.h @@ -361,6 +361,10 @@ class ZooKeeperWithFaultInjection return access("trySet", path, [&]() { return keeper->trySet(path, data, version, stat); }); } + void checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests) + { + return access("checkExistsAndGetCreateAncestorsOps", path, [&]() { return keeper->checkExistsAndGetCreateAncestorsOps(path, requests); }); + } void handleEphemeralNodeExistenceNoFailureInjection(const std::string & path, const std::string & fast_delete_if_equal_value) { diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp index e8f79a785fde..ec00cc3d2b9f 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp @@ -578,6 +578,9 @@ void DataPartStorageOnDiskBase::remove( if (e.code() == ErrorCodes::FILE_DOESNT_EXIST) { LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from)); + /// We will never touch this part again, so unlocking it from zero-copy + if (!can_remove_description) + can_remove_description.emplace(can_remove_callback()); return; } throw; @@ -588,6 +591,10 @@ void DataPartStorageOnDiskBase::remove( { LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. " "Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from)); + /// We will never touch this part again, so unlocking it from zero-copy + if (!can_remove_description) + can_remove_description.emplace(can_remove_callback()); + return; } throw; diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 46c6d09eca47..0e707a821354 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -821,6 +821,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( const auto data_settings = data.getSettings(); MergeTreeData::DataPart::Checksums data_checksums; + zkutil::EphemeralNodeHolderPtr zero_copy_temporary_lock_holder; if (to_remote_disk) { readStringBinary(part_id, in); @@ -829,7 +830,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {} (with type {}).", part_name, part_id, disk->getName(), toString(disk->getDataSourceDescription().type)); LOG_DEBUG(log, "Downloading part {} unique id {} metadata onto disk {}.", part_name, part_id, disk->getName()); - data.lockSharedDataTemporary(part_name, part_id, disk); + zero_copy_temporary_lock_holder = data.lockSharedDataTemporary(part_name, part_id, disk); } else { @@ -938,7 +939,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( if (to_remote_disk) { - data.lockSharedData(*new_data_part, /* replace_existing_lock = */ true, {}); LOG_DEBUG(log, "Download of part {} unique id {} metadata onto disk {} finished.", part_name, part_id, disk->getName()); } else @@ -948,6 +948,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk( LOG_DEBUG(log, "Download of part {} onto disk {} finished.", part_name, disk->getName()); } + if (zero_copy_temporary_lock_holder) + zero_copy_temporary_lock_holder->setAlreadyRemoved(); + return new_data_part; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index e2f41f9b4545..20839a61c92d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1473,16 +1473,18 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil: } MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAndCommit(Transaction & transaction, - const MutableDataPartPtr & part, std::optional hardlinked_files) + const MutableDataPartPtr & part, std::optional hardlinked_files, bool replace_zero_copy_lock) { auto zookeeper = getZooKeeper(); while (true) { + LOG_DEBUG(log, "Committing part {} to zookeeper", part->name); Coordination::Requests ops; NameSet absent_part_paths_on_replicas; - lockSharedData(*part, false, hardlinked_files); + getLockSharedDataOps(*part, std::make_shared(zookeeper), replace_zero_copy_lock, hardlinked_files, ops); + size_t zero_copy_lock_ops_size = ops.size(); /// Checksums are checked here and `ops` is filled. In fact, the part is added to ZK just below, when executing `multi`. checkPartChecksumsAndAddCommitOps(zookeeper, part, ops, part->name, &absent_part_paths_on_replicas); @@ -1510,11 +1512,14 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd Coordination::Responses responses; Coordination::Error e = zookeeper->tryMulti(ops, responses); if (e == Coordination::Error::ZOK) + { + LOG_DEBUG(log, "Part {} committed to zookeeper", part->name); return transaction.commit(); + } if (e == Coordination::Error::ZNODEEXISTS) { - size_t num_check_ops = 2 * absent_part_paths_on_replicas.size(); + size_t num_check_ops = 2 * absent_part_paths_on_replicas.size() + zero_copy_lock_ops_size; size_t failed_op_index = zkutil::getFailedOpIndex(e, responses); if (failed_op_index < num_check_ops) { @@ -4165,7 +4170,7 @@ bool StorageReplicatedMergeTree::fetchPart( Transaction transaction(*this, NO_TRANSACTION_RAW); renameTempPartAndReplace(part, transaction); - replaced_parts = checkPartChecksumsAndCommit(transaction, part, hardlinked_files); + replaced_parts = checkPartChecksumsAndCommit(transaction, part, hardlinked_files, !part_to_clone); /** If a quorum is tracked for this part, you must update it. * If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method. @@ -8116,31 +8121,31 @@ std::optional StorageReplicatedMergeTree::tryGetTableSharedIDFromCreateQ } -void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const +zkutil::EphemeralNodeHolderPtr StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const { auto settings = getSettings(); if (!disk || !disk->supportZeroCopyReplication() || !settings->allow_remote_fs_zero_copy_replication) - return; + return {}; zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper(); if (!zookeeper) - return; + return {}; String id = part_id; boost::replace_all(id, "/", "_"); - Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), - part_name, zookeeper_path); + String zc_zookeeper_path = getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), + part_name, zookeeper_path)[0]; - for (const auto & zc_zookeeper_path : zc_zookeeper_paths) - { - String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name; + String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name; - LOG_TRACE(log, "Set zookeeper temporary ephemeral lock {}", zookeeper_node); - createZeroCopyLockNode( - std::make_shared(zookeeper), zookeeper_node, zkutil::CreateMode::Ephemeral, false); - } + LOG_TRACE(log, "Set zookeeper temporary ephemeral lock {}", zookeeper_node); + createZeroCopyLockNode( + std::make_shared(zookeeper), zookeeper_node, zkutil::CreateMode::Ephemeral, false); + + LOG_TRACE(log, "Zookeeper temporary ephemeral lock {} created", zookeeper_node); + return zkutil::EphemeralNodeHolder::existing(zookeeper_node, *zookeeper); } void StorageReplicatedMergeTree::lockSharedData( @@ -8148,6 +8153,7 @@ void StorageReplicatedMergeTree::lockSharedData( bool replace_existing_lock, std::optional hardlinked_files) const { + LOG_DEBUG(log, "Trying to create zero-copy lock for part {}", part.name); auto zookeeper = tryGetZooKeeper(); if (zookeeper) return lockSharedData(part, std::make_shared(zookeeper), replace_existing_lock, hardlinked_files); @@ -8155,6 +8161,54 @@ void StorageReplicatedMergeTree::lockSharedData( return lockSharedData(part, std::make_shared(nullptr), replace_existing_lock, hardlinked_files); } +void StorageReplicatedMergeTree::getLockSharedDataOps( + const IMergeTreeDataPart & part, + const ZooKeeperWithFaultInjectionPtr & zookeeper, + bool replace_existing_lock, + std::optional hardlinked_files, + Coordination::Requests & requests) const +{ + auto settings = getSettings(); + + if (!part.isStoredOnDisk() || !settings->allow_remote_fs_zero_copy_replication) + return; + + if (!part.getDataPartStorage().supportZeroCopyReplication()) + return; + + if (zookeeper->isNull()) + return; + + String id = part.getUniqueId(); + boost::replace_all(id, "/", "_"); + + Strings zc_zookeeper_paths = getZeroCopyPartPath( + *getSettings(), part.getDataPartStorage().getDiskType(), getTableSharedID(), + part.name, zookeeper_path); + + String path_to_set_hardlinked_files; + NameSet hardlinks; + + if (hardlinked_files.has_value() && !hardlinked_files->hardlinks_from_source_part.empty()) + { + path_to_set_hardlinked_files = getZeroCopyPartPath( + *getSettings(), part.getDataPartStorage().getDiskType(), hardlinked_files->source_table_shared_id, + hardlinked_files->source_part_name, zookeeper_path)[0]; + + hardlinks = hardlinked_files->hardlinks_from_source_part; + } + + for (const auto & zc_zookeeper_path : zc_zookeeper_paths) + { + String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name; + + getZeroCopyLockNodeCreateOps( + zookeeper, zookeeper_node, requests, zkutil::CreateMode::Persistent, + replace_existing_lock, path_to_set_hardlinked_files, hardlinks); + } +} + + void StorageReplicatedMergeTree::lockSharedData( const IMergeTreeDataPart & part, const ZooKeeperWithFaultInjectionPtr & zookeeper, @@ -8195,11 +8249,13 @@ void StorageReplicatedMergeTree::lockSharedData( { String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name; - LOG_TRACE(log, "Set zookeeper persistent lock {}", zookeeper_node); + LOG_TRACE(log, "Trying to create zookeeper persistent lock {}", zookeeper_node); createZeroCopyLockNode( zookeeper, zookeeper_node, zkutil::CreateMode::Persistent, replace_existing_lock, path_to_set_hardlinked_files, hardlinks); + + LOG_TRACE(log, "Zookeeper persistent lock {} created", zookeeper_node); } } @@ -8333,6 +8389,7 @@ std::pair getParentLockedBlobs(const ZooKeeperWithFaultInjectionP /// all_0_0_0_1 /// all_0_0_0 std::sort(parts_infos.begin(), parts_infos.end()); + std::string part_info_str = part_info.getPartNameV1(); /// In reverse order to process from bigger to smaller for (const auto & [parent_candidate_info, part_candidate_info_str] : parts_infos | std::views::reverse) @@ -8343,7 +8400,7 @@ std::pair getParentLockedBlobs(const ZooKeeperWithFaultInjectionP /// We are mutation child of this parent if (part_info.isMutationChildOf(parent_candidate_info)) { - LOG_TRACE(log, "Found mutation parent {} for part {}", part_candidate_info_str, part_info.getPartNameV1()); + LOG_TRACE(log, "Found mutation parent {} for part {}", part_candidate_info_str, part_info_str); /// Get hardlinked files String files_not_to_remove_str; Coordination::Error code; @@ -8360,6 +8417,7 @@ std::pair getParentLockedBlobs(const ZooKeeperWithFaultInjectionP return {true, files_not_to_remove}; } } + LOG_TRACE(log, "No mutation parent found for part {}", part_info_str); return {false, files_not_to_remove}; } @@ -8411,6 +8469,10 @@ std::pair StorageReplicatedMergeTree::unlockSharedDataByID( LOG_INFO(logger, "Lock on path {} for part {} doesn't exist, refuse to remove blobs", zookeeper_part_replica_node, part_name); return {false, {}}; } + else + { + LOG_INFO(logger, "Lock on path {} for part {} doesn't exist, but we don't have mutation parent, can remove blobs", zookeeper_part_replica_node, part_name); + } } else { @@ -8931,6 +8993,46 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP return true; } +void StorageReplicatedMergeTree::getZeroCopyLockNodeCreateOps( + const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, Coordination::Requests & requests, + int32_t mode, bool replace_existing_lock, + const String & path_to_set_hardlinked_files, const NameSet & hardlinked_files) +{ + + /// Ephemeral locks can be created only when we fetch shared data. + /// So it never require to create ancestors. If we create them + /// race condition with source replica drop is possible. + if (mode == zkutil::CreateMode::Persistent) + zookeeper->checkExistsAndGetCreateAncestorsOps(zookeeper_node, requests); + + if (replace_existing_lock && zookeeper->exists(zookeeper_node)) + { + requests.emplace_back(zkutil::makeRemoveRequest(zookeeper_node, -1)); + requests.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode)); + if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty()) + { + std::string data = boost::algorithm::join(hardlinked_files, "\n"); + /// List of files used to detect hardlinks. path_to_set_hardlinked_files -- + /// is a path to source part zero copy node. During part removal hardlinked + /// files will be left for source part. + requests.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1)); + } + } + else + { + Coordination::Requests ops; + if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty()) + { + std::string data = boost::algorithm::join(hardlinked_files, "\n"); + /// List of files used to detect hardlinks. path_to_set_hardlinked_files -- + /// is a path to source part zero copy node. During part removal hardlinked + /// files will be left for source part. + requests.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1)); + } + requests.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode)); + } +} + void StorageReplicatedMergeTree::createZeroCopyLockNode( const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, int32_t mode, @@ -8942,75 +9044,49 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode( bool created = false; for (int attempts = 5; attempts > 0; --attempts) { - try + Coordination::Requests ops; + Coordination::Responses responses; + getZeroCopyLockNodeCreateOps(zookeeper, zookeeper_node, ops, mode, replace_existing_lock, path_to_set_hardlinked_files, hardlinked_files); + auto error = zookeeper->tryMulti(ops, responses); + if (error == Coordination::Error::ZOK) { - /// Ephemeral locks can be created only when we fetch shared data. - /// So it never require to create ancestors. If we create them - /// race condition with source replica drop is possible. - if (mode == zkutil::CreateMode::Persistent) - zookeeper->createAncestors(zookeeper_node); + created = true; + break; + } + else if (mode == zkutil::CreateMode::Persistent) + { + if (error == Coordination::Error::ZNONODE) + continue; - if (replace_existing_lock && zookeeper->exists(zookeeper_node)) + if (error == Coordination::Error::ZNODEEXISTS) { - Coordination::Requests ops; - ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_node, -1)); - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode)); - if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty()) - { - std::string data = boost::algorithm::join(hardlinked_files, "\n"); - /// List of files used to detect hardlinks. path_to_set_hardlinked_files -- - /// is a path to source part zero copy node. During part removal hardlinked - /// files will be left for source part. - ops.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1)); - } - Coordination::Responses responses; - auto error = zookeeper->tryMulti(ops, responses); - if (error == Coordination::Error::ZOK) + size_t failed_op = zkutil::getFailedOpIndex(error, responses); + /// Part was locked before, unfortunately it's possible during moves + if (ops[failed_op]->getPath() == zookeeper_node) { created = true; break; } - else if (error == Coordination::Error::ZNONODE && mode != zkutil::CreateMode::Persistent) - { - throw Exception(ErrorCodes::NOT_FOUND_NODE, - "Cannot create ephemeral zero copy lock {} because part was unlocked from zookeeper", zookeeper_node); - } + continue; } - else + } + else if (mode == zkutil::CreateMode::Ephemeral) + { + /// It is super rare case when we had part, but it was lost and we were unable to unlock it from keeper. + /// Now we are trying to fetch it from other replica and unlocking. + if (error == Coordination::Error::ZNODEEXISTS) { - Coordination::Requests ops; - if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty()) + size_t failed_op = zkutil::getFailedOpIndex(error, responses); + if (ops[failed_op]->getPath() == zookeeper_node) { - std::string data = boost::algorithm::join(hardlinked_files, "\n"); - /// List of files used to detect hardlinks. path_to_set_hardlinked_files -- - /// is a path to source part zero copy node. During part removal hardlinked - /// files will be left for source part. - ops.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1)); - } - ops.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode)); - - Coordination::Responses responses; - auto error = zookeeper->tryMulti(ops, responses); - if (error == Coordination::Error::ZOK || error == Coordination::Error::ZNODEEXISTS) - { - created = true; - break; - } - else if (error == Coordination::Error::ZNONODE && mode != zkutil::CreateMode::Persistent) - { - /// Ephemeral locks used during fetches so if parent node was removed we cannot do anything - throw Exception(ErrorCodes::NOT_FOUND_NODE, - "Cannot create ephemeral zero copy lock {} because part was unlocked from zookeeper", zookeeper_node); + LOG_WARNING(&Poco::Logger::get("ZeroCopyLocks"), "Replacing persistent lock with ephemeral for path {}. It can happen only in case of local part loss", zookeeper_node); + replace_existing_lock = true; + continue; } } } - catch (const zkutil::KeeperException & e) - { - if (e.code == Coordination::Error::ZNONODE) - continue; - throw; - } + zkutil::KeeperMultiException::check(error, ops, responses); } if (!created) diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index ade4e4f0b4bc..a148618744c6 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -252,7 +252,14 @@ class StorageReplicatedMergeTree final : public MergeTreeData bool replace_existing_lock, std::optional hardlinked_files) const; - void lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const; + void getLockSharedDataOps( + const IMergeTreeDataPart & part, + const ZooKeeperWithFaultInjectionPtr & zookeeper, + bool replace_existing_lock, + std::optional hardlinked_files, + Coordination::Requests & requests) const; + + zkutil::EphemeralNodeHolderPtr lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const; /// Unlock shared data part in zookeeper /// Return true if data unlocked @@ -542,7 +549,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const; /// Accepts a PreActive part, atomically checks its checksums with ones on other replicas and commit the part - DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const MutableDataPartPtr & part, std::optional hardlinked_files = {}); + DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const MutableDataPartPtr & part, std::optional hardlinked_files = {}, bool replace_zero_copy_lock=false); bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override; @@ -861,6 +868,12 @@ class StorageReplicatedMergeTree final : public MergeTreeData int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false, const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {}); + static void getZeroCopyLockNodeCreateOps( + const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, Coordination::Requests & requests, + int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false, + const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {}); + + bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override; /// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled. diff --git a/tests/queries/0_stateless/02725_start_stop_fetches.reference b/tests/queries/0_stateless/02725_start_stop_fetches.reference new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/queries/0_stateless/02725_start_stop_fetches.sh b/tests/queries/0_stateless/02725_start_stop_fetches.sh new file mode 100755 index 000000000000..0ca687ae951a --- /dev/null +++ b/tests/queries/0_stateless/02725_start_stop_fetches.sh @@ -0,0 +1,78 @@ +#!/usr/bin/env bash +# Tags: race, zookeeper, no-parallel, no-upgrade-check, no-replicated-database + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +set -e + +NUM_REPLICAS=5 + +for i in $(seq 1 $NUM_REPLICAS); do + $CLICKHOUSE_CLIENT -n -q " + DROP TABLE IF EXISTS r$i SYNC; + CREATE TABLE r$i (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/r', 'r$i') ORDER BY x SETTINGS replicated_deduplication_window = 1, allow_remote_fs_zero_copy_replication = 1; + " +done + +function thread { + while true; do + REPLICA=$(($RANDOM % 5 + 1)) + $CLICKHOUSE_CLIENT --query "INSERT INTO r$REPLICA SELECT rand()" + done +} + +function nemesis_thread1 { + while true; do + REPLICA=$(($RANDOM % 5 + 1)) + $CLICKHOUSE_CLIENT --query "SYSTEM STOP REPLICATED SENDS r$REPLICA" + sleep 0.5 + $CLICKHOUSE_CLIENT --query "SYSTEM START REPLICATED SENDS r$REPLICA" + done +} + +function nemesis_thread2 { + while true; do + REPLICA=$(($RANDOM % 5 + 1)) + $CLICKHOUSE_CLIENT --query "SYSTEM STOP FETCHES r$REPLICA" + sleep 0.5 + $CLICKHOUSE_CLIENT --query "SYSTEM START FETCHES r$REPLICA" + done +} + + + +export -f thread +export -f nemesis_thread1 +export -f nemesis_thread2 + +TIMEOUT=20 + +timeout $TIMEOUT bash -c thread 2>/dev/null & +timeout $TIMEOUT bash -c thread 2>/dev/null & +timeout $TIMEOUT bash -c thread 2>/dev/null & +timeout $TIMEOUT bash -c nemesis_thread1 2>/dev/null & +timeout $TIMEOUT bash -c nemesis_thread1 2>/dev/null & +timeout $TIMEOUT bash -c nemesis_thread1 2>/dev/null & +timeout $TIMEOUT bash -c nemesis_thread2 2>/dev/null & +timeout $TIMEOUT bash -c nemesis_thread2 2>/dev/null & +timeout $TIMEOUT bash -c nemesis_thread2 2>/dev/null & + +wait + + +for i in $(seq 1 $NUM_REPLICAS); do + $CLICKHOUSE_CLIENT -q "SYSTEM START FETCHES r$REPLICA" + $CLICKHOUSE_CLIENT -q "SYSTEM START REPLICATED SENDS r$REPLICA" +done + +for i in $(seq 1 $NUM_REPLICAS); do + $CLICKHOUSE_CLIENT --max_execution_time 60 -q "SYSTEM SYNC REPLICA r$i PULL" +done + +for i in $(seq 1 $NUM_REPLICAS); do + $CLICKHOUSE_CLIENT -q "DROP TABLE r$i" 2>/dev/null & +done + +wait