Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #47631 to 23.1: Fix wait for zero copy lock during move #47906

Merged
merged 2 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/Common/ZooKeeper/ZooKeeperLock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ ZooKeeperLock::~ZooKeeperLock()
}
}

bool ZooKeeperLock::isLocked() const
{
return locked;
}

const std::string & ZooKeeperLock::getLockPath() const
{
return lock_path;
}

void ZooKeeperLock::unlock()
{
if (!locked)
Expand Down
2 changes: 2 additions & 0 deletions src/Common/ZooKeeper/ZooKeeperLock.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class ZooKeeperLock

void unlock();
bool tryLock();
bool isLocked() const;
const std::string & getLockPath() const;

private:
zkutil::ZooKeeperPtr zookeeper;
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeFromLogEntryTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()

zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk);

if (!zero_copy_lock)
if (!zero_copy_lock || !zero_copy_lock->isLocked())
{
LOG_DEBUG(log, "Merge of part {} started by some other replica, will wait it and fetch merged part", entry.new_part_name);
/// Don't check for missing part -- it's missing because other replica still not
Expand Down
52 changes: 36 additions & 16 deletions src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7353,7 +7353,7 @@ bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, SpacePtr spa
if (moving_tagger->parts_to_move.empty())
return false;

return moveParts(moving_tagger);
return moveParts(moving_tagger, true);
}

MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::selectPartsForMove()
Expand Down Expand Up @@ -7408,7 +7408,7 @@ MergeTreeData::CurrentlyMovingPartsTaggerPtr MergeTreeData::checkPartsForMove(co
return std::make_shared<CurrentlyMovingPartsTagger>(std::move(parts_to_move), *this);
}

bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger)
bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, bool wait_for_move_if_zero_copy)
{
LOG_INFO(log, "Got {} parts to move.", moving_tagger->parts_to_move.size());

Expand Down Expand Up @@ -7455,21 +7455,41 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge
auto disk = moving_part.reserved_space->getDisk();
if (supportsReplication() && disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
{
/// If we acquired lock than let's try to move. After one
/// replica will actually move the part from disk to some
/// zero-copy storage other replicas will just fetch
/// metainformation.
if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock)
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);
}
else
/// This loop is not endless, if shutdown called/connection failed/replica became readonly
/// we will return true from waitZeroCopyLock and createZeroCopyLock will return nullopt.
while (true)
{
/// Move will be retried but with backoff.
LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name);
result = false;
continue;
/// If we acquired lock than let's try to move. After one
/// replica will actually move the part from disk to some
/// zero-copy storage other replicas will just fetch
/// metainformation.
if (auto lock = tryCreateZeroCopyExclusiveLock(moving_part.part->name, disk); lock)
{
if (lock->isLocked())
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);
break;
}
else if (wait_for_move_if_zero_copy)
{
LOG_DEBUG(log, "Other replica is working on move of {}, will wait until lock disappear", moving_part.part->name);
/// Wait and checks not only for timeout but also for shutdown and so on.
while (!waitZeroCopyLockToDisappear(*lock, 3000))
{
LOG_DEBUG(log, "Waiting until some replica will move {} and zero copy lock disappear", moving_part.part->name);
}
}
else
break;
}
else
{
/// Move will be retried but with backoff.
LOG_DEBUG(log, "Move of part {} postponed, because zero copy mode enabled and someone other moving this part right now", moving_part.part->name);
result = MovePartsOutcome::false;
break;
}
}
}
else /// Ordinary move as it should be
Expand Down
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
using CurrentlyMovingPartsTaggerPtr = std::shared_ptr<CurrentlyMovingPartsTagger>;

/// Move selected parts to corresponding disks
bool moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger);
bool moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagger, bool wait_for_move_if_zero_copy=false);

/// Select parts for move and disks for them. Used in background moving processes.
CurrentlyMovingPartsTaggerPtr selectPartsForMove();
Expand Down Expand Up @@ -1510,6 +1510,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
/// Create zero-copy exclusive lock for part and disk. Useful for coordination of
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; }
virtual bool waitZeroCopyLockToDisappear(const ZeroCopyLock &, size_t) { return false; }

/// Remove parts from disk calling part->remove(). Can do it in parallel in case of big set of parts and enabled settings.
/// If we fail to remove some part and throw_on_error equal to `true` will throw an exception on the first failed part.
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MutateFromLogEntryTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare()

zero_copy_lock = storage.tryCreateZeroCopyExclusiveLock(entry.new_part_name, disk);

if (!zero_copy_lock)
if (!zero_copy_lock || !zero_copy_lock->isLocked())
{
LOG_DEBUG(log, "Mutation of part {} started by some other replica, will wait it and mutated merged part", entry.new_part_name);
return PrepareResult{
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/ZeroCopyLock.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ struct ZeroCopyLock
{
ZeroCopyLock(const zkutil::ZooKeeperPtr & zookeeper, const std::string & lock_path);

bool isLocked() const { return lock->isLocked(); }
/// Actual lock
std::unique_ptr<zkutil::ZooKeeperLock> lock;
};
Expand Down
32 changes: 28 additions & 4 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8492,11 +8492,37 @@ std::optional<String> StorageReplicatedMergeTree::getZeroCopyPartPath(const Stri
return getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(), part_name, zookeeper_path)[0];
}

bool StorageReplicatedMergeTree::waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait)
{
if (lock.isLocked())
return true;

if (partial_shutdown_called.load(std::memory_order_relaxed))
return true;

auto lock_path = lock.lock->getLockPath();
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return true;

Stopwatch time_waiting;
const auto & stop_waiting = [&]()
{
bool timeout_exceeded = milliseconds_to_wait < time_waiting.elapsedMilliseconds();
return partial_shutdown_called.load(std::memory_order_relaxed) || is_readonly.load(std::memory_order_relaxed) || timeout_exceeded;
};

return zookeeper->waitForDisappear(lock_path, stop_waiting);
}

std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk)
{
if (!disk || !disk->supportZeroCopyReplication())
return std::nullopt;

if (partial_shutdown_called.load(std::memory_order_relaxed) || is_readonly.load(std::memory_order_relaxed))
return std::nullopt;

zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return std::nullopt;
Expand All @@ -8509,10 +8535,8 @@ std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusi

/// Create actual lock
ZeroCopyLock lock(zookeeper, zc_zookeeper_path);
if (lock.lock->tryLock())
return lock;
else
return std::nullopt;
lock.lock->tryLock();
return lock;
}

String StorageReplicatedMergeTree::findReplicaHavingPart(
Expand Down
7 changes: 6 additions & 1 deletion src/Storages/StorageReplicatedMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -863,9 +863,14 @@ class StorageReplicatedMergeTree final : public MergeTreeData
std::optional<String> getZeroCopyPartPath(const String & part_name, const DiskPtr & disk);

/// Create ephemeral lock in zookeeper for part and disk which support zero copy replication.
/// If somebody already holding the lock -- return std::nullopt.
/// If no connection to zookeeper, shutdown, readonly -- return std::nullopt.
/// If somebody already holding the lock -- return unlocked ZeroCopyLock object (not std::nullopt).
std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String & part_name, const DiskPtr & disk) override;

/// Wait for ephemral lock to disappear. Return true if table shutdown/readonly/timeout exceeded, etc.
/// Or if node actually disappeared.
bool waitZeroCopyLockToDisappear(const ZeroCopyLock & lock, size_t milliseconds_to_wait) override;

void startupImpl(bool from_attach_thread);
};

Expand Down
79 changes: 79 additions & 0 deletions tests/integration/test_zero_copy_fetch/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import string
import time

from multiprocessing.dummy import Pool

import pytest
from helpers.cluster import ClickHouseCluster

Expand Down Expand Up @@ -102,3 +104,80 @@ def get_part_to_disk(query_result):
assert part_to_disk["20230102_0_0_0"] == "s3"
assert part_to_disk["20230109_0_0_0"] == "s3"
assert part_to_disk["20230116_0_0_0"] == "default"


def test_concurrent_move_to_s3(started_cluster):
node1 = cluster.instances["node1"]
node2 = cluster.instances["node2"]

node1.query(
"""
CREATE TABLE test_concurrent_move (EventDate Date, CounterID UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test_concurrent_move', 'r1')
PARTITION BY CounterID
ORDER BY (CounterID, EventDate)
SETTINGS index_granularity = 8192, storage_policy = 's3'"""
)

node2.query(
"""
CREATE TABLE test_concurrent_move (EventDate Date, CounterID UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse-tables/test_concurrent_move', 'r2')
PARTITION BY CounterID
ORDER BY (CounterID, EventDate)
SETTINGS index_granularity = 8192, storage_policy = 's3'"""
)
partitions = range(10)

for i in partitions:
node1.query(
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number), {i} from system.numbers limit 20"
)
node1.query(
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), {i} from system.numbers limit 20"
)
node1.query(
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), {i} from system.numbers limit 20"
)
node1.query(
f"INSERT INTO test_concurrent_move SELECT toDate('2023-01-01') + toIntervalDay(number) + rand(), {i} from system.numbers limit 20"
)

node2.query("SYSTEM SYNC REPLICA test_concurrent_move")

# check that we can move parts concurrently without exceptions
p = Pool(3)
for i in partitions:

def move_partition_to_s3(node):
node.query(
f"ALTER TABLE test_concurrent_move MOVE PARTITION '{i}' TO DISK 's3'"
)

j1 = p.apply_async(move_partition_to_s3, (node1,))
j2 = p.apply_async(move_partition_to_s3, (node2,))
j1.get()
j2.get()

def get_part_to_disk(query_result):
part_to_disk = {}
for row in query_result.strip().split("\n"):
disk, part = row.split("\t")
part_to_disk[part] = disk
return part_to_disk

part_to_disk = get_part_to_disk(
node1.query(
"SELECT disk_name, name FROM system.parts where table = 'test_concurrent_move' and active"
)
)

assert all([value == "s3" for value in part_to_disk.values()])

part_to_disk = get_part_to_disk(
node2.query(
"SELECT disk_name, name FROM system.parts where table = 'test_concurrent_move' and active"
)
)
assert all([value == "s3" for value in part_to_disk.values()])