Skip to content

Commit

Permalink
[Improvement](schema scan) Use async scanner for schema scanners
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Jul 26, 2024
1 parent 00fca3e commit b51c223
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 81 deletions.
15 changes: 15 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@
#include "exec/schema_scanner/schema_workload_groups_scanner.h"
#include "exec/schema_scanner/schema_workload_sched_policy_scanner.h"
#include "olap/hll.h"
#include "pipeline/dependency.h"
#include "runtime/define_primitive_type.h"
#include "runtime/fragment_mgr.h"
#include "util/string_util.h"
#include "util/types.h"
#include "vec/columns/column.h"
Expand Down Expand Up @@ -82,6 +84,19 @@ Status SchemaScanner::start(RuntimeState* state) {
return Status::InternalError("call Start before Init.");
}

if (_is_async_scanner()) {
_finish_dependency->block();
_dependency->block();
auto task_ctx = state->get_task_execution_context();
RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
[this, task_ctx]() {
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
return;
}
_get_next_block_internal();
}));
}
return Status::OK();
}

Expand Down
23 changes: 23 additions & 0 deletions be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <stddef.h>
#include <stdint.h>

#include <condition_variable>
#include <memory>
#include <string>
#include <vector>
Expand All @@ -43,6 +44,10 @@ namespace vectorized {
class Block;
}

namespace pipeline {
class Dependency;
}

struct SchemaScannerCommonParam {
SchemaScannerCommonParam()
: db(nullptr),
Expand Down Expand Up @@ -103,6 +108,14 @@ class SchemaScanner {
TSchemaTableType::type type() const { return _schema_table_type; }

protected:
// For async scanner (which has blocking operation (e.g. RPC), this function should return true
// and all blocking operations should be done in `_get_next_block_internal`)
virtual bool _is_async_scanner() { return false; }
bool _data_queue_is_full() { return _data_queue.size() > QUEUE_SIZE; }
// TODO:
virtual void _get_next_block_internal() {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Not Implement");
}
Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
const std::vector<void*>& datas);

Expand All @@ -125,6 +138,16 @@ class SchemaScanner {
RuntimeProfile::Counter* _get_table_timer = nullptr;
RuntimeProfile::Counter* _get_describe_timer = nullptr;
RuntimeProfile::Counter* _fill_block_timer = nullptr;

pipeline::Dependency* _dependency = nullptr;
pipeline::Dependency* _finish_dependency = nullptr;

static constexpr auto QUEUE_SIZE = 3;
std::mutex _m;
std::condition_variable _cv;
std::deque<std::unique_ptr<vectorized::Block>> _data_queue;
Status _scanner_status;
bool _eos = false;
};

} // namespace doris
73 changes: 41 additions & 32 deletions be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "exec/schema_scanner/schema_active_queries_scanner.h"

#include "pipeline/dependency.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -47,10 +48,14 @@ SchemaActiveQueriesScanner::~SchemaActiveQueriesScanner() {}
Status SchemaActiveQueriesScanner::start(RuntimeState* state) {
_block_rows_limit = state->batch_size();
_rpc_timeout = state->execution_timeout() * 1000;
return Status::OK();
return SchemaScanner::start(state);
}

Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
void SchemaActiveQueriesScanner::_get_next_block_internal() {
Defer defer {[&]() {
_dependency->set_ready();
_finish_dependency->set_ready();
}};
TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address;

TSchemaTableRequestParams schema_table_params;
Expand All @@ -67,34 +72,43 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {

TFetchSchemaTableDataResult result;

RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
_scanner_status = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->fetchSchemaTableData(result, request);
},
_rpc_timeout));
_rpc_timeout);
if (!_scanner_status.ok()) {
return;
}

Status status(Status::create(result.status));
if (!status.ok()) {
LOG(WARNING) << "fetch active queries from FE failed, errmsg=" << status;
return status;
_scanner_status = status;
return;
}
std::vector<TRow> result_data = result.data_batch;

_active_query_block = vectorized::Block::create_unique();
std::unique_lock l(_m);
_data_queue.emplace_back(vectorized::Block::create_unique());
DCHECK(_data_queue.size() == 1);
for (int i = 0; i < _s_tbls_columns.size(); ++i) {
TypeDescriptor descriptor(_s_tbls_columns[i].type);
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
_active_query_block->insert(vectorized::ColumnWithTypeAndName(
_data_queue[0]->insert(vectorized::ColumnWithTypeAndName(
data_type->create_column(), data_type, _s_tbls_columns[i].name));
}

_active_query_block->reserve(_block_rows_limit);
_data_queue[0]->reserve(_block_rows_limit);

_total_rows = _data_queue[0]->rows();
if (result_data.size() > 0) {
int col_size = result_data[0].column_value.size();
if (col_size != _s_tbls_columns.size()) {
return Status::InternalError<false>("active queries schema is not match for FE and BE");
_scanner_status = Status::InternalError<false>(
"active queries schema is not match for FE and BE");
return;
}
}

Expand Down Expand Up @@ -123,32 +137,24 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
for (int i = 0; i < result_data.size(); i++) {
TRow row = result_data[i];

insert_string_value(0, row.column_value[0].stringVal, _active_query_block.get());
insert_string_value(1, row.column_value[1].stringVal, _active_query_block.get());
insert_int_value(2, row.column_value[2].longVal, _active_query_block.get());
insert_int_value(3, row.column_value[3].longVal, _active_query_block.get());
insert_string_value(4, row.column_value[4].stringVal, _active_query_block.get());
insert_string_value(5, row.column_value[5].stringVal, _active_query_block.get());
insert_string_value(6, row.column_value[6].stringVal, _active_query_block.get());
insert_string_value(7, row.column_value[7].stringVal, _active_query_block.get());
insert_string_value(8, row.column_value[8].stringVal, _active_query_block.get());
insert_string_value(9, row.column_value[9].stringVal, _active_query_block.get());
insert_string_value(0, row.column_value[0].stringVal, _data_queue[0].get());
insert_string_value(1, row.column_value[1].stringVal, _data_queue[0].get());
insert_int_value(2, row.column_value[2].longVal, _data_queue[0].get());
insert_int_value(3, row.column_value[3].longVal, _data_queue[0].get());
insert_string_value(4, row.column_value[4].stringVal, _data_queue[0].get());
insert_string_value(5, row.column_value[5].stringVal, _data_queue[0].get());
insert_string_value(6, row.column_value[6].stringVal, _data_queue[0].get());
insert_string_value(7, row.column_value[7].stringVal, _data_queue[0].get());
insert_string_value(8, row.column_value[8].stringVal, _data_queue[0].get());
insert_string_value(9, row.column_value[9].stringVal, _data_queue[0].get());
}
return Status::OK();
}

Status SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}

if (nullptr == block || nullptr == eos) {
return Status::InternalError("input pointer is nullptr.");
}

if (_active_query_block == nullptr) {
RETURN_IF_ERROR(_get_active_queries_block_from_fe());
_total_rows = _active_query_block->rows();
std::unique_lock l(_m);
RETURN_IF_ERROR(_scanner_status);
if (_data_queue.empty()) {
return Status::InternalError("No active queries!");
}

if (_row_idx == _total_rows) {
Expand All @@ -158,10 +164,13 @@ Status SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, bool

int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx);
vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block);
RETURN_IF_ERROR(mblock.add_rows(_active_query_block.get(), _row_idx, current_batch_rows));
RETURN_IF_ERROR(mblock.add_rows(_data_queue[0].get(), _row_idx, current_batch_rows));
_row_idx += current_batch_rows;

*eos = _row_idx == _total_rows;
if (*eos) {
_data_queue.clear();
}
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/schema_scanner/schema_active_queries_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ class SchemaActiveQueriesScanner : public SchemaScanner {
static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;

private:
Status _get_active_queries_block_from_fe();
void _get_next_block_internal() override;
bool _is_async_scanner() override { return true; }

int _block_rows_limit = 4096;
int _row_idx = 0;
int _total_rows = 0;
std::unique_ptr<vectorized::Block> _active_query_block = nullptr;
int _rpc_timeout = 3000;
};
}; // namespace doris
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ SchemaBackendActiveTasksScanner::~SchemaBackendActiveTasksScanner() {}

Status SchemaBackendActiveTasksScanner::start(RuntimeState* state) {
_block_rows_limit = state->batch_size();
return Status::OK();
return SchemaScanner::start(state);
}

Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* block, bool* eos) {
Expand Down
115 changes: 71 additions & 44 deletions be/src/exec/schema_scanner/schema_columns_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <cstdint>

#include "exec/schema_scanner/schema_helper.h"
#include "pipeline/dependency.h"
#include "runtime/define_primitive_type.h"
#include "util/runtime_profile.h"
#include "vec/common/string_ref.h"
Expand Down Expand Up @@ -71,38 +72,68 @@ SchemaColumnsScanner::SchemaColumnsScanner()

SchemaColumnsScanner::~SchemaColumnsScanner() = default;

Status SchemaColumnsScanner::start(RuntimeState* state) {
SCOPED_TIMER(_get_db_timer);
if (!_is_init) {
return Status::InternalError("schema columns scanner not inited.");
}
// get all database
TGetDbsParams db_params;
if (nullptr != _param->common_param->db) {
db_params.__set_pattern(*(_param->common_param->db));
}
if (nullptr != _param->common_param->catalog) {
db_params.__set_catalog(*(_param->common_param->catalog));
}
if (nullptr != _param->common_param->current_user_ident) {
db_params.__set_current_user_ident(*_param->common_param->current_user_ident);
} else {
if (nullptr != _param->common_param->user) {
db_params.__set_user(*(_param->common_param->user));
void SchemaColumnsScanner::_get_next_block_internal() {
Defer defer {[&]() {
_dependency->set_ready();
_finish_dependency->set_ready();
}};
{
SCOPED_TIMER(_get_db_timer);
// get all database
TGetDbsParams db_params;
if (nullptr != _param->common_param->db) {
db_params.__set_pattern(*(_param->common_param->db));
}
if (nullptr != _param->common_param->user_ip) {
db_params.__set_user_ip(*(_param->common_param->user_ip));
if (nullptr != _param->common_param->catalog) {
db_params.__set_catalog(*(_param->common_param->catalog));
}
if (nullptr != _param->common_param->current_user_ident) {
db_params.__set_current_user_ident(*_param->common_param->current_user_ident);
} else {
if (nullptr != _param->common_param->user) {
db_params.__set_user(*(_param->common_param->user));
}
if (nullptr != _param->common_param->user_ip) {
db_params.__set_user_ip(*(_param->common_param->user_ip));
}
}
if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
_scanner_status = SchemaHelper::get_db_names(
*(_param->common_param->ip), _param->common_param->port, db_params, &_db_result);
if (!_scanner_status.ok()) {
return ;
}
} else {
_scanner_status = Status::InternalError("IP or port doesn't exists");
return;
}
}

if (nullptr != _param->common_param->ip && 0 != _param->common_param->port) {
RETURN_IF_ERROR(SchemaHelper::get_db_names(
*(_param->common_param->ip), _param->common_param->port, db_params, &_db_result));
} else {
return Status::InternalError("IP or port doesn't exists");
std::unique_lock l(_m);
_data_queue.emplace_back(vectorized::Block::create_unique());
while (_table_index >= _table_result.tables.size()) {
if (_db_index < _db_result.dbs.size()) {
_scanner_status = _get_new_table();
if (!_scanner_status.ok()) {
return ;
}
_scanner_status = _get_new_desc();
if (!_scanner_status.ok()) {
return ;
}
_scanner_status = _fill_block_impl(_data_queue.back().get());
if (!_scanner_status.ok()) {
return ;
}
_dependency->set_ready();
while (!_eos && _data_queue_is_full() && _scanner_status.ok()) {
_cv.wait_for(l, std::chrono::seconds(1));
}
} else {
_eos = true;
return;
}
}

return Status::OK();
}

//For compatibility with mysql the result of DATA_TYPE in information_schema.columns
Expand Down Expand Up @@ -348,25 +379,21 @@ Status SchemaColumnsScanner::_get_new_table() {
}

Status SchemaColumnsScanner::get_next_block(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("use this class before inited.");
}
if (nullptr == block || nullptr == eos) {
return Status::InternalError("input parameter is nullptr.");
}
std::unique_lock l(_m);
RETURN_IF_ERROR(_scanner_status);
block->swap(*_data_queue.front());
_data_queue.pop_front();

while (_table_index >= _table_result.tables.size()) {
if (_db_index < _db_result.dbs.size()) {
RETURN_IF_ERROR(_get_new_table());
} else {
*eos = true;
return Status::OK();
}
if (_data_queue.empty()) {
_dependency->block();
} else if (!_data_queue_is_full()) {
_cv.notify_all();
}
RETURN_IF_ERROR(_get_new_desc());

*eos = false;
return _fill_block_impl(block);
*eos = _eos;
if (_eos) {
_data_queue.clear();
}
return Status::OK();
}

Status SchemaColumnsScanner::_fill_block_impl(vectorized::Block* block) {
Expand Down
Loading

0 comments on commit b51c223

Please sign in to comment.