Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,12 @@ class DistinctStreamingAggOperatorX final
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution(state);
if (state->enable_distinct_streaming_agg_force_passthrough()) {
return {ExchangeType::PASSTHROUGH};
} else {
return StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution(
state);
}
}

bool is_colocated_operator() const override { return _is_colocate; }
Expand Down
24 changes: 16 additions & 8 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,28 @@ class HashJoinProbeOperatorX MOCK_REMOVE(final)
bool* eos) const override;

bool need_more_input_data(RuntimeState* state) const override;
DataDistribution required_data_distribution(RuntimeState* /*state*/) const override {
DataDistribution required_data_distribution(RuntimeState* state) const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return {ExchangeType::NOOP};
} else if (_is_broadcast_join) {
if (state->enable_broadcast_join_force_passthrough()) {
return DataDistribution(ExchangeType::PASSTHROUGH);
} else {
return _child && _child->is_serial_operator()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
}
}
return _is_broadcast_join
? DataDistribution(ExchangeType::PASSTHROUGH)
: (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE,
_partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));

return (_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
_join_distribution == TJoinDistributionType::COLOCATE
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
}
bool is_broadcast_join() const { return _is_broadcast_join; }

bool is_hash_join_probe() const override { return true; }

bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED ||
_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class OperatorBase {
return Status::OK();
}

virtual bool is_hash_join_probe() const { return false; }

/**
* Pipeline task is blockable means it will be blocked in the next run. So we should put the
* pipeline task into the blocking task scheduler.
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/streaming_aggregation_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ class StreamingAggOperatorX MOCK_REMOVE(final) : public StatefulOperatorX<Stream
_spill_streaming_agg_mem_limit = 1024 * 1024;
}
DataDistribution required_data_distribution(RuntimeState* state) const override {
if (_child && _child->is_hash_join_probe() &&
state->enable_streaming_agg_hash_join_force_passthrough()) {
return DataDistribution(ExchangeType::PASSTHROUGH);
}
if (!state->get_query_ctx()->should_be_shuffled_agg(
StatefulOperatorX<StreamingAggLocalState>::node_id())) {
return StatefulOperatorX<StreamingAggLocalState>::required_data_distribution(state);
Expand Down
15 changes: 15 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,21 @@ class RuntimeState {
return _query_options.__isset.enable_profile && _query_options.enable_profile;
}

bool enable_streaming_agg_hash_join_force_passthrough() const {
return _query_options.__isset.enable_streaming_agg_hash_join_force_passthrough &&
_query_options.enable_streaming_agg_hash_join_force_passthrough;
}

bool enable_distinct_streaming_agg_force_passthrough() const {
return _query_options.__isset.enable_distinct_streaming_agg_force_passthrough &&
_query_options.enable_distinct_streaming_agg_force_passthrough;
}

bool enable_broadcast_join_force_passthrough() const {
return _query_options.__isset.enable_broadcast_join_force_passthrough &&
_query_options.enable_broadcast_join_force_passthrough;
}

int rpc_verbose_profile_max_instance_count() const {
return _query_options.__isset.rpc_verbose_profile_max_instance_count
? _query_options.rpc_verbose_profile_max_instance_count
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ public class SessionVariable implements Serializable, Writable {
public static final String BROKER_LOAD_BATCH_SIZE = "broker_load_batch_size";
public static final String DISABLE_STREAMING_PREAGGREGATIONS = "disable_streaming_preaggregations";
public static final String ENABLE_DISTINCT_STREAMING_AGGREGATION = "enable_distinct_streaming_aggregation";
public static final String ENABLE_STREAMING_AGG_HASH_JOIN_FORCE_PASSTHROUGH =
"enable_streaming_agg_hash_join_force_passthrough";
public static final String ENABLE_DISTINCT_STREAMING_AGG_FORCE_PASSTHROUGH =
"enable_distinct_streaming_agg_force_passthrough";
public static final String ENABLE_BROADCAST_JOIN_FORCE_PASSTHROUGH = "enable_broadcast_join_force_passthrough";
public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan";
public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join";
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
Expand Down Expand Up @@ -1228,6 +1233,15 @@ public void checkQuerySlotCount(String slotCnt) {
@VariableMgr.VarAttr(name = ENABLE_DISTINCT_STREAMING_AGGREGATION, fuzzy = true)
public boolean enableDistinctStreamingAggregation = true;

@VariableMgr.VarAttr(name = ENABLE_STREAMING_AGG_HASH_JOIN_FORCE_PASSTHROUGH, fuzzy = true)
public boolean enableStreamingAggHashJoinForcePassthrough = true;

@VariableMgr.VarAttr(name = ENABLE_DISTINCT_STREAMING_AGG_FORCE_PASSTHROUGH, fuzzy = true)
public boolean enableDistinctStreamingAggForcePassthrough = true;

@VariableMgr.VarAttr(name = ENABLE_BROADCAST_JOIN_FORCE_PASSTHROUGH, fuzzy = true)
public boolean enableBroadcastJoinForcePassthrough = false;

@VariableMgr.VarAttr(name = DISABLE_COLOCATE_PLAN)
public boolean disableColocatePlan = false;

Expand Down Expand Up @@ -3294,6 +3308,9 @@ public void initFuzzyModeVariables() {
this.useSerialExchange = random.nextBoolean();
this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
this.disableStreamPreaggregations = random.nextBoolean();
this.enableStreamingAggHashJoinForcePassthrough = random.nextBoolean();
this.enableDistinctStreamingAggForcePassthrough = random.nextBoolean();
this.enableBroadcastJoinForcePassthrough = random.nextBoolean();
this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
this.shortCircuitEvaluation = random.nextBoolean();

Expand Down Expand Up @@ -4951,6 +4968,9 @@ public TQueryOptions toThrift() {
tResult.setBatchSize(batchSize);
tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);
tResult.setEnableDistinctStreamingAggregation(enableDistinctStreamingAggregation);
tResult.setEnableStreamingAggHashJoinForcePassthrough(enableStreamingAggHashJoinForcePassthrough);
tResult.setEnableDistinctStreamingAggForcePassthrough(enableDistinctStreamingAggForcePassthrough);
tResult.setEnableBroadcastJoinForcePassthrough(enableBroadcastJoinForcePassthrough);
tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions);
tResult.setPartitionTopnPrePartitionRows(partitionTopNPerPartitionRows);

Expand Down
6 changes: 6 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ struct TQueryOptions {

179: optional bool enable_parquet_filter_by_bloom_filter = true;

186: optional bool enable_streaming_agg_hash_join_force_passthrough;

187: optional bool enable_distinct_streaming_agg_force_passthrough;

188: optional bool enable_broadcast_join_force_passthrough;

195: optional bool enable_left_semi_direct_return_opt;

// For cloud, to control if the content would be written into file cache
Expand Down
Loading