Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ const RowDescriptor& OperatorBase::row_desc() {

std::string OperatorBase::debug_string() const {
std::stringstream ss;
ss << _operator_builder->get_name() << ", source: " << is_source();
ss << ", sink: " << is_sink() << ", is closed: " << _is_closed;
ss << _operator_builder->get_name() << ", is source: " << is_source();
ss << ", is sink: " << is_sink() << ", is closed: " << _is_closed;
ss << ", is pending finish: " << is_pending_finish();
return ss.str();
}
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ class OperatorBase {
*/
virtual bool is_pending_finish() const { return false; }

virtual Status try_close() { return Status::OK(); }

bool is_closed() const { return _is_closed; }

MemTracker* mem_tracker() const { return _mem_tracker.get(); }
Expand All @@ -225,7 +227,7 @@ class OperatorBase {
const RowDescriptor& row_desc();

RuntimeProfile* runtime_profile() { return _runtime_profile.get(); }
std::string debug_string() const;
virtual std::string debug_string() const;
int32_t id() const { return _operator_builder->id(); }

protected:
Expand Down
16 changes: 16 additions & 0 deletions be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,24 @@ bool ScanOperator::is_pending_finish() const {
return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule();
}

Status ScanOperator::try_close() {
return _node->try_close();
}

bool ScanOperator::runtime_filters_are_ready_or_timeout() {
return _node->runtime_filters_are_ready_or_timeout();
}

std::string ScanOperator::debug_string() const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ",
SourceOperator::debug_string(), _node->_scanner_ctx == nullptr);
if (_node->_scanner_ctx) {
fmt::format_to(debug_string_buffer, ", num_running_scanners = {}, num_scheduling_ctx = {} ",
_node->_scanner_ctx->get_num_running_scanners(),
_node->_scanner_ctx->get_num_scheduling_ctx());
}
return fmt::to_string(debug_string_buffer);
}

} // namespace doris::pipeline
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class ScanOperator : public SourceOperator<ScanOperatorBuilder> {
bool is_pending_finish() const override;

bool runtime_filters_are_ready_or_timeout() override;

std::string debug_string() const override;

Status try_close() override;
};

} // namespace doris::pipeline
4 changes: 4 additions & 0 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ Status PipelineTask::finalize() {
return _sink->finalize(_state);
}

Status PipelineTask::try_close() {
return _source->try_close();
}

Status PipelineTask::close() {
int64_t close_ns = 0;
Status s;
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class PipelineTask {

Status execute(bool* eos);

// Try to close this pipeline task. If there are still some resources need to be released after `try_close`,
// this task will enter the `PENDING_FINISH` state.
Status try_close();
// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
Status close();
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ void TaskScheduler::_do_work(size_t index) {

void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state) {
// state only should be CANCELED or FINISHED
task->try_close();
if (task->is_pending_finish()) {
task->set_state(PENDING_FINISH);
_blocked_task_scheduler->add_blocked_task(task);
Expand Down
4 changes: 4 additions & 0 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ class ScannerContext {
_ctx_finish_cv.notify_one();
}

const int get_num_running_scanners() const { return _num_running_scanners; }

const int get_num_scheduling_ctx() const { return _num_scheduling_ctx; }

void get_next_batch_of_scanners(std::list<VScanner*>* current_run);

void clear_and_join();
Expand Down
9 changes: 9 additions & 0 deletions be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,15 @@ void VScanNode::release_resource(RuntimeState* state) {
ExecNode::release_resource(state);
}

Status VScanNode::try_close() {
if (_scanner_ctx.get()) {
// mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore
// TODO: there is a lock in `set_should_stop` may cause some slight impact
_scanner_ctx->set_should_stop();
}
return Status::OK();
}

Status VScanNode::_normalize_conjuncts() {
// The conjuncts is always on output tuple, so use _output_tuple_desc;
std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots();
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/exec/scan/vscan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class VScanNode : public ExecNode {
void release_resource(RuntimeState* state) override;
bool runtime_filters_are_ready_or_timeout();

Status try_close();

enum class PushDownType {
// The predicate can not be pushed down to data source
UNACCEPTABLE,
Expand Down