diff --git a/Cargo.lock b/Cargo.lock index a99f0b51e3a8f..e8ce6368f0b97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2063,6 +2063,7 @@ dependencies = [ "common-exception", "common-expression", "common-meta-app", + "flagset", "futures", "opendal", "regex", diff --git a/src/common/storage/Cargo.toml b/src/common/storage/Cargo.toml index bbbbf5c15c6f0..8a32bf3018f98 100644 --- a/src/common/storage/Cargo.toml +++ b/src/common/storage/Cargo.toml @@ -21,6 +21,7 @@ anyhow = { workspace = true } async-trait = "0.1" bytes = "1" chrono = { workspace = true } +flagset = "0.4" futures = "0.3" opendal = { workspace = true } regex = "1.6.0" diff --git a/src/common/storage/src/lib.rs b/src/common/storage/src/lib.rs index 9d04c0331e3b8..3f2f20173134a 100644 --- a/src/common/storage/src/lib.rs +++ b/src/common/storage/src/lib.rs @@ -55,7 +55,6 @@ pub use parquet::read_parquet_schema_async; mod stage; pub use stage::init_stage_operator; -pub use stage::FileWithMeta; pub use stage::StageFileInfo; pub use stage::StageFileStatus; pub use stage::StageFilesInfo; diff --git a/src/common/storage/src/stage.rs b/src/common/storage/src/stage.rs index 6a3fee266ba8c..0cf0abbce347e 100644 --- a/src/common/storage/src/stage.rs +++ b/src/common/storage/src/stage.rs @@ -33,20 +33,6 @@ use regex::Regex; use crate::init_operator; use crate::DataOperator; -pub struct FileWithMeta { - pub path: String, - pub metadata: Metadata, -} - -impl FileWithMeta { - fn new(path: &str, meta: Metadata) -> Self { - Self { - path: path.to_string(), - metadata: meta, - } - } -} - #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] pub enum StageFileStatus { NeedCopy, @@ -79,11 +65,10 @@ impl StageFileInfo { creator: None, } } -} -impl From for StageFileInfo { - fn from(value: FileWithMeta) -> Self { - StageFileInfo::new(value.path, &value.metadata) + /// NOTE: update this query when add new meta + pub fn meta_query() -> flagset::FlagSet { + Metakey::ContentLength | Metakey::ContentMd5 | Metakey::LastModified | Metakey::Etag } } @@ -122,7 +107,7 @@ impl StageFilesInfo { } } - pub async fn list(&self, operator: &Operator, first_only: bool) -> Result> { + pub async fn list(&self, operator: &Operator, first_only: bool) -> Result> { if let Some(files) = &self.files { let mut res = Vec::new(); for file in files { @@ -132,7 +117,7 @@ impl StageFilesInfo { .to_string(); let meta = operator.stat(&full_path).await?; if meta.mode().is_file() { - res.push(FileWithMeta::new(&full_path, meta)) + res.push(StageFileInfo::new(full_path, &meta)) } else { return Err(ErrorCode::BadArguments(format!( "{full_path} is not a file" @@ -149,7 +134,7 @@ impl StageFilesInfo { } } - pub async fn first_file(&self, operator: &Operator) -> Result { + pub async fn first_file(&self, operator: &Operator) -> Result { let mut files = self.list(operator, true).await?; match files.pop() { None => Err(ErrorCode::BadArguments("no file found")), @@ -157,7 +142,7 @@ impl StageFilesInfo { } } - pub fn blocking_first_file(&self, operator: &Operator) -> Result { + pub fn blocking_first_file(&self, operator: &Operator) -> Result { let mut files = self.blocking_list(operator, true)?; match files.pop() { None => Err(ErrorCode::BadArguments("no file found")), @@ -169,7 +154,7 @@ impl StageFilesInfo { &self, operator: &Operator, first_only: bool, - ) -> Result> { + ) -> Result> { if let Some(files) = &self.files { let mut res = Vec::new(); for file in files { @@ -179,7 +164,7 @@ impl StageFilesInfo { .to_string(); let meta = operator.blocking().stat(&full_path)?; if meta.mode().is_file() { - res.push(FileWithMeta::new(&full_path, meta)) + res.push(StageFileInfo::new(full_path, &meta)) } else { return Err(ErrorCode::BadArguments(format!( "{full_path} is not a file" @@ -201,11 +186,11 @@ impl StageFilesInfo { path: &str, pattern: Option, first_only: bool, - ) -> Result> { + ) -> Result> { let root_meta = operator.stat(path).await; match root_meta { Ok(meta) => match meta.mode() { - EntryMode::FILE => return Ok(vec![FileWithMeta::new(path, meta)]), + EntryMode::FILE => return Ok(vec![StageFileInfo::new(path.to_string(), &meta)]), EntryMode::DIR => {} EntryMode::Unknown => { return Err(ErrorCode::BadArguments("object mode is unknown")); @@ -224,10 +209,9 @@ impl StageFilesInfo { let mut files = Vec::new(); let mut list = operator.scan(path).await?; while let Some(obj) = list.try_next().await? { - // todo(youngsofun): not always need Metakey::Complete - let meta = operator.metadata(&obj, Metakey::Complete).await?; + let meta = operator.metadata(&obj, StageFileInfo::meta_query()).await?; if check_file(obj.path(), meta.mode(), &pattern) { - files.push(FileWithMeta::new(obj.path(), meta)); + files.push(StageFileInfo::new(obj.path().to_string(), &meta)); if first_only { return Ok(files); } @@ -253,13 +237,13 @@ fn blocking_list_files_with_pattern( path: &str, pattern: Option, first_only: bool, -) -> Result> { +) -> Result> { let operator = operator.blocking(); let root_meta = operator.stat(path); match root_meta { Ok(meta) => match meta.mode() { - EntryMode::FILE => return Ok(vec![FileWithMeta::new(path, meta)]), + EntryMode::FILE => return Ok(vec![StageFileInfo::new(path.to_string(), &meta)]), EntryMode::DIR => {} EntryMode::Unknown => return Err(ErrorCode::BadArguments("object mode is unknown")), }, @@ -277,9 +261,9 @@ fn blocking_list_files_with_pattern( let list = operator.list(path)?; for obj in list { let obj = obj?; - let meta = operator.metadata(&obj, Metakey::Complete)?; + let meta = operator.metadata(&obj, StageFileInfo::meta_query())?; if check_file(obj.path(), meta.mode(), &pattern) { - files.push(FileWithMeta::new(obj.path(), meta)); + files.push(StageFileInfo::new(obj.path().to_string(), &meta)); if first_only { return Ok(files); } diff --git a/src/query/service/src/interpreters/interpreter_list.rs b/src/query/service/src/interpreters/interpreter_list.rs index 03629b6ae6a1f..bfb4aec923778 100644 --- a/src/query/service/src/interpreters/interpreter_list.rs +++ b/src/query/service/src/interpreters/interpreter_list.rs @@ -68,12 +68,7 @@ impl Interpreter for ListInterpreter { files: None, pattern, }; - let files: Vec = files_info - .list(&op, false) - .await? - .into_iter() - .map(|file_with_meta| file_with_meta.into()) - .collect::>(); + let files: Vec = files_info.list(&op, false).await?; let names: Vec> = files .iter() diff --git a/src/query/sql/src/planner/binder/copy.rs b/src/query/sql/src/planner/binder/copy.rs index 22a094eaba841..64f1c6c51f7ab 100644 --- a/src/query/sql/src/planner/binder/copy.rs +++ b/src/query/sql/src/planner/binder/copy.rs @@ -41,7 +41,6 @@ use common_exception::Result; use common_meta_app::principal::OnErrorMode; use common_meta_app::principal::StageInfo; use common_storage::init_stage_operator; -use common_storage::StageFileInfo; use common_storage::StageFileStatus; use common_storage::StageFilesInfo; use common_users::UserApiProvider; @@ -540,18 +539,13 @@ impl<'a> Binder { } let operator = init_stage_operator(&stage_info)?; - let files = if operator.info().can_blocking() { + let mut files = if operator.info().can_blocking() { files_info.blocking_list(&operator, false) } else { files_info.list(&operator, false).await }?; - let mut all_source_file_infos = files - .into_iter() - .map(|file_with_meta| StageFileInfo::new(file_with_meta.path, &file_with_meta.metadata)) - .collect::>(); - - info!("end to list files: {}", all_source_file_infos.len()); + info!("end to list files: {}", files.len()); if !stmt.force { // Status. @@ -561,21 +555,16 @@ impl<'a> Binder { info!(status); } - all_source_file_infos = self + files = self .ctx - .color_copied_files( - dst_catalog_name, - dst_database_name, - dst_table_name, - all_source_file_infos, - ) + .color_copied_files(dst_catalog_name, dst_database_name, dst_table_name, files) .await?; - info!("end to color copied files: {}", all_source_file_infos.len()); + info!("end to color copied files: {}", files.len()); } let mut need_copy_file_infos = vec![]; - for file in &all_source_file_infos { + for file in &files { if file.status == StageFileStatus::NeedCopy { need_copy_file_infos.push(file.clone()); } @@ -583,7 +572,7 @@ impl<'a> Binder { info!( "copy: read all files finished, all:{}, need copy:{}, elapsed:{}", - all_source_file_infos.len(), + files.len(), need_copy_file_infos.len(), start.elapsed().as_secs() ); @@ -630,7 +619,7 @@ impl<'a> Binder { schema: dst_table.schema(), from: Box::new(query_plan), stage_info: Box::new(stage_info), - all_source_file_infos, + all_source_file_infos: files, need_copy_file_infos, validation_mode, }))) diff --git a/src/query/storages/parquet/src/parquet_table/partition.rs b/src/query/storages/parquet/src/parquet_table/partition.rs index 590dda36946e5..7ae970c314867 100644 --- a/src/query/storages/parquet/src/parquet_table/partition.rs +++ b/src/query/storages/parquet/src/parquet_table/partition.rs @@ -118,7 +118,7 @@ impl ParquetTable { self.files_info.list(&self.operator, false).await }? .into_iter() - .map(|f| (f.path, f.metadata.content_length())) + .map(|f| (f.path, f.size)) .collect::>(), }; diff --git a/src/query/storages/stage/src/stage_table.rs b/src/query/storages/stage/src/stage_table.rs index 85949194dc7b1..4f05dedae4107 100644 --- a/src/query/storages/stage/src/stage_table.rs +++ b/src/query/storages/stage/src/stage_table.rs @@ -78,7 +78,6 @@ impl StageTable { .list(&op, false) .await? .into_iter() - .map(|file_with_meta| StageFileInfo::new(file_with_meta.path, &file_with_meta.metadata)) .collect::>(); Ok(infos) }