From 0492f6724c9ccd7482837932372706af93a521c4 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 1 Jun 2025 06:41:27 +0000 Subject: [PATCH 1/3] Merge pull request #79369 from ilejn/ignore_error_distributed_ddl_queue Ignore parse error in system.distributed_ddl_queue --- .../System/StorageSystemDDLWorkerQueue.cpp | 26 +++- .../test_system_ddl_worker_queue/test.py | 128 +++++++++++++++--- 2 files changed, 130 insertions(+), 24 deletions(-) diff --git a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp index 9f88955e9204..1132e990eed7 100644 --- a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp +++ b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp @@ -29,6 +29,11 @@ namespace Setting extern const SettingsUInt64 max_query_size; } +namespace ErrorCodes +{ + extern const int SYNTAX_ERROR; +} + enum class Status : uint8_t { INACTIVE, @@ -62,7 +67,7 @@ ColumnsDescription StorageSystemDDLWorkerQueue::getColumnsDescription() {"entry_version", std::make_shared(std::make_shared()), "Version of the entry."}, {"initiator_host", std::make_shared(std::make_shared()), "Host that initiated the DDL operation."}, {"initiator_port", std::make_shared(std::make_shared()), "Port used by the initiator."}, - {"cluster", std::make_shared(), "Cluster name."}, + {"cluster", std::make_shared(), "Cluster name, empty if not determined."}, {"query", std::make_shared(), "Query executed."}, {"settings", std::make_shared(std::make_shared(), std::make_shared()), "Settings used in the DDL operation."}, {"query_create_time", std::make_shared(), "Query created time."}, @@ -85,8 +90,23 @@ static String clusterNameFromDDLQuery(ContextPtr context, const DDLTask & task) String description = fmt::format("from {}", task.entry_path); ParserQuery parser_query(end, settings[Setting::allow_settings_after_format_in_insert]); - ASTPtr query = parseQuery( - parser_query, begin, end, description, settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]); + ASTPtr query; + + try + { + query = parseQuery( + parser_query, begin, end, description, settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]); + } + catch (const Exception & e) + { + LOG_INFO(getLogger("StorageSystemDDLWorkerQueue"), "Failed to determine cluster"); + if (e.code() == ErrorCodes::SYNTAX_ERROR) + { + /// ignore parse error and present available information + return ""; + } + throw; + } String cluster_name; if (const auto * query_on_cluster = dynamic_cast(query.get())) diff --git a/tests/integration/test_system_ddl_worker_queue/test.py b/tests/integration/test_system_ddl_worker_queue/test.py index 4659e5b92e84..1bebf709a821 100644 --- a/tests/integration/test_system_ddl_worker_queue/test.py +++ b/tests/integration/test_system_ddl_worker_queue/test.py @@ -1,4 +1,5 @@ import pytest +import time from helpers.cluster import ClickHouseCluster @@ -25,46 +26,131 @@ def started_cluster(): try: cluster.start() - for i, node in enumerate([node1, node2]): - node.query("CREATE DATABASE testdb") - node.query( - """CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table1', '{}') ORDER BY id;""".format( - i - ) - ) - for i, node in enumerate([node3, node4]): - node.query("CREATE DATABASE testdb") - node.query( - """CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table2', '{}') ORDER BY id;""".format( - i - ) - ) yield cluster finally: cluster.shutdown() +def maintain_test_table(test_table): + tmark = time.time() # to guarantee ZK path uniqueness + + for i, node in enumerate([node1, node2]): + node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC") + node.query("DROP DATABASE IF EXISTS testdb") + + node.query("CREATE DATABASE testdb") + node.query( + f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}1-{tmark}', '{i}') ORDER BY id;" + ) + for i, node in enumerate([node3, node4]): + node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC") + node.query("DROP DATABASE IF EXISTS testdb") + + node.query("CREATE DATABASE testdb") + node.query( + f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}2-{tmark}', '{i}') ORDER BY id;" + ) + + def test_distributed_ddl_queue(started_cluster): + test_table = "test_table" + maintain_test_table(test_table) node1.query( - "INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)" + f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)" ) node3.query( - "INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)" + f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)" ) - node2.query("SYSTEM SYNC REPLICA testdb.test_table") - node4.query("SYSTEM SYNC REPLICA testdb.test_table") + node2.query(f"SYSTEM SYNC REPLICA testdb.{test_table}") + node4.query(f"SYSTEM SYNC REPLICA testdb.{test_table}") node1.query( - "ALTER TABLE testdb.test_table ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val", + f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val", settings={"replication_alter_partitions_sync": "2"}, ) for node in nodes: - node.query("SYSTEM SYNC REPLICA testdb.test_table") - assert node.query("SELECT somecolumn FROM testdb.test_table LIMIT 1") == "0\n" + node.query(f"SYSTEM SYNC REPLICA testdb.{test_table}") + assert ( + node.query(f"SELECT somecolumn FROM testdb.{test_table} LIMIT 1") == "0\n" + ) assert ( node.query( "SELECT If((SELECT count(*) FROM system.distributed_ddl_queue WHERE cluster='test_cluster' AND entry='query-0000000000') > 0, 'ok', 'fail')" ) == "ok\n" ) + + node1.query( + f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somecolumn", + settings={"replication_alter_partitions_sync": "2"}, + ) + + +def test_distributed_ddl_rubbish(started_cluster): + test_table = "test_table_rubbish" + maintain_test_table(test_table) + node1.query( + f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somenewcolumn UInt8 AFTER val", + settings={"replication_alter_partitions_sync": "2"}, + ) + + zk_content = node1.query( + "SELECT name, value, path FROM system.zookeeper WHERE path LIKE '/clickhouse/task_queue/ddl%' SETTINGS allow_unrestricted_reads_from_keeper=true", + parse=True, + ).to_dict("records") + + original_query = "" + new_query = "query-artificial-" + str(time.monotonic_ns()) + + # Copy information about query (one that added 'somenewcolumn') with new query ID + # and broken query text (TABLE => TUBLE) + for row in zk_content: + if row["value"].find("somenewcolumn") >= 0: + original_query = row["name"] + break + + rows_to_insert = [] + + for row in zk_content: + if row["name"] == original_query: + rows_to_insert.append( + { + "name": new_query, + "path": row["path"], + "value": row["value"].replace("TABLE", "TUBLE"), + } + ) + continue + pos = row["path"].find(original_query) + if pos >= 0: + rows_to_insert.append( + { + "name": row["name"], + "path": row["path"].replace(original_query, new_query), + "value": row["value"], + } + ) + + # Ingest it to ZK + for row in rows_to_insert: + node1.query( + "insert into system.zookeeper (name, path, value) values ('{}', '{}', '{}')".format( + f'{row["name"]}', f'{row["path"]}', f'{row["value"]}' + ) + ) + + # Ensure that data is visible via system.distributed_ddl_queue + assert ( + int( + node1.query( + f"SELECT count(1) FROM system.distributed_ddl_queue WHERE entry='{new_query}' AND cluster=''" + ) + ) + == 4 + ) + + node1.query( + f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somenewcolumn", + settings={"replication_alter_partitions_sync": "2"}, + ) From 2df8357155f71970e937676614887b9d1df56cee Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Fri, 7 Nov 2025 11:05:05 +0000 Subject: [PATCH 2/3] Revert "Merge pull request #79369 from ilejn/ignore_error_distributed_ddl_queue" This reverts commit 0492f6724c9ccd7482837932372706af93a521c4. --- .../System/StorageSystemDDLWorkerQueue.cpp | 26 +--- .../test_system_ddl_worker_queue/test.py | 128 +++--------------- 2 files changed, 24 insertions(+), 130 deletions(-) diff --git a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp index 1132e990eed7..9f88955e9204 100644 --- a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp +++ b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp @@ -29,11 +29,6 @@ namespace Setting extern const SettingsUInt64 max_query_size; } -namespace ErrorCodes -{ - extern const int SYNTAX_ERROR; -} - enum class Status : uint8_t { INACTIVE, @@ -67,7 +62,7 @@ ColumnsDescription StorageSystemDDLWorkerQueue::getColumnsDescription() {"entry_version", std::make_shared(std::make_shared()), "Version of the entry."}, {"initiator_host", std::make_shared(std::make_shared()), "Host that initiated the DDL operation."}, {"initiator_port", std::make_shared(std::make_shared()), "Port used by the initiator."}, - {"cluster", std::make_shared(), "Cluster name, empty if not determined."}, + {"cluster", std::make_shared(), "Cluster name."}, {"query", std::make_shared(), "Query executed."}, {"settings", std::make_shared(std::make_shared(), std::make_shared()), "Settings used in the DDL operation."}, {"query_create_time", std::make_shared(), "Query created time."}, @@ -90,23 +85,8 @@ static String clusterNameFromDDLQuery(ContextPtr context, const DDLTask & task) String description = fmt::format("from {}", task.entry_path); ParserQuery parser_query(end, settings[Setting::allow_settings_after_format_in_insert]); - ASTPtr query; - - try - { - query = parseQuery( - parser_query, begin, end, description, settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]); - } - catch (const Exception & e) - { - LOG_INFO(getLogger("StorageSystemDDLWorkerQueue"), "Failed to determine cluster"); - if (e.code() == ErrorCodes::SYNTAX_ERROR) - { - /// ignore parse error and present available information - return ""; - } - throw; - } + ASTPtr query = parseQuery( + parser_query, begin, end, description, settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]); String cluster_name; if (const auto * query_on_cluster = dynamic_cast(query.get())) diff --git a/tests/integration/test_system_ddl_worker_queue/test.py b/tests/integration/test_system_ddl_worker_queue/test.py index 1bebf709a821..4659e5b92e84 100644 --- a/tests/integration/test_system_ddl_worker_queue/test.py +++ b/tests/integration/test_system_ddl_worker_queue/test.py @@ -1,5 +1,4 @@ import pytest -import time from helpers.cluster import ClickHouseCluster @@ -26,131 +25,46 @@ def started_cluster(): try: cluster.start() + for i, node in enumerate([node1, node2]): + node.query("CREATE DATABASE testdb") + node.query( + """CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table1', '{}') ORDER BY id;""".format( + i + ) + ) + for i, node in enumerate([node3, node4]): + node.query("CREATE DATABASE testdb") + node.query( + """CREATE TABLE testdb.test_table(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/test_table2', '{}') ORDER BY id;""".format( + i + ) + ) yield cluster finally: cluster.shutdown() -def maintain_test_table(test_table): - tmark = time.time() # to guarantee ZK path uniqueness - - for i, node in enumerate([node1, node2]): - node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC") - node.query("DROP DATABASE IF EXISTS testdb") - - node.query("CREATE DATABASE testdb") - node.query( - f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}1-{tmark}', '{i}') ORDER BY id;" - ) - for i, node in enumerate([node3, node4]): - node.query(f"DROP TABLE IF EXISTS testdb.{test_table} SYNC") - node.query("DROP DATABASE IF EXISTS testdb") - - node.query("CREATE DATABASE testdb") - node.query( - f"CREATE TABLE testdb.{test_table}(id UInt32, val String) ENGINE = ReplicatedMergeTree('/clickhouse/test/{test_table}2-{tmark}', '{i}') ORDER BY id;" - ) - - def test_distributed_ddl_queue(started_cluster): - test_table = "test_table" - maintain_test_table(test_table) node1.query( - f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)" + "INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)" ) node3.query( - f"INSERT INTO testdb.{test_table} SELECT number, toString(number) FROM numbers(100)" + "INSERT INTO testdb.test_table SELECT number, toString(number) FROM numbers(100)" ) - node2.query(f"SYSTEM SYNC REPLICA testdb.{test_table}") - node4.query(f"SYSTEM SYNC REPLICA testdb.{test_table}") + node2.query("SYSTEM SYNC REPLICA testdb.test_table") + node4.query("SYSTEM SYNC REPLICA testdb.test_table") node1.query( - f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val", + "ALTER TABLE testdb.test_table ON CLUSTER test_cluster ADD COLUMN somecolumn UInt8 AFTER val", settings={"replication_alter_partitions_sync": "2"}, ) for node in nodes: - node.query(f"SYSTEM SYNC REPLICA testdb.{test_table}") - assert ( - node.query(f"SELECT somecolumn FROM testdb.{test_table} LIMIT 1") == "0\n" - ) + node.query("SYSTEM SYNC REPLICA testdb.test_table") + assert node.query("SELECT somecolumn FROM testdb.test_table LIMIT 1") == "0\n" assert ( node.query( "SELECT If((SELECT count(*) FROM system.distributed_ddl_queue WHERE cluster='test_cluster' AND entry='query-0000000000') > 0, 'ok', 'fail')" ) == "ok\n" ) - - node1.query( - f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somecolumn", - settings={"replication_alter_partitions_sync": "2"}, - ) - - -def test_distributed_ddl_rubbish(started_cluster): - test_table = "test_table_rubbish" - maintain_test_table(test_table) - node1.query( - f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster ADD COLUMN somenewcolumn UInt8 AFTER val", - settings={"replication_alter_partitions_sync": "2"}, - ) - - zk_content = node1.query( - "SELECT name, value, path FROM system.zookeeper WHERE path LIKE '/clickhouse/task_queue/ddl%' SETTINGS allow_unrestricted_reads_from_keeper=true", - parse=True, - ).to_dict("records") - - original_query = "" - new_query = "query-artificial-" + str(time.monotonic_ns()) - - # Copy information about query (one that added 'somenewcolumn') with new query ID - # and broken query text (TABLE => TUBLE) - for row in zk_content: - if row["value"].find("somenewcolumn") >= 0: - original_query = row["name"] - break - - rows_to_insert = [] - - for row in zk_content: - if row["name"] == original_query: - rows_to_insert.append( - { - "name": new_query, - "path": row["path"], - "value": row["value"].replace("TABLE", "TUBLE"), - } - ) - continue - pos = row["path"].find(original_query) - if pos >= 0: - rows_to_insert.append( - { - "name": row["name"], - "path": row["path"].replace(original_query, new_query), - "value": row["value"], - } - ) - - # Ingest it to ZK - for row in rows_to_insert: - node1.query( - "insert into system.zookeeper (name, path, value) values ('{}', '{}', '{}')".format( - f'{row["name"]}', f'{row["path"]}', f'{row["value"]}' - ) - ) - - # Ensure that data is visible via system.distributed_ddl_queue - assert ( - int( - node1.query( - f"SELECT count(1) FROM system.distributed_ddl_queue WHERE entry='{new_query}' AND cluster=''" - ) - ) - == 4 - ) - - node1.query( - f"ALTER TABLE testdb.{test_table} ON CLUSTER test_cluster DROP COLUMN somenewcolumn", - settings={"replication_alter_partitions_sync": "2"}, - ) From 5238553f64faff041eed74a991276092557db167 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 1 Jun 2025 06:41:27 +0000 Subject: [PATCH 3/3] Merge pull request #79369 from ilejn/ignore_error_distributed_ddl_queue Ignore parse error in system.distributed_ddl_queue --- .../System/StorageSystemDDLWorkerQueue.cpp | 26 ++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp index 9f88955e9204..1132e990eed7 100644 --- a/src/Storages/System/StorageSystemDDLWorkerQueue.cpp +++ b/src/Storages/System/StorageSystemDDLWorkerQueue.cpp @@ -29,6 +29,11 @@ namespace Setting extern const SettingsUInt64 max_query_size; } +namespace ErrorCodes +{ + extern const int SYNTAX_ERROR; +} + enum class Status : uint8_t { INACTIVE, @@ -62,7 +67,7 @@ ColumnsDescription StorageSystemDDLWorkerQueue::getColumnsDescription() {"entry_version", std::make_shared(std::make_shared()), "Version of the entry."}, {"initiator_host", std::make_shared(std::make_shared()), "Host that initiated the DDL operation."}, {"initiator_port", std::make_shared(std::make_shared()), "Port used by the initiator."}, - {"cluster", std::make_shared(), "Cluster name."}, + {"cluster", std::make_shared(), "Cluster name, empty if not determined."}, {"query", std::make_shared(), "Query executed."}, {"settings", std::make_shared(std::make_shared(), std::make_shared()), "Settings used in the DDL operation."}, {"query_create_time", std::make_shared(), "Query created time."}, @@ -85,8 +90,23 @@ static String clusterNameFromDDLQuery(ContextPtr context, const DDLTask & task) String description = fmt::format("from {}", task.entry_path); ParserQuery parser_query(end, settings[Setting::allow_settings_after_format_in_insert]); - ASTPtr query = parseQuery( - parser_query, begin, end, description, settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]); + ASTPtr query; + + try + { + query = parseQuery( + parser_query, begin, end, description, settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]); + } + catch (const Exception & e) + { + LOG_INFO(getLogger("StorageSystemDDLWorkerQueue"), "Failed to determine cluster"); + if (e.code() == ErrorCodes::SYNTAX_ERROR) + { + /// ignore parse error and present available information + return ""; + } + throw; + } String cluster_name; if (const auto * query_on_cluster = dynamic_cast(query.get()))