diff --git a/Cargo.lock b/Cargo.lock index dc4ee5809c79c..86e7b6767203b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2032,7 +2032,6 @@ dependencies = [ "dashmap", "globiter", "itertools", - "num", "num-derive", "num-traits", "once_cell", @@ -2064,7 +2063,6 @@ dependencies = [ "common-meta-app", "futures", "opendal", - "parking_lot 0.12.1", "regex", "serde", ] diff --git a/src/common/storage/Cargo.toml b/src/common/storage/Cargo.toml index 940f5d82ea7d6..501caab22ae5a 100644 --- a/src/common/storage/Cargo.toml +++ b/src/common/storage/Cargo.toml @@ -22,7 +22,6 @@ async-trait = "0.1" bytes = "1" futures = "0.3" opendal = { workspace = true } -parking_lot = "0.12.1" regex = "1.6.0" serde = { workspace = true } diff --git a/src/common/storage/src/metrics.rs b/src/common/storage/src/metrics.rs index cfec59f6f3934..cbcfadc0d1117 100644 --- a/src/common/storage/src/metrics.rs +++ b/src/common/storage/src/metrics.rs @@ -33,7 +33,6 @@ use opendal::raw::RpRead; use opendal::raw::RpScan; use opendal::raw::RpWrite; use opendal::Result; -use parking_lot::RwLock; /// StorageMetrics represents the metrics of storage (all bytes metrics are compressed size). #[derive(Debug, Default)] @@ -50,8 +49,6 @@ pub struct StorageMetrics { partitions_scanned: AtomicU64, /// Number of partitions, before pruning partitions_total: AtomicU64, - /// Status of the operation. - status: Arc>, } impl StorageMetrics { @@ -72,13 +69,6 @@ impl StorageMetrics { partitions_total: AtomicU64::new( vs.iter().map(|v| v.as_ref().get_partitions_total()).sum(), ), - // Get the last one status, mainly used for the single table operation. - status: Arc::new(RwLock::new( - vs.iter() - .map(|v| v.as_ref().get_status()) - .collect::>() - .join("|"), - )), } } @@ -141,16 +131,6 @@ impl StorageMetrics { pub fn get_partitions_total(&self) -> u64 { self.partitions_total.load(Ordering::Relaxed) } - - pub fn set_status(&self, new: &str) { - let mut status = self.status.write(); - *status = new.to_string(); - } - - pub fn get_status(&self) -> String { - let status = self.status.read(); - status.clone() - } } #[derive(Clone, Debug)] @@ -210,6 +190,14 @@ impl LayeredAccessor for StorageMetricsAccessor { .map(|(rp, r)| (rp, StorageMetricsWrapper::new(r, self.metrics.clone()))) } + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + self.inner.list(path, args).await + } + + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + self.inner.scan(path, args).await + } + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { self.inner .blocking_read(path, args) @@ -222,14 +210,6 @@ impl LayeredAccessor for StorageMetricsAccessor { .map(|(rp, r)| (rp, StorageMetricsWrapper::new(r, self.metrics.clone()))) } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - self.inner.list(path, args).await - } - - async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { - self.inner.scan(path, args).await - } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { self.inner.blocking_list(path, args) } diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index ad5a1e1e9760a..c3da8bdde68c6 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -81,6 +81,8 @@ pub trait TableContext: Send + Sync { fn get_write_progress_value(&self) -> ProgressValues; fn get_result_progress(&self) -> Arc; fn get_result_progress_value(&self) -> ProgressValues; + fn get_status_info(&self) -> String; + fn set_status_info(&self, info: &str); fn get_partition(&self) -> Option; fn get_partitions(&self, num: usize) -> Vec; diff --git a/src/query/service/src/interpreters/interpreter_copy.rs b/src/query/service/src/interpreters/interpreter_copy.rs index 2bead7157a89d..f1ad73a32a19e 100644 --- a/src/query/service/src/interpreters/interpreter_copy.rs +++ b/src/query/service/src/interpreters/interpreter_copy.rs @@ -49,7 +49,7 @@ use crate::sessions::TableContext; use crate::sql::plans::CopyPlan; use crate::sql::plans::Plan; -const MAX_QUERY_COPIED_FILES_NUM: usize = 50; +const MAX_QUERY_COPIED_FILES_NUM: usize = 1000; pub struct CopyInterpreter { ctx: Arc, @@ -217,7 +217,7 @@ impl CopyInterpreter { } // Colored. - let mut results = vec![]; + let mut results = Vec::with_capacity(files.len()); for mut file in files { if let Some(copied_file) = copied_files.get(&file.path) { match &copied_file.etag { @@ -280,10 +280,32 @@ impl CopyInterpreter { let start = Instant::now(); let ctx = self.ctx.clone(); let table_ctx: Arc = ctx.clone(); + + // Status. + { + let status = "begin to list files"; + ctx.set_status_info(status); + info!(status); + } + let mut stage_table_info = stage_table_info.clone(); let mut all_source_file_infos = StageTable::list_files(&stage_table_info).await?; + // Status. + { + let status = format!("end to list files: {}", all_source_file_infos.len()); + ctx.set_status_info(&status); + info!(status); + } + if !force { + // Status. + { + let status = "begin to color copied files"; + ctx.set_status_info(status); + info!(status); + } + all_source_file_infos = CopyInterpreter::color_copied_files( &table_ctx, catalog_name, @@ -293,7 +315,12 @@ impl CopyInterpreter { ) .await?; - // Need copied file info. + // Status. + { + let status = format!("end to color copied files: {}", all_source_file_infos.len()); + ctx.set_status_info(&status); + info!(status); + } } let mut need_copied_file_infos = vec![]; @@ -315,8 +342,14 @@ impl CopyInterpreter { return Ok(build_res); } - stage_table_info.files_to_copy = Some(need_copied_file_infos.clone()); + // Status. + { + let status = "begin to read stage table plan"; + ctx.set_status_info(status); + info!(status); + } + stage_table_info.files_to_copy = Some(need_copied_file_infos.clone()); let stage_table = StageTable::try_create(stage_table_info.clone())?; let read_source_plan = { stage_table @@ -324,6 +357,13 @@ impl CopyInterpreter { .await? }; + // Status. + { + let status = "begin to read stage table data"; + ctx.set_status_info(status); + info!(status); + } + let to_table = ctx .get_table(catalog_name, database_name, table_name) .await?; @@ -429,17 +469,26 @@ impl CopyInterpreter { &all_source_files, ) .await; - info!( - "copy: try to purge files:{}, elapsed:{}", - all_source_files.len(), - purge_start.elapsed().as_secs() - ); + + // Status. + { + let status = format!( + "try to purge files:{}, elapsed:{}", + all_source_files.len(), + purge_start.elapsed().as_secs() + ); + ctx.set_status_info(&status); + info!(status); + } } - info!( - "copy: all copy finished, elapsed:{}", - start.elapsed().as_secs() - ); + // Status. + { + let status = + format!("all copy finished, elapsed:{}", start.elapsed().as_secs()); + ctx.set_status_info(&status); + info!(status); + } Ok(()) }); diff --git a/src/query/service/src/interpreters/interpreter_list.rs b/src/query/service/src/interpreters/interpreter_list.rs index 2eb46e7b64d5b..54716cdace2d9 100644 --- a/src/query/service/src/interpreters/interpreter_list.rs +++ b/src/query/service/src/interpreters/interpreter_list.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use common_exception::ErrorCode; use common_exception::Result; use common_expression::types::number::UInt64Type; use common_expression::types::StringType; @@ -25,7 +24,6 @@ use common_expression::FromOptData; use common_sql::plans::ListPlan; use common_storages_stage::list_file; use common_storages_stage::StageTable; -use regex::Regex; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; @@ -57,20 +55,7 @@ impl Interpreter for ListInterpreter { async fn execute2(&self) -> Result { let plan = &self.plan; let op = StageTable::get_op(&plan.stage)?; - let mut files = list_file(&op, &plan.path).await?; - - let files = if plan.pattern.is_empty() { - files - } else { - let regex = Regex::new(&plan.pattern).map_err(|e| { - ErrorCode::SyntaxException(format!( - "Pattern format invalid, got:{}, error:{:?}", - &plan.pattern, e - )) - })?; - files.retain(|v| regex.is_match(&v.path)); - files - }; + let files = list_file(&op, &plan.path, &plan.pattern).await?; let names: Vec> = files .iter() diff --git a/src/query/service/src/interpreters/interpreter_user_stage_remove.rs b/src/query/service/src/interpreters/interpreter_user_stage_remove.rs index a53ead8c9cc67..95bf5431f7283 100644 --- a/src/query/service/src/interpreters/interpreter_user_stage_remove.rs +++ b/src/query/service/src/interpreters/interpreter_user_stage_remove.rs @@ -15,13 +15,11 @@ use std::sync::Arc; use common_catalog::table_context::TableContext; -use common_exception::ErrorCode; use common_exception::Result; use common_sql::plans::RemoveStagePlan; use common_storages_fuse::io::Files; use common_storages_stage::list_file; use common_storages_stage::StageTable; -use regex::Regex; use tracing::error; use crate::interpreters::Interpreter; @@ -50,21 +48,7 @@ impl Interpreter for RemoveUserStageInterpreter { async fn execute2(&self) -> Result { let plan = self.plan.clone(); let op = StageTable::get_op(&self.plan.stage)?; - let mut stage_files = list_file(&op, &plan.path).await?; - - let files = if plan.pattern.is_empty() { - stage_files - } else { - let regex = Regex::new(&plan.pattern).map_err(|e| { - ErrorCode::SyntaxException(format!( - "Pattern format invalid, got:{}, error:{:?}", - &plan.pattern, e - )) - })?; - - stage_files.retain(|v| regex.is_match(&v.path)); - stage_files - }; + let files = list_file(&op, &plan.path, &plan.pattern).await?; let table_ctx: Arc = self.ctx.clone(); let file_op = Files::create(table_ctx, op); diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index efdb79265fa46..7907ea06f7521 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -247,6 +247,16 @@ impl TableContext for QueryContext { self.shared.result_progress.as_ref().get_values() } + fn get_status_info(&self) -> String { + let status = self.shared.status.read(); + status.clone() + } + + fn set_status_info(&self, info: &str) { + let mut status = self.shared.status.write(); + *status = info.to_string(); + } + fn get_partition(&self) -> Option { self.partition_queue.write().pop_front() } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 74a6e30dd1b5e..1c34d8c6f171e 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -83,6 +83,8 @@ pub struct QueryContextShared { /// partitions_sha for each table in the query. Not empty only when enabling query result cache. pub(in crate::sessions) partitions_shas: Arc>>, pub(in crate::sessions) cacheable: Arc, + // Status info. + pub(in crate::sessions) status: Arc>, } impl QueryContextShared { @@ -115,6 +117,7 @@ impl QueryContextShared { on_error_map: Arc::new(RwLock::new(None)), partitions_shas: Arc::new(RwLock::new(vec![])), cacheable: Arc::new(AtomicBool::new(true)), + status: Arc::new(RwLock::new("null".to_string())), })) } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 8f20dc596ae55..809e92926ced7 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -345,6 +345,12 @@ impl TableContext for CtxDelegation { todo!() } + fn get_status_info(&self) -> String { + "".to_string() + } + + fn set_status_info(&self, _info: &str) {} + fn get_partition(&self) -> Option { todo!() } diff --git a/src/query/storages/fuse/src/operations/gc.rs b/src/query/storages/fuse/src/operations/gc.rs index 0e13e8b3c1762..d42d785a4c36b 100644 --- a/src/query/storages/fuse/src/operations/gc.rs +++ b/src/query/storages/fuse/src/operations/gc.rs @@ -132,8 +132,8 @@ impl FuseTable { None, &ListSnapshotLiteOption::NeedSegmentsWithExclusion(segments_excluded.clone()), min_snapshot_timestamp, - |x| { - self.data_metrics.set_status(&x); + |status| { + ctx.set_status_info(&status); }, ) .await?; @@ -221,7 +221,7 @@ impl FuseTable { "gc: scan table statistic files:{} takes:{} sec.", status_ts_scan_count, status_ts_scan_cost, ); - self.data_metrics.set_status(&status); + ctx.set_status_info(&status); info!(status); } } @@ -301,7 +301,7 @@ impl FuseTable { status_segment_to_be_purged_count, start.elapsed().as_secs() ); - self.data_metrics.set_status(&status); + ctx.set_status_info(&status); info!(status); } } @@ -346,7 +346,7 @@ impl FuseTable { status_purged_count, start.elapsed().as_secs() ); - self.data_metrics.set_status(&status); + ctx.set_status_info(&status); info!(status); } } @@ -376,7 +376,7 @@ impl FuseTable { status_purged_count, start.elapsed().as_secs() ); - self.data_metrics.set_status(&status); + ctx.set_status_info(&status); info!(status); } } diff --git a/src/query/storages/stage/src/file.rs b/src/query/storages/stage/src/file.rs index e23c1c390c89d..3e6f184bcb641 100644 --- a/src/query/storages/stage/src/file.rs +++ b/src/query/storages/stage/src/file.rs @@ -16,12 +16,14 @@ use chrono::TimeZone; use chrono::Utc; use common_catalog::plan::StageFileInfo; use common_catalog::plan::StageFileStatus; +use common_exception::ErrorCode; use common_exception::Result; use futures::TryStreamExt; use opendal::Entry; use opendal::Metadata; use opendal::Metakey; use opendal::Operator; +use regex::Regex; use tracing::warn; /// List files from DAL in recursive way. @@ -31,7 +33,7 @@ use tracing::warn; /// - If not exist, we will try to list `path/` too. /// /// TODO(@xuanwo): return a stream instead. -pub async fn list_file(op: &Operator, path: &str) -> Result> { +pub async fn list_file(op: &Operator, path: &str, pattern: &str) -> Result> { let mut files = Vec::new(); // - If the path itself is a dir, return directly. @@ -64,6 +66,19 @@ pub async fn list_file(op: &Operator, path: &str) -> Result> }; } + let files = if pattern.is_empty() { + files + } else { + let regex = Regex::new(pattern).map_err(|e| { + ErrorCode::SyntaxException(format!( + "Pattern format invalid, got:{}, error:{:?}", + pattern, e + )) + })?; + files.retain(|v| regex.is_match(&v.path)); + files + }; + Ok(files) } diff --git a/src/query/storages/stage/src/stage_table.rs b/src/query/storages/stage/src/stage_table.rs index 22efa9cf89b79..87e93fbcbe0dc 100644 --- a/src/query/storages/stage/src/stage_table.rs +++ b/src/query/storages/stage/src/stage_table.rs @@ -43,7 +43,6 @@ use common_pipeline_sources::input_formats::SplitInfo; use common_storage::init_stage_operator; use opendal::Operator; use parking_lot::Mutex; -use regex::Regex; use crate::format_stage_file_info; use crate::list_file; @@ -76,11 +75,10 @@ impl StageTable { } pub async fn list_files(stage_info: &StageTableInfo) -> Result> { - // 1. List all files. let path = &stage_info.path; let files = &stage_info.files; let op = Self::get_op(&stage_info.stage_info)?; - let mut all_files = if !files.is_empty() { + let all_files = if !files.is_empty() { let mut res = vec![]; for file in files { // Here we add the path to the file: /path/to/path/file1. @@ -93,23 +91,9 @@ impl StageTable { } res } else { - list_file(&op, path).await? + list_file(&op, path, &stage_info.pattern).await? }; - // 2. Retain pattern match files. - { - let pattern = &stage_info.pattern; - if !pattern.is_empty() { - let regex = Regex::new(pattern).map_err(|e| { - ErrorCode::SyntaxException(format!( - "Pattern format invalid, got:{}, error:{:?}", - pattern, e - )) - })?; - all_files.retain(|v| regex.is_match(&v.path)); - } - } - Ok(all_files) } @@ -210,6 +194,7 @@ impl Table for StageTable { ctx.get_scan_progress(), compact_threshold, )?); + input_ctx.format.exec_copy(input_ctx.clone(), pipeline)?; ctx.set_on_error_map(input_ctx.get_maximum_error_per_file()); Ok(()) diff --git a/src/query/storages/system/src/processes_table.rs b/src/query/storages/system/src/processes_table.rs index 41a4df498a431..0ea98d809399e 100644 --- a/src/query/storages/system/src/processes_table.rs +++ b/src/query/storages/system/src/processes_table.rs @@ -100,12 +100,13 @@ impl SyncSystemTable for ProcessesTable { if let Some(data_metrics) = data_metrics { processes_data_read_bytes.push(data_metrics.get_read_bytes() as u64); processes_data_write_bytes.push(data_metrics.get_write_bytes() as u64); - processes_status.push(data_metrics.get_status().clone().into_bytes()); } else { processes_data_read_bytes.push(0); processes_data_write_bytes.push(0); - processes_status.push("".to_string().into_bytes()); } + + // Status info. + processes_status.push(ctx.get_status_info().clone().into_bytes()); } Ok(DataBlock::new_from_columns(vec![