Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #46045 to 22.12: Fix reading of non existing nested columns with multiple level in compact parts #46218

Merged
merged 1 commit into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 12 additions & 9 deletions src/Storages/MergeTree/IMergeTreeReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,19 @@ void IMergeTreeReader::performRequiredConversions(Columns & res_columns) const
}
}

IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const
IMergeTreeReader::ColumnPositionLevel IMergeTreeReader::findColumnForOffsets(const NameAndTypePair & required_column) const
{
auto get_offsets_streams = [](const auto & serialization, const auto & name_in_storage)
{
Names offsets_streams;
std::vector<std::pair<String, size_t>> offsets_streams;
serialization->enumerateStreams([&](const auto & subpath)
{
if (subpath.empty() || subpath.back().type != ISerialization::Substream::ArraySizes)
return;

auto subname = ISerialization::getSubcolumnNameForStream(subpath);
auto full_name = Nested::concatenateName(name_in_storage, subname);
offsets_streams.push_back(full_name);
offsets_streams.emplace_back(full_name, ISerialization::getArrayLevel(subpath));
});

return offsets_streams;
Expand All @@ -233,7 +233,7 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const Na
auto required_offsets_streams = get_offsets_streams(getSerializationInPart(required_column), required_name_in_storage);

size_t max_matched_streams = 0;
ColumnPosition position;
ColumnPositionLevel position_level;

/// Find column that has maximal number of matching
/// offsets columns with required_column.
Expand All @@ -244,23 +244,26 @@ IMergeTreeReader::ColumnPosition IMergeTreeReader::findColumnForOffsets(const Na
continue;

auto offsets_streams = get_offsets_streams(data_part_info_for_read->getSerialization(part_column), name_in_storage);
NameSet offsets_streams_set(offsets_streams.begin(), offsets_streams.end());
NameToIndexMap offsets_streams_map(offsets_streams.begin(), offsets_streams.end());

size_t i = 0;
auto it = offsets_streams_map.end();
for (; i < required_offsets_streams.size(); ++i)
{
if (!offsets_streams_set.contains(required_offsets_streams[i]))
auto current_it = offsets_streams_map.find(required_offsets_streams[i].first);
if (current_it == offsets_streams_map.end())
break;
it = current_it;
}

if (i && (!position || i > max_matched_streams))
if (i && (!position_level || i > max_matched_streams))
{
max_matched_streams = i;
position = data_part_info_for_read->getColumnPosition(part_column.name);
position_level.emplace(*data_part_info_for_read->getColumnPosition(part_column.name), it->second);
}
}

return position;
return position_level;
}

void IMergeTreeReader::checkNumberOfColumns(size_t num_columns_to_read) const
Expand Down
8 changes: 6 additions & 2 deletions src/Storages/MergeTree/IMergeTreeReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ class IMergeTreeReader : private boost::noncopyable
StorageMetadataPtr metadata_snapshot;
MarkRanges all_mark_ranges;

using ColumnPosition = std::optional<size_t>;
ColumnPosition findColumnForOffsets(const NameAndTypePair & column) const;
/// Position and level (of nesting).
using ColumnPositionLevel = std::optional<std::pair<size_t, size_t>>;
/// In case of part of the nested column does not exists, offsets should be
/// read, but only the offsets for the current column, that is why it
/// returns pair of size_t, not just one.
ColumnPositionLevel findColumnForOffsets(const NameAndTypePair & column) const;

NameSet partially_read_columns;

Expand Down
49 changes: 39 additions & 10 deletions src/Storages/MergeTree/MergeTreeReaderCompact.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,16 @@ void MergeTreeReaderCompact::fillColumnPositions()
{
/// If array of Nested column is missing in part,
/// we have to read its offsets if they exist.
position = findColumnForOffsets(column_to_read);
read_only_offsets[i] = (position != std::nullopt);
auto position_level = findColumnForOffsets(column_to_read);
if (position_level.has_value())
{
column_positions[i].emplace(position_level->first);
read_only_offsets[i].emplace(position_level->second);
partially_read_columns.insert(column_to_read.name);
}
}

column_positions[i] = std::move(position);
if (read_only_offsets[i])
partially_read_columns.insert(column_to_read.name);
else
column_positions[i] = std::move(position);
}
}

Expand Down Expand Up @@ -217,7 +220,8 @@ size_t MergeTreeReaderCompact::readRows(

void MergeTreeReaderCompact::readData(
const NameAndTypePair & name_and_type, ColumnPtr & column,
size_t from_mark, size_t current_task_last_mark, size_t column_position, size_t rows_to_read, bool only_offsets)
size_t from_mark, size_t current_task_last_mark, size_t column_position, size_t rows_to_read,
std::optional<size_t> only_offsets_level)
{
const auto & [name, type] = name_and_type;

Expand All @@ -228,9 +232,34 @@ void MergeTreeReaderCompact::readData(

auto buffer_getter = [&](const ISerialization::SubstreamPath & substream_path) -> ReadBuffer *
{
/// Offset stream from another column could be read, in case of current
/// column does not exists (see findColumnForOffsets() in
/// MergeTreeReaderCompact::fillColumnPositions())
bool is_offsets = !substream_path.empty() && substream_path.back().type == ISerialization::Substream::ArraySizes;
if (only_offsets && !is_offsets)
return nullptr;
if (only_offsets_level.has_value())
{
if (!is_offsets)
return nullptr;

/// Offset stream can be read only from columns of current level or
/// below (since it is OK to read all parent streams from the
/// alternative).
///
/// Consider the following columns in nested "root":
/// - root.array Array(UInt8) - exists
/// - root.nested_array Array(Array(UInt8)) - does not exists (only_offsets_level=1)
///
/// For root.nested_array it will try to read multiple streams:
/// - offsets (substream_path = {ArraySizes})
/// OK
/// - root.nested_array elements (substream_path = {ArrayElements, ArraySizes})
/// NOT OK - cannot use root.array offsets stream for this
///
/// Here only_offsets_level is the level of the alternative stream,
/// and substream_path.size() is the level of the current stream.
if (only_offsets_level.value() < ISerialization::getArrayLevel(substream_path))
return nullptr;
}

return data_buffer;
};
Expand Down Expand Up @@ -267,7 +296,7 @@ void MergeTreeReaderCompact::readData(
}

/// The buffer is left in inconsistent state after reading single offsets
if (only_offsets)
if (only_offsets_level.has_value())
last_read_granule.reset();
else
last_read_granule.emplace(from_mark, column_position);
Expand Down
10 changes: 6 additions & 4 deletions src/Storages/MergeTree/MergeTreeReaderCompact.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ class MergeTreeReaderCompact : public IMergeTreeReader
MergeTreeMarksLoader marks_loader;

/// Positions of columns in part structure.
using ColumnPositions = std::vector<ColumnPosition>;
using ColumnPositions = std::vector<std::optional<size_t>>;
ColumnPositions column_positions;
/// Should we read full column or only it's offsets
std::vector<bool> read_only_offsets;
/// Should we read full column or only it's offsets.
/// Element of the vector is the level of the alternative stream.
std::vector<std::optional<size_t>> read_only_offsets;

/// For asynchronous reading from remote fs. Same meaning as in MergeTreeReaderStream.
std::optional<size_t> last_right_offset;
Expand All @@ -64,7 +65,8 @@ class MergeTreeReaderCompact : public IMergeTreeReader
void seekToMark(size_t row_index, size_t column_index);

void readData(const NameAndTypePair & name_and_type, ColumnPtr & column, size_t from_mark,
size_t current_task_last_mark, size_t column_position, size_t rows_to_read, bool only_offsets);
size_t current_task_last_mark, size_t column_position, size_t rows_to_read,
std::optional<size_t> only_offsets_level);

/// Returns maximal value of granule size in compressed file from @mark_ranges.
/// This value is used as size of read buffer.
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/MergeTreeReaderInMemory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ MergeTreeReaderInMemory::MergeTreeReaderInMemory(
{
if (auto offsets_position = findColumnForOffsets(column_to_read))
{
positions_for_offsets[column_to_read.name] = *offsets_position;
positions_for_offsets[column_to_read.name] = offsets_position->first;
partially_read_columns.insert(column_to_read.name);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
data_compact Compact
[[]]
data_memory InMemory
[[]]
data_wide Wide
[[]]
45 changes: 45 additions & 0 deletions tests/queries/0_stateless/02559_nested_multiple_levels_default.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
DROP TABLE IF EXISTS data_compact;
DROP TABLE IF EXISTS data_memory;
DROP TABLE IF EXISTS data_wide;

-- compact
DROP TABLE IF EXISTS data_compact;
CREATE TABLE data_compact
(
`root.array` Array(UInt8),
)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS min_rows_for_compact_part=0, min_bytes_for_compact_part=0, min_rows_for_wide_part=100, min_bytes_for_wide_part=1e9;
INSERT INTO data_compact VALUES ([0]);
ALTER TABLE data_compact ADD COLUMN root.nested_array Array(Array(UInt8));
SELECT table, part_type FROM system.parts WHERE table = 'data_compact' AND database = currentDatabase();
SELECT root.nested_array FROM data_compact;

-- memory
DROP TABLE IF EXISTS data_memory;
CREATE TABLE data_memory
(
`root.array` Array(UInt8),
)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS min_rows_for_compact_part=100, min_bytes_for_compact_part=1e9, min_rows_for_wide_part=100, min_bytes_for_wide_part=1e9, in_memory_parts_enable_wal=0;
INSERT INTO data_memory VALUES ([0]);
ALTER TABLE data_memory ADD COLUMN root.nested_array Array(Array(UInt8));
SELECT table, part_type FROM system.parts WHERE table = 'data_memory' AND database = currentDatabase();
SELECT root.nested_array FROM data_memory;

-- wide
DROP TABLE IF EXISTS data_wide;
CREATE TABLE data_wide
(
`root.array` Array(UInt8),
)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS min_rows_for_wide_part=0, min_bytes_for_wide_part=0, min_rows_for_wide_part=0, min_bytes_for_wide_part=0;
INSERT INTO data_wide VALUES ([0]);
ALTER TABLE data_wide ADD COLUMN root.nested_array Array(Array(UInt8));
SELECT table, part_type FROM system.parts WHERE table = 'data_wide' AND database = currentDatabase();
SELECT root.nested_array FROM data_wide;