From e733713dff9a4e0bb8ea1fe9276649fb4f4d28db Mon Sep 17 00:00:00 2001 From: LJ Date: Sun, 30 Mar 2025 16:15:18 -0700 Subject: [PATCH] Source API simplification: `get_value()` no longer returns ordinal --- src/execution/row_indexer.rs | 6 +- src/execution/source_indexer.rs | 4 +- src/ops/interface.rs | 9 +-- src/ops/sources/google_drive.rs | 106 +++++++++++++++----------------- src/ops/sources/local_file.rs | 38 ++++-------- 5 files changed, 63 insertions(+), 100 deletions(-) diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index df81d10f..ef87b292 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -463,14 +463,10 @@ pub async fn evaluate_source_entry_with_memory( None }; let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options); - let source_data = match source_op.executor.get_value(key).await? { + let source_value = match source_op.executor.get_value(key).await? { Some(d) => d, None => return Ok(None), }; - let source_value = match source_data.value.await? { - Some(value) => value, - None => return Ok(None), - }; let output = evaluate_source_entry(plan, source_op, schema, key, source_value, &memory).await?; Ok(Some(output)) } diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index ece58575..40bb82d1 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -94,15 +94,13 @@ impl SourceContext { let source_op = &plan.source_ops[self.source_idx]; let source_value = if source_version.kind == row_indexer::SourceVersionKind::Deleted { None - } else if let Some(d) = source_op.executor.get_value(&key).await? { - d.value.await? } else { // Even if the source version kind is not Deleted, the source value might be gone one polling. // In this case, we still use the current source version even if it's already stale - actually this version skew // also happens for update cases and there's no way to keep them always in sync for many sources. // // We only need source version <= actual version for value. - None + source_op.executor.get_value(&key).await? }; let schema = &self.flow.data_schema; let result = row_indexer::update_source_row( diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 8e70a6cd..49cc41fd 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -40,13 +40,6 @@ impl TryFrom> for Ordinal { } } -pub struct SourceData<'a> { - /// Value that must increase monotonically after change. E.g. can be from the update time. - pub ordinal: Option, - /// None means the item is gone when polling. - pub value: BoxFuture<'a, Result>>, -} - pub struct SourceRowMetadata { pub key: KeyValue, /// None means the ordinal is unavailable. @@ -75,7 +68,7 @@ pub trait SourceExecutor: Send + Sync { ) -> BoxStream<'a, Result>>; // Get the value for the given key. - async fn get_value(&self, key: &KeyValue) -> Result>>; + async fn get_value(&self, key: &KeyValue) -> Result>; fn change_stream<'a>(&'a self) -> Option>> { None diff --git a/src/ops/sources/google_drive.rs b/src/ops/sources/google_drive.rs index 208cc549..86145ba3 100644 --- a/src/ops/sources/google_drive.rs +++ b/src/ops/sources/google_drive.rs @@ -246,15 +246,14 @@ impl SourceExecutor for Executor { .boxed() } - async fn get_value(&self, key: &KeyValue) -> Result>> { + async fn get_value(&self, key: &KeyValue) -> Result> { let file_id = key.str_value()?; - let resp = self .drive_hub .files() .get(file_id) .add_scope(Scope::Readonly) - .param("fields", "id,name,mimeType,trashed,modifiedTime") + .param("fields", "id,name,mimeType,trashed") .doit() .await .or_not_found()?; @@ -262,64 +261,55 @@ impl SourceExecutor for Executor { Some((_, file)) if file.trashed != Some(true) => file, _ => return Ok(None), }; - - let modified_time = file.modified_time; - let value = async move { - let type_n_body = if let Some(export_mime_type) = file - .mime_type - .as_ref() - .and_then(|mime_type| EXPORT_MIME_TYPES.get(mime_type.as_str())) - { - let target_mime_type = if self.binary { - export_mime_type.binary - } else { - export_mime_type.text - }; - self.drive_hub - .files() - .export(file_id, target_mime_type) - .add_scope(Scope::Readonly) - .doit() - .await - .or_not_found()? - .map(|content| (Some(target_mime_type.to_string()), content.into_body())) + let type_n_body = if let Some(export_mime_type) = file + .mime_type + .as_ref() + .and_then(|mime_type| EXPORT_MIME_TYPES.get(mime_type.as_str())) + { + let target_mime_type = if self.binary { + export_mime_type.binary } else { - self.drive_hub - .files() - .get(file_id) - .add_scope(Scope::Readonly) - .param("alt", "media") - .doit() - .await - .or_not_found()? - .map(|(resp, _)| (file.mime_type, resp.into_body())) + export_mime_type.text }; - let value = match type_n_body { - Some((mime_type, resp_body)) => { - let content = resp_body.collect().await?; + self.drive_hub + .files() + .export(file_id, target_mime_type) + .add_scope(Scope::Readonly) + .doit() + .await + .or_not_found()? + .map(|content| (Some(target_mime_type.to_string()), content.into_body())) + } else { + self.drive_hub + .files() + .get(file_id) + .add_scope(Scope::Readonly) + .param("alt", "media") + .doit() + .await + .or_not_found()? + .map(|(resp, _)| (file.mime_type, resp.into_body())) + }; + let value = match type_n_body { + Some((mime_type, resp_body)) => { + let content = resp_body.collect().await?; - let fields = vec![ - file.name.unwrap_or_default().into(), - mime_type.into(), - if self.binary { - content.to_bytes().to_vec().into() - } else { - String::from_utf8_lossy(&content.to_bytes()) - .to_string() - .into() - }, - ]; - Some(FieldValues { fields }) - } - None => None, - }; - Ok(value) - } - .boxed(); - Ok(Some(SourceData { - ordinal: modified_time.map(|t| t.try_into()).transpose()?, - value, - })) + let fields = vec![ + file.name.unwrap_or_default().into(), + mime_type.into(), + if self.binary { + content.to_bytes().to_vec().into() + } else { + String::from_utf8_lossy(&content.to_bytes()) + .to_string() + .into() + }, + ]; + Some(FieldValues { fields }) + } + None => None, + }; + Ok(value) } } diff --git a/src/ops/sources/local_file.rs b/src/ops/sources/local_file.rs index 8f7bcc6a..097c8e97 100644 --- a/src/ops/sources/local_file.rs +++ b/src/ops/sources/local_file.rs @@ -84,38 +84,24 @@ impl SourceExecutor for Executor { .boxed() } - async fn get_value(&self, key: &KeyValue) -> Result>> { + async fn get_value(&self, key: &KeyValue) -> Result> { if !self.is_file_included(key.str_value()?.as_ref()) { return Ok(None); } let path = self.root_path.join(key.str_value()?.as_ref()); - let modified_time = match std::fs::metadata(&path) { - Ok(metadata) => metadata.modified()?, - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - return Ok(None); + let value = match std::fs::read(path) { + Ok(content) => { + let content = if self.binary { + fields_value!(content) + } else { + fields_value!(String::from_utf8_lossy(&content).to_string()) + }; + Some(content) } - Err(e) => return Err(e.into()), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => None, + Err(e) => Err(e)?, }; - let value = async move { - let value = match std::fs::read(path) { - Ok(content) => { - let content = if self.binary { - fields_value!(content) - } else { - fields_value!(String::from_utf8_lossy(&content).to_string()) - }; - Some(content) - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => None, - Err(e) => Err(e)?, - }; - Ok(value) - } - .boxed(); - Ok(Some(SourceData { - ordinal: Some(modified_time.try_into()?), - value, - })) + Ok(value) } }