diff --git a/Cargo.toml b/Cargo.toml index f4164090..722a5186 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,12 +31,7 @@ sqlx = { version = "0.8.3", features = [ "runtime-tokio", "uuid", ] } -tokio = { version = "1.44.1", features = [ - "macros", - "rt-multi-thread", - "full", - "tracing", -] } +tokio = { version = "1.44.1", features = ["macros", "rt-multi-thread", "full", "tracing", "fs"] } tower = "0.5.2" tower-http = { version = "0.6.2", features = ["cors", "trace"] } indexmap = { version = "2.8.0", features = ["serde"] } diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index 39c78322..1447dc08 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -1,5 +1,6 @@ use anyhow::Result; use futures::future::try_join_all; +use futures::StreamExt; use indexmap::IndexMap; use itertools::Itertools; use serde::ser::SerializeSeq; @@ -14,6 +15,7 @@ use super::indexer; use super::memoization::EvaluationMemoryOptions; use crate::base::{schema, value}; use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan}; +use crate::ops::interface::SourceExecutorListOptions; use crate::utils::yaml_ser::YamlSerializer; #[derive(Debug, Clone, Deserialize)] @@ -163,22 +165,27 @@ impl<'a> Dumper<'a> { } async fn evaluate_and_dump_for_source_op(&self, source_op: &AnalyzedSourceOp) -> Result<()> { - let all_keys = source_op.executor.list_keys().await?; - let mut keys_by_filename_prefix: IndexMap> = IndexMap::new(); - for key in all_keys { - let mut s = key - .to_strs() - .into_iter() - .map(|s| urlencoding::encode(&s).into_owned()) - .join(":"); - s.truncate( - (0..(FILENAME_PREFIX_MAX_LENGTH - source_op.name.as_str().len())) - .rev() - .find(|i| s.is_char_boundary(*i)) - .unwrap_or(0), - ); - keys_by_filename_prefix.entry(s).or_default().push(key); + + let mut rows_stream = source_op.executor.list(SourceExecutorListOptions { + include_ordinal: false, + }); + while let Some(rows) = rows_stream.next().await { + for row in rows?.into_iter() { + let mut s = row + .key + .to_strs() + .into_iter() + .map(|s| urlencoding::encode(&s).into_owned()) + .join(":"); + s.truncate( + (0..(FILENAME_PREFIX_MAX_LENGTH - source_op.name.as_str().len())) + .rev() + .find(|i| s.is_char_boundary(*i)) + .unwrap_or(0), + ); + keys_by_filename_prefix.entry(s).or_default().push(row.key); + } } let output_dir = Path::new(&self.options.output_dir); let evaluate_futs = diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index 16358b64..0e854e36 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -1,6 +1,6 @@ use crate::prelude::*; -use futures::future::{join, join_all, try_join, try_join_all}; +use futures::future::{join, join_all, try_join_all}; use itertools::Itertools; use log::error; use serde::Serialize; @@ -14,7 +14,9 @@ use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoiz use crate::base::schema; use crate::base::value::{self, FieldValues, KeyValue}; use crate::builder::plan::*; -use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry, Ordinal}; +use crate::ops::interface::{ + ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorListOptions, +}; use crate::utils::db::WriteAction; use crate::utils::fingerprint::{Fingerprint, Fingerprinter}; @@ -638,16 +640,21 @@ async fn update_source( schema: &schema::DataSchema, pool: &PgPool, ) -> Result { - let (keys, existing_keys_json) = try_join( - source_op.executor.list_keys(), - db_tracking::list_source_tracking_keys( - source_op.source_id, - &plan.tracking_table_setup, - pool, - ), + let existing_keys_json = db_tracking::list_source_tracking_keys( + source_op.source_id, + &plan.tracking_table_setup, + pool, ) .await?; + let mut keys = Vec::new(); + let mut rows_stream = source_op.executor.list(SourceExecutorListOptions { + include_ordinal: false, + }); + while let Some(rows) = rows_stream.next().await { + keys.extend(rows?.into_iter().map(|row| row.key)); + } + let stats = UpdateStats::default(); let upsert_futs = join_all(keys.iter().map(|key| { update_source_entry_with_err_handling(plan, source_op, schema, key, false, pool, &stats) diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 05b73cba..8e70a6cd 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -47,6 +47,12 @@ pub struct SourceData<'a> { pub value: BoxFuture<'a, Result>>, } +pub struct SourceRowMetadata { + pub key: KeyValue, + /// None means the ordinal is unavailable. + pub ordinal: Option, +} + pub struct SourceChange<'a> { /// Last update/deletion ordinal. None means unavailable. pub ordinal: Option, @@ -55,10 +61,18 @@ pub struct SourceChange<'a> { pub value: Option>>>, } +#[derive(Debug, Default)] +pub struct SourceExecutorListOptions { + pub include_ordinal: bool, +} + #[async_trait] pub trait SourceExecutor: Send + Sync { /// Get the list of keys for the source. - async fn list_keys(&self) -> Result>; + fn list<'a>( + &'a self, + options: SourceExecutorListOptions, + ) -> BoxStream<'a, Result>>; // Get the value for the given key. async fn get_value(&self, key: &KeyValue) -> Result>>; diff --git a/src/ops/sources/google_drive.rs b/src/ops/sources/google_drive.rs index f3823d52..208cc549 100644 --- a/src/ops/sources/google_drive.rs +++ b/src/ops/sources/google_drive.rs @@ -1,18 +1,18 @@ use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, sync::{Arc, LazyLock}, }; +use async_stream::try_stream; use google_drive3::{ - api::Scope, + api::{File, Scope}, yup_oauth2::{read_service_account_key, ServiceAccountAuthenticator}, DriveHub, }; use http_body_util::BodyExt; use hyper_rustls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; -use indexmap::IndexSet; -use log::warn; +use log::{trace, warn}; use crate::base::field_attrs; use crate::ops::sdk::*; @@ -80,7 +80,7 @@ pub struct Spec { struct Executor { drive_hub: DriveHub>, binary: bool, - root_folder_ids: Vec, + root_folder_ids: Vec>, } impl Executor { @@ -105,7 +105,7 @@ impl Executor { Ok(Self { drive_hub, binary: spec.binary, - root_folder_ids: spec.root_folder_ids, + root_folder_ids: spec.root_folder_ids.into_iter().map(Arc::from).collect(), }) } } @@ -123,55 +123,60 @@ fn escape_string(s: &str) -> String { } impl Executor { - async fn traverse_folder( + fn visit_file( &self, - folder_id: &str, - visited_folder_ids: &mut IndexSet, - result: &mut IndexSet, - ) -> Result<()> { - if !visited_folder_ids.insert(folder_id.to_string()) { - return Ok(()); + file: File, + new_folder_ids: &mut Vec>, + seen_ids: &mut HashSet>, + ) -> Result> { + if file.trashed == Some(true) { + return Ok(None); } - let query = format!("'{}' in parents", escape_string(folder_id)); - let mut next_page_token: Option = None; - loop { - let mut list_call = self - .drive_hub - .files() - .list() - .add_scope(Scope::Readonly) - .q(&query); - if let Some(next_page_token) = &next_page_token { - list_call = list_call.page_token(next_page_token); - } - let (_, files) = list_call.doit().await?; - if let Some(files) = files.files { - for file in files { - match (file.id, file.mime_type) { - (Some(id), Some(mime_type)) => { - if mime_type == FOLDER_MIME_TYPE { - Box::pin(self.traverse_folder(&id, visited_folder_ids, result)) - .await?; - } else if is_supported_file_type(&mime_type) { - result.insert(KeyValue::Str(Arc::from(id))); - } else { - warn!("Skipping file with unsupported mime type: id={id}, mime_type={mime_type}, name={:?}", file.name); - } - } - (id, mime_type) => { - warn!( - "Skipping file with incomplete metadata: id={id:?}, mime_type={mime_type:?}", - ); - } - } - } - } - next_page_token = files.next_page_token; - if next_page_token.is_none() { - break; + let (id, mime_type) = match (file.id, file.mime_type) { + (Some(id), Some(mime_type)) => (Arc::::from(id), mime_type), + (id, mime_type) => { + warn!("Skipping file with incomplete metadata: id={id:?}, mime_type={mime_type:?}",); + return Ok(None); } + }; + if !seen_ids.insert(id.clone()) { + return Ok(None); + } + let result = if mime_type == FOLDER_MIME_TYPE { + new_folder_ids.push(id); + None + } else if is_supported_file_type(&mime_type) { + Some(SourceRowMetadata { + key: KeyValue::Str(Arc::from(id)), + ordinal: file.modified_time.map(|t| t.try_into()).transpose()?, + }) + } else { + trace!("Skipping file with unsupported mime type: id={id}, mime_type={mime_type}, name={:?}", file.name); + None + }; + Ok(result) + } + + async fn list_files( + &self, + folder_id: &str, + fields: &str, + next_page_token: &mut Option, + ) -> Result> { + let query = format!("'{}' in parents", escape_string(folder_id)); + let mut list_call = self + .drive_hub + .files() + .list() + .add_scope(Scope::Readonly) + .q(&query) + .param("fields", fields); + if let Some(next_page_token) = &next_page_token { + list_call = list_call.page_token(next_page_token); } - Ok(()) + let (_, files) = list_call.doit().await?; + let file_iter = files.files.into_iter().flat_map(|file| file.into_iter()); + Ok(file_iter) } } @@ -202,13 +207,43 @@ impl ResultExt for google_drive3::Result { #[async_trait] impl SourceExecutor for Executor { - async fn list_keys(&self) -> Result> { - let mut result = IndexSet::new(); - for root_folder_id in &self.root_folder_ids { - self.traverse_folder(root_folder_id, &mut IndexSet::new(), &mut result) - .await?; + fn list<'a>( + &'a self, + options: SourceExecutorListOptions, + ) -> BoxStream<'a, Result>> { + let mut seen_ids = HashSet::new(); + let mut folder_ids = self.root_folder_ids.clone(); + let fields = format!( + "files(id,name,mimeType,trashed{})", + if options.include_ordinal { + ",modifiedTime" + } else { + "" + } + ); + let mut new_folder_ids = Vec::new(); + try_stream! { + while let Some(folder_id) = folder_ids.pop() { + let mut next_page_token = None; + loop { + let mut curr_rows = Vec::new(); + let files = self + .list_files(&folder_id, &fields, &mut next_page_token) + .await?; + for file in files { + curr_rows.extend(self.visit_file(file, &mut new_folder_ids, &mut seen_ids)?); + } + if !curr_rows.is_empty() { + yield curr_rows; + } + if next_page_token.is_none() { + break; + } + } + folder_ids.extend(new_folder_ids.drain(..).rev()); + } } - Ok(result.into_iter().collect()) + .boxed() } async fn get_value(&self, key: &KeyValue) -> Result>> { diff --git a/src/ops/sources/local_file.rs b/src/ops/sources/local_file.rs index 2659ce5f..8f7bcc6a 100644 --- a/src/ops/sources/local_file.rs +++ b/src/ops/sources/local_file.rs @@ -1,5 +1,8 @@ +use async_stream::try_stream; use globset::{Glob, GlobSet, GlobSetBuilder}; use log::warn; +use std::borrow::Cow; +use std::path::Path; use std::{path::PathBuf, sync::Arc}; use crate::base::field_attrs; @@ -14,7 +17,6 @@ pub struct Spec { } struct Executor { - root_path_str: String, root_path: PathBuf, binary: bool, included_glob_set: Option, @@ -22,46 +24,64 @@ struct Executor { } impl Executor { - fn is_excluded(&self, path: &str) -> bool { + fn is_excluded(&self, path: impl AsRef + Copy) -> bool { self.excluded_glob_set .as_ref() .is_some_and(|glob_set| glob_set.is_match(path)) } - fn is_file_included(&self, path: &str) -> bool { + fn is_file_included(&self, path: impl AsRef + Copy) -> bool { self.included_glob_set .as_ref() .is_none_or(|glob_set| glob_set.is_match(path)) && !self.is_excluded(path) } +} - async fn traverse_dir(&self, dir_path: &PathBuf, result: &mut Vec) -> Result<()> { - for entry in std::fs::read_dir(dir_path)? { - let entry = entry?; - let path = entry.path(); - if let Some(file_name) = path.to_str() { - let relative_path = &file_name[self.root_path_str.len() + 1..]; - if path.is_dir() { - if !self.is_excluded(relative_path) { - Box::pin(self.traverse_dir(&path, result)).await?; +#[async_trait] +impl SourceExecutor for Executor { + fn list<'a>( + &'a self, + options: SourceExecutorListOptions, + ) -> BoxStream<'a, Result>> { + let root_component_size = self.root_path.components().count(); + let mut dirs = Vec::new(); + dirs.push(Cow::Borrowed(&self.root_path)); + let mut new_dirs = Vec::new(); + try_stream! { + while let Some(dir) = dirs.pop() { + let mut entries = tokio::fs::read_dir(dir.as_ref()).await?; + while let Some(entry) = entries.next_entry().await? { + let path = entry.path(); + let mut path_components = path.components(); + for _ in 0..root_component_size { + path_components.next(); + } + let relative_path = path_components.as_path(); + if path.is_dir() { + if !self.is_excluded(relative_path) { + new_dirs.push(Cow::Owned(path)); + } + } else if self.is_file_included(relative_path) { + let ordinal: Option = if options.include_ordinal { + Some(path.metadata()?.modified()?.try_into()?) + } else { + None + }; + if let Some(relative_path) = relative_path.to_str() { + yield vec![SourceRowMetadata { + key: KeyValue::Str(relative_path.into()), + ordinal, + }]; + } else { + warn!("Skipped ill-formed file path: {}", path.display()); + } } - } else if self.is_file_included(relative_path) { - result.push(KeyValue::Str(Arc::from(relative_path))); } - } else { - warn!("Skipped ill-formed file path: {}", path.display()); + dirs.extend(new_dirs.drain(..).rev()); } } - Ok(()) - } -} - -#[async_trait] -impl SourceExecutor for Executor { - async fn list_keys(&self) -> Result> { - let mut result = Vec::new(); - self.traverse_dir(&self.root_path, &mut result).await?; - Ok(result) + .boxed() } async fn get_value(&self, key: &KeyValue) -> Result>> { @@ -145,7 +165,6 @@ impl SourceFactoryBase for Factory { _context: Arc, ) -> Result> { Ok(Box::new(Executor { - root_path_str: spec.path.clone(), root_path: PathBuf::from(spec.path), binary: spec.binary, included_glob_set: spec.included_patterns.map(build_glob_set).transpose()?, diff --git a/src/prelude.rs b/src/prelude.rs index a49f4704..88613106 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,5 +1,5 @@ pub use anyhow::Result; pub use async_trait::async_trait; -pub use futures::FutureExt; -pub use futures::{future::BoxFuture, stream::BoxStream}; +pub use futures::{future::BoxFuture, prelude::*, stream::BoxStream}; +pub use futures::{FutureExt, StreamExt}; pub use std::sync::Arc; diff --git a/src/service/flows.rs b/src/service/flows.rs index 7a35814d..bebb1033 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -7,10 +7,10 @@ use axum::{ Json, }; use axum_extra::extract::Query; +use futures::StreamExt; use serde::{Deserialize, Serialize}; use super::error::ApiError; -use crate::base::{schema::DataSchema, value}; use crate::lib_context::LibContext; use crate::{ api_bail, api_error, @@ -18,6 +18,10 @@ use crate::{ execution::indexer, execution::memoization, }; +use crate::{ + base::{schema::DataSchema, value}, + ops::interface::SourceExecutorListOptions, +}; pub async fn list_flows( State(lib_context): State>, @@ -95,7 +99,13 @@ pub async fn get_keys( ) })?; - let keys = source_op.executor.list_keys().await?; + let mut rows_stream = source_op.executor.list(SourceExecutorListOptions { + include_ordinal: false, + }); + let mut keys = Vec::new(); + while let Some(rows) = rows_stream.next().await { + keys.extend(rows?.into_iter().map(|row| row.key)); + } Ok(Json(GetKeysResponse { key_type: key_type.clone(), keys,