Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve grace_hash join by reserving hash table's size #49816

Merged
merged 6 commits into from May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 12 additions & 5 deletions src/Interpreters/GraceHashJoin.cpp
Expand Up @@ -571,7 +571,13 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()

size_t bucket_idx = current_bucket->idx;

hash_join = makeInMemoryJoin();
size_t prev_keys_num = 0;
// If there is only one bucket, don't take this check.
if (hash_join && buckets.size() > 1)
{
// Use previous hash_join's keys number to estimate next hash_join's size is reasonable.
prev_keys_num = hash_join->getTotalRowCount();
}

for (bucket_idx = bucket_idx + 1; bucket_idx < buckets.size(); ++bucket_idx)
{
Expand All @@ -585,6 +591,7 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
continue;
}

hash_join = makeInMemoryJoin(prev_keys_num);
auto right_reader = current_bucket->startJoining();
size_t num_rows = 0; /// count rows that were written and rehashed
while (Block block = right_reader.read())
Expand All @@ -604,9 +611,9 @@ IBlocksStreamPtr GraceHashJoin::getDelayedBlocks()
return nullptr;
}

GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin()
GraceHashJoin::InMemoryJoinPtr GraceHashJoin::makeInMemoryJoin(size_t reserve_num)
{
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row);
return std::make_unique<InMemoryJoin>(table_join, right_sample_block, any_take_last_row, reserve_num);
}

Block GraceHashJoin::prepareRightBlock(const Block & block)
Expand Down Expand Up @@ -646,6 +653,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
if (!current_block.rows())
return;
}
auto prev_keys_num = hash_join->getTotalRowCount();
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);

if (!hasMemoryOverflow(hash_join))
Expand All @@ -654,7 +662,6 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = {};

auto right_blocks = hash_join->releaseJoinedBlocks(/* restructure */ false);
hash_join = nullptr;

buckets_snapshot = rehashBuckets(buckets_snapshot.size() * 2);

Expand All @@ -674,7 +681,7 @@ void GraceHashJoin::addJoinedBlockImpl(Block block)
current_block = concatenateBlocks(current_blocks);
}

hash_join = makeInMemoryJoin();
hash_join = makeInMemoryJoin(prev_keys_num);

if (current_block.rows() > 0)
hash_join->addJoinedBlock(current_block, /* check_limits = */ false);
Expand Down
3 changes: 2 additions & 1 deletion src/Interpreters/GraceHashJoin.h
Expand Up @@ -90,7 +90,8 @@ class GraceHashJoin final : public IJoin
private:
void initBuckets();
/// Create empty join for in-memory processing.
InMemoryJoinPtr makeInMemoryJoin();
/// reserve_num for reserving space in hash table.
InMemoryJoinPtr makeInMemoryJoin(size_t reserve_num = 0);

/// Add right table block to the @join. Calls @rehash on overflow.
void addJoinedBlockImpl(Block block);
Expand Down
8 changes: 5 additions & 3 deletions src/Interpreters/HashJoin.cpp
Expand Up @@ -217,7 +217,7 @@ static void correctNullabilityInplace(ColumnWithTypeAndName & column, bool nulla
JoinCommon::removeColumnNullability(column);
}

HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_)
HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block_, bool any_take_last_row_, size_t reserve_num)
: table_join(table_join_)
, kind(table_join->kind())
, strictness(table_join->strictness())
Expand Down Expand Up @@ -302,7 +302,7 @@ HashJoin::HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_s
}

for (auto & maps : data->maps)
dataMapInit(maps);
dataMapInit(maps, reserve_num);
}

HashJoin::Type HashJoin::chooseMethod(JoinKind kind, const ColumnRawPtrs & key_columns, Sizes & key_sizes)
Expand Down Expand Up @@ -454,13 +454,15 @@ struct KeyGetterForType
using Type = typename KeyGetterForTypeImpl<type, Value, Mapped>::Type;
};

void HashJoin::dataMapInit(MapsVariant & map)
void HashJoin::dataMapInit(MapsVariant & map, size_t reserve_num)
{

if (kind == JoinKind::Cross)
return;
joinDispatchInit(kind, strictness, map);
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { map_.create(data->type); });
if (reserve_num)
joinDispatch(kind, strictness, map, [&](auto, auto, auto & map_) { map_.reserve(data->type, reserve_num); });
}

bool HashJoin::empty() const
Expand Down
30 changes: 28 additions & 2 deletions src/Interpreters/HashJoin.h
Expand Up @@ -146,7 +146,7 @@ class JoinUsedFlags
class HashJoin : public IJoin
{
public:
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false);
HashJoin(std::shared_ptr<TableJoin> table_join_, const Block & right_sample_block, bool any_take_last_row_ = false, size_t reserve_num = 0);

~HashJoin() override;

Expand Down Expand Up @@ -217,6 +217,16 @@ class HashJoin : public IJoin
M(keys256) \
M(hashed)

/// Only for maps using hash table.
#define APPLY_FOR_HASH_JOIN_VARIANTS(M) \
M(key32) \
M(key64) \
M(key_string) \
M(key_fixed_string) \
M(keys128) \
M(keys256) \
M(hashed)


/// Used for reading from StorageJoin and applying joinGet function
#define APPLY_FOR_JOIN_VARIANTS_LIMITED(M) \
Expand Down Expand Up @@ -266,6 +276,22 @@ class HashJoin : public IJoin
}
}

void reserve(Type which, size_t num)
{
switch (which)
{
case Type::EMPTY: break;
case Type::CROSS: break;
case Type::key8: break;
case Type::key16: break;

#define M(NAME) \
case Type::NAME: NAME->reserve(num); break;
APPLY_FOR_HASH_JOIN_VARIANTS(M)
#undef M
}
}

size_t getTotalRowCount(Type which) const
{
switch (which)
Expand Down Expand Up @@ -409,7 +435,7 @@ class HashJoin : public IJoin
/// If set HashJoin instance is not available for modification (addJoinedBlock)
TableLockHolder storage_join_lock = nullptr;

void dataMapInit(MapsVariant &);
void dataMapInit(MapsVariant &, size_t);

void initRightBlockStructure(Block & saved_block_sample);

Expand Down