diff --git a/src/ops/sources/google_drive.rs b/src/ops/sources/google_drive.rs index 937509cd..8befd7b0 100644 --- a/src/ops/sources/google_drive.rs +++ b/src/ops/sources/google_drive.rs @@ -9,10 +9,13 @@ use google_drive3::{ use http_body_util::BodyExt; use hyper_rustls::HttpsConnector; use hyper_util::client::legacy::connect::HttpConnector; +use indexmap::IndexSet; use log::debug; use crate::ops::sdk::*; +const FOLDER_MIME_TYPE: &'static str = "application/vnd.google-apps.folder"; + #[derive(Debug, Deserialize)] pub struct Spec { service_account_credential_path: String, @@ -28,7 +31,6 @@ struct Executor { impl Executor { async fn new(spec: Spec) -> Result { - // let user_secret = read_authorized_user_secret(spec.service_account_credential_path).await?; let service_account_key = read_service_account_key(spec.service_account_credential_path).await?; let auth = ServiceAccountAuthenticator::builder(service_account_key) @@ -64,12 +66,18 @@ fn escape_string(s: &str) -> String { escaped } -#[async_trait] -impl SourceExecutor for Executor { - async fn list_keys(&self) -> Result> { - let query = format!("'{}' in parents", escape_string(&self.root_folder_id)); +impl Executor { + async fn traverse_folder( + &self, + folder_id: &str, + visited_folder_ids: &mut IndexSet, + result: &mut IndexSet, + ) -> Result<()> { + if !visited_folder_ids.insert(folder_id.to_string()) { + return Ok(()); + } + let query = format!("'{}' in parents", escape_string(folder_id)); let mut next_page_token: Option = None; - let mut result = Vec::new(); loop { let mut list_call = self .drive_hub @@ -83,9 +91,12 @@ impl SourceExecutor for Executor { let (_, files) = list_call.doit().await?; if let Some(files) = files.files { for file in files { - debug!("file: {:?}", file); if let Some(id) = file.id { - result.push(KeyValue::Str(Arc::from(id))); + if file.mime_type.as_ref() == Some(&FOLDER_MIME_TYPE.to_string()) { + Box::pin(self.traverse_folder(&id, visited_folder_ids, result)).await?; + } else { + result.insert(KeyValue::Str(Arc::from(id))); + } } } } @@ -94,7 +105,17 @@ impl SourceExecutor for Executor { break; } } - Ok(result) + Ok(()) + } +} + +#[async_trait] +impl SourceExecutor for Executor { + async fn list_keys(&self) -> Result> { + let mut result = IndexSet::new(); + self.traverse_folder(&self.root_folder_id, &mut IndexSet::new(), &mut result) + .await?; + Ok(result.into_iter().collect()) } async fn get_value(&self, key: &KeyValue) -> Result> {