From 688419b0efea60ca7fea42fa72d4aab1d3e52011 Mon Sep 17 00:00:00 2001 From: Mathias Fussenegger Date: Tue, 30 Sep 2025 15:18:41 +0200 Subject: [PATCH 1/3] Split rolling upgrade logic a bit `_test_rolling_upgrade` was getting rather long, this extracts two methods - one to bootstrap the initial data on a cluster, and one to run all the queries after a individual node was upgraded. --- tests/bwc/test_rolling_upgrade.py | 368 ++++++++++++++++-------------- 1 file changed, 195 insertions(+), 173 deletions(-) diff --git a/tests/bwc/test_rolling_upgrade.py b/tests/bwc/test_rolling_upgrade.py index 5d65a56d..3975d398 100644 --- a/tests/bwc/test_rolling_upgrade.py +++ b/tests/bwc/test_rolling_upgrade.py @@ -1,6 +1,9 @@ import unittest from crate.client import connect +from crate.client.cursor import Cursor +from crate.client.connection import Connection from crate.client.exceptions import ProgrammingError +from cr8.run_crate import CrateNode from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath @@ -69,7 +72,7 @@ def test_rolling_upgrade_5_to_6(self): finally: self.tearDown() - def _test_rolling_upgrade(self, path, nodes): + def _test_rolling_upgrade(self, path: UpgradePath, nodes: int): """ Test a rolling upgrade across given versions. An initial test cluster is started and then subsequently each node in @@ -79,8 +82,9 @@ def _test_rolling_upgrade(self, path, nodes): is possible. """ - shards, replicas = (nodes, 1) - expected_active_shards = shards + shards * replicas + shards = nodes + replicas = 1 + expected_active_shards = 0 settings = { "transport.netty.worker_count": 16, @@ -88,69 +92,10 @@ def _test_rolling_upgrade(self, path, nodes): } cluster = self._new_cluster(path.from_version, nodes, settings=settings) cluster.start() - with connect(cluster.node().http_url, error_trace=True) as conn: - c = conn.cursor() - c.execute("create user arthur with (password = 'secret')") - c.execute("grant dql to arthur") - c.execute(f''' - CREATE TABLE doc.t1 ( - type BYTE, - value FLOAT, - title string, - author object as ( - name string - ), - o object(ignored) as (a int), - index composite_nested_ft using fulltext(title, author['name']) with(analyzer = 'stop') - ) CLUSTERED INTO {shards} SHARDS - WITH (number_of_replicas={replicas}) - ''') - c.execute("deny dql on table doc.t1 to arthur") - c.execute("CREATE VIEW doc.v1 AS SELECT type, title, value FROM doc.t1") - insert_data(conn, 'doc', 't1', 1000) - c.execute("INSERT INTO doc.t1 (type, value, title, author) VALUES (1, 1, 'matchMe title', {name='no match name'})") - c.execute("INSERT INTO doc.t1 (type, value, title, author) VALUES (2, 2, 'no match title', {name='matchMe name'})") - c.execute("INSERT INTO doc.t1 (title, author, o) VALUES ('prefix_check', {\"dyn_empty_array\" = []}, {\"dyn_ignored_subcol\" = 'hello'})") - - if int(path.from_version.split('.')[0]) >= 5: - c.execute(''' - create table doc.t2 ( - a int primary key, - b int not null, - c int default (random() + 1), - d generated always as (a + b + c), - constraint d CHECK (d > a + b) - ) clustered into 1 shards with (number_of_replicas = 0) - ''') - expected_active_shards += 1 - c.execute(''' - create table doc.t3 ( - a int primary key, - b int not null, - c int default (random() + 1), - d generated always as (a + b + c), - constraint d CHECK (d > a + b) - ) partitioned by (a) clustered into 1 shards with (number_of_replicas = 0) - ''') - - c.execute(''' - CREATE FUNCTION foo(INT) - RETURNS INT - LANGUAGE JAVASCRIPT - AS 'function foo(a) { return a + 1 }'; - ''') - c.execute(f''' - CREATE TABLE doc.parted ( - id INT, - value INT, - f_value GENERATED ALWAYS AS foo(value) - ) CLUSTERED INTO {shards} SHARDS - PARTITIONED BY (id) - WITH (number_of_replicas=0, "write.wait_for_active_shards"=1) - ''') - c.execute("INSERT INTO doc.parted (id, value) VALUES (1, 1)") - # Add the shards of the new partition primaries - expected_active_shards += shards + node = cluster.node() + with connect(node.http_url, error_trace=True) as conn: + new_shards = init_data(conn, node.version, shards, replicas) + expected_active_shards += new_shards for idx, node in enumerate(cluster): # Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node. @@ -158,9 +103,7 @@ def _test_rolling_upgrade(self, path, nodes): # Was a regression for 5.7 <-> 5.8 with connect(node.http_url, error_trace=True) as old_node_conn: c = old_node_conn.cursor() - c.execute(''' - SELECT * from sys.nodes - ''') + c.execute("SELECT * from sys.nodes") res = c.fetchall() self.assertEqual(len(res), 3) @@ -181,110 +124,8 @@ def _test_rolling_upgrade(self, path, nodes): cluster[idx] = new_node with connect(new_node.http_url, error_trace=True) as conn: c = conn.cursor() - wait_for_active_shards(c, expected_active_shards) - - c.execute("select name from sys.users order by 1") - self.assertEqual(c.fetchall(), [["arthur"], ["crate"]]) - - c.execute("select * from sys.privileges order by ident") - self.assertEqual( - c.fetchall(), - [['TABLE', 'arthur', 'crate', 'doc.t1', 'DENY', 'DQL'], - ['CLUSTER', 'arthur', 'crate', None, 'GRANT', 'DQL']]) - - c.execute("REFRESH TABLE doc.t1") - c.execute(''' - SELECT type, AVG(value) - FROM doc.t1 - GROUP BY type - ''') - c.fetchall() - # Ensure aggregation with different intermediate input works, this was an regression for 4.1 <-> 4.2 - c.execute(''' - SELECT type, count(distinct value) - FROM doc.t1 - GROUP BY type - ''') - c.fetchall() - - # Ensure scalar symbols are working across versions - c.execute(''' - SELECT type, value + 1 - FROM doc.t1 - WHERE value > 1 - LIMIT 1 - ''') - c.fetchone() - c.execute('SELECT type, value + 1 FROM doc.v1 WHERE value > 1 LIMIT 1') - c.fetchone() - - # Ensure match queries work. Table level dedicated index column mapping has been changed in 5.4. - c.execute(''' - SELECT value, title, author - FROM doc.t1 - WHERE MATCH(composite_nested_ft, 'matchMe') - ORDER BY value - ''') - res = c.fetchall() - print("Results for match query:") - print(res[0]) - print(res[1]) - self.assertEqual(len(res), 2) - # only title matches - self.assertEqual(res[0][1], 'matchMe title') - self.assertEqual(res[0][2], {'name': 'no match name'}) - # only name matches - self.assertEqual(res[1][1], 'no match title') - self.assertEqual(res[1][2], {'name': 'matchMe name'}) - - # Dynamically added empty arrays and ignored object sub-columns are indexed with special prefix starting from 5.5 - # Ensure that reading such columns work across all versions. - # Related to https://github.com/crate/crate/commit/278d45f176e7d1d3215118255cd69afd2d3786ee - c.execute(''' - SELECT author, o['dyn_ignored_subcol'] - FROM doc.t1 - WHERE title = 'prefix_check' - ''') - res = c.fetchall() - self.assertEqual(len(res), 1) - self.assertEqual(res[0][0], {'dyn_empty_array': []}) - self.assertEqual(res[0][1], 'hello') - - # Ensure that inserts are working while upgrading - c.execute( - "INSERT INTO doc.t1 (type, value, title, author) VALUES (3, 3, 'some title', {name='nothing to see, move on'})") - - # Ensure that inserts, which will create a new partition, are working while upgrading - c.execute("INSERT INTO doc.parted (id, value) VALUES (?, ?)", [idx + 10, idx + 10]) - # Add the shards of the new partition primaries - expected_active_shards += shards - - # Ensure table/partition versions created are correct - if int(path.from_version.split('.')[0]) >= 5: - c.execute("insert into doc.t2(a, b) values (?, ?)", [idx, idx]) - c.execute("refresh table t2") - c.execute("select a, b, c>=1 and c<=2, d>a+b from doc.t2 where a = ?", [idx]) - self.assertEqual(c.fetchall(), [[idx, idx, True, True]]) - old_version = '.'.join(map(str, node.version)) - c.execute("select distinct(version['created']) from information_schema.tables where table_name = 't2'") - self.assertEqual(c.fetchall(), [[old_version]]) - # There was a behavior change in 5.9. After fully upgrading all nodes in the cluster, newly added - # partitions' version created will follow the upgraded version. - # E.g., when 5.9 -> 5.10 is completed, the version created for new partitions will be 5.10 - if int(path.from_version.split('.')[1]) >= 9: - c.execute("insert into doc.t3(a, b) values (?, ?)", [idx, idx]) - expected_active_shards += 1 - c.execute("refresh table t3") - c.execute("select a, b, c>=1 and c<=2, d>a+b from doc.t3 where a = ?", [idx]) - self.assertEqual(c.fetchall(), [[idx, idx, True, True]]) - c.execute("select distinct(version['created']) from information_schema.tables where table_name = 't3'") - self.assertEqual(c.fetchall(), [[old_version]]) - partition_version = old_version - if idx == nodes - 1: - # the partition added after all nodes are upgraded should follow the upgraded(latest) version - partition_version = '.'.join(map(str, new_node.version)) - c.execute("select version['created'] from information_schema.table_partitions where table_name = 't3' and values['a'] = ?", [idx]) - self.assertEqual(c.fetchall(), [[partition_version]]) + new_shards = self._test_queries_on_new_node(idx, c, node, new_node, nodes, shards, expected_active_shards) + expected_active_shards += new_shards # Finally validate that all shards (primaries and replicas) of all partitions are started # and writes into the partitioned table while upgrading were successful @@ -309,3 +150,184 @@ def _test_rolling_upgrade(self, path, nodes): c.execute("CREATE VIEW doc.v1 AS SELECT 11") c.execute("SELECT * FROM doc.v1") self.assertEqual(c.fetchall(), [[11]]) + + def _test_queries_on_new_node(self, + idx: int, + c: Cursor, + old_node: CrateNode, + new_node: CrateNode, + num_nodes: int, + shards_per_partition: int, + current_shards: int) -> int: + wait_for_active_shards(c, current_shards) + new_shards = 0 + + c.execute("select name from sys.users order by 1") + self.assertEqual(c.fetchall(), [["arthur"], ["crate"]]) + + c.execute("select * from sys.privileges order by ident") + self.assertEqual( + c.fetchall(), + [['TABLE', 'arthur', 'crate', 'doc.t1', 'DENY', 'DQL'], + ['CLUSTER', 'arthur', 'crate', None, 'GRANT', 'DQL']]) + + c.execute("REFRESH TABLE doc.t1") + c.execute(''' + SELECT type, AVG(value) + FROM doc.t1 + GROUP BY type + ''') + c.fetchall() + # Ensure aggregation with different intermediate input works, this was an regression for 4.1 <-> 4.2 + c.execute(''' + SELECT type, count(distinct value) + FROM doc.t1 + GROUP BY type + ''') + c.fetchall() + + # Ensure scalar symbols are working across versions + c.execute(''' + SELECT type, value + 1 + FROM doc.t1 + WHERE value > 1 + LIMIT 1 + ''') + c.fetchone() + c.execute('SELECT type, value + 1 FROM doc.v1 WHERE value > 1 LIMIT 1') + c.fetchone() + + # Ensure match queries work. Table level dedicated index column mapping has been changed in 5.4. + c.execute(''' + SELECT value, title, author + FROM doc.t1 + WHERE MATCH(composite_nested_ft, 'matchMe') + ORDER BY value + ''') + res = c.fetchall() + print("Results for match query:") + print(res[0]) + print(res[1]) + self.assertEqual(len(res), 2) + # only title matches + self.assertEqual(res[0][1], 'matchMe title') + self.assertEqual(res[0][2], {'name': 'no match name'}) + # only name matches + self.assertEqual(res[1][1], 'no match title') + self.assertEqual(res[1][2], {'name': 'matchMe name'}) + + # Dynamically added empty arrays and ignored object sub-columns are indexed with special prefix starting from 5.5 + # Ensure that reading such columns work across all versions. + # Related to https://github.com/crate/crate/commit/278d45f176e7d1d3215118255cd69afd2d3786ee + c.execute(''' + SELECT author, o['dyn_ignored_subcol'] + FROM doc.t1 + WHERE title = 'prefix_check' + ''') + res = c.fetchall() + self.assertEqual(len(res), 1) + self.assertEqual(res[0][0], {'dyn_empty_array': []}) + self.assertEqual(res[0][1], 'hello') + + # Ensure that inserts are working while upgrading + c.execute( + "INSERT INTO doc.t1 (type, value, title, author) VALUES (3, 3, 'some title', {name='nothing to see, move on'})") + + # Ensure that inserts, which will create a new partition, are working while upgrading + c.execute("INSERT INTO doc.parted (id, value) VALUES (?, ?)", [idx + 10, idx + 10]) + new_shards += shards_per_partition + + # Ensure table/partition versions created are correct + if old_node.version >= (5, 0, 0): + c.execute("insert into doc.t2 (a, b) values (?, ?)", [idx, idx]) + c.execute("refresh table t2") + c.execute("select a, b, c>=1 and c<=2, d>a+b from doc.t2 where a = ?", [idx]) + self.assertEqual(c.fetchall(), [[idx, idx, True, True]]) + old_version = '.'.join(map(str, old_node.version)) + c.execute("select distinct(version['created']) from information_schema.tables where table_name = 't2'") + self.assertEqual(c.fetchall(), [[old_version]]) + # There was a behavior change in 5.9. After fully upgrading all nodes in the cluster, newly added + # partitions' version created will follow the upgraded version. + # E.g., when 5.9 -> 5.10 is completed, the version created for new partitions will be 5.10 + if old_node.version[1] >= 9: + c.execute("insert into doc.t3 (a, b) values (?, ?)", [idx, idx]) + new_shards += 1 + c.execute("refresh table t3") + c.execute("select a, b, c>=1 and c<=2, d>a+b from doc.t3 where a = ?", [idx]) + self.assertEqual(c.fetchall(), [[idx, idx, True, True]]) + c.execute("select distinct(version['created']) from information_schema.tables where table_name = 't3'") + self.assertEqual(c.fetchall(), [[old_version]]) + partition_version = old_version + if idx == num_nodes - 1: + # the partition added after all nodes are upgraded should follow the upgraded(latest) version + partition_version = '.'.join(map(str, new_node.version)) + c.execute("select version['created'] from information_schema.table_partitions where table_name = 't3' and values['a'] = ?", [idx]) + self.assertEqual(c.fetchall(), [[partition_version]]) + return new_shards + + +def init_data(conn: Connection, version: tuple[int, int, int], shards: int, replicas: int) -> int: + new_shards = 0 + c = conn.cursor() + c.execute("create user arthur with (password = 'secret')") + c.execute("grant dql to arthur") + c.execute(f''' + CREATE TABLE doc.t1 ( + type BYTE, + value FLOAT, + title string, + author object as ( + name string + ), + o object(ignored) as (a int), + index composite_nested_ft using fulltext(title, author['name']) with(analyzer = 'stop') + ) CLUSTERED INTO {shards} SHARDS + WITH (number_of_replicas={replicas}) + ''') + new_shards = shards + (shards * replicas) + c.execute("deny dql on table doc.t1 to arthur") + c.execute("CREATE VIEW doc.v1 AS SELECT type, title, value FROM doc.t1") + insert_data(conn, 'doc', 't1', 1000) + c.execute("INSERT INTO doc.t1 (type, value, title, author) VALUES (1, 1, 'matchMe title', {name='no match name'})") + c.execute("INSERT INTO doc.t1 (type, value, title, author) VALUES (2, 2, 'no match title', {name='matchMe name'})") + c.execute("INSERT INTO doc.t1 (title, author, o) VALUES ('prefix_check', {\"dyn_empty_array\" = []}, {\"dyn_ignored_subcol\" = 'hello'})") + + if version >= (5, 0, 0): + c.execute(''' + create table doc.t2 ( + a int primary key, + b int not null, + c int default (random() + 1), + d generated always as (a + b + c), + constraint d CHECK (d > a + b) + ) clustered into 1 shards with (number_of_replicas = 0) + ''') + new_shards += 1 + c.execute(''' + create table doc.t3 ( + a int primary key, + b int not null, + c int default (random() + 1), + d generated always as (a + b + c), + constraint d CHECK (d > a + b) + ) partitioned by (a) clustered into 1 shards with (number_of_replicas = 0) + ''') + + c.execute(''' + CREATE FUNCTION foo(INT) + RETURNS INT + LANGUAGE JAVASCRIPT + AS 'function foo(a) { return a + 1 }'; + ''') + c.execute(f''' + CREATE TABLE doc.parted ( + id INT, + value INT, + f_value GENERATED ALWAYS AS foo(value) + ) CLUSTERED INTO {shards} SHARDS + PARTITIONED BY (id) + WITH (number_of_replicas=0, "write.wait_for_active_shards"=1) + ''') + c.execute("INSERT INTO doc.parted (id, value) VALUES (1, 1)") + new_shards += shards + return new_shards From d9a09124eec439465ce68ada7663a78e4c5d3413 Mon Sep 17 00:00:00 2001 From: Mathias Fussenegger Date: Tue, 30 Sep 2025 15:19:57 +0200 Subject: [PATCH 2/3] Fix old_node version check to work with new major version --- tests/bwc/test_rolling_upgrade.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/bwc/test_rolling_upgrade.py b/tests/bwc/test_rolling_upgrade.py index 3975d398..25256b59 100644 --- a/tests/bwc/test_rolling_upgrade.py +++ b/tests/bwc/test_rolling_upgrade.py @@ -249,7 +249,7 @@ def _test_queries_on_new_node(self, # There was a behavior change in 5.9. After fully upgrading all nodes in the cluster, newly added # partitions' version created will follow the upgraded version. # E.g., when 5.9 -> 5.10 is completed, the version created for new partitions will be 5.10 - if old_node.version[1] >= 9: + if old_node.version >= (5, 9, 0): c.execute("insert into doc.t3 (a, b) values (?, ?)", [idx, idx]) new_shards += 1 c.execute("refresh table t3") From 02cca4efed12a76780a9c5b7c28792a8ad398646 Mon Sep 17 00:00:00 2001 From: Mathias Fussenegger Date: Mon, 6 Oct 2025 11:30:26 +0200 Subject: [PATCH 3/3] Remove match query result output again The issue is fixed with https://github.com/crate/crate/pull/18470 --- tests/bwc/test_rolling_upgrade.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/tests/bwc/test_rolling_upgrade.py b/tests/bwc/test_rolling_upgrade.py index 25256b59..e462eb1b 100644 --- a/tests/bwc/test_rolling_upgrade.py +++ b/tests/bwc/test_rolling_upgrade.py @@ -205,16 +205,10 @@ def _test_queries_on_new_node(self, ORDER BY value ''') res = c.fetchall() - print("Results for match query:") - print(res[0]) - print(res[1]) - self.assertEqual(len(res), 2) - # only title matches - self.assertEqual(res[0][1], 'matchMe title') - self.assertEqual(res[0][2], {'name': 'no match name'}) - # only name matches - self.assertEqual(res[1][1], 'no match title') - self.assertEqual(res[1][2], {'name': 'matchMe name'}) + self.assertEqual(res, [ + [1.0, "matchMe title", {"name": "no match name"}], + [2.0, "no match title", {"name": "matchMe name"}], + ]) # Dynamically added empty arrays and ignored object sub-columns are indexed with special prefix starting from 5.5 # Ensure that reading such columns work across all versions.