Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grace hash join supports right/full join #49483

Merged
merged 15 commits into from
May 22, 2023
2 changes: 2 additions & 0 deletions docs/en/operations/settings/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ Possible values:

The first phase of a grace join reads the right table and splits it into N buckets depending on the hash value of key columns (initially, N is `grace_hash_join_initial_buckets`). This is done in a way to ensure that each bucket can be processed independently. Rows from the first bucket are added to an in-memory hash table while the others are saved to disk. If the hash table grows beyond the memory limit (e.g., as set by [`max_bytes_in_join`](/docs/en/operations/settings/query-complexity.md/#settings-max_bytes_in_join)), the number of buckets is increased and the assigned bucket for each row. Any rows which don’t belong to the current bucket are flushed and reassigned.

Supports `INNER/LEFT/RIGHT/FULL ALL/ANY JOIN`.

- hash

[Hash join algorithm](https://en.wikipedia.org/wiki/Hash_join) is used. The most generic implementation that supports all combinations of kind and strictness and multiple join keys that are combined with `OR` in the `JOIN ON` section.
Expand Down
55 changes: 36 additions & 19 deletions src/Interpreters/GraceHashJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,10 @@ void GraceHashJoin::initBuckets()

bool GraceHashJoin::isSupported(const std::shared_ptr<TableJoin> & table_join)
{

bool is_asof = (table_join->strictness() == JoinStrictness::Asof);
return !is_asof && isInnerOrLeft(table_join->kind()) && table_join->oneDisjunct();
auto kind = table_join->kind();
return !is_asof && (isInner(kind) || isLeft(kind) || isRight(kind) || isFull(kind)) && table_join->oneDisjunct();
}

GraceHashJoin::~GraceHashJoin() = default;
Expand All @@ -325,7 +327,6 @@ bool GraceHashJoin::hasMemoryOverflow(size_t total_rows, size_t total_bytes) con
/// One row can't be split, avoid loop
if (total_rows < 2)
return false;

bool has_overflow = !table_join->sizeLimits().softCheck(total_rows, total_bytes);

if (has_overflow)
Expand Down Expand Up @@ -470,18 +471,30 @@ bool GraceHashJoin::alwaysReturnsEmptySet() const

return hash_join_is_empty;
}

IBlocksStreamPtr GraceHashJoin::getNonJoinedBlocks(const Block &, const Block &, UInt64) const
/// Each bucket are handled by the following steps
/// 1. build hash_join by the right side blocks.
/// 2. join left side with the hash_join,
/// 3. read right non-joined blocks from hash_join.
/// buckets are handled one by one, each hash_join will not be release before the right non-joined blocks are emitted.
///
/// There is a finished counter in JoiningTransform/DelayedJoinedBlocksWorkerTransform,
/// only one processor could take the non-joined blocks from right stream, and ensure all rows from
/// left stream have been emitted before this.
IBlocksStreamPtr
GraceHashJoin::getNonJoinedBlocks(const Block & left_sample_block_, const Block & result_sample_block_, UInt64 max_block_size_) const
{
/// We do no support returning non joined blocks here.
/// TODO: They _should_ be reported by getDelayedBlocks instead
return nullptr;
return hash_join->getNonJoinedBlocks(left_sample_block_, result_sample_block_, max_block_size_);
lgbo-ustc marked this conversation as resolved.
Show resolved Hide resolved
}

class GraceHashJoin::DelayedBlocks : public IBlocksStream
{
public:
explicit DelayedBlocks(size_t current_bucket_, Buckets buckets_, InMemoryJoinPtr hash_join_, const Names & left_key_names_, const Names & right_key_names_)
explicit DelayedBlocks(
size_t current_bucket_,
Buckets buckets_,
InMemoryJoinPtr hash_join_,
const Names & left_key_names_,
const Names & right_key_names_)
: current_bucket(current_bucket_)
, buckets(std::move(buckets_))
, hash_join(std::move(hash_join_))
Expand All @@ -499,12 +512,15 @@ class GraceHashJoin::DelayedBlocks : public IBlocksStream

do
{
// One DelayedBlocks is shared among multiple DelayedJoinedBlocksWorkerTransform.
// There is a lock inside left_reader.read().
block = left_reader.read();
if (!block)
{
return {};
}

// block comes from left_reader, need to join with right table to get the result.
Blocks blocks = JoinCommon::scatterBlockByHash(left_key_names, block, num_buckets);
block = std::move(blocks[current_idx]);

Expand Down Expand Up @@ -555,16 +571,6 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()

size_t bucket_idx = current_bucket->idx;

if (hash_join)
{
auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
lgbo-ustc marked this conversation as resolved.
Show resolved Hide resolved
for (auto & block : right_blocks)
{
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, block, buckets.size());
flushBlocksToBuckets<JoinTableSide::Right>(blocks, buckets, bucket_idx);
}
}

hash_join = makeInMemoryJoin();

for (bucket_idx = bucket_idx + 1; bucket_idx < buckets.size(); ++bucket_idx)
Expand All @@ -589,7 +595,6 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()

LOG_TRACE(log, "Loaded bucket {} with {}(/{}) rows",
bucket_idx, hash_join->getTotalRowCount(), num_rows);

return std::make_unique<DelayedBlocks>(current_bucket->idx, buckets, hash_join, left_key_names, right_key_names);
}

Expand Down Expand Up @@ -629,6 +634,18 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
if (!hash_join)
hash_join = makeInMemoryJoin();

// buckets size has been changed in other threads. Need to scatter current_block again.
// rehash could only happen under hash_join_mutex's scope.
auto current_buckets = getCurrentBuckets();
if (buckets_snapshot.size() != current_buckets.size())
{
LOG_TRACE(log, "mismatch buckets size. previous:{}, current:{}", buckets_snapshot.size(), getCurrentBuckets().size());
Blocks blocks = JoinCommon::scatterBlockByHash(right_key_names, current_block, current_buckets.size());
flushBlocksToBuckets<JoinTableSide::Right>(blocks, current_buckets, bucket_index);
current_block = std::move(blocks[bucket_index]);
if (!current_block.rows())
return;
}
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);

if (!hasMemoryOverflow(hash_join))
Expand Down
3 changes: 1 addition & 2 deletions src/Interpreters/GraceHashJoin.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

namespace DB
{

class TableJoin;
class HashJoin;

Expand Down Expand Up @@ -79,7 +78,7 @@ class GraceHashJoin final : public IJoin
bool supportTotals() const override { return false; }

IBlocksStreamPtr
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;
getNonJoinedBlocks(const Block & left_sample_block_, const Block & result_sample_block_, UInt64 max_block_size) const override;

/// Open iterator over joined blocks.
/// Must be called after all @joinBlock calls.
Expand Down
73 changes: 66 additions & 7 deletions src/Processors/Transforms/JoiningTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ void JoiningTransform::transform(Chunk & chunk)
}
else
block = readExecute(chunk);

auto num_rows = block.rows();
chunk.setColumns(block.getColumns(), num_rows);
}
Expand Down Expand Up @@ -311,8 +310,16 @@ void FillingRightJoinSideTransform::work()
}


DelayedJoinedBlocksWorkerTransform::DelayedJoinedBlocksWorkerTransform(Block output_header)
: IProcessor(InputPorts{Block()}, OutputPorts{output_header})
DelayedJoinedBlocksWorkerTransform::DelayedJoinedBlocksWorkerTransform(
Block left_header_,
Block output_header_,
size_t max_block_size_,
JoinPtr join_)
: IProcessor(InputPorts{Block()}, OutputPorts{output_header_})
, left_header(left_header_)
, output_header(output_header_)
, max_block_size(max_block_size_)
, join(join_)
{
}

Expand Down Expand Up @@ -366,6 +373,7 @@ IProcessor::Status DelayedJoinedBlocksWorkerTransform::prepare()
if (!data.chunk.hasChunkInfo())
throw Exception(ErrorCodes::LOGICAL_ERROR, "DelayedJoinedBlocksWorkerTransform must have chunk info");
task = std::dynamic_pointer_cast<const DelayedBlocksTask>(data.chunk.getChunkInfo());

}
else
{
Expand All @@ -386,12 +394,24 @@ void DelayedJoinedBlocksWorkerTransform::work()
{
if (!task)
return;
Block block;
if (!left_delayed_stream_finished)
{
block = task->delayed_blocks->next();

Block block = task->delayed_blocks->next();

if (!block)
{
left_delayed_stream_finished = true;
block = nextNonJoinedBlock();
}
}
else
{
block = nextNonJoinedBlock();
}
if (!block)
{
task.reset();
resetTask();
return;
}

Expand All @@ -400,6 +420,38 @@ void DelayedJoinedBlocksWorkerTransform::work()
output_chunk.setColumns(block.getColumns(), rows);
}

void DelayedJoinedBlocksWorkerTransform::resetTask()
{
task.reset();
left_delayed_stream_finished = false;
setup_non_joined_stream = false;
non_joined_delayed_stream = nullptr;
}

Block DelayedJoinedBlocksWorkerTransform::nextNonJoinedBlock()
{
if (!setup_non_joined_stream)
{
setup_non_joined_stream = true;
// Before read from non-joined stream, all blocks in left file reader must have been joined.
// For example, in HashJoin, it may return invalid mismatch rows from non-joined stream before
// the all blocks in left file reader have been finished, since the used flags are incomplete.
// To make only one processor could read from non-joined stream seems be a easy way.
if (task && task->left_delayed_stream_finish_counter->isLast())
{
if (!non_joined_delayed_stream)
{
non_joined_delayed_stream = join->getNonJoinedBlocks(left_header, output_header, max_block_size);
}
}
}
if (non_joined_delayed_stream)
{
return non_joined_delayed_stream->next();
}
return {};
}

DelayedJoinedBlocksTransform::DelayedJoinedBlocksTransform(size_t num_streams, JoinPtr join_)
: IProcessor(InputPorts{}, OutputPorts(num_streams, Block()))
, join(std::move(join_))
Expand Down Expand Up @@ -433,6 +485,9 @@ IProcessor::Status DelayedJoinedBlocksTransform::prepare()

if (finished)
{
// Since have memory limit, cannot handle all buckets parallelly by different
// DelayedJoinedBlocksWorkerTransform. So send the same task to all outputs.
// Wait for all DelayedJoinedBlocksWorkerTransform be idle before getting next bucket.
for (auto & output : outputs)
{
if (output.isFinished())
Expand All @@ -448,10 +503,14 @@ IProcessor::Status DelayedJoinedBlocksTransform::prepare()

if (delayed_blocks)
{
// This counter is used to ensure that only the last DelayedJoinedBlocksWorkerTransform
// could read right non-joined blocks from the join.
auto left_delayed_stream_finished_counter = std::make_shared<JoiningTransform::FinishCounter>(outputs.size());
for (auto & output : outputs)
{
Chunk chunk;
chunk.setChunkInfo(std::make_shared<DelayedBlocksTask>(delayed_blocks));
auto task = std::make_shared<DelayedBlocksTask>(delayed_blocks, left_delayed_stream_finished_counter);
chunk.setChunkInfo(task);
output.push(std::move(chunk));
}
delayed_blocks = nullptr;
Expand Down
25 changes: 22 additions & 3 deletions src/Processors/Transforms/JoiningTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,14 @@ class DelayedBlocksTask : public ChunkInfo
public:

explicit DelayedBlocksTask() : finished(true) {}
explicit DelayedBlocksTask(IBlocksStreamPtr delayed_blocks_) : delayed_blocks(std::move(delayed_blocks_)) {}
explicit DelayedBlocksTask(IBlocksStreamPtr delayed_blocks_, JoiningTransform::FinishCounterPtr left_delayed_stream_finish_counter_)
: delayed_blocks(std::move(delayed_blocks_))
, left_delayed_stream_finish_counter(left_delayed_stream_finish_counter_)
{
}

IBlocksStreamPtr delayed_blocks = nullptr;
JoiningTransform::FinishCounterPtr left_delayed_stream_finish_counter = nullptr;

bool finished = false;
};
Expand Down Expand Up @@ -147,18 +152,32 @@ class DelayedJoinedBlocksTransform : public IProcessor
class DelayedJoinedBlocksWorkerTransform : public IProcessor
{
public:
explicit DelayedJoinedBlocksWorkerTransform(Block output_header);
explicit DelayedJoinedBlocksWorkerTransform(
Block left_header_,
Block output_header_,
size_t max_block_size_,
JoinPtr join_);

String getName() const override { return "DelayedJoinedBlocksWorkerTransform"; }

Status prepare() override;
void work() override;

private:
Block left_header;
Block output_header;
size_t max_block_size;
JoinPtr join;
DelayedBlocksTaskPtr task;
Chunk output_chunk;

bool finished = false;
/// All joined and non-joined rows from left stream are emitted, only right non-joined rows are left
bool left_delayed_stream_finished = false;
lgbo-ustc marked this conversation as resolved.
Show resolved Hide resolved
bool setup_non_joined_stream = false;
IBlocksStreamPtr non_joined_delayed_stream = nullptr;

void resetTask();
Block nextNonJoinedBlock();
};

}
2 changes: 1 addition & 1 deletion src/QueryPipeline/QueryPipelineBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
if (delayed_root)
{
// Process delayed joined blocks when all JoiningTransform are finished.
auto delayed = std::make_shared<DelayedJoinedBlocksWorkerTransform>(joined_header);
auto delayed = std::make_shared<DelayedJoinedBlocksWorkerTransform>(left_header, joined_header, max_block_size, join);
if (delayed->getInputs().size() != 1 || delayed->getOutputs().size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "DelayedJoinedBlocksWorkerTransform should have one input and one output");

Expand Down