Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reading from sparse columns after restart #49660

Merged
merged 2 commits into from May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 19 additions & 4 deletions src/DataTypes/Serializations/SerializationInfo.cpp
Expand Up @@ -246,7 +246,8 @@ void SerializationInfoByName::writeJSON(WriteBuffer & out) const
return writeString(oss.str(), out);
}

void SerializationInfoByName::readJSON(ReadBuffer & in)
SerializationInfoByName SerializationInfoByName::readJSON(
const NamesAndTypesList & columns, const Settings & settings, ReadBuffer & in)
{
String json_str;
readString(json_str, in);
Expand All @@ -262,22 +263,36 @@ void SerializationInfoByName::readJSON(ReadBuffer & in)
"Unknown version of serialization infos ({}). Should be less or equal than {}",
object->getValue<size_t>(KEY_VERSION), SERIALIZATION_INFO_VERSION);

SerializationInfoByName infos;
if (object->has(KEY_COLUMNS))
{
std::unordered_map<std::string_view, const IDataType *> column_type_by_name;
for (const auto & [name, type] : columns)
column_type_by_name.emplace(name, type.get());

auto array = object->getArray(KEY_COLUMNS);
for (const auto & elem : *array)
{
auto elem_object = elem.extract<Poco::JSON::Object::Ptr>();

if (!elem_object->has(KEY_NAME))
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Missed field '{}' in SerializationInfo of columns", KEY_NAME);
"Missed field '{}' in serialization infos", KEY_NAME);

auto name = elem_object->getValue<String>(KEY_NAME);
if (auto it = find(name); it != end())
it->second->fromJSON(*elem_object);
auto it = column_type_by_name.find(name);

if (it == column_type_by_name.end())
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Found unexpected column '{}' in serialization infos", name);

auto info = it->second->createSerializationInfo(settings);
info->fromJSON(*elem_object);
infos.emplace(name, std::move(info));
}
}

return infos;
}

}
8 changes: 6 additions & 2 deletions src/DataTypes/Serializations/SerializationInfo.h
Expand Up @@ -96,8 +96,10 @@ using MutableSerializationInfos = std::vector<MutableSerializationInfoPtr>;
class SerializationInfoByName : public std::map<String, MutableSerializationInfoPtr>
{
public:
using Settings = SerializationInfo::Settings;

SerializationInfoByName() = default;
SerializationInfoByName(const NamesAndTypesList & columns, const SerializationInfo::Settings & settings);
SerializationInfoByName(const NamesAndTypesList & columns, const Settings & settings);

void add(const Block & block);
void add(const SerializationInfoByName & other);
Expand All @@ -108,7 +110,9 @@ class SerializationInfoByName : public std::map<String, MutableSerializationInfo
void replaceData(const SerializationInfoByName & other);

void writeJSON(WriteBuffer & out) const;
void readJSON(ReadBuffer & in);

static SerializationInfoByName readJSON(
const NamesAndTypesList & columns, const Settings & settings, ReadBuffer & in);
};

}
4 changes: 2 additions & 2 deletions src/Storages/MergeTree/IMergeTreeDataPart.cpp
Expand Up @@ -1341,11 +1341,11 @@ void IMergeTreeDataPart::loadColumns(bool require)
.choose_kind = false,
};

SerializationInfoByName infos(loaded_columns, settings);
SerializationInfoByName infos;
if (metadata_manager->exists(SERIALIZATION_FILE_NAME))
Comment on lines -1344 to 1345
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it intended that now if in line 1345 condition is not satisfied then infos is not initialised?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's ok. If SerializationInfoByName is not initialized the default serialization will be returned for all columns. It's what expected.

{
auto in = metadata_manager->read(SERIALIZATION_FILE_NAME);
infos.readJSON(*in);
infos = SerializationInfoByName::readJSON(loaded_columns, settings, *in);
}

int32_t loaded_metadata_version;
Expand Down
9 changes: 9 additions & 0 deletions src/Storages/MergeTree/MergeTask.cpp
Expand Up @@ -326,6 +326,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (!ctx->need_remove_expired_values)
{
size_t expired_columns = 0;
auto part_serialization_infos = global_ctx->new_data_part->getSerializationInfos();

for (auto & [column_name, ttl] : global_ctx->new_data_part->ttl_infos.columns_ttl)
{
Expand All @@ -335,6 +336,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
LOG_TRACE(ctx->log, "Adding expired column {} for part {}", column_name, global_ctx->new_data_part->name);
std::erase(global_ctx->gathering_column_names, column_name);
std::erase(global_ctx->merging_column_names, column_name);
std::erase(global_ctx->all_column_names, column_name);
part_serialization_infos.erase(column_name);
++expired_columns;
}
}
Expand All @@ -343,6 +346,12 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
{
global_ctx->gathering_columns = global_ctx->gathering_columns.filter(global_ctx->gathering_column_names);
global_ctx->merging_columns = global_ctx->merging_columns.filter(global_ctx->merging_column_names);
global_ctx->storage_columns = global_ctx->storage_columns.filter(global_ctx->all_column_names);

global_ctx->new_data_part->setColumns(
global_ctx->storage_columns,
part_serialization_infos,
global_ctx->metadata_snapshot->getMetadataVersion());
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/Storages/MergeTree/checkDataPart.cpp
Expand Up @@ -94,12 +94,13 @@ IMergeTreeDataPart::Checksums checkDataPart(
};

auto ratio_of_defaults = data_part->storage.getSettings()->ratio_of_defaults_for_sparse_serialization;
SerializationInfoByName serialization_infos(columns_txt, SerializationInfo::Settings{ratio_of_defaults, false});
SerializationInfoByName serialization_infos;

if (data_part_storage.exists(IMergeTreeDataPart::SERIALIZATION_FILE_NAME))
{
auto serialization_file = data_part_storage.readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, {}, std::nullopt, std::nullopt);
serialization_infos.readJSON(*serialization_file);
SerializationInfo::Settings settings{ratio_of_defaults, false};
serialization_infos = SerializationInfoByName::readJSON(columns_txt, settings, *serialization_file);
}

auto get_serialization = [&serialization_infos](const auto & column)
Expand Down
@@ -0,0 +1,2 @@
100000
100000
18 changes: 18 additions & 0 deletions tests/queries/0_stateless/02733_sparse_columns_reload.sql
@@ -0,0 +1,18 @@
DROP TABLE IF EXISTS t_sparse_reload;

CREATE TABLE t_sparse_reload (id UInt64, v UInt64)
ENGINE = MergeTree ORDER BY id
SETTINGS ratio_of_defaults_for_sparse_serialization = 0.95;

INSERT INTO t_sparse_reload SELECT number, 0 FROM numbers(100000);

SELECT count() FROM t_sparse_reload WHERE NOT ignore(*);

ALTER TABLE t_sparse_reload MODIFY SETTING ratio_of_defaults_for_sparse_serialization = 1.0;

DETACH TABLE t_sparse_reload;
ATTACH TABLE t_sparse_reload;

SELECT count() FROM t_sparse_reload WHERE NOT ignore(*);

DROP TABLE t_sparse_reload;