Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An option to avoid waiting for inactive Replicated database replicas #58350

Merged
merged 4 commits into from Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/en/operations/settings/settings.md
Expand Up @@ -3847,6 +3847,8 @@ Possible values:
- `none` — Is similar to throw, but distributed DDL query returns no result set.
- `null_status_on_timeout` — Returns `NULL` as execution status in some rows of result set instead of throwing `TIMEOUT_EXCEEDED` if query is not finished on the corresponding hosts.
- `never_throw` — Do not throw `TIMEOUT_EXCEEDED` and do not rethrow exceptions if query has failed on some hosts.
- `null_status_on_timeout_only_active` — similar to `null_status_on_timeout`, but doesn't wait for inactive replicas of the `Replicated` database
- `throw_only_active` — similar to `throw`, but doesn't wait for inactive replicas of the `Replicated` database

Default value: `throw`.

Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsEnums.cpp
Expand Up @@ -115,6 +115,8 @@ IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS,
{{"none", DistributedDDLOutputMode::NONE},
{"throw", DistributedDDLOutputMode::THROW},
{"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT},
{"throw_only_active", DistributedDDLOutputMode::THROW_ONLY_ACTIVE},
{"null_status_on_timeout_only_active", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE},
{"never_throw", DistributedDDLOutputMode::NEVER_THROW}})

IMPLEMENT_SETTING_ENUM(StreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS,
Expand Down
2 changes: 2 additions & 0 deletions src/Core/SettingsEnums.h
Expand Up @@ -173,6 +173,8 @@ enum class DistributedDDLOutputMode
THROW,
NULL_STATUS_ON_TIMEOUT,
NEVER_THROW,
THROW_ONLY_ACTIVE,
NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE,
};

DECLARE_SETTING_ENUM(DistributedDDLOutputMode)
Expand Down
101 changes: 77 additions & 24 deletions src/Interpreters/executeDDLQueryOnCluster.cpp
Expand Up @@ -200,8 +200,6 @@ class DDLQueryStatusSource final : public ISource
Status prepare() override;

private:
static Strings getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path);

static Block getSampleBlock(ContextPtr context_, bool hosts_to_wait);

Strings getNewAndUpdate(const Strings & current_list_of_finished_hosts);
Expand All @@ -228,7 +226,8 @@ class DDLQueryStatusSource final : public ISource
NameSet waiting_hosts; /// hosts from task host list
NameSet finished_hosts; /// finished hosts from host list
NameSet ignoring_hosts; /// appeared hosts that are not in hosts list
Strings current_active_hosts; /// Hosts that were in active state at the last check
Strings current_active_hosts; /// Hosts that are currently executing the task
NameSet offline_hosts; /// Hosts that are not currently running
size_t num_hosts_finished = 0;

/// Save the first detected error and throw it at the end of execution
Expand All @@ -237,7 +236,10 @@ class DDLQueryStatusSource final : public ISource
Int64 timeout_seconds = 120;
bool is_replicated_database = false;
bool throw_on_timeout = true;
bool only_running_hosts = false;

bool timeout_exceeded = false;
bool stop_waiting_offline_hosts = false;
};


Expand Down Expand Up @@ -310,12 +312,15 @@ DDLQueryStatusSource::DDLQueryStatusSource(
, log(&Poco::Logger::get("DDLQueryStatusSource"))
{
auto output_mode = context->getSettingsRef().distributed_ddl_output_mode;
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::NONE;
throw_on_timeout = output_mode == DistributedDDLOutputMode::THROW || output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE
|| output_mode == DistributedDDLOutputMode::NONE;

if (hosts_to_wait)
{
evillique marked this conversation as resolved.
Show resolved Hide resolved
waiting_hosts = NameSet(hosts_to_wait->begin(), hosts_to_wait->end());
is_replicated_database = true;
only_running_hosts = output_mode == DistributedDDLOutputMode::THROW_ONLY_ACTIVE ||
output_mode == DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT_ONLY_ACTIVE;
}
else
{
Expand Down Expand Up @@ -377,6 +382,38 @@ Chunk DDLQueryStatusSource::generateChunkWithUnfinishedHosts() const
return Chunk(std::move(columns), unfinished_hosts.size());
}

static NameSet getOfflineHosts(const String & node_path, const NameSet & hosts_to_wait, const ZooKeeperPtr & zookeeper, Poco::Logger * log)
{
fs::path replicas_path;
if (node_path.ends_with('/'))
replicas_path = fs::path(node_path).parent_path().parent_path().parent_path() / "replicas";
else
replicas_path = fs::path(node_path).parent_path().parent_path() / "replicas";

Strings paths;
Strings hosts_array;
for (const auto & host : hosts_to_wait)
{
hosts_array.push_back(host);
paths.push_back(replicas_path / host / "active");
}

NameSet offline;
auto res = zookeeper->tryGet(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error == Coordination::Error::ZNONODE)
offline.insert(hosts_array[i]);

if (offline.size() == hosts_to_wait.size())
{
/// Avoid reporting that all hosts are offline
LOG_WARNING(log, "Did not find active hosts, will wait for all {} hosts. This should not happen often", offline.size());
return {};
}

return offline;
}

Chunk DDLQueryStatusSource::generate()
{
bool all_hosts_finished = num_hosts_finished >= waiting_hosts.size();
Expand All @@ -398,15 +435,15 @@ Chunk DDLQueryStatusSource::generate()
if (isCancelled())
return {};

if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
if (stop_waiting_offline_hosts || (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds))
{
timeout_exceeded = true;

size_t num_unfinished_hosts = waiting_hosts.size() - num_hosts_finished;
size_t num_active_hosts = current_active_hosts.size();

constexpr auto msg_format = "Watching task {} is executing longer than distributed_ddl_task_timeout (={}) seconds. "
"There are {} unfinished hosts ({} of them are currently active), "
"There are {} unfinished hosts ({} of them are currently executing the task), "
"they are going to execute the query in background";
if (throw_on_timeout)
{
Expand All @@ -425,10 +462,7 @@ Chunk DDLQueryStatusSource::generate()
return generateChunkWithUnfinishedHosts();
}

if (num_hosts_finished != 0 || try_number != 0)
{
sleepForMilliseconds(std::min<size_t>(1000, 50 * (try_number + 1)));
}
sleepForMilliseconds(std::min<size_t>(1000, 50 * try_number));

bool node_exists = false;
Strings tmp_hosts;
Expand All @@ -440,9 +474,21 @@ Chunk DDLQueryStatusSource::generate()
retries_ctl.retryLoop([&]()
{
auto zookeeper = context->getZooKeeper();
node_exists = zookeeper->exists(node_path);
tmp_hosts = getChildrenAllowNoNode(zookeeper, fs::path(node_path) / node_to_wait);
tmp_active_hosts = getChildrenAllowNoNode(zookeeper, fs::path(node_path) / "active");
Strings paths = {String(fs::path(node_path) / node_to_wait), String(fs::path(node_path) / "active")};
auto res = zookeeper->tryGetChildren(paths);
for (size_t i = 0; i < res.size(); ++i)
if (res[i].error != Coordination::Error::ZOK && res[i].error != Coordination::Error::ZNONODE)
throw Coordination::Exception::fromPath(res[i].error, paths[i]);

if (res[0].error == Coordination::Error::ZNONODE)
node_exists = zookeeper->exists(node_path);
else
node_exists = true;
tmp_hosts = res[0].names;
tmp_active_hosts = res[1].names;

if (only_running_hosts)
offline_hosts = getOfflineHosts(node_path, waiting_hosts, zookeeper, log);
});
}

Expand All @@ -460,6 +506,17 @@ Chunk DDLQueryStatusSource::generate()

Strings new_hosts = getNewAndUpdate(tmp_hosts);
++try_number;

if (only_running_hosts)
{
size_t num_finished_or_offline = 0;
for (const auto & host : waiting_hosts)
num_finished_or_offline += finished_hosts.contains(host) || offline_hosts.contains(host);

if (num_finished_or_offline == waiting_hosts.size())
stop_waiting_offline_hosts = true;
}

if (new_hosts.empty())
continue;

Expand All @@ -470,7 +527,13 @@ Chunk DDLQueryStatusSource::generate()
{
ExecutionStatus status(-1, "Cannot obtain error message");

if (node_to_wait == "finished")
/// Replicated database retries in case of error, it should not write error status.
#ifdef ABORT_ON_LOGICAL_ERROR
bool need_check_status = true;
#else
bool need_check_status = !is_replicated_database;
#endif
if (need_check_status)
{
String status_data;
bool finished_exists = false;
Expand All @@ -496,7 +559,6 @@ Chunk DDLQueryStatusSource::generate()
if (status.code != 0 && !first_exception
&& context->getSettingsRef().distributed_ddl_output_mode != DistributedDDLOutputMode::NEVER_THROW)
{
/// Replicated database retries in case of error, it should not write error status.
if (is_replicated_database)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There was an error on {}: {} (probably it's a bug)", host_id, status.message);

Expand Down Expand Up @@ -555,15 +617,6 @@ IProcessor::Status DDLQueryStatusSource::prepare()
return ISource::prepare();
}

Strings DDLQueryStatusSource::getChildrenAllowNoNode(const std::shared_ptr<zkutil::ZooKeeper> & zookeeper, const String & node_path)
{
Strings res;
Coordination::Error code = zookeeper->tryGetChildren(node_path, res);
if (code != Coordination::Error::ZOK && code != Coordination::Error::ZNONODE)
throw Coordination::Exception::fromPath(code, node_path);
return res;
}

Strings DDLQueryStatusSource::getNewAndUpdate(const Strings & current_list_of_finished_hosts)
{
Strings diff;
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_replicated_database/test.py
Expand Up @@ -507,7 +507,7 @@ def test_alters_from_different_replicas(started_cluster):

settings = {"distributed_ddl_task_timeout": 5}
assert (
"There are 1 unfinished hosts (0 of them are currently active)"
"There are 1 unfinished hosts (0 of them are currently executing the task"
in competing_node.query_and_get_error(
"ALTER TABLE alters_from_different_replicas.concurrent_test ADD COLUMN Added0 UInt32;",
settings=settings,
Expand Down
Expand Up @@ -96,7 +96,7 @@ def test_cluster_groups(started_cluster):
main_node_2.stop_clickhouse()
settings = {"distributed_ddl_task_timeout": 5}
assert (
"There are 1 unfinished hosts (0 of them are currently active)"
"There are 1 unfinished hosts (0 of them are currently executing the task)"
in main_node_1.query_and_get_error(
"CREATE TABLE cluster_groups.table_2 (d Date, k UInt64) ENGINE=ReplicatedMergeTree ORDER BY k PARTITION BY toYYYYMM(d);",
settings=settings,
Expand Down
Expand Up @@ -3,7 +3,7 @@ Received exception from server:
Code: 57. Error: Received from localhost:9000. Error: There was an error on [localhost:9000]: Code: 57. Error: Table default.none already exists. (TABLE_ALREADY_EXISTS)
(query: create table none on cluster test_shard_localhost (n int) engine=Memory;)
Received exception from server:
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently executing the task), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
(query: drop table if exists none on cluster test_unavailable_shard;)
throw
localhost 9000 0 0 0
Expand All @@ -12,7 +12,7 @@ Code: 57. Error: Received from localhost:9000. Error: There was an error on [loc
(query: create table throw on cluster test_shard_localhost (n int) engine=Memory format Null;)
localhost 9000 0 1 0
Received exception from server:
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently active), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
Code: 159. Error: Received from localhost:9000. Error: Watching task <task> is executing longer than distributed_ddl_task_timeout (=1) seconds. There are 1 unfinished hosts (0 of them are currently executing the task), they are going to execute the query in background. (TIMEOUT_EXCEEDED)
(query: drop table if exists throw on cluster test_unavailable_shard;)
null_status_on_timeout
localhost 9000 0 0 0
Expand Down
Expand Up @@ -33,7 +33,7 @@ function run_until_out_contains()
done
}

RAND_COMMENT="01175_DDL_$RANDOM"
RAND_COMMENT="01175_DDL_$CLICKHOUSE_DATABASE"
LOG_COMMENT="${CLICKHOUSE_LOG_COMMENT}_$RAND_COMMENT"

CLICKHOUSE_CLIENT_WITH_SETTINGS=${CLICKHOUSE_CLIENT/--log_comment ${CLICKHOUSE_LOG_COMMENT}/--log_comment ${LOG_COMMENT}}
Expand Down
Expand Up @@ -13,9 +13,15 @@ t
rdb_default 1 1 s1 r1 1
2
2
s1 r1 OK 2 0
s1 r2 QUEUED 2 0
s2 r1 QUEUED 2 0
2
rdb_default 1 1 s1 r1 1
rdb_default 1 2 s1 r2 0
2
2
t
t2
t3
rdb_default_4 1 1 s1 r1 1
4 changes: 4 additions & 0 deletions tests/queries/0_stateless/02447_drop_database_replica.sh
Expand Up @@ -32,6 +32,10 @@ $CLICKHOUSE_CLIENT -q "system sync database replica $db"
$CLICKHOUSE_CLIENT -q "select cluster, shard_num, replica_num, database_shard_name, database_replica_name, is_active from system.clusters where cluster='$db' and shard_num=1 and replica_num=1"
$CLICKHOUSE_CLIENT -q "system drop database replica 's1|r1' from database $db2" 2>&1| grep -Fac "is active, cannot drop it"

# Also check that it doesn't exceed distributed_ddl_task_timeout waiting for inactive replicas
timeout 60s $CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=1000 --distributed_ddl_output_mode=throw_only_active -q "create table $db.t2 (n int) engine=Log" 2>&1| grep -Fac "TIMEOUT_EXCEEDED"
timeout 60s $CLICKHOUSE_CLIENT --distributed_ddl_task_timeout=1000 --distributed_ddl_output_mode=null_status_on_timeout_only_active -q "create table $db.t3 (n int) engine=Log" | sort

$CLICKHOUSE_CLIENT -q "detach database $db3"
$CLICKHOUSE_CLIENT -q "system drop database replica 'r1' from shard 's2' from database $db"
$CLICKHOUSE_CLIENT -q "attach database $db3" 2>/dev/null
Expand Down