Skip to content

Commit

Permalink
Merge pull request #51032 from ClickHouse/backport/23.5/50699
Browse files Browse the repository at this point in the history
Backport #50699 to 23.5: Revert recent grace hash join changes
  • Loading branch information
tavplubix committed Jun 16, 2023
2 parents 298b093 + 0f19fe5 commit dfc1df7
Show file tree
Hide file tree
Showing 16 changed files with 147 additions and 203 deletions.
2 changes: 0 additions & 2 deletions docs/en/operations/settings/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,6 @@ Possible values:

The first phase of a grace join reads the right table and splits it into N buckets depending on the hash value of key columns (initially, N is `grace_hash_join_initial_buckets`). This is done in a way to ensure that each bucket can be processed independently. Rows from the first bucket are added to an in-memory hash table while the others are saved to disk. If the hash table grows beyond the memory limit (e.g., as set by [`max_bytes_in_join`](/docs/en/operations/settings/query-complexity.md/#settings-max_bytes_in_join)), the number of buckets is increased and the assigned bucket for each row. Any rows which don’t belong to the current bucket are flushed and reassigned.

Supports `INNER/LEFT/RIGHT/FULL ALL/ANY JOIN`.

- hash

[Hash join algorithm](https://en.wikipedia.org/wiki/Hash_join) is used. The most generic implementation that supports all combinations of kind and strictness and multiple join keys that are combined with `OR` in the `JOIN ON` section.
Expand Down
68 changes: 22 additions & 46 deletions src/Interpreters/GraceHashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,8 @@ void GraceHashJoin::initBuckets()

bool GraceHashJoin::isSupported(const std::shared_ptr<TableJoin> & table_join)
{

bool is_asof = (table_join->strictness() == JoinStrictness::Asof);
auto kind = table_join->kind();
return !is_asof && (isInner(kind) || isLeft(kind) || isRight(kind) || isFull(kind)) && table_join->oneDisjunct();
return !is_asof && isInnerOrLeft(table_join->kind()) && table_join->oneDisjunct();
}

GraceHashJoin::~GraceHashJoin() = default;
Expand All @@ -327,6 +325,7 @@ bool GraceHashJoin::hasMemoryOverflow(size_t total_rows, size_t total_bytes) con
/// One row can't be split, avoid loop
if (total_rows < 2)
return false;

bool has_overflow = !table_join->sizeLimits().softCheck(total_rows, total_bytes);

if (has_overflow)
Expand Down Expand Up @@ -471,30 +470,18 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const

return hash_join_is_empty;
}
/// Each bucket are handled by the following steps
/// 1. build hash_join by the right side blocks.
/// 2. join left side with the hash_join,
/// 3. read right non-joined blocks from hash_join.
/// buckets are handled one by one, each hash_join will not be release before the right non-joined blocks are emitted.
///
/// There is a finished counter in JoiningTransform/DelayedJoinedBlocksWorkerTransform,
/// only one processor could take the non-joined blocks from right stream, and ensure all rows from
/// left stream have been emitted before this.
IBlocksStreamPtr
GraceHashJoin::getNonJoinedBlocks(const Block & left_sample_block_, const Block & result_sample_block_, UInt64 max_block_size_) const

IBlocksStreamPtr GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const
{
return hash_join->getNonJoinedBlocks(left_sample_block_, result_sample_block_, max_block_size_);
/// We do no support returning non joined blocks here.
/// TODO: They _should_ be reported by getDelayedBlocks instead
return nullptr;
}

class GraceHashJoin::DelayedBlocks : public IBlocksStream
{
public:
explicit DelayedBlocks(
size_t current_bucket_,
Buckets buckets_,
InMemoryJoinPtr hash_join_,
const Names & left_key_names_,
const Names & right_key_names_)
explicit DelayedBlocks(size_t current_bucket_, Buckets buckets_, InMemoryJoinPtr hash_join_, const Names & left_key_names_, const Names & right_key_names_)
: current_bucket(current_bucket_)
, buckets(std::move(buckets_))
, hash_join(std::move(hash_join_))
Expand All @@ -512,15 +499,12 @@ class GraceHashJoin::DelayedBlocks : public IBlocksStream

do
{
// One DelayedBlocks is shared among multiple DelayedJoinedBlocksWorkerTransform.
// There is a lock inside left_reader.read().
block = left_reader.read();
if (!block)
{
return {};
}

// block comes from left_reader, need to join with right table to get the result.
Blocks blocks = JoinCommon::scatterBlockByHash(left_key_names, block, num_buckets);
block = std::move(blocks[current_idx]);

Expand Down Expand Up @@ -571,14 +555,18 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()

size_t bucket_idx = current_bucket->idx;

size_t prev_keys_num = 0;
// If there is only one bucket, don't take this check.
if (hash_join && buckets.size() > 1)
if (hash_join)
{
// Use previous hash_join's keys number to estimate next hash_join's size is reasonable.
prev_keys_num = hash_join->getTotalRowCount();
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
for (auto & block : right_blocks)
{
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets.size());
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets, bucket_idx);
}
}

hash_join = makeInMemoryJoin();

for (bucket_idx = bucket_idx + 1; bucket_idx < buckets.size(); ++bucket_idx)
{
current_bucket = buckets[bucket_idx].get();
Expand All @@ -591,7 +579,6 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
continue;
}

hash_join = makeInMemoryJoin(prev_keys_num);
auto right_reader = current_bucket->startJoining();
size_t num_rows = 0; /// count rows that were written and rehashed
while (Block block = right_reader.read())
Expand All @@ -602,6 +589,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()

LOG_TRACE(log, "Loaded bucket {} with {}(/{}) rows",
bucket_idx, hash_join->getTotalRowCount(), num_rows);

return std::make_unique<DelayedBlocks>(current_bucket->idx, buckets, hash_join, left_key_names, right_key_names);
}

Expand All @@ -611,9 +599,9 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
return nullptr;
}

GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin(size_t reserve_num)
GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin()
{
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row, reserve_num);
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row);
}

Block GraceHashJoin::prepareRightBlock(const Block & block)
Expand Down Expand Up @@ -642,19 +630,6 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
if (!hash_join)
hash_join = makeInMemoryJoin();

// buckets size has been changed in other threads. Need to scatter current_block again.
// rehash could only happen under hash_join_mutex's scope.
auto current_buckets = getCurrentBuckets();
if (buckets_snapshot.size() != current_buckets.size())
{
LOG_TRACE(log, "mismatch buckets size. previous:{}, current:{}", buckets_snapshot.size(), getCurrentBuckets().size());
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, current_block, current_buckets.size());
flushBlocksToBuckets<JoinTableSide::Right>(blocks, current_buckets, bucket_index);
current_block = std::move(blocks[bucket_index]);
if (!current_block.rows())
return;
}
auto prev_keys_num = hash_join->getTotalRowCount();
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);

if (!hasMemoryOverflow(hash_join))
Expand All @@ -663,6 +638,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = {};

auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
hash_join = nullptr;

buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2);

Expand All @@ -682,7 +658,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = concatenateBlocks(current_blocks);
}

hash_join = makeInMemoryJoin(prev_keys_num);
hash_join = makeInMemoryJoin();

if (current_block.rows() > 0)
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
Expand Down
6 changes: 3 additions & 3 deletions src/Interpreters/GraceHashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace DB
{

class TableJoin;
class HashJoin;

Expand Down Expand Up @@ -78,7 +79,7 @@ class GraceHashJoin final : public IJoin
bool supportTotals() const override { return false; }

IBlocksStreamPtr
getNonJoinedBlocks(const Block & left_sample_block_, const Block & result_sample_block_, UInt64 max_block_size) const override;
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;

/// Open iterator over joined blocks.
/// Must be called after all @joinBlock calls.
Expand All @@ -90,8 +91,7 @@ class GraceHashJoin final : public IJoin
private:
void initBuckets();
/// Create empty join for in-memory processing.
/// reserve_num for reserving space in hash table.
InMemoryJoinPtr makeInMemoryJoin(size_t reserve_num = 0);
InMemoryJoinPtr makeInMemoryJoin();

/// Add right table block to the @join. Calls @rehash on overflow.
void addJoinedBlockImpl(Block block);
Expand Down
8 changes: 3 additions & 5 deletions src/Interpreters/HashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nulla
JoinCommon::removeColumnNullability(column);
}

HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_, size_t reserve_num)
HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_)
: table_join(table_join_)
, kind(table_join->kind())
, strictness(table_join->strictness())
Expand Down Expand Up @@ -302,7 +302,7 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
}

for (auto & maps : data->maps)
dataMapInit(maps, reserve_num);
dataMapInit(maps);
}

HashJoin::Type HashJoin::chooseMethod(JoinKind kind, const ColumnRawPtrs & key_columns, Sizes & key_sizes)
Expand Down Expand Up @@ -454,15 +454,13 @@ struct KeyGetterForType
using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type;
};

void HashJoin::dataMapInit(MapsVariant & map, size_t reserve_num)
void HashJoin::dataMapInit(MapsVariant & map)
{

if (kind == JoinKind::Cross)
return;
joinDispatchInit(kind, strictness, map);
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { map_.create(data->type); });
if (reserve_num)
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { map_.reserve(data->type, reserve_num); });
}

bool HashJoin::empty() const
Expand Down
30 changes: 2 additions & 28 deletions src/Interpreters/HashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class JoinUsedFlags
class HashJoin : public IJoin
{
public:
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false, size_t reserve_num = 0);
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false);

~HashJoin() override;

Expand Down Expand Up @@ -217,16 +217,6 @@ class HashJoin : public IJoin
M(keys256) \
M(hashed)

/// Only for maps using hash table.
#define APPLY_FOR_HASH_JOIN_VARIANTS(M) \
M(key32) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
M(keys128) \
M(keys256) \
M(hashed)


/// Used for reading from StorageJoin and applying joinGet function
#define APPLY_FOR_JOIN_VARIANTS_LIMITED(M) \
Expand Down Expand Up @@ -276,22 +266,6 @@ class HashJoin : public IJoin
}
}

void reserve(Type which, size_t num)
{
switch (which)
{
case Type::EMPTY: break;
case Type::CROSS: break;
case Type::key8: break;
case Type::key16: break;

#define M(NAME) \
case Type::NAME: NAME->reserve(num); break;
APPLY_FOR_HASH_JOIN_VARIANTS(M)
#undef M
}
}

size_t getTotalRowCount(Type which) const
{
switch (which)
Expand Down Expand Up @@ -435,7 +409,7 @@ class HashJoin : public IJoin
/// If set HashJoin instance is not available for modification (addJoinedBlock)
TableLockHolder storage_join_lock = nullptr;

void dataMapInit(MapsVariant &, size_t);
void dataMapInit(MapsVariant &);

void initRightBlockStructure(Block & saved_block_sample);

Expand Down

0 comments on commit dfc1df7

Please sign in to comment.