Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement](schema scan) Use async scanner for schema scanners #38403

Merged
merged 5 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@
#include "exec/schema_scanner/schema_workload_groups_scanner.h"
#include "exec/schema_scanner/schema_workload_sched_policy_scanner.h"
#include "olap/hll.h"
#include "pipeline/dependency.h"
#include "runtime/define_primitive_type.h"
#include "runtime/fragment_mgr.h"
#include "runtime/types.h"
#include "util/string_util.h"
#include "util/types.h"
#include "vec/columns/column.h"
Expand All @@ -65,6 +68,7 @@
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
class ObjectPool;
Expand All @@ -85,7 +89,60 @@ Status SchemaScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaScanner::get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
if (_data_block == nullptr) {
return Status::InternalError("No data left!");
}
DCHECK(_async_thread_running == false);
RETURN_IF_ERROR(_scanner_status.status());
for (size_t i = 0; i < block->columns(); i++) {
std::move(*block->get_by_position(i).column)
.mutate()
->insert_range_from(*_data_block->get_by_position(i).column, 0,
_data_block->rows());
}
_data_block->clear_column_data();
*eos = _eos;
if (!*eos) {
RETURN_IF_ERROR(get_next_block_async(state));
}
return Status::OK();
}

Status SchemaScanner::get_next_block_async(RuntimeState* state) {
_dependency->block();
auto task_ctx = state->get_task_execution_context();
RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
[this, task_ctx, state]() {
DCHECK(_async_thread_running == false);
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
_scanner_status.update(Status::InternalError("Task context not exists!"));
return;
}
SCOPED_ATTACH_TASK(state);
_dependency->block();
_async_thread_running = true;
_finish_dependency->block();
if (!_opened) {
_data_block = vectorized::Block::create_unique();
_init_block(_data_block.get());
_scanner_status.update(start(state));
_opened = true;
}
bool eos = false;
_scanner_status.update(get_next_block_internal(_data_block.get(), &eos));
_eos = eos;
_async_thread_running = false;
_dependency->set_ready();
if (eos) {
_finish_dependency->set_ready();
}
}));
return Status::OK();
}

Status SchemaScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("used before initialized.");
}
Expand Down Expand Up @@ -176,6 +233,16 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
}
}

void SchemaScanner::_init_block(vectorized::Block* src_block) {
const std::vector<SchemaScanner::ColumnDesc>& columns_desc(get_column_desc());
for (int i = 0; i < columns_desc.size(); ++i) {
TypeDescriptor descriptor(columns_desc[i].type);
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
src_block->insert(vectorized::ColumnWithTypeAndName(data_type->create_column(), data_type,
columns_desc[i].name));
}
}

Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_t pos,
const std::vector<void*>& datas) {
const ColumnDesc& col_desc = _columns[pos];
Expand Down
24 changes: 23 additions & 1 deletion be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <stddef.h>
#include <stdint.h>

#include <condition_variable>
#include <memory>
#include <string>
#include <vector>
Expand All @@ -43,6 +44,10 @@ namespace vectorized {
class Block;
}

namespace pipeline {
class Dependency;
}

struct SchemaScannerCommonParam {
SchemaScannerCommonParam()
: db(nullptr),
Expand Down Expand Up @@ -94,15 +99,23 @@ class SchemaScanner {

// init object need information, schema etc.
virtual Status init(SchemaScannerParam* param, ObjectPool* pool);
Status get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos);
// Start to work
virtual Status start(RuntimeState* state);
virtual Status get_next_block(vectorized::Block* block, bool* eos);
virtual Status get_next_block_internal(vectorized::Block* block, bool* eos);
const std::vector<ColumnDesc>& get_column_desc() const { return _columns; }
// factory function
static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
TSchemaTableType::type type() const { return _schema_table_type; }
void set_dependency(std::shared_ptr<pipeline::Dependency> dep,
std::shared_ptr<pipeline::Dependency> fin_dep) {
_dependency = dep;
_finish_dependency = fin_dep;
}
Status get_next_block_async(RuntimeState* state);

protected:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema scanner
FeBasedSchemaScanner
vector _block;
bool has_started = false
dependency dep;
get_block_async() {
thread_pool.submit() {
if (!has_started) {
start();
}
get_block_internal();
queue.push_block();
}
}

    get_block(Block* block) {
        block = _block;
    }

void _init_block(vectorized::Block* src_block);
Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
const std::vector<void*>& datas);

Expand All @@ -125,6 +138,15 @@ class SchemaScanner {
RuntimeProfile::Counter* _get_table_timer = nullptr;
RuntimeProfile::Counter* _get_describe_timer = nullptr;
RuntimeProfile::Counter* _fill_block_timer = nullptr;

std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;

std::unique_ptr<vectorized::Block> _data_block;
AtomicStatus _scanner_status;
std::atomic<bool> _eos = false;
std::atomic<bool> _opened = false;
std::atomic<bool> _async_thread_running = false;
};

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
return Status::OK();
}

Status SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaActiveQueriesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_active_queries_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaActiveQueriesScanner : public SchemaScanner {
~SchemaActiveQueriesScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;

Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ Status SchemaBackendActiveTasksScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Block* block,
bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaBackendActiveTasksScanner : public SchemaScanner {
~SchemaBackendActiveTasksScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_charsets_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ SchemaCharsetsScanner::SchemaCharsetsScanner()

SchemaCharsetsScanner::~SchemaCharsetsScanner() {}

Status SchemaCharsetsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaCharsetsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("call this before initial.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_charsets_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaCharsetsScanner : public SchemaScanner {
SchemaCharsetsScanner();
~SchemaCharsetsScanner() override;

Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

private:
struct CharsetStruct {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_collations_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ SchemaCollationsScanner::SchemaCollationsScanner()

SchemaCollationsScanner::~SchemaCollationsScanner() {}

Status SchemaCollationsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaCollationsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("call this before initial.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_collations_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaCollationsScanner : public SchemaScanner {
SchemaCollationsScanner();
~SchemaCollationsScanner() override;

Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

private:
struct CollationStruct {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_columns_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ Status SchemaColumnsScanner::_get_new_table() {
return Status::OK();
}

Status SchemaColumnsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaColumnsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("use this class before inited.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_columns_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SchemaColumnsScanner : public SchemaScanner {
SchemaColumnsScanner();
~SchemaColumnsScanner() override;
Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

private:
Status _get_new_table();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_dummy_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Status SchemaDummyScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaDummyScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaDummyScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
*eos = true;
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_dummy_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SchemaDummyScanner : public SchemaScanner {
SchemaDummyScanner();
~SchemaDummyScanner() override;
Status start(RuntimeState* state = nullptr) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
};

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_files_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ Status SchemaFilesScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaFilesScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaFilesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_files_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SchemaFilesScanner : public SchemaScanner {
~SchemaFilesScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

int _db_index;
int _table_index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ Status SchemaMetadataNameIdsScanner::_fill_block_impl(vectorized::Block* block)
return Status::OK();
}

Status SchemaMetadataNameIdsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaMetadataNameIdsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SchemaMetadataNameIdsScanner : public SchemaScanner {
~SchemaMetadataNameIdsScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

private:
Status _get_new_table();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_partitions_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaPartitionsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaPartitionsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_partitions_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SchemaPartitionsScanner : public SchemaScanner {
~SchemaPartitionsScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

int _db_index;
int _table_index;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_processlist_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Status SchemaProcessListScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaProcessListScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaProcessListScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("call this before initial.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_processlist_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SchemaProcessListScanner : public SchemaScanner {
~SchemaProcessListScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_processlist_columns;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_profiling_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Status SchemaProfilingScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaProfilingScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaProfilingScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_profiling_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SchemaProfilingScanner : public SchemaScanner {
~SchemaProfilingScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_routine_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ Status SchemaRoutinesScanner::get_block_from_fe() {
return Status::OK();
}

Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaRoutinesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_routine_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaRoutinesScanner : public SchemaScanner {
~SchemaRoutinesScanner() override = default;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_rowsets_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Status SchemaRowsetsScanner::_get_all_rowsets() {
return Status::OK();
}

Status SchemaRowsetsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaRowsetsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_rowsets_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SchemaRowsetsScanner : public SchemaScanner {
~SchemaRowsetsScanner() override = default;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

private:
Status _get_all_rowsets();
Expand Down
Loading
Loading