Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert recent grace hash join changes #50699

Merged
merged 2 commits into from Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions docs/en/operations/settings/settings.md
Expand Up @@ -452,8 +452,6 @@ Possible values:

The first phase of a grace join reads the right table and splits it into N buckets depending on the hash value of key columns (initially, N is `grace_hash_join_initial_buckets`). This is done in a way to ensure that each bucket can be processed independently. Rows from the first bucket are added to an in-memory hash table while the others are saved to disk. If the hash table grows beyond the memory limit (e.g., as set by [`max_bytes_in_join`](/docs/en/operations/settings/query-complexity.md/#settings-max_bytes_in_join)), the number of buckets is increased and the assigned bucket for each row. Any rows which don’t belong to the current bucket are flushed and reassigned.

Supports `INNER/LEFT/RIGHT/FULL ALL/ANY JOIN`.

- hash

[Hash join algorithm](https://en.wikipedia.org/wiki/Hash_join) is used. The most generic implementation that supports all combinations of kind and strictness and multiple join keys that are combined with `OR` in the `JOIN ON` section.
Expand Down
68 changes: 22 additions & 46 deletions src/Interpreters/GraceHashJoin.cpp
Expand Up @@ -304,10 +304,8 @@ void GraceHashJoin::initBuckets()

bool GraceHashJoin::isSupported(const std::shared_ptr<TableJoin> & table_join)
{

bool is_asof = (table_join->strictness() == JoinStrictness::Asof);
auto kind = table_join->kind();
return !is_asof && (isInner(kind) || isLeft(kind) || isRight(kind) || isFull(kind)) && table_join->oneDisjunct();
return !is_asof && isInnerOrLeft(table_join->kind()) && table_join->oneDisjunct();
}

GraceHashJoin::~GraceHashJoin() = default;
Expand All @@ -327,6 +325,7 @@ bool GraceHashJoin::hasMemoryOverflow(size_t total_rows, size_t total_bytes) con
/// One row can't be split, avoid loop
if (total_rows < 2)
return false;

bool has_overflow = !table_join->sizeLimits().softCheck(total_rows, total_bytes);

if (has_overflow)
Expand Down Expand Up @@ -471,30 +470,18 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const

return hash_join_is_empty;
}
/// Each bucket are handled by the following steps
/// 1. build hash_join by the right side blocks.
/// 2. join left side with the hash_join,
/// 3. read right non-joined blocks from hash_join.
/// buckets are handled one by one, each hash_join will not be release before the right non-joined blocks are emitted.
///
/// There is a finished counter in JoiningTransform/DelayedJoinedBlocksWorkerTransform,
/// only one processor could take the non-joined blocks from right stream, and ensure all rows from
/// left stream have been emitted before this.
IBlocksStreamPtr
GraceHashJoin::getNonJoinedBlocks(const Block & left_sample_block_, const Block & result_sample_block_, UInt64 max_block_size_) const

IBlocksStreamPtr GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const
{
return hash_join->getNonJoinedBlocks(left_sample_block_, result_sample_block_, max_block_size_);
/// We do no support returning non joined blocks here.
/// TODO: They _should_ be reported by getDelayedBlocks instead
return nullptr;
}

class GraceHashJoin::DelayedBlocks : public IBlocksStream
{
public:
explicit DelayedBlocks(
size_t current_bucket_,
Buckets buckets_,
InMemoryJoinPtr hash_join_,
const Names & left_key_names_,
const Names & right_key_names_)
explicit DelayedBlocks(size_t current_bucket_, Buckets buckets_, InMemoryJoinPtr hash_join_, const Names & left_key_names_, const Names & right_key_names_)
: current_bucket(current_bucket_)
, buckets(std::move(buckets_))
, hash_join(std::move(hash_join_))
Expand All @@ -512,15 +499,12 @@ class GraceHashJoin::DelayedBlocks : public IBlocksStream

do
{
// One DelayedBlocks is shared among multiple DelayedJoinedBlocksWorkerTransform.
// There is a lock inside left_reader.read().
block = left_reader.read();
if (!block)
{
return {};
}

// block comes from left_reader, need to join with right table to get the result.
Blocks blocks = JoinCommon::scatterBlockByHash(left_key_names, block, num_buckets);
block = std::move(blocks[current_idx]);

Expand Down Expand Up @@ -571,14 +555,18 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()

size_t bucket_idx = current_bucket->idx;

size_t prev_keys_num = 0;
// If there is only one bucket, don't take this check.
if (hash_join && buckets.size() > 1)
if (hash_join)
{
// Use previous hash_join's keys number to estimate next hash_join's size is reasonable.
prev_keys_num = hash_join->getTotalRowCount();
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
for (auto & block : right_blocks)
{
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets.size());
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets, bucket_idx);
}
}

hash_join = makeInMemoryJoin();

for (bucket_idx = bucket_idx + 1; bucket_idx < buckets.size(); ++bucket_idx)
{
current_bucket = buckets[bucket_idx].get();
Expand All @@ -591,7 +579,6 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
continue;
}

hash_join = makeInMemoryJoin(prev_keys_num);
auto right_reader = current_bucket->startJoining();
size_t num_rows = 0; /// count rows that were written and rehashed
while (Block block = right_reader.read())
Expand All @@ -602,6 +589,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()

LOG_TRACE(log, "Loaded bucket {} with {}(/{}) rows",
bucket_idx, hash_join->getTotalRowCount(), num_rows);

return std::make_unique<DelayedBlocks>(current_bucket->idx, buckets, hash_join, left_key_names, right_key_names);
}

Expand All @@ -611,9 +599,9 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
return nullptr;
}

GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin(size_t reserve_num)
GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin()
{
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row, reserve_num);
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row);
}

Block GraceHashJoin::prepareRightBlock(const Block & block)
Expand Down Expand Up @@ -641,19 +629,6 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
if (!hash_join)
hash_join = makeInMemoryJoin();

// buckets size has been changed in other threads. Need to scatter current_block again.
// rehash could only happen under hash_join_mutex's scope.
auto current_buckets = getCurrentBuckets();
if (buckets_snapshot.size() != current_buckets.size())
{
LOG_TRACE(log, "mismatch buckets size. previous:{}, current:{}", buckets_snapshot.size(), getCurrentBuckets().size());
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, current_block, current_buckets.size());
flushBlocksToBuckets<JoinTableSide::Right>(blocks, current_buckets, bucket_index);
current_block = std::move(blocks[bucket_index]);
if (!current_block.rows())
return;
}
auto prev_keys_num = hash_join->getTotalRowCount();
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);

if (!hasMemoryOverflow(hash_join))
Expand All @@ -662,6 +637,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = {};

auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
hash_join = nullptr;

buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2);

Expand All @@ -681,7 +657,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = concatenateBlocks(current_blocks);
}

hash_join = makeInMemoryJoin(prev_keys_num);
hash_join = makeInMemoryJoin();

if (current_block.rows() > 0)
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
Expand Down
6 changes: 3 additions & 3 deletions src/Interpreters/GraceHashJoin.h
Expand Up @@ -13,6 +13,7 @@

namespace DB
{

class TableJoin;
class HashJoin;

Expand Down Expand Up @@ -78,7 +79,7 @@ class GraceHashJoin final : public IJoin
bool supportTotals() const override { return false; }

IBlocksStreamPtr
getNonJoinedBlocks(const Block & left_sample_block_, const Block & result_sample_block_, UInt64 max_block_size) const override;
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;

/// Open iterator over joined blocks.
/// Must be called after all @joinBlock calls.
Expand All @@ -90,8 +91,7 @@ class GraceHashJoin final : public IJoin
private:
void initBuckets();
/// Create empty join for in-memory processing.
/// reserve_num for reserving space in hash table.
InMemoryJoinPtr makeInMemoryJoin(size_t reserve_num = 0);
InMemoryJoinPtr makeInMemoryJoin();

/// Add right table block to the @join. Calls @rehash on overflow.
void addJoinedBlockImpl(Block block);
Expand Down
8 changes: 3 additions & 5 deletions src/Interpreters/HashJoin.cpp
Expand Up @@ -217,7 +217,7 @@ static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nulla
JoinCommon::removeColumnNullability(column);
}

HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_, size_t reserve_num)
HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_)
: table_join(table_join_)
, kind(table_join->kind())
, strictness(table_join->strictness())
Expand Down Expand Up @@ -302,7 +302,7 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
}

for (auto & maps : data->maps)
dataMapInit(maps, reserve_num);
dataMapInit(maps);
}

HashJoin::Type HashJoin::chooseMethod(JoinKind kind, const ColumnRawPtrs & key_columns, Sizes & key_sizes)
Expand Down Expand Up @@ -454,15 +454,13 @@ struct KeyGetterForType
using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type;
};

void HashJoin::dataMapInit(MapsVariant & map, size_t reserve_num)
void HashJoin::dataMapInit(MapsVariant & map)
{

if (kind == JoinKind::Cross)
return;
joinDispatchInit(kind, strictness, map);
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { map_.create(data->type); });
if (reserve_num)
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { map_.reserve(data->type, reserve_num); });
}

bool HashJoin::empty() const
Expand Down
30 changes: 2 additions & 28 deletions src/Interpreters/HashJoin.h
Expand Up @@ -146,7 +146,7 @@ class JoinUsedFlags
class HashJoin : public IJoin
{
public:
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false, size_t reserve_num = 0);
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false);

~HashJoin() override;

Expand Down Expand Up @@ -217,16 +217,6 @@ class HashJoin : public IJoin
M(keys256) \
M(hashed)

/// Only for maps using hash table.
#define APPLY_FOR_HASH_JOIN_VARIANTS(M) \
M(key32) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
M(keys128) \
M(keys256) \
M(hashed)


/// Used for reading from StorageJoin and applying joinGet function
#define APPLY_FOR_JOIN_VARIANTS_LIMITED(M) \
Expand Down Expand Up @@ -276,22 +266,6 @@ class HashJoin : public IJoin
}
}

void reserve(Type which, size_t num)
{
switch (which)
{
case Type::EMPTY: break;
case Type::CROSS: break;
case Type::key8: break;
case Type::key16: break;

#define M(NAME) \
case Type::NAME: NAME->reserve(num); break;
APPLY_FOR_HASH_JOIN_VARIANTS(M)
#undef M
}
}

size_t getTotalRowCount(Type which) const
{
switch (which)
Expand Down Expand Up @@ -435,7 +409,7 @@ class HashJoin : public IJoin
/// If set HashJoin instance is not available for modification (addJoinedBlock)
TableLockHolder storage_join_lock = nullptr;

void dataMapInit(MapsVariant &, size_t);
void dataMapInit(MapsVariant &);

void initRightBlockStructure(Block & saved_block_sample);

Expand Down