From c00bc0a2fcf2e750dc0fa3d2089338da26db6aef Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Sun, 12 Mar 2023 08:14:31 +0800 Subject: [PATCH 1/6] feat: add status for the operation in show processlist --- Cargo.lock | 1 - src/common/storage/src/metrics.rs | 36 +++++-------------- src/query/catalog/src/table_context.rs | 2 ++ src/query/service/src/sessions/query_ctx.rs | 10 ++++++ .../service/src/sessions/query_ctx_shared.rs | 3 ++ .../it/storages/fuse/operations/commit.rs | 6 ++++ src/query/storages/fuse/src/operations/gc.rs | 12 +++---- .../storages/system/src/processes_table.rs | 5 +-- 8 files changed, 38 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dc4ee5809c79c..899be0f6bc2f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2032,7 +2032,6 @@ dependencies = [ "dashmap", "globiter", "itertools", - "num", "num-derive", "num-traits", "once_cell", diff --git a/src/common/storage/src/metrics.rs b/src/common/storage/src/metrics.rs index cfec59f6f3934..cbcfadc0d1117 100644 --- a/src/common/storage/src/metrics.rs +++ b/src/common/storage/src/metrics.rs @@ -33,7 +33,6 @@ use opendal::raw::RpRead; use opendal::raw::RpScan; use opendal::raw::RpWrite; use opendal::Result; -use parking_lot::RwLock; /// StorageMetrics represents the metrics of storage (all bytes metrics are compressed size). #[derive(Debug, Default)] @@ -50,8 +49,6 @@ pub struct StorageMetrics { partitions_scanned: AtomicU64, /// Number of partitions, before pruning partitions_total: AtomicU64, - /// Status of the operation. - status: Arc>, } impl StorageMetrics { @@ -72,13 +69,6 @@ impl StorageMetrics { partitions_total: AtomicU64::new( vs.iter().map(|v| v.as_ref().get_partitions_total()).sum(), ), - // Get the last one status, mainly used for the single table operation. - status: Arc::new(RwLock::new( - vs.iter() - .map(|v| v.as_ref().get_status()) - .collect::>() - .join("|"), - )), } } @@ -141,16 +131,6 @@ impl StorageMetrics { pub fn get_partitions_total(&self) -> u64 { self.partitions_total.load(Ordering::Relaxed) } - - pub fn set_status(&self, new: &str) { - let mut status = self.status.write(); - *status = new.to_string(); - } - - pub fn get_status(&self) -> String { - let status = self.status.read(); - status.clone() - } } #[derive(Clone, Debug)] @@ -210,6 +190,14 @@ impl LayeredAccessor for StorageMetricsAccessor { .map(|(rp, r)| (rp, StorageMetricsWrapper::new(r, self.metrics.clone()))) } + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { + self.inner.list(path, args).await + } + + async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { + self.inner.scan(path, args).await + } + fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { self.inner .blocking_read(path, args) @@ -222,14 +210,6 @@ impl LayeredAccessor for StorageMetricsAccessor { .map(|(rp, r)| (rp, StorageMetricsWrapper::new(r, self.metrics.clone()))) } - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Pager)> { - self.inner.list(path, args).await - } - - async fn scan(&self, path: &str, args: OpScan) -> Result<(RpScan, Self::Pager)> { - self.inner.scan(path, args).await - } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingPager)> { self.inner.blocking_list(path, args) } diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index ad5a1e1e9760a..c3da8bdde68c6 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -81,6 +81,8 @@ pub trait TableContext: Send + Sync { fn get_write_progress_value(&self) -> ProgressValues; fn get_result_progress(&self) -> Arc; fn get_result_progress_value(&self) -> ProgressValues; + fn get_status_info(&self) -> String; + fn set_status_info(&self, info: &str); fn get_partition(&self) -> Option; fn get_partitions(&self, num: usize) -> Vec; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index efdb79265fa46..7907ea06f7521 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -247,6 +247,16 @@ impl TableContext for QueryContext { self.shared.result_progress.as_ref().get_values() } + fn get_status_info(&self) -> String { + let status = self.shared.status.read(); + status.clone() + } + + fn set_status_info(&self, info: &str) { + let mut status = self.shared.status.write(); + *status = info.to_string(); + } + fn get_partition(&self) -> Option { self.partition_queue.write().pop_front() } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 74a6e30dd1b5e..0c10b3e1cdbaa 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -83,6 +83,8 @@ pub struct QueryContextShared { /// partitions_sha for each table in the query. Not empty only when enabling query result cache. pub(in crate::sessions) partitions_shas: Arc>>, pub(in crate::sessions) cacheable: Arc, + // Status info. + pub(in crate::sessions) status: Arc>, } impl QueryContextShared { @@ -115,6 +117,7 @@ impl QueryContextShared { on_error_map: Arc::new(RwLock::new(None)), partitions_shas: Arc::new(RwLock::new(vec![])), cacheable: Arc::new(AtomicBool::new(true)), + status: Arc::new(Default::default()), })) } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 8f20dc596ae55..809e92926ced7 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -345,6 +345,12 @@ impl TableContext for CtxDelegation { todo!() } + fn get_status_info(&self) -> String { + "".to_string() + } + + fn set_status_info(&self, _info: &str) {} + fn get_partition(&self) -> Option { todo!() } diff --git a/src/query/storages/fuse/src/operations/gc.rs b/src/query/storages/fuse/src/operations/gc.rs index 0e13e8b3c1762..d42d785a4c36b 100644 --- a/src/query/storages/fuse/src/operations/gc.rs +++ b/src/query/storages/fuse/src/operations/gc.rs @@ -132,8 +132,8 @@ impl FuseTable { None, &ListSnapshotLiteOption::NeedSegmentsWithExclusion(segments_excluded.clone()), min_snapshot_timestamp, - |x| { - self.data_metrics.set_status(&x); + |status| { + ctx.set_status_info(&status); }, ) .await?; @@ -221,7 +221,7 @@ impl FuseTable { "gc: scan table statistic files:{} takes:{} sec.", status_ts_scan_count, status_ts_scan_cost, ); - self.data_metrics.set_status(&status); + ctx.set_status_info(&status); info!(status); } } @@ -301,7 +301,7 @@ impl FuseTable { status_segment_to_be_purged_count, start.elapsed().as_secs() ); - self.data_metrics.set_status(&status); + ctx.set_status_info(&status); info!(status); } } @@ -346,7 +346,7 @@ impl FuseTable { status_purged_count, start.elapsed().as_secs() ); - self.data_metrics.set_status(&status); + ctx.set_status_info(&status); info!(status); } } @@ -376,7 +376,7 @@ impl FuseTable { status_purged_count, start.elapsed().as_secs() ); - self.data_metrics.set_status(&status); + ctx.set_status_info(&status); info!(status); } } diff --git a/src/query/storages/system/src/processes_table.rs b/src/query/storages/system/src/processes_table.rs index 41a4df498a431..0ea98d809399e 100644 --- a/src/query/storages/system/src/processes_table.rs +++ b/src/query/storages/system/src/processes_table.rs @@ -100,12 +100,13 @@ impl SyncSystemTable for ProcessesTable { if let Some(data_metrics) = data_metrics { processes_data_read_bytes.push(data_metrics.get_read_bytes() as u64); processes_data_write_bytes.push(data_metrics.get_write_bytes() as u64); - processes_status.push(data_metrics.get_status().clone().into_bytes()); } else { processes_data_read_bytes.push(0); processes_data_write_bytes.push(0); - processes_status.push("".to_string().into_bytes()); } + + // Status info. + processes_status.push(ctx.get_status_info().clone().into_bytes()); } Ok(DataBlock::new_from_columns(vec![ From f76da9a6771a9b3c752d840fd45006e6822d60fc Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Sun, 12 Mar 2023 08:48:57 +0800 Subject: [PATCH 2/6] add status info to copy --- .../src/interpreters/interpreter_copy.rs | 49 +++++++++++++++---- src/query/storages/stage/src/stage_table.rs | 1 + 2 files changed, 40 insertions(+), 10 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_copy.rs b/src/query/service/src/interpreters/interpreter_copy.rs index 2bead7157a89d..daf99587f4254 100644 --- a/src/query/service/src/interpreters/interpreter_copy.rs +++ b/src/query/service/src/interpreters/interpreter_copy.rs @@ -49,7 +49,7 @@ use crate::sessions::TableContext; use crate::sql::plans::CopyPlan; use crate::sql::plans::Plan; -const MAX_QUERY_COPIED_FILES_NUM: usize = 50; +const MAX_QUERY_COPIED_FILES_NUM: usize = 1000; pub struct CopyInterpreter { ctx: Arc, @@ -217,7 +217,7 @@ impl CopyInterpreter { } // Colored. - let mut results = vec![]; + let mut results = Vec::with_capacity(files.len()); for mut file in files { if let Some(copied_file) = copied_files.get(&file.path) { match &copied_file.etag { @@ -280,10 +280,23 @@ impl CopyInterpreter { let start = Instant::now(); let ctx = self.ctx.clone(); let table_ctx: Arc = ctx.clone(); + + // Status. + ctx.set_status_info("begin to list files"); + let mut stage_table_info = stage_table_info.clone(); let mut all_source_file_infos = StageTable::list_files(&stage_table_info).await?; + // Status. + ctx.set_status_info(&format!( + "end to list files: {}", + all_source_file_infos.len() + )); + if !force { + // Status. + ctx.set_status_info("begin to color copied files"); + all_source_file_infos = CopyInterpreter::color_copied_files( &table_ctx, catalog_name, @@ -293,7 +306,11 @@ impl CopyInterpreter { ) .await?; - // Need copied file info. + // Status. + ctx.set_status_info(&format!( + "end to color copied files: {}", + all_source_file_infos.len() + )); } let mut need_copied_file_infos = vec![]; @@ -315,8 +332,10 @@ impl CopyInterpreter { return Ok(build_res); } - stage_table_info.files_to_copy = Some(need_copied_file_infos.clone()); + // Status. + ctx.set_status_info("begin to read stage table read plan"); + stage_table_info.files_to_copy = Some(need_copied_file_infos.clone()); let stage_table = StageTable::try_create(stage_table_info.clone())?; let read_source_plan = { stage_table @@ -324,6 +343,9 @@ impl CopyInterpreter { .await? }; + // Status. + ctx.set_status_info("begin to read stage table data"); + let to_table = ctx .get_table(catalog_name, database_name, table_name) .await?; @@ -429,17 +451,24 @@ impl CopyInterpreter { &all_source_files, ) .await; - info!( - "copy: try to purge files:{}, elapsed:{}", + + let status = format!( + "try to purge files:{}, elapsed:{}", all_source_files.len(), purge_start.elapsed().as_secs() ); + info!(status); + + // Status. + ctx.set_status_info(&status); } - info!( - "copy: all copy finished, elapsed:{}", - start.elapsed().as_secs() - ); + let status = + format!("all copy finished, elapsed:{}", start.elapsed().as_secs()); + info!(status); + + // Status. + ctx.set_status_info(&status); Ok(()) }); diff --git a/src/query/storages/stage/src/stage_table.rs b/src/query/storages/stage/src/stage_table.rs index 22efa9cf89b79..1b50162c5147b 100644 --- a/src/query/storages/stage/src/stage_table.rs +++ b/src/query/storages/stage/src/stage_table.rs @@ -210,6 +210,7 @@ impl Table for StageTable { ctx.get_scan_progress(), compact_threshold, )?); + input_ctx.format.exec_copy(input_ctx.clone(), pipeline)?; ctx.set_on_error_map(input_ctx.get_maximum_error_per_file()); Ok(()) From 372953d43dfa1c672d99742b75b3d4bb39cc030f Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Sun, 12 Mar 2023 09:00:58 +0800 Subject: [PATCH 3/6] fix unused deps --- Cargo.lock | 1 - src/common/storage/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 899be0f6bc2f5..86e7b6767203b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2063,7 +2063,6 @@ dependencies = [ "common-meta-app", "futures", "opendal", - "parking_lot 0.12.1", "regex", "serde", ] diff --git a/src/common/storage/Cargo.toml b/src/common/storage/Cargo.toml index 940f5d82ea7d6..501caab22ae5a 100644 --- a/src/common/storage/Cargo.toml +++ b/src/common/storage/Cargo.toml @@ -22,7 +22,6 @@ async-trait = "0.1" bytes = "1" futures = "0.3" opendal = { workspace = true } -parking_lot = "0.12.1" regex = "1.6.0" serde = { workspace = true } From d82a0378a88575771ec3052aa2f908d82610361a Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Sun, 12 Mar 2023 09:20:15 +0800 Subject: [PATCH 4/6] add more log for status --- .../src/interpreters/interpreter_copy.rs | 70 ++++++++++++------- 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_copy.rs b/src/query/service/src/interpreters/interpreter_copy.rs index daf99587f4254..f1ad73a32a19e 100644 --- a/src/query/service/src/interpreters/interpreter_copy.rs +++ b/src/query/service/src/interpreters/interpreter_copy.rs @@ -282,20 +282,29 @@ impl CopyInterpreter { let table_ctx: Arc = ctx.clone(); // Status. - ctx.set_status_info("begin to list files"); + { + let status = "begin to list files"; + ctx.set_status_info(status); + info!(status); + } let mut stage_table_info = stage_table_info.clone(); let mut all_source_file_infos = StageTable::list_files(&stage_table_info).await?; // Status. - ctx.set_status_info(&format!( - "end to list files: {}", - all_source_file_infos.len() - )); + { + let status = format!("end to list files: {}", all_source_file_infos.len()); + ctx.set_status_info(&status); + info!(status); + } if !force { // Status. - ctx.set_status_info("begin to color copied files"); + { + let status = "begin to color copied files"; + ctx.set_status_info(status); + info!(status); + } all_source_file_infos = CopyInterpreter::color_copied_files( &table_ctx, @@ -307,10 +316,11 @@ impl CopyInterpreter { .await?; // Status. - ctx.set_status_info(&format!( - "end to color copied files: {}", - all_source_file_infos.len() - )); + { + let status = format!("end to color copied files: {}", all_source_file_infos.len()); + ctx.set_status_info(&status); + info!(status); + } } let mut need_copied_file_infos = vec![]; @@ -333,7 +343,11 @@ impl CopyInterpreter { } // Status. - ctx.set_status_info("begin to read stage table read plan"); + { + let status = "begin to read stage table plan"; + ctx.set_status_info(status); + info!(status); + } stage_table_info.files_to_copy = Some(need_copied_file_infos.clone()); let stage_table = StageTable::try_create(stage_table_info.clone())?; @@ -344,7 +358,11 @@ impl CopyInterpreter { }; // Status. - ctx.set_status_info("begin to read stage table data"); + { + let status = "begin to read stage table data"; + ctx.set_status_info(status); + info!(status); + } let to_table = ctx .get_table(catalog_name, database_name, table_name) @@ -452,23 +470,25 @@ impl CopyInterpreter { ) .await; - let status = format!( - "try to purge files:{}, elapsed:{}", - all_source_files.len(), - purge_start.elapsed().as_secs() - ); - info!(status); - // Status. - ctx.set_status_info(&status); + { + let status = format!( + "try to purge files:{}, elapsed:{}", + all_source_files.len(), + purge_start.elapsed().as_secs() + ); + ctx.set_status_info(&status); + info!(status); + } } - let status = - format!("all copy finished, elapsed:{}", start.elapsed().as_secs()); - info!(status); - // Status. - ctx.set_status_info(&status); + { + let status = + format!("all copy finished, elapsed:{}", start.elapsed().as_secs()); + ctx.set_status_info(&status); + info!(status); + } Ok(()) }); From bc2237182eeb78264ed4734099c911ae8320f3a2 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Sun, 12 Mar 2023 09:25:47 +0800 Subject: [PATCH 5/6] change status default --- src/query/service/src/sessions/query_ctx_shared.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 0c10b3e1cdbaa..1c34d8c6f171e 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -117,7 +117,7 @@ impl QueryContextShared { on_error_map: Arc::new(RwLock::new(None)), partitions_shas: Arc::new(RwLock::new(vec![])), cacheable: Arc::new(AtomicBool::new(true)), - status: Arc::new(Default::default()), + status: Arc::new(RwLock::new("null".to_string())), })) } From d05770ee6a04ea6aa6ceaa2bcae4a6f35f9f4d75 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Sun, 12 Mar 2023 09:49:01 +0800 Subject: [PATCH 6/6] refine the list_file --- .../src/interpreters/interpreter_list.rs | 17 +--------------- .../interpreter_user_stage_remove.rs | 18 +---------------- src/query/storages/stage/src/file.rs | 17 +++++++++++++++- src/query/storages/stage/src/stage_table.rs | 20 ++----------------- 4 files changed, 20 insertions(+), 52 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_list.rs b/src/query/service/src/interpreters/interpreter_list.rs index 2eb46e7b64d5b..54716cdace2d9 100644 --- a/src/query/service/src/interpreters/interpreter_list.rs +++ b/src/query/service/src/interpreters/interpreter_list.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use common_exception::ErrorCode; use common_exception::Result; use common_expression::types::number::UInt64Type; use common_expression::types::StringType; @@ -25,7 +24,6 @@ use common_expression::FromOptData; use common_sql::plans::ListPlan; use common_storages_stage::list_file; use common_storages_stage::StageTable; -use regex::Regex; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; @@ -57,20 +55,7 @@ impl Interpreter for ListInterpreter { async fn execute2(&self) -> Result { let plan = &self.plan; let op = StageTable::get_op(&plan.stage)?; - let mut files = list_file(&op, &plan.path).await?; - - let files = if plan.pattern.is_empty() { - files - } else { - let regex = Regex::new(&plan.pattern).map_err(|e| { - ErrorCode::SyntaxException(format!( - "Pattern format invalid, got:{}, error:{:?}", - &plan.pattern, e - )) - })?; - files.retain(|v| regex.is_match(&v.path)); - files - }; + let files = list_file(&op, &plan.path, &plan.pattern).await?; let names: Vec> = files .iter() diff --git a/src/query/service/src/interpreters/interpreter_user_stage_remove.rs b/src/query/service/src/interpreters/interpreter_user_stage_remove.rs index a53ead8c9cc67..95bf5431f7283 100644 --- a/src/query/service/src/interpreters/interpreter_user_stage_remove.rs +++ b/src/query/service/src/interpreters/interpreter_user_stage_remove.rs @@ -15,13 +15,11 @@ use std::sync::Arc; use common_catalog::table_context::TableContext; -use common_exception::ErrorCode; use common_exception::Result; use common_sql::plans::RemoveStagePlan; use common_storages_fuse::io::Files; use common_storages_stage::list_file; use common_storages_stage::StageTable; -use regex::Regex; use tracing::error; use crate::interpreters::Interpreter; @@ -50,21 +48,7 @@ impl Interpreter for RemoveUserStageInterpreter { async fn execute2(&self) -> Result { let plan = self.plan.clone(); let op = StageTable::get_op(&self.plan.stage)?; - let mut stage_files = list_file(&op, &plan.path).await?; - - let files = if plan.pattern.is_empty() { - stage_files - } else { - let regex = Regex::new(&plan.pattern).map_err(|e| { - ErrorCode::SyntaxException(format!( - "Pattern format invalid, got:{}, error:{:?}", - &plan.pattern, e - )) - })?; - - stage_files.retain(|v| regex.is_match(&v.path)); - stage_files - }; + let files = list_file(&op, &plan.path, &plan.pattern).await?; let table_ctx: Arc = self.ctx.clone(); let file_op = Files::create(table_ctx, op); diff --git a/src/query/storages/stage/src/file.rs b/src/query/storages/stage/src/file.rs index e23c1c390c89d..3e6f184bcb641 100644 --- a/src/query/storages/stage/src/file.rs +++ b/src/query/storages/stage/src/file.rs @@ -16,12 +16,14 @@ use chrono::TimeZone; use chrono::Utc; use common_catalog::plan::StageFileInfo; use common_catalog::plan::StageFileStatus; +use common_exception::ErrorCode; use common_exception::Result; use futures::TryStreamExt; use opendal::Entry; use opendal::Metadata; use opendal::Metakey; use opendal::Operator; +use regex::Regex; use tracing::warn; /// List files from DAL in recursive way. @@ -31,7 +33,7 @@ use tracing::warn; /// - If not exist, we will try to list `path/` too. /// /// TODO(@xuanwo): return a stream instead. -pub async fn list_file(op: &Operator, path: &str) -> Result> { +pub async fn list_file(op: &Operator, path: &str, pattern: &str) -> Result> { let mut files = Vec::new(); // - If the path itself is a dir, return directly. @@ -64,6 +66,19 @@ pub async fn list_file(op: &Operator, path: &str) -> Result> }; } + let files = if pattern.is_empty() { + files + } else { + let regex = Regex::new(pattern).map_err(|e| { + ErrorCode::SyntaxException(format!( + "Pattern format invalid, got:{}, error:{:?}", + pattern, e + )) + })?; + files.retain(|v| regex.is_match(&v.path)); + files + }; + Ok(files) } diff --git a/src/query/storages/stage/src/stage_table.rs b/src/query/storages/stage/src/stage_table.rs index 1b50162c5147b..87e93fbcbe0dc 100644 --- a/src/query/storages/stage/src/stage_table.rs +++ b/src/query/storages/stage/src/stage_table.rs @@ -43,7 +43,6 @@ use common_pipeline_sources::input_formats::SplitInfo; use common_storage::init_stage_operator; use opendal::Operator; use parking_lot::Mutex; -use regex::Regex; use crate::format_stage_file_info; use crate::list_file; @@ -76,11 +75,10 @@ impl StageTable { } pub async fn list_files(stage_info: &StageTableInfo) -> Result> { - // 1. List all files. let path = &stage_info.path; let files = &stage_info.files; let op = Self::get_op(&stage_info.stage_info)?; - let mut all_files = if !files.is_empty() { + let all_files = if !files.is_empty() { let mut res = vec![]; for file in files { // Here we add the path to the file: /path/to/path/file1. @@ -93,23 +91,9 @@ impl StageTable { } res } else { - list_file(&op, path).await? + list_file(&op, path, &stage_info.pattern).await? }; - // 2. Retain pattern match files. - { - let pattern = &stage_info.pattern; - if !pattern.is_empty() { - let regex = Regex::new(pattern).map_err(|e| { - ErrorCode::SyntaxException(format!( - "Pattern format invalid, got:{}, error:{:?}", - pattern, e - )) - })?; - all_files.retain(|v| regex.is_match(&v.path)); - } - } - Ok(all_files) }