Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "common/compiler_util.h" // IWYU pragma: keep
#include "exec/operator/streaming_agg_min_reduction.h"
#include "exprs/vectorized_agg_fn.h"
#include "util/cpu_info.h"

namespace doris {
class ExecNode;
Expand Down
6 changes: 1 addition & 5 deletions be/src/exec/operator/streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@

#include "common/cast_set.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "exec/operator/operator.h"
#include "exec/operator/streaming_agg_min_reduction.h"
#include "exprs/aggregate/aggregate_function_simple_factory.h"
#include "exprs/vectorized_agg_fn.h"
#include "exprs/vslot_ref.h"
#include "util/cpu_info.h"

namespace doris {
#include "common/compile_check_begin.h"
class RuntimeState;
} // namespace doris

namespace doris {

StreamingAggLocalState::StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent)
: Base(state, parent),
Expand Down
50 changes: 50 additions & 0 deletions be/src/util/cpu_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,56 @@ class CpuInfo {

static std::string debug_string();

static long get_cache_size(CacheLevel level) {
long cache_sizes[NUM_CACHE_LEVELS];
long cache_line_sizes[NUM_CACHE_LEVELS];
_get_cache_info(cache_sizes, cache_line_sizes);
return cache_sizes[level];
}

static long get_cache_line_size(CacheLevel level) {
long cache_sizes[NUM_CACHE_LEVELS];
long cache_line_sizes[NUM_CACHE_LEVELS];
_get_cache_info(cache_sizes, cache_line_sizes);
return cache_line_sizes[level];
}
Comment on lines +151 to +163

struct StreamingHtMinReductionEntry {
long min_ht_mem;
double streaming_ht_min_reduction;
};

static const std::vector<StreamingHtMinReductionEntry>& get_streaming_ht_min_reduction() {
static std::vector<StreamingHtMinReductionEntry> entries;
Comment on lines +165 to +171
static bool initialized = false;

if (!initialized) {
long l2_cache_size = CpuInfo::get_cache_size(CpuInfo::L2_CACHE);
long l3_cache_size = CpuInfo::get_cache_size(CpuInfo::L3_CACHE);

entries.push_back({.min_ht_mem = 0, .streaming_ht_min_reduction = 0.0});

if (l2_cache_size > 256 * 1024) {
entries.push_back(
{.min_ht_mem = l2_cache_size / 4, .streaming_ht_min_reduction = 1.1});
} else {
entries.push_back({.min_ht_mem = 256 * 1024, .streaming_ht_min_reduction = 1.1});
}

if (l3_cache_size > 4 * 1024 * 1024) {
entries.push_back(
{.min_ht_mem = l3_cache_size / 2, .streaming_ht_min_reduction = 2.0});
} else {
entries.push_back(
{.min_ht_mem = 16 * 1024 * 1024, .streaming_ht_min_reduction = 2.0});
}

initialized = true;
}
Comment on lines +171 to +196

return entries;
}

/// A utility class for temporarily disabling CPU features. Usage:
///
/// {
Expand Down
Loading