Skip to content

Commit

Permalink
Backport #37978 to 22.4: Fix reading of sparse columns from s3
Browse files Browse the repository at this point in the history
  • Loading branch information
robot-clickhouse committed Jul 14, 2022
1 parent be942f9 commit d99a3f4
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 56 deletions.
94 changes: 38 additions & 56 deletions src/Storages/MergeTree/MergeTreeReaderStream.cpp
Expand Up @@ -125,63 +125,45 @@ size_t MergeTreeReaderStream::getRightOffset(size_t right_mark_non_included)
size_t result_right_offset;
if (0 < right_mark_non_included && right_mark_non_included < marks_count)
{
auto right_mark = marks_loader.getMark(right_mark_non_included);
result_right_offset = right_mark.offset_in_compressed_file;

bool need_to_check_marks_from_the_right = false;

/// If the end of range is inside the block, we will need to read it too.
if (right_mark.offset_in_decompressed_block > 0)
{
need_to_check_marks_from_the_right = true;
}
else if (is_low_cardinality_dictionary)
{
/// Also, in LowCardinality dictionary several consecutive marks can point to
/// the same offset. So to get true bytes offset we have to get first
/// non-equal mark.
/// Example:
/// Mark 186, points to [2003111, 0]
/// Mark 187, points to [2003111, 0]
/// Mark 188, points to [2003111, 0] <--- for example need to read until 188
/// Mark 189, points to [2003111, 0] <--- not suitable, because have same offset
/// Mark 190, points to [2003111, 0]
/// Mark 191, points to [2003111, 0]
/// Mark 192, points to [2081424, 0] <--- what we are looking for
/// Mark 193, points to [2081424, 0]
/// Mark 194, points to [2081424, 0]

/// Also, in some cases, when one granule is not-atomically written (which is possible at merges)
/// one granule may require reading of two dictionaries which starts from different marks.
/// The only correct way is to take offset from at least next different granule from the right one.

/// Check test_s3_low_cardinality_right_border.

need_to_check_marks_from_the_right = true;
}


/// Let's go to the right and find mark with bigger offset in compressed file
if (need_to_check_marks_from_the_right)
{
bool found_bigger_mark = false;
for (size_t i = right_mark_non_included + 1; i < marks_count; ++i)
/// Find the right border of the last mark we need to read.
/// To do that let's find the upper bound of the offset of the last
/// included mark.

/// In LowCardinality dictionary and in values of Sparse columns
/// several consecutive marks can point to the same offset.
///
/// Example:
/// Mark 186, points to [2003111, 0]
/// Mark 187, points to [2003111, 0]
/// Mark 188, points to [2003111, 0] <--- for example need to read until 188
/// Mark 189, points to [2003111, 0] <--- not suitable, because have same offset
/// Mark 190, points to [2003111, 0]
/// Mark 191, points to [2003111, 0]
/// Mark 192, points to [2081424, 0] <--- what we are looking for
/// Mark 193, points to [2081424, 0]
/// Mark 194, points to [2081424, 0]

/// Also, in some cases, when one granule is not-atomically written (which is possible at merges)
/// one granule may require reading of two dictionaries which starts from different marks.
/// The only correct way is to take offset from at least next different granule from the right one.
/// So, that's why we have to read one extra granule to the right,
/// while reading dictionary of LowCardinality.

size_t right_mark_included = is_low_cardinality_dictionary
? right_mark_non_included
: right_mark_non_included - 1;

auto indices = collections::range(right_mark_included, marks_count);
auto it = std::upper_bound(indices.begin(), indices.end(), right_mark_included,
[&](auto lhs, auto rhs)
{
const auto & candidate_mark = marks_loader.getMark(i);
if (result_right_offset < candidate_mark.offset_in_compressed_file)
{
result_right_offset = candidate_mark.offset_in_compressed_file;
found_bigger_mark = true;
break;
}
}

if (!found_bigger_mark)
{
/// If there are no marks after the end of range, just use file size
result_right_offset = file_size;
}
}
return marks_loader.getMark(lhs).offset_in_compressed_file < marks_loader.getMark(rhs).offset_in_compressed_file;
});

if (it != indices.end())
result_right_offset = marks_loader.getMark(*it).offset_in_compressed_file;
else
result_right_offset = file_size;
}
else if (right_mark_non_included == 0)
result_right_offset = marks_loader.getMark(right_mark_non_included).offset_in_compressed_file;
Expand Down
2 changes: 2 additions & 0 deletions tests/queries/0_stateless/02336_sparse_columns_s3.reference
@@ -0,0 +1,2 @@
Sparse
50787
42 changes: 42 additions & 0 deletions tests/queries/0_stateless/02336_sparse_columns_s3.sql
@@ -0,0 +1,42 @@
-- Tags: no-parallel, no-fasttest, no-s3-storage

DROP TABLE IF EXISTS t_sparse_s3;

CREATE TABLE t_sparse_s3 (id UInt32, cond UInt8, s String)
engine = MergeTree ORDER BY id
settings ratio_of_defaults_for_sparse_serialization = 0.01, storage_policy = 's3_cache',
min_bytes_for_wide_part = 0, min_compress_block_size = 1;

INSERT INTO t_sparse_s3 SELECT 1, number % 2, '' FROM numbers(8192);
INSERT INTO t_sparse_s3 SELECT 2, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 3, number % 2, '' FROM numbers(8192);
INSERT INTO t_sparse_s3 SELECT 4, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 5, number % 2, '' FROM numbers(8192);
INSERT INTO t_sparse_s3 SELECT 6, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 7, number % 2, '' FROM numbers(8192);
INSERT INTO t_sparse_s3 SELECT 8, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 9, number % 2, '' FROM numbers(8192);
INSERT INTO t_sparse_s3 SELECT 10, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 11, number % 2, '' FROM numbers(8000);
INSERT INTO t_sparse_s3 SELECT 12, number % 2, 'foo' FROM numbers(192);
INSERT INTO t_sparse_s3 SELECT 13, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 14, number % 2, 'foo' FROM numbers(8192);
INSERT INTO t_sparse_s3 SELECT 15, number % 2, '' FROM numbers(24576);
INSERT INTO t_sparse_s3 SELECT 16, number % 2, 'foo' FROM numbers(4730);
INSERT INTO t_sparse_s3 SELECT 17, number % 2, 'foo' FROM numbers(3462);
INSERT INTO t_sparse_s3 SELECT 18, number % 2, '' FROM numbers(24576);

OPTIMIZE TABLE t_sparse_s3 FINAL;

SELECT serialization_kind FROM system.parts_columns
WHERE table = 't_sparse_s3' AND active AND column = 's'
AND database = currentDatabase();

SET max_threads = 1;

SELECT count() FROM t_sparse_s3
PREWHERE cond
WHERE id IN (1, 3, 5, 7, 9, 11, 13, 15, 17)
AND NOT ignore(s);

DROP TABLE t_sparse_s3;

0 comments on commit d99a3f4

Please sign in to comment.