From 0612f5d7820e70921748353efcea600cab0581ab Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Mon, 8 May 2023 11:50:21 +0000 Subject: [PATCH 1/2] fix sparse columns after reload --- .../Serializations/SerializationInfo.cpp | 23 +++++++++++++++---- .../Serializations/SerializationInfo.h | 8 +++++-- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 4 ++-- src/Storages/MergeTree/checkDataPart.cpp | 5 ++-- .../02733_sparse_columns_reload.reference | 2 ++ .../02733_sparse_columns_reload.sql | 18 +++++++++++++++ 6 files changed, 50 insertions(+), 10 deletions(-) create mode 100644 tests/queries/0_stateless/02733_sparse_columns_reload.reference create mode 100644 tests/queries/0_stateless/02733_sparse_columns_reload.sql diff --git a/src/DataTypes/Serializations/SerializationInfo.cpp b/src/DataTypes/Serializations/SerializationInfo.cpp index 4e5790ad58d4..4e9b99054540 100644 --- a/src/DataTypes/Serializations/SerializationInfo.cpp +++ b/src/DataTypes/Serializations/SerializationInfo.cpp @@ -246,7 +246,8 @@ void SerializationInfoByName::writeJSON(WriteBuffer & out) const return writeString(oss.str(), out); } -void SerializationInfoByName::readJSON(ReadBuffer & in) +SerializationInfoByName SerializationInfoByName::readJSON( + const NamesAndTypesList & columns, const Settings & settings, ReadBuffer & in) { String json_str; readString(json_str, in); @@ -262,8 +263,13 @@ void SerializationInfoByName::readJSON(ReadBuffer & in) "Unknown version of serialization infos ({}). Should be less or equal than {}", object->getValue(KEY_VERSION), SERIALIZATION_INFO_VERSION); + SerializationInfoByName infos; if (object->has(KEY_COLUMNS)) { + std::unordered_map column_type_by_name; + for (const auto & [name, type] : columns) + column_type_by_name.emplace(name, type.get()); + auto array = object->getArray(KEY_COLUMNS); for (const auto & elem : *array) { @@ -271,13 +277,22 @@ void SerializationInfoByName::readJSON(ReadBuffer & in) if (!elem_object->has(KEY_NAME)) throw Exception(ErrorCodes::CORRUPTED_DATA, - "Missed field '{}' in SerializationInfo of columns", KEY_NAME); + "Missed field '{}' in serialization infos", KEY_NAME); auto name = elem_object->getValue(KEY_NAME); - if (auto it = find(name); it != end()) - it->second->fromJSON(*elem_object); + auto it = column_type_by_name.find(name); + + if (it == column_type_by_name.end()) + throw Exception(ErrorCodes::CORRUPTED_DATA, + "Found unexpected column '{}' in serialization infos", name); + + auto info = it->second->createSerializationInfo(settings); + info->fromJSON(*elem_object); + infos.emplace(name, std::move(info)); } } + + return infos; } } diff --git a/src/DataTypes/Serializations/SerializationInfo.h b/src/DataTypes/Serializations/SerializationInfo.h index 22a9d62d312c..3d8f4f1d00ce 100644 --- a/src/DataTypes/Serializations/SerializationInfo.h +++ b/src/DataTypes/Serializations/SerializationInfo.h @@ -96,8 +96,10 @@ using MutableSerializationInfos = std::vector; class SerializationInfoByName : public std::map { public: + using Settings = SerializationInfo::Settings; + SerializationInfoByName() = default; - SerializationInfoByName(const NamesAndTypesList & columns, const SerializationInfo::Settings & settings); + SerializationInfoByName(const NamesAndTypesList & columns, const Settings & settings); void add(const Block & block); void add(const SerializationInfoByName & other); @@ -108,7 +110,9 @@ class SerializationInfoByName : public std::mapexists(SERIALIZATION_FILE_NAME)) { auto in = metadata_manager->read(SERIALIZATION_FILE_NAME); - infos.readJSON(*in); + infos = SerializationInfoByName::readJSON(loaded_columns, settings, *in); } int32_t loaded_metadata_version; diff --git a/src/Storages/MergeTree/checkDataPart.cpp b/src/Storages/MergeTree/checkDataPart.cpp index de31258b2f98..00710ed3ed69 100644 --- a/src/Storages/MergeTree/checkDataPart.cpp +++ b/src/Storages/MergeTree/checkDataPart.cpp @@ -94,12 +94,13 @@ IMergeTreeDataPart::Checksums checkDataPart( }; auto ratio_of_defaults = data_part->storage.getSettings()->ratio_of_defaults_for_sparse_serialization; - SerializationInfoByName serialization_infos(columns_txt, SerializationInfo::Settings{ratio_of_defaults, false}); + SerializationInfoByName serialization_infos; if (data_part_storage.exists(IMergeTreeDataPart::SERIALIZATION_FILE_NAME)) { auto serialization_file = data_part_storage.readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, {}, std::nullopt, std::nullopt); - serialization_infos.readJSON(*serialization_file); + SerializationInfo::Settings settings{ratio_of_defaults, false}; + serialization_infos = SerializationInfoByName::readJSON(columns_txt, settings, *serialization_file); } auto get_serialization = [&serialization_infos](const auto & column) diff --git a/tests/queries/0_stateless/02733_sparse_columns_reload.reference b/tests/queries/0_stateless/02733_sparse_columns_reload.reference new file mode 100644 index 000000000000..7ab314964ee9 --- /dev/null +++ b/tests/queries/0_stateless/02733_sparse_columns_reload.reference @@ -0,0 +1,2 @@ +100000 +100000 diff --git a/tests/queries/0_stateless/02733_sparse_columns_reload.sql b/tests/queries/0_stateless/02733_sparse_columns_reload.sql new file mode 100644 index 000000000000..d4b482741079 --- /dev/null +++ b/tests/queries/0_stateless/02733_sparse_columns_reload.sql @@ -0,0 +1,18 @@ +DROP TABLE IF EXISTS t_sparse_reload; + +CREATE TABLE t_sparse_reload (id UInt64, v UInt64) +ENGINE = MergeTree ORDER BY id +SETTINGS ratio_of_defaults_for_sparse_serialization = 0.95; + +INSERT INTO t_sparse_reload SELECT number, 0 FROM numbers(100000); + +SELECT count() FROM t_sparse_reload WHERE NOT ignore(*); + +ALTER TABLE t_sparse_reload MODIFY SETTING ratio_of_defaults_for_sparse_serialization = 1.0; + +DETACH TABLE t_sparse_reload; +ATTACH TABLE t_sparse_reload; + +SELECT count() FROM t_sparse_reload WHERE NOT ignore(*); + +DROP TABLE t_sparse_reload; From 996fcfe120177ce3f05ceaef5c6fe243453542d8 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 9 May 2023 21:37:43 +0000 Subject: [PATCH 2/2] fix column ttl with sparse columns --- src/Storages/MergeTree/MergeTask.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index eee550f8dd63..2b7dab91934a 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -326,6 +326,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() if (!ctx->need_remove_expired_values) { size_t expired_columns = 0; + auto part_serialization_infos = global_ctx->new_data_part->getSerializationInfos(); for (auto & [column_name, ttl] : global_ctx->new_data_part->ttl_infos.columns_ttl) { @@ -335,6 +336,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() LOG_TRACE(ctx->log, "Adding expired column {} for part {}", column_name, global_ctx->new_data_part->name); std::erase(global_ctx->gathering_column_names, column_name); std::erase(global_ctx->merging_column_names, column_name); + std::erase(global_ctx->all_column_names, column_name); + part_serialization_infos.erase(column_name); ++expired_columns; } } @@ -343,6 +346,12 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() { global_ctx->gathering_columns = global_ctx->gathering_columns.filter(global_ctx->gathering_column_names); global_ctx->merging_columns = global_ctx->merging_columns.filter(global_ctx->merging_column_names); + global_ctx->storage_columns = global_ctx->storage_columns.filter(global_ctx->all_column_names); + + global_ctx->new_data_part->setColumns( + global_ctx->storage_columns, + part_serialization_infos, + global_ctx->metadata_snapshot->getMetadataVersion()); } }