Skip to content

Commit

Permalink
Merge pull request #58627 from ClickHouse/backport/23.12/58595
Browse files Browse the repository at this point in the history
Backport #58595 to 23.12: Disable max_joined_block_rows in ConcurrentHashJoin
  • Loading branch information
robot-ch-test-poll2 committed Jan 9, 2024
2 parents c493ad2 + b6d59c9 commit 56527c7
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 3 deletions.
3 changes: 3 additions & 0 deletions src/Interpreters/ConcurrentHashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ ConcurrentHashJoin::ConcurrentHashJoin(ContextPtr context_, std::shared_ptr<Tabl
auto inner_hash_join = std::make_shared<InternalHashJoin>();

inner_hash_join->data = std::make_unique<HashJoin>(table_join_, right_sample_block, any_take_last_row_, 0, fmt::format("concurrent{}", i));
/// Non zero `max_joined_block_rows` allows to process block partially and return not processed part.
/// TODO: It's not handled properly in ConcurrentHashJoin case, so we set it to 0 to disable this feature.
inner_hash_join->data->setMaxJoinedBlockRows(0);
hash_joins.emplace_back(std::move(inner_hash_join));
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/Interpreters/HashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
, asof_inequality(table_join->getAsofInequality())
, data(std::make_shared<RightTableData>())
, right_sample_block(right_sample_block_)
, max_joined_block_rows(table_join->maxJoinedBlockRows())
, instance_log_id(!instance_id_.empty() ? "(" + instance_id_ + ") " : "")
, log(&Poco::Logger::get("HashJoin"))
{
Expand Down Expand Up @@ -1401,7 +1402,7 @@ NO_INLINE size_t joinRightColumns(
{
if constexpr (join_features.need_replication)
{
if (unlikely(current_offset > max_joined_block_rows))
if (unlikely(current_offset >= max_joined_block_rows))
{
added_columns.offsets_to_replicate->resize_assume_reserved(i);
added_columns.filter.resize_assume_reserved(i);
Expand Down Expand Up @@ -1690,7 +1691,7 @@ Block HashJoin::joinBlockImpl(

bool has_required_right_keys = (required_right_keys.columns() != 0);
added_columns.need_filter = join_features.need_filter || has_required_right_keys;
added_columns.max_joined_block_rows = table_join->maxJoinedBlockRows();
added_columns.max_joined_block_rows = max_joined_block_rows;
if (!added_columns.max_joined_block_rows)
added_columns.max_joined_block_rows = std::numeric_limits<size_t>::max();
else
Expand Down Expand Up @@ -1771,7 +1772,6 @@ Block HashJoin::joinBlockImpl(

void HashJoin::joinBlockImplCross(Block & block, ExtraBlockPtr & not_processed) const
{
size_t max_joined_block_rows = table_join->maxJoinedBlockRows();
size_t start_left_row = 0;
size_t start_right_block = 0;
if (not_processed)
Expand Down
5 changes: 5 additions & 0 deletions src/Interpreters/HashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@ class HashJoin : public IJoin

void shrinkStoredBlocksToFit(size_t & total_bytes_in_join);

void setMaxJoinedBlockRows(size_t value) { max_joined_block_rows = value; }

private:
template<bool> friend class NotJoinedHash;

Expand Down Expand Up @@ -433,6 +435,9 @@ class HashJoin : public IJoin
/// Left table column names that are sources for required_right_keys columns
std::vector<String> required_right_keys_sources;

/// Maximum number of rows in result block. If it is 0, then no limits.
size_t max_joined_block_rows = 0;

/// When tracked memory consumption is more than a threshold, we will shrink to fit stored blocks.
bool shrink_blocks = false;
Int64 memory_usage_before_adding_blocks = 0;
Expand Down
32 changes: 32 additions & 0 deletions tests/queries/0_stateless/02962_max_joined_block_rows.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
1 0
1 1
1 2
1 3
1 4
1 5
1 6
1 7
1 8
1 9
--
1 0
1 1
1 2
1 3
1 4
1 5
1 6
1 7
1 8
1 9
--
1 0
1 1
1 2
1 3
1 4
1 5
1 6
1 7
1 8
1 9
38 changes: 38 additions & 0 deletions tests/queries/0_stateless/02962_max_joined_block_rows.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
DROP TABLE IF EXISTS t1;
DROP TABLE IF EXISTS t2;

CREATE table t1 (a UInt64, b UInt64) ENGINE = Memory;
INSERT INTO t1 SELECT number % 2, number FROM numbers(10);

CREATE table t2 (a UInt64) ENGINE = Memory;

INSERT INTO t2 SELECT number % 2 FROM numbers(10);

-- block size is always multiple of 5 because we have 5 rows for each key in right table
-- we do not split rows corresponding to the same key

SELECT max(bs) <= 5, b FROM (
SELECT blockSize() as bs, * FROM t1 JOIN t2 ON t1.a = t2.a
) GROUP BY b
ORDER BY b
SETTINGS max_joined_block_size_rows = 5;

SELECT '--';

SELECT max(bs) <= 10, b FROM (
SELECT blockSize() as bs, * FROM t1 JOIN t2 ON t1.a = t2.a
) GROUP BY b
ORDER BY b
SETTINGS max_joined_block_size_rows = 10;

SELECT '--';

-- parallel_hash doen't support max_joined_block_size_rows

SET join_algorithm = 'parallel_hash';

SELECT max(bs) > 10, b FROM (
SELECT blockSize() as bs, * FROM t1 JOIN t2 ON t1.a = t2.a
) GROUP BY b
ORDER BY b
SETTINGS max_joined_block_size_rows = 10;

0 comments on commit 56527c7

Please sign in to comment.