Skip to content

Commit

Permalink
hash join use memory tracker delta to decide on shrink to fit
Browse files Browse the repository at this point in the history
  • Loading branch information
vdimir committed Sep 14, 2023
1 parent b11063f commit 0d51fc7
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 11 deletions.
64 changes: 53 additions & 11 deletions src/Interpreters/HashJoin.cpp
Expand Up @@ -59,6 +59,16 @@ struct NotProcessedCrossJoin : public ExtraBlock
size_t right_block;
};


Int64 getCurrentQueryMemoryUsage()
{
/// Use query-level memory tracker
if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())
if (auto * memory_tracker = memory_tracker_child->getParent())
return memory_tracker->get();
return 0;
}

}

namespace JoinStuff
Expand Down Expand Up @@ -738,6 +748,13 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
if (unlikely(source_block_.rows() > std::numeric_limits<RowRef::SizeT>::max()))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Too many rows in right table block for HashJoin: {}", source_block_.rows());

/** We do not allocate memory for stored blocks inside HashJoin, only for hash table.
* In case when we have all the blocks allocated before the first `addBlockToJoin` call, will already be quite high.
* In that case memory consumed by stored blocks will be underestimated.
*/
if (!memory_usage_before_adding_blocks)
memory_usage_before_adding_blocks = getCurrentQueryMemoryUsage();

Block source_block = source_block_;
if (strictness == JoinStrictness::Asof)
{
Expand Down Expand Up @@ -882,12 +899,35 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
}
}

auto max_total_bytes = table_join->sizeLimits().max_bytes;
if (!shrink_blocks && max_total_bytes && total_bytes > max_total_bytes / 2)
shrinkStoredBlocksToFit(total_bytes);


return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}

void HashJoin::shrinkStoredBlocksToFit(size_t & total_bytes_in_join)
{
if (shrink_blocks)
return; /// Already shrunk

Int64 current_memory_usage = getCurrentQueryMemoryUsage();
Int64 query_memory_usage_delta = current_memory_usage - memory_usage_before_adding_blocks;
Int64 max_total_bytes_for_query = memory_usage_before_adding_blocks ? table_join->getMaxMemoryUsage() : 0;

auto max_total_bytes_in_join = table_join->sizeLimits().max_bytes;

/** If accounted data size is more than half of `max_bytes_in_join`
* or query memory consumption growth from the beginning of adding blocks (estimation of memory consumed by join using memory tracker)
* is bigger than half of all memory available for query,
* then shrink stored blocks to fit.
*/
if ((max_total_bytes_in_join && total_bytes_in_join > max_total_bytes_in_join / 2) ||
(max_total_bytes_for_query && query_memory_usage_delta > max_total_bytes_for_query / 2))
{
shrink_blocks = true;
LOG_DEBUG(log, "Shrinking stored blocks table after {} of {} consumed",
ReadableSize(total_bytes), ReadableSize(max_total_bytes));
LOG_DEBUG(log, "Shrinking stored blocks, memory consumption is {} / {} caclulated by join, {} / {} by memory tracker",
ReadableSize(total_bytes_in_join), ReadableSize(max_total_bytes_in_join),
ReadableSize(query_memory_usage_delta), ReadableSize(max_total_bytes_for_query));
for (auto & stored_block : data->blocks)
{
size_t old_size = stored_block.allocatedBytes();
Expand All @@ -896,15 +936,17 @@ bool HashJoin::addBlockToJoin(const Block & source_block_, bool check_limits)
chassert(old_size >= new_size);
data->blocks_allocated_size -= old_size - new_size;
}
auto new_total_bytes = getTotalByteCount();
size_t total_bytes_delta = total_bytes - new_total_bytes;
chassert(new_total_bytes <= total_bytes);
LOG_DEBUG(log, "Shrunk stored blocks {} freed, new memory consumption is {}",
ReadableSize(total_bytes_delta), ReadableSize(new_total_bytes));
total_bytes = new_total_bytes;
auto new_total_bytes_in_join = getTotalByteCount();
Int64 new_current_memory_usage = getCurrentQueryMemoryUsage();

size_t total_bytes_delta = total_bytes_in_join - new_total_bytes_in_join;
chassert(new_total_bytes_in_join <= total_bytes_in_join);
LOG_DEBUG(log, "Shrunk stored blocks {} freed ({} by memory tracker), new memory consumption is {} ({} by memory tracker)",
ReadableSize(total_bytes_delta), ReadableSize(new_current_memory_usage - current_memory_usage),
ReadableSize(new_total_bytes_in_join), ReadableSize(new_current_memory_usage));
total_bytes_in_join = new_total_bytes_in_join;
}

return table_join->sizeLimits().check(total_rows, total_bytes, "JOIN", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED);
}


Expand Down
3 changes: 3 additions & 0 deletions src/Interpreters/HashJoin.h
Expand Up @@ -393,6 +393,8 @@ class HashJoin : public IJoin

void debugKeys() const;

void shrinkStoredBlocksToFit(size_t & total_bytes_in_join);

private:
template<bool> friend class NotJoinedHash;

Expand Down Expand Up @@ -432,6 +434,7 @@ class HashJoin : public IJoin

/// When tracked memory consumption is more than a threshold, we will shrink to fit stored blocks.
bool shrink_blocks = false;
Int64 memory_usage_before_adding_blocks = 0;

Poco::Logger * log;

Expand Down
7 changes: 7 additions & 0 deletions src/Interpreters/TableJoin.cpp
Expand Up @@ -112,6 +112,7 @@ TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
, partial_merge_join_left_table_buffer_bytes(settings.partial_merge_join_left_table_buffer_bytes)
, max_files_to_merge(settings.join_on_disk_max_files_to_merge)
, temporary_files_codec(settings.temporary_files_codec)
, max_memory_usage(settings.max_memory_usage)
, tmp_volume(tmp_volume_)
{
}
Expand Down Expand Up @@ -953,4 +954,10 @@ ActionsDAGPtr TableJoin::createJoinedBlockActions(ContextPtr context) const
return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false);
}

size_t TableJoin::getMaxMemoryUsage() const
{
return max_memory_usage;
}


}
5 changes: 5 additions & 0 deletions src/Interpreters/TableJoin.h
Expand Up @@ -146,6 +146,9 @@ class TableJoin
const size_t max_files_to_merge = 0;
const String temporary_files_codec = "LZ4";

/// Value if setting max_memory_usage for query, can be used when max_bytes_in_join is not specified.
size_t max_memory_usage = 0;

ASTs key_asts_left;
ASTs key_asts_right;

Expand Down Expand Up @@ -244,6 +247,8 @@ class TableJoin
JoinStrictness strictness() const { return table_join.strictness; }
bool sameStrictnessAndKind(JoinStrictness, JoinKind) const;
const SizeLimits & sizeLimits() const { return size_limits; }
size_t getMaxMemoryUsage() const;

VolumePtr getGlobalTemporaryVolume() { return tmp_volume; }

ActionsDAGPtr createJoinedBlockActions(ContextPtr context) const;
Expand Down

0 comments on commit 0d51fc7

Please sign in to comment.