From 0628ae62681d970a37414a251e0894de1e3b4569 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 12 Feb 2024 17:13:30 +0100 Subject: [PATCH 1/2] S3 queue fix uninitialized value --- src/Storages/S3Queue/S3QueueTableMetadata.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/Storages/S3Queue/S3QueueTableMetadata.cpp b/src/Storages/S3Queue/S3QueueTableMetadata.cpp index 3ee2594135dd..1830bac47434 100644 --- a/src/Storages/S3Queue/S3QueueTableMetadata.cpp +++ b/src/Storages/S3Queue/S3QueueTableMetadata.cpp @@ -69,16 +69,23 @@ void S3QueueTableMetadata::read(const String & metadata_str) { Poco::JSON::Parser parser; auto json = parser.parse(metadata_str).extract(); + after_processing = json->getValue("after_processing"); mode = json->getValue("mode"); s3queue_tracked_files_limit = json->getValue("s3queue_tracked_files_limit"); s3queue_tracked_file_ttl_sec = json->getValue("s3queue_tracked_file_ttl_sec"); format_name = json->getValue("format_name"); columns = json->getValue("columns"); + if (json->has("s3queue_total_shards_num")) s3queue_total_shards_num = json->getValue("s3queue_total_shards_num"); + else + s3queue_total_shards_num = 1; + if (json->has("s3queue_processing_threads_num")) s3queue_processing_threads_num = json->getValue("s3queue_processing_threads_num"); + else + s3queue_processing_threads_num = 1; } S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str) From d008ee725f7d1e1bad802c5153111e41622481b1 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 12 Feb 2024 20:23:21 +0100 Subject: [PATCH 2/2] Add a test --- src/Storages/S3Queue/S3QueueTableMetadata.h | 8 +-- .../integration/test_storage_s3_queue/test.py | 50 +++++++++++++++++++ 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/src/Storages/S3Queue/S3QueueTableMetadata.h b/src/Storages/S3Queue/S3QueueTableMetadata.h index 30642869930f..84087f72a6aa 100644 --- a/src/Storages/S3Queue/S3QueueTableMetadata.h +++ b/src/Storages/S3Queue/S3QueueTableMetadata.h @@ -21,10 +21,10 @@ struct S3QueueTableMetadata String columns; String after_processing; String mode; - UInt64 s3queue_tracked_files_limit; - UInt64 s3queue_tracked_file_ttl_sec; - UInt64 s3queue_total_shards_num; - UInt64 s3queue_processing_threads_num; + UInt64 s3queue_tracked_files_limit = 0; + UInt64 s3queue_tracked_file_ttl_sec = 0; + UInt64 s3queue_total_shards_num = 1; + UInt64 s3queue_processing_threads_num = 1; S3QueueTableMetadata() = default; S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings, const StorageInMemoryMetadata & storage_metadata); diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py index 810c4f29e9dd..a7abd8408341 100644 --- a/tests/integration/test_storage_s3_queue/test.py +++ b/tests/integration/test_storage_s3_queue/test.py @@ -101,6 +101,15 @@ def started_cluster(): ], stay_alive=True, ) + cluster.add_instance( + "old_instance", + with_zookeeper=True, + image="clickhouse/clickhouse-server", + tag="23.12", + stay_alive=True, + with_installed_binary=True, + allow_analyzer=False, + ) logging.info("Starting cluster...") cluster.start() @@ -1386,3 +1395,44 @@ def get_count(): break time.sleep(1) assert expected_rows == get_count() + + +def test_upgrade(started_cluster): + node = started_cluster.instances["old_instance"] + + table_name = f"test_upgrade" + dst_table_name = f"{table_name}_dst" + keeper_path = f"/clickhouse/test_{table_name}" + files_path = f"{table_name}_data" + files_to_generate = 10 + + create_table( + started_cluster, + node, + table_name, + "ordered", + files_path, + additional_settings={ + "keeper_path": keeper_path, + }, + ) + total_values = generate_random_files( + started_cluster, files_path, files_to_generate, start_ind=0, row_num=1 + ) + + create_mv(node, table_name, dst_table_name) + + def get_count(): + return int(node.query(f"SELECT count() FROM {dst_table_name}")) + + expected_rows = 10 + for _ in range(20): + if expected_rows == get_count(): + break + time.sleep(1) + + assert expected_rows == get_count() + + node.restart_with_latest_version() + + assert expected_rows == get_count()