Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "cloud/config.h"
#include "common/config.h"
#include "olap/parallel_scanner_builder.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "pipeline/common/runtime_filter_consumer.h"
Expand Down Expand Up @@ -367,6 +368,70 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
bool has_cpu_limit = state()->query_options().__isset.resource_limit &&
state()->query_options().resource_limit.__isset.cpu_limit;

bool enable_segment_cache = state()->query_options().__isset.enable_segment_cache
? state()->query_options().enable_segment_cache
: true;
std::vector<std::shared_ptr<std::promise<Status>>> proms;
proms.reserve(_read_sources.size());
auto pool = ExecEnv::GetInstance()->scanner_scheduler()->get_remote_scan_thread_pool();
{
SCOPED_TIMER(_delete_bitmap_get_agg_timer);
srand(static_cast<unsigned int>(time(nullptr)));
const int MIN = 1000000;
const int MAX = 10000000;
for (auto& read_source : _read_sources) {
// The rowset_id across different rs_splits is guaranteed to be unique
for (auto& rs_split : read_source.rs_splits) {
auto rowset = rs_split.rs_reader->rowset();
RETURN_IF_ERROR(rowset->load());
auto prom = std::make_shared<std::promise<Status>>();
proms.emplace_back(prom);

SCOPED_TIMER(_projection_timer);
int num = (rand() % (MAX - MIN + 1)) + MIN;
auto start_subimt_time = std::chrono::high_resolution_clock::now();
auto st = pool->submit_scan_task(vectorized::SimplifiedScanTask(
[esc = enable_segment_cache, rowset, p = std::move(prom), num,
start_subimt_time] {
auto start_exec_time = std::chrono::high_resolution_clock::now();

SegmentCacheHandle sch;
auto task_st = SegmentLoader::instance()->load_segments(
std::dynamic_pointer_cast<BetaRowset>(rowset), &sch, esc,
false);
p->set_value(task_st);

auto end_exec_time = std::chrono::high_resolution_clock::now();

auto exec_time_us =
std::chrono::duration_cast<std::chrono::microseconds>(
end_exec_time - start_exec_time)
.count();
auto exec_time_ms = exec_time_us / 1000.0;

auto submit_to_exec_us =
std::chrono::duration_cast<std::chrono::microseconds>(
start_exec_time - start_subimt_time)
.count();
auto submit_to_exec_ms = submit_to_exec_us / 1000.0;

LOG(INFO) << "happen lee " << num << " func exec time: " << exec_time_ms
<< "ms"
<< " func submit time: " << submit_to_exec_ms << "ms";
},
nullptr));

if (!st.ok()) {
LOG(WARNING) << "failed to submit scan task, err=" << st;
return st;
}
}
}
SCOPED_TIMER(_process_conjunct_timer);
for (auto& prom : proms) RETURN_IF_ERROR(prom->get_future().get());
}

LOG(INFO) << "happen lee rowset size: " << proms.size();
if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
p._push_down_agg_type == TPushAggOp::NONE &&
(_storage_no_merge() || p._olap_scan_node.is_preaggregation)) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Status ScannerScheduler::init(ExecEnv* env) {
_remote_scan_thread_pool =
std::make_unique<vectorized::SimplifiedScanScheduler>("RemoteScanThreadPool", nullptr);
Status ret2 = _remote_scan_thread_pool->start(_remote_thread_pool_max_thread_num,
config::doris_scanner_min_thread_pool_thread_num,
config::thrift_connect_timeout_seconds,
remote_scan_pool_queue_size);
RETURN_IF_ERROR(ret2);

Expand Down
Loading