diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index 64437641d4e8b1..bfc65d1052ea30 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -29,6 +29,7 @@ #include "cloud/config.h" #include "common/config.h" #include "olap/parallel_scanner_builder.h" +#include "olap/rowset/beta_rowset.h" #include "olap/storage_engine.h" #include "olap/tablet_manager.h" #include "pipeline/common/runtime_filter_consumer.h" @@ -367,6 +368,70 @@ Status OlapScanLocalState::_init_scanners(std::list* s bool has_cpu_limit = state()->query_options().__isset.resource_limit && state()->query_options().resource_limit.__isset.cpu_limit; + bool enable_segment_cache = state()->query_options().__isset.enable_segment_cache + ? state()->query_options().enable_segment_cache + : true; + std::vector>> proms; + proms.reserve(_read_sources.size()); + auto pool = ExecEnv::GetInstance()->scanner_scheduler()->get_remote_scan_thread_pool(); + { + SCOPED_TIMER(_delete_bitmap_get_agg_timer); + srand(static_cast(time(nullptr))); + const int MIN = 1000000; + const int MAX = 10000000; + for (auto& read_source : _read_sources) { + // The rowset_id across different rs_splits is guaranteed to be unique + for (auto& rs_split : read_source.rs_splits) { + auto rowset = rs_split.rs_reader->rowset(); + RETURN_IF_ERROR(rowset->load()); + auto prom = std::make_shared>(); + proms.emplace_back(prom); + + SCOPED_TIMER(_projection_timer); + int num = (rand() % (MAX - MIN + 1)) + MIN; + auto start_subimt_time = std::chrono::high_resolution_clock::now(); + auto st = pool->submit_scan_task(vectorized::SimplifiedScanTask( + [esc = enable_segment_cache, rowset, p = std::move(prom), num, + start_subimt_time] { + auto start_exec_time = std::chrono::high_resolution_clock::now(); + + SegmentCacheHandle sch; + auto task_st = SegmentLoader::instance()->load_segments( + std::dynamic_pointer_cast(rowset), &sch, esc, + false); + p->set_value(task_st); + + auto end_exec_time = std::chrono::high_resolution_clock::now(); + + auto exec_time_us = + std::chrono::duration_cast( + end_exec_time - start_exec_time) + .count(); + auto exec_time_ms = exec_time_us / 1000.0; + + auto submit_to_exec_us = + std::chrono::duration_cast( + start_exec_time - start_subimt_time) + .count(); + auto submit_to_exec_ms = submit_to_exec_us / 1000.0; + + LOG(INFO) << "happen lee " << num << " func exec time: " << exec_time_ms + << "ms" + << " func submit time: " << submit_to_exec_ms << "ms"; + }, + nullptr)); + + if (!st.ok()) { + LOG(WARNING) << "failed to submit scan task, err=" << st; + return st; + } + } + } + SCOPED_TIMER(_process_conjunct_timer); + for (auto& prom : proms) RETURN_IF_ERROR(prom->get_future().get()); + } + + LOG(INFO) << "happen lee rowset size: " << proms.size(); if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit && p._push_down_agg_type == TPushAggOp::NONE && (_storage_no_merge() || p._olap_scan_node.is_preaggregation)) { diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index db767cf9228011..0380ac81f69563 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -91,7 +91,7 @@ Status ScannerScheduler::init(ExecEnv* env) { _remote_scan_thread_pool = std::make_unique("RemoteScanThreadPool", nullptr); Status ret2 = _remote_scan_thread_pool->start(_remote_thread_pool_max_thread_num, - config::doris_scanner_min_thread_pool_thread_num, + config::thrift_connect_timeout_seconds, remote_scan_pool_queue_size); RETURN_IF_ERROR(ret2);