Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/common/base/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ impl Runtime {

Ok(handlers)
}

// TODO(Winter): remove
// Please do not use this method(it's temporary)
#[async_backtrace::framed]
pub async fn spawn_blocking<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce() -> Result<R> + Send + 'static,
R: Send + 'static,
{
match_join_handle(self.handle.spawn_blocking(f)).await
}
}

impl TrySpawn for Runtime {
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/fuse/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ serde_json = { workspace = true }
sha2 = "0.10.6"
siphasher = "0.3.10"
streaming-decompression = "0.1.2"
tokio-rayon = "2.1.0"
tracing = "0.1.36"
typetag = "0.2.3"
uuid = { version = "1.1.2", features = ["serde", "v4"] }
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::hash::Hasher;
use std::sync::Arc;

use common_arrow::arrow::bitmap::MutableBitmap;
use common_base::runtime::GlobalIORuntime;
use common_catalog::plan::Projection;
use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
Expand Down Expand Up @@ -226,7 +227,7 @@ impl MergeIntoOperationAggregator {
})?
.value
.as_column()
.ok_or_else(||{
.ok_or_else(|| {
ErrorCode::Internal(format!(
"unexpected, cast block entry (index {}) to column failed, got None. segment index {}, block index {}",
on_conflict_field_index, segment_index, block_index
Expand Down Expand Up @@ -277,7 +278,9 @@ impl MergeIntoOperationAggregator {
// serialization and compression is cpu intensive, send them to dedicated thread pool
// and wait (asyncly, which will NOT block the executor thread)
let block_builder = self.block_builder.clone();
let serialized = tokio_rayon::spawn(move || block_builder.build(new_block)).await?;
let serialized = GlobalIORuntime::instance()
.spawn_blocking(move || block_builder.build(new_block))
.await?;

// persistent data
let new_block_meta = serialized.block_meta;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;
use std::time::Instant;

use async_trait::async_trait;
use common_base::runtime::GlobalIORuntime;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_expression::BlockThresholds;
Expand Down Expand Up @@ -170,8 +171,9 @@ impl AsyncAccumulatingTransform for AppendTransform {
}
// 1. serialize block and index
let block_builder = self.block_builder.clone();
let serialized_block_state =
tokio_rayon::spawn(move || block_builder.build(data_block)).await?;
let serialized_block_state = GlobalIORuntime::instance()
.spawn_blocking(move || block_builder.build(data_block))
.await?;

let start = Instant::now();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Instant;

use common_base::base::Progress;
use common_base::base::ProgressValues;
use common_base::runtime::GlobalIORuntime;
use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_expression::DataBlock;
Expand Down Expand Up @@ -166,7 +167,9 @@ impl Processor for CompactSource {
DataBlock::concat(&blocks)?
};
// build block serialization.
let serialized = tokio_rayon::spawn(move || block_builder.build(new_block)).await?;
let serialized = GlobalIORuntime::instance()
.spawn_blocking(move || block_builder.build(new_block))
.await?;

let start = Instant::now();

Expand Down