Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
public:
Pipeline() = delete;
explicit Pipeline(PipelineId pipeline_id, std::weak_ptr<PipelineFragmentContext> context)
: _complete_dependency(0), _pipeline_id(pipeline_id), _context(context) {
: _complete_dependency(0),
_pipeline_id(pipeline_id),
_context(context),
_can_steal(true) {
_init_profile();
}

Expand All @@ -48,9 +51,13 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {

// If all dependencies are finished, this pipeline task should be scheduled.
// e.g. Hash join probe task will be scheduled once Hash join build task is finished.
bool finish_one_dependency() {
bool finish_one_dependency(int dependency_core_id) {
DCHECK(_complete_dependency < _dependencies.size());
return _complete_dependency.fetch_add(1) == _dependencies.size() - 1;
bool finish = _complete_dependency.fetch_add(1) == _dependencies.size() - 1;
if (finish) {
_previous_schedule_id = dependency_core_id;
}
return finish;
}

bool has_dependency() { return _complete_dependency.load() < _dependencies.size(); }
Expand All @@ -65,6 +72,10 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {

RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }

bool can_steal() const { return _can_steal; }

void disable_task_steal() { _can_steal = false; }

private:
void _init_profile();
std::atomic<uint32_t> _complete_dependency;
Expand All @@ -77,6 +88,8 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {

PipelineId _pipeline_id;
std::weak_ptr<PipelineFragmentContext> _context;
bool _can_steal;
int _previous_schedule_id = -1;

std::unique_ptr<RuntimeProfile> _pipeline_profile;
};
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
OperatorBuilderPtr join_sink =
std::make_shared<HashJoinBuildSinkBuilder>(next_operator_builder_id(), join_node);
RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
new_pipe->disable_task_steal();

RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr join_source = std::make_shared<HashJoinProbeOperatorBuilder>(
Expand Down
11 changes: 9 additions & 2 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class PipelineTask {
_sink(sink),
_prepared(false),
_opened(false),
_can_steal(pipeline->_can_steal),
_state(state),
_cur_state(NOT_READY),
_data_state(SourceState::DEPEND_ON_SOURCE),
Expand Down Expand Up @@ -136,19 +137,24 @@ class PipelineTask {

bool sink_can_write() { return _sink->can_write(); }

bool can_steal() const { return _can_steal; }

Status finalize();

void finish_p_dependency() {
for (const auto& p : _pipeline->_parents) {
p->finish_one_dependency();
p->finish_one_dependency(_previous_schedule_id);
}
}

PipelineFragmentContext* fragment_context() { return _fragment_context; }

QueryFragmentsCtx* query_fragments_context();

int get_previous_core_id() const { return _previous_schedule_id; }
int get_previous_core_id() const {
return _previous_schedule_id != -1 ? _previous_schedule_id
: _pipeline->_previous_schedule_id;
}

void set_previous_core_id(int id) { _previous_schedule_id = id; }

Expand Down Expand Up @@ -180,6 +186,7 @@ class PipelineTask {

bool _prepared;
bool _opened;
bool _can_steal;
RuntimeState* _state;
int _previous_schedule_id = -1;
uint32_t _schedule_time = 0;
Expand Down
23 changes: 13 additions & 10 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
namespace doris {
namespace pipeline {

PipelineTask* SubWorkTaskQueue::try_take() {
PipelineTask* SubWorkTaskQueue::try_take(bool is_steal) {
if (_queue.empty()) {
return nullptr;
}
++_schedule_time;
auto task = _queue.front();
if (!task->can_steal() && is_steal) {
return nullptr;
}
++_schedule_time;
_queue.pop();
return task;
}
Expand All @@ -52,7 +55,7 @@ void WorkTaskQueue::close() {
_wait_task.notify_all();
}

PipelineTask* WorkTaskQueue::try_take_unprotected() {
PipelineTask* WorkTaskQueue::try_take_unprotected(bool is_steal) {
if (_total_task_size == 0 || _closed) {
return nullptr;
}
Expand All @@ -76,7 +79,7 @@ PipelineTask* WorkTaskQueue::try_take_unprotected() {
}
}

auto task = _sub_queues[idx].try_take();
auto task = _sub_queues[idx].try_take(is_steal);
if (task) {
_total_task_size--;
}
Expand All @@ -93,15 +96,15 @@ int WorkTaskQueue::_compute_level(PipelineTask* task) {
return SUB_QUEUE_LEVEL - 1;
}

PipelineTask* WorkTaskQueue::try_take() {
PipelineTask* WorkTaskQueue::try_take(bool is_steal) {
// TODO other efficient lock? e.g. if get lock fail, return null_ptr
std::unique_lock<std::mutex> lock(_work_size_mutex);
return try_take_unprotected();
return try_take_unprotected(is_steal);
}

PipelineTask* WorkTaskQueue::take(uint32_t timeout_ms) {
std::unique_lock<std::mutex> lock(_work_size_mutex);
auto task = try_take_unprotected();
auto task = try_take_unprotected(false);
if (task) {
return task;
} else {
Expand All @@ -110,7 +113,7 @@ PipelineTask* WorkTaskQueue::take(uint32_t timeout_ms) {
} else {
_wait_task.wait(lock);
}
return try_take_unprotected();
return try_take_unprotected(false);
}
}

Expand Down Expand Up @@ -138,7 +141,7 @@ void TaskQueue::close() {
PipelineTask* TaskQueue::try_take(size_t core_id) {
PipelineTask* task;
while (!_closed) {
task = _async_queue[core_id].try_take();
task = _async_queue[core_id].try_take(false);
if (task) {
break;
}
Expand Down Expand Up @@ -166,7 +169,7 @@ PipelineTask* TaskQueue::steal_take(size_t core_id) {
next_id = 0;
}
DCHECK(next_id < _core_size);
auto task = _async_queue[next_id].try_take();
auto task = _async_queue[next_id].try_take(true);
if (task) {
return task;
}
Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SubWorkTaskQueue {
public:
void push_back(PipelineTask* task) { _queue.emplace(task); }

PipelineTask* try_take();
PipelineTask* try_take(bool is_steal);

void set_factor_for_normal(double factor_for_normal) { _factor_for_normal = factor_for_normal; }

Expand All @@ -53,9 +53,9 @@ class WorkTaskQueue {

void close();

PipelineTask* try_take_unprotected();
PipelineTask* try_take_unprotected(bool is_steal);

PipelineTask* try_take();
PipelineTask* try_take(bool is_steal);

PipelineTask* take(uint32_t timeout_ms = 0);

Expand Down