Skip to content

Commit

Permalink
Backport #50026 to 23.4: Avoid deadlock when starting table in attach…
Browse files Browse the repository at this point in the history
… thread of `ReplicatedMergeTree`
  • Loading branch information
robot-clickhouse committed May 22, 2023
1 parent f03e374 commit a005e61
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 11 deletions.
11 changes: 8 additions & 3 deletions src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@ class ReplicatedMergeTreeRestartingThread
public:
explicit ReplicatedMergeTreeRestartingThread(StorageReplicatedMergeTree & storage_);

void start() { task->activateAndSchedule(); }
void start(bool schedule = true)
{
if (schedule)
task->activateAndSchedule();
else
task->activate();
}

void wakeup() { task->schedule(); }

void shutdown(bool part_of_full_shutdown);

void run();
private:
StorageReplicatedMergeTree & storage;
String log_name;
Expand All @@ -43,8 +50,6 @@ class ReplicatedMergeTreeRestartingThread
UInt32 consecutive_check_failures = 0; /// How many consecutive checks have failed
bool first_time = true; /// Activate replica for the first time.

void run();

/// Restarts table if needed, returns false if it failed to restart replica.
bool runImpl();

Expand Down
25 changes: 17 additions & 8 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4370,21 +4370,30 @@ void StorageReplicatedMergeTree::startupImpl(bool from_attach_thread)

startBeingLeader();

/// In this thread replica will be activated.
restarting_thread.start();
/// Activate replica in a separate thread if we are not calling from attach thread
restarting_thread.start(/*schedule=*/!from_attach_thread);

if (from_attach_thread)
{
/// Try activating replica in current thread.
restarting_thread.run();
}
else
{
/// Wait while restarting_thread finishing initialization.
/// NOTE It does not mean that replication is actually started after receiving this event.
/// It only means that an attempt to startup replication was made.
/// Table may be still in readonly mode if this attempt failed for any reason.
startup_event.wait();
}

/// And this is just a callback
session_expired_callback_handler = EventNotifier::instance().subscribe(Coordination::Error::ZSESSIONEXPIRED, [this]()
{
LOG_TEST(log, "Received event for expired session. Waking up restarting thread");
restarting_thread.start();
});

/// Wait while restarting_thread finishing initialization.
/// NOTE It does not mean that replication is actually started after receiving this event.
/// It only means that an attempt to startup replication was made.
/// Table may be still in readonly mode if this attempt failed for any reason.
startup_event.wait();

startBackgroundMovesIfNeeded();

part_moves_between_shards_orchestrator.start();
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<clickhouse>
<background_schedule_pool_size>1</background_schedule_pool_size>
<merge_tree>
<initialization_retry_period>5</initialization_retry_period>
</merge_tree>
</clickhouse>
63 changes: 63 additions & 0 deletions tests/integration/test_replicated_table_attach/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import pytest

from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager


cluster = ClickHouseCluster(__file__)

node = cluster.add_instance(
"node",
main_configs=["configs/config.xml"],
with_zookeeper=True,
stay_alive=True,
)


@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster

finally:
cluster.shutdown()


def test_startup_with_small_bg_pool(started_cluster):
node.query(
"CREATE TABLE replicated_table (k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree('/clickhouse/replicated_table', 'r1') ORDER BY k"
)

node.query("INSERT INTO replicated_table VALUES(20, 30)")

def assert_values():
assert node.query("SELECT * FROM replicated_table") == "20\t30\n"

assert_values()
node.restart_clickhouse(stop_start_wait_sec=10)
assert_values()

node.query("DROP TABLE replicated_table SYNC")


def test_startup_with_small_bg_pool_partitioned(started_cluster):
node.query(
"CREATE TABLE replicated_table_partitioned (k UInt64, i32 Int32) ENGINE=ReplicatedMergeTree('/clickhouse/replicated_table_partitioned', 'r1') ORDER BY k"
)

node.query("INSERT INTO replicated_table_partitioned VALUES(20, 30)")

def assert_values():
assert node.query("SELECT * FROM replicated_table_partitioned") == "20\t30\n"

assert_values()
with PartitionManager() as pm:
pm.drop_instance_zk_connections(node)
node.restart_clickhouse(stop_start_wait_sec=20)
assert_values()

# check that we activate it in the end
node.query_with_retry("INSERT INTO replicated_table_partitioned VALUES(20, 30)")

node.query("DROP TABLE replicated_table_partitioned SYNC")

0 comments on commit a005e61

Please sign in to comment.