diff --git a/Cargo.toml b/Cargo.toml index b065d3e8..f4164090 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -95,3 +95,5 @@ http-body-util = "0.1.3" yaml-rust2 = "0.10.0" urlencoding = "2.1.3" uuid = { version = "1.16.0", features = ["serde", "v4", "v8"] } +tokio-stream = "0.1.17" +async-stream = "0.3.6" diff --git a/src/execution/evaluator.rs b/src/execution/evaluator.rs index d808ce17..e9a9b590 100644 --- a/src/execution/evaluator.rs +++ b/src/execution/evaluator.rs @@ -443,8 +443,9 @@ pub async fn evaluate_source_entry( source_op: &AnalyzedSourceOp, schema: &schema::DataSchema, key: &value::KeyValue, + source_value: value::FieldValues, memory: &EvaluationMemory, -) -> Result> { +) -> Result { let root_schema = &schema.schema; let root_scope_value = ScopeValueBuilder::new(root_schema.fields.len(), schema.collectors.len()); @@ -464,26 +465,20 @@ pub async fn evaluate_source_entry( } }; - let result = match source_op.executor.get_value(key).await? { - Some(val) => { - let scope_value = - ScopeValueBuilder::augmented_from(&value::ScopeValue(val), collection_schema)?; - root_scope_entry.define_field_w_builder( - &source_op.output, - value::Value::Table(BTreeMap::from([(key.clone(), scope_value)])), - ); - - evaluate_op_scope( - &plan.op_scope, - RefList::Nil.prepend(&root_scope_entry), - memory, - ) - .await?; - Some(root_scope_value) - } - None => None, - }; - anyhow::Ok(result) + let scope_value = + ScopeValueBuilder::augmented_from(&value::ScopeValue(source_value), collection_schema)?; + root_scope_entry.define_field_w_builder( + &source_op.output, + value::Value::Table(BTreeMap::from([(key.clone(), scope_value)])), + ); + + evaluate_op_scope( + &plan.op_scope, + RefList::Nil.prepend(&root_scope_entry), + memory, + ) + .await?; + Ok(root_scope_value) } pub async fn evaluate_transient_flow( diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index f144e2de..453ebbd4 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -1,4 +1,5 @@ -use anyhow::Result; +use crate::prelude::*; + use futures::future::{join, join_all, try_join, try_join_all}; use itertools::Itertools; use log::error; @@ -13,7 +14,7 @@ use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoiz use crate::base::schema; use crate::base::value::{self, FieldValues, KeyValue}; use crate::builder::plan::*; -use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry}; +use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry, Ordinal}; use crate::utils::db::WriteAction; use crate::utils::fingerprint::{Fingerprint, Fingerprinter}; @@ -93,9 +94,9 @@ pub fn extract_primary_key( Ok(key) } -enum WithApplyStatus { +pub enum UnchangedOr { Normal(T), - Collapsed, + Unchanged, } #[derive(Default)] @@ -140,7 +141,7 @@ async fn precommit_source_tracking_info( db_setup: &db_tracking_setup::TrackingTableSetupState, export_ops: &[AnalyzedExportOp], pool: &PgPool, -) -> Result> { +) -> Result> { let mut txn = pool.begin().await?; let tracking_info = db_tracking::read_source_tracking_info_for_precommit( @@ -157,7 +158,7 @@ async fn precommit_source_tracking_info( .and_then(|info| info.processed_source_ordinal) > source_ordinal { - return Ok(WithApplyStatus::Collapsed); + return Ok(UnchangedOr::Unchanged); } let process_ordinal = (tracking_info .as_ref() @@ -323,7 +324,7 @@ async fn precommit_source_tracking_info( txn.commit().await?; - Ok(WithApplyStatus::Normal(PrecommitOutput { + Ok(UnchangedOr::Normal(PrecommitOutput { metadata: PrecommitMetadata { source_entry_exists: data.is_some(), process_ordinal, @@ -343,7 +344,7 @@ async fn commit_source_tracking_info( process_timestamp: &chrono::DateTime, db_setup: &db_tracking_setup::TrackingTableSetupState, pool: &PgPool, -) -> Result> { +) -> Result> { let mut txn = pool.begin().await?; let tracking_info = db_tracking::read_source_tracking_info_for_commit( @@ -357,7 +358,7 @@ async fn commit_source_tracking_info( if tracking_info.as_ref().and_then(|info| info.process_ordinal) >= Some(precommit_metadata.process_ordinal) { - return Ok(WithApplyStatus::Collapsed); + return Ok(UnchangedOr::Unchanged); } let cleaned_staging_target_keys = tracking_info @@ -417,7 +418,7 @@ async fn commit_source_tracking_info( txn.commit().await?; - Ok(WithApplyStatus::Normal(())) + Ok(UnchangedOr::Normal(())) } pub async fn evaluate_source_entry_with_memory( @@ -444,8 +445,16 @@ pub async fn evaluate_source_entry_with_memory( None }; let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options); - let data_builder = evaluate_source_entry(plan, source_op, schema, key, &memory).await?; - Ok(data_builder) + let source_data = match source_op.executor.get_value(key).await? { + Some(d) => d, + None => return Ok(None), + }; + let source_value = match source_data.value.await? { + Some(value) => value, + None => return Ok(None), + }; + let output = evaluate_source_entry(plan, source_op, schema, key, source_value, &memory).await?; + Ok(Some(output)) } pub async fn update_source_entry( @@ -461,8 +470,6 @@ pub async fn update_source_entry( let process_timestamp = chrono::Utc::now(); // Phase 1: Evaluate with memoization info. - - // TODO: Skip if the source is not newer and the processing logic is not changed. let existing_tracking_info = read_source_tracking_info( source_op.source_id, &source_key_json, @@ -471,57 +478,85 @@ pub async fn update_source_entry( ) .await?; let already_exists = existing_tracking_info.is_some(); - let memoization_info = existing_tracking_info - .and_then(|info| info.memoization_info.map(|info| info.0)) - .flatten(); - let evaluation_memory = EvaluationMemory::new( - process_timestamp, - memoization_info, - EvaluationMemoryOptions { - enable_cache: true, - evaluation_only: false, - }, - ); - let value_builder = if !only_for_deletion { - evaluate_source_entry(plan, source_op, schema, key, &evaluation_memory).await? + let (existing_source_ordinal, existing_logic_fingerprint, memoization_info) = + match existing_tracking_info { + Some(info) => ( + info.processed_source_ordinal.map(Ordinal), + info.process_logic_fingerprint, + info.memoization_info.map(|info| info.0).flatten(), + ), + None => Default::default(), + }; + let (source_ordinal, output, stored_mem_info) = if !only_for_deletion { + let source_data = source_op.executor.get_value(key).await?; + let source_ordinal = source_data.as_ref().and_then(|d| d.ordinal); + match (source_ordinal, existing_source_ordinal) { + // TODO: Collapse if the source is not newer and the processing logic is not changed. + (Some(source_ordinal), Some(existing_source_ordinal)) => { + if source_ordinal < existing_source_ordinal + || (source_ordinal == existing_source_ordinal + && existing_logic_fingerprint == source_op.) + { + return Ok(()); + } + } + _ => {} + } + let source_value = match source_data { + Some(d) => d.value.await?, + None => None, + }; + match source_value { + Some(source_value) => { + let evaluation_memory = EvaluationMemory::new( + process_timestamp, + memoization_info, + EvaluationMemoryOptions { + enable_cache: true, + evaluation_only: false, + }, + ); + let output = evaluate_source_entry( + plan, + source_op, + schema, + key, + source_value, + &evaluation_memory, + ) + .await?; + ( + source_ordinal, + Some(output), + evaluation_memory.into_stored()?, + ) + } + None => Default::default(), + } } else { - None + Default::default() }; - let exists = value_builder.is_some(); - if already_exists { - if exists { + if output.is_some() { stats.num_already_exists.fetch_add(1, Relaxed); } else { stats.num_deletions.fetch_add(1, Relaxed); } - } else if exists { + } else if output.is_some() { stats.num_insertions.fetch_add(1, Relaxed); } else { return Ok(()); } - let memoization_info = evaluation_memory.into_stored()?; - let (source_ordinal, precommit_data) = match &value_builder { - Some(scope_value) => { - ( - // TODO: Generate the actual source ordinal. - Some(1), - Some(PrecommitData { - scope_value, - memoization_info: &memoization_info, - }), - ) - } - None => (None, None), - }; - // Phase 2 (precommit): Update with the memoization info and stage target keys. let precommit_output = precommit_source_tracking_info( source_op.source_id, &source_key_json, - source_ordinal, - precommit_data, + source_ordinal.map(|o| o.into()), + output.as_ref().map(|scope_value| PrecommitData { + scope_value, + memoization_info: &stored_mem_info, + }), &process_timestamp, &plan.tracking_table_setup, &plan.export_ops, @@ -529,8 +564,8 @@ pub async fn update_source_entry( ) .await?; let precommit_output = match precommit_output { - WithApplyStatus::Normal(output) => output, - WithApplyStatus::Collapsed => return Ok(()), + UnchangedOr::Normal(output) => output, + UnchangedOr::Unchanged => return Ok(()), }; // Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones. @@ -554,7 +589,7 @@ pub async fn update_source_entry( commit_source_tracking_info( source_op.source_id, &source_key_json, - source_ordinal, + source_ordinal.map(|o| o.into()), &plan.logic_fingerprint, precommit_output.metadata, &process_timestamp, diff --git a/src/lib.rs b/src/lib.rs index 8ccb271b..6ce035d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ mod execution; mod lib_context; mod llm; mod ops; +mod prelude; mod py; mod server; mod service; diff --git a/src/ops/interface.rs b/src/ops/interface.rs index 80be8615..05b73cba 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -1,26 +1,71 @@ +use std::time::SystemTime; + use crate::base::{ schema::*, spec::{IndexOptions, VectorSimilarityMetric}, value::*, }; +use crate::prelude::*; use crate::setup; -use anyhow::Result; -use async_trait::async_trait; -use futures::future::BoxFuture; +use chrono::TimeZone; use serde::Serialize; -use std::sync::Arc; pub struct FlowInstanceContext { pub flow_instance_name: String, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct Ordinal(pub i64); + +impl Into for Ordinal { + fn into(self) -> i64 { + self.0 + } +} + +impl TryFrom for Ordinal { + type Error = anyhow::Error; + + fn try_from(time: SystemTime) -> Result { + let duration = time.duration_since(std::time::UNIX_EPOCH)?; + Ok(duration.as_micros().try_into().map(Ordinal)?) + } +} + +impl TryFrom> for Ordinal { + type Error = anyhow::Error; + + fn try_from(time: chrono::DateTime) -> Result { + Ok(Ordinal(time.timestamp_micros())) + } +} + +pub struct SourceData<'a> { + /// Value that must increase monotonically after change. E.g. can be from the update time. + pub ordinal: Option, + /// None means the item is gone when polling. + pub value: BoxFuture<'a, Result>>, +} + +pub struct SourceChange<'a> { + /// Last update/deletion ordinal. None means unavailable. + pub ordinal: Option, + pub key: KeyValue, + /// None means a deletion. None within the `BoxFuture` means the item is gone when polling. + pub value: Option>>>, +} + #[async_trait] pub trait SourceExecutor: Send + Sync { /// Get the list of keys for the source. async fn list_keys(&self) -> Result>; // Get the value for the given key. - async fn get_value(&self, key: &KeyValue) -> Result>; + async fn get_value(&self, key: &KeyValue) -> Result>>; + + fn change_stream<'a>(&'a self) -> Option>> { + None + } } pub trait SourceFactory { diff --git a/src/ops/sdk.rs b/src/ops/sdk.rs index bbe970a6..fe7c7d8b 100644 --- a/src/ops/sdk.rs +++ b/src/ops/sdk.rs @@ -1,15 +1,14 @@ +pub use crate::prelude::*; + use crate::builder::plan::AnalyzedFieldReference; use crate::builder::plan::AnalyzedLocalFieldReference; use std::collections::BTreeMap; -use std::sync::Arc; pub use super::factory_bases::*; pub use super::interface::*; pub use crate::base::schema::*; pub use crate::base::spec::*; pub use crate::base::value::*; -pub use anyhow::Result; -pub use axum::async_trait; pub use serde::Deserialize; /// Defined for all types convertible to ValueType, to ease creation for ValueType in various operation factories. diff --git a/src/ops/sources/google_drive.rs b/src/ops/sources/google_drive.rs index 7d219f1e..f3823d52 100644 --- a/src/ops/sources/google_drive.rs +++ b/src/ops/sources/google_drive.rs @@ -175,6 +175,31 @@ impl Executor { } } +trait ResultExt { + type OptResult; + fn or_not_found(self) -> Self::OptResult; +} + +impl ResultExt for google_drive3::Result { + type OptResult = google_drive3::Result>; + + fn or_not_found(self) -> Self::OptResult { + match self { + Ok(value) => Ok(Some(value)), + Err(google_drive3::Error::BadRequest(err_msg)) + if err_msg + .get("error") + .and_then(|e| e.get("code")) + .and_then(|code| code.as_i64()) + == Some(404) => + { + Ok(None) + } + Err(e) => Err(e), + } + } +} + #[async_trait] impl SourceExecutor for Executor { async fn list_keys(&self) -> Result> { @@ -186,80 +211,80 @@ impl SourceExecutor for Executor { Ok(result.into_iter().collect()) } - async fn get_value(&self, key: &KeyValue) -> Result> { + async fn get_value(&self, key: &KeyValue) -> Result>> { let file_id = key.str_value()?; - let file = match self + let resp = self .drive_hub .files() .get(file_id) .add_scope(Scope::Readonly) - .param("fields", "id,name,mimeType,trashed") + .param("fields", "id,name,mimeType,trashed,modifiedTime") .doit() .await - { - Ok((_, file)) => { - if file.trashed == Some(true) { - return Ok(None); - } - file - } - Err(google_drive3::Error::BadRequest(err_msg)) - if err_msg - .get("error") - .and_then(|e| e.get("code")) - .and_then(|code| code.as_i64()) - == Some(404) => - { - return Ok(None); - } - Err(e) => Err(e)?, + .or_not_found()?; + let file = match resp { + Some((_, file)) if file.trashed != Some(true) => file, + _ => return Ok(None), }; - let (mime_type, resp_body) = if let Some(export_mime_type) = file - .mime_type - .as_ref() - .and_then(|mime_type| EXPORT_MIME_TYPES.get(mime_type.as_str())) - { - let target_mime_type = if self.binary { - export_mime_type.binary + let modified_time = file.modified_time; + let value = async move { + let type_n_body = if let Some(export_mime_type) = file + .mime_type + .as_ref() + .and_then(|mime_type| EXPORT_MIME_TYPES.get(mime_type.as_str())) + { + let target_mime_type = if self.binary { + export_mime_type.binary + } else { + export_mime_type.text + }; + self.drive_hub + .files() + .export(file_id, target_mime_type) + .add_scope(Scope::Readonly) + .doit() + .await + .or_not_found()? + .map(|content| (Some(target_mime_type.to_string()), content.into_body())) } else { - export_mime_type.text + self.drive_hub + .files() + .get(file_id) + .add_scope(Scope::Readonly) + .param("alt", "media") + .doit() + .await + .or_not_found()? + .map(|(resp, _)| (file.mime_type, resp.into_body())) }; - let content = self - .drive_hub - .files() - .export(file_id, target_mime_type) - .add_scope(Scope::Readonly) - .doit() - .await? - .into_body(); - (Some(target_mime_type.to_string()), content) - } else { - let (resp, _) = self - .drive_hub - .files() - .get(file_id) - .add_scope(Scope::Readonly) - .param("alt", "media") - .doit() - .await?; - (file.mime_type, resp.into_body()) - }; - let content = resp_body.collect().await?; + let value = match type_n_body { + Some((mime_type, resp_body)) => { + let content = resp_body.collect().await?; - let fields = vec![ - file.name.unwrap_or_default().into(), - mime_type.into(), - if self.binary { - content.to_bytes().to_vec().into() - } else { - String::from_utf8_lossy(&content.to_bytes()) - .to_string() - .into() - }, - ]; - Ok(Some(FieldValues { fields })) + let fields = vec![ + file.name.unwrap_or_default().into(), + mime_type.into(), + if self.binary { + content.to_bytes().to_vec().into() + } else { + String::from_utf8_lossy(&content.to_bytes()) + .to_string() + .into() + }, + ]; + Some(FieldValues { fields }) + } + None => None, + }; + Ok(value) + } + .boxed(); + Ok(Some(SourceData { + ordinal: modified_time.map(|t| t.try_into()).transpose()?, + value, + })) } } diff --git a/src/ops/sources/local_file.rs b/src/ops/sources/local_file.rs index 1fb5aec7..2659ce5f 100644 --- a/src/ops/sources/local_file.rs +++ b/src/ops/sources/local_file.rs @@ -64,24 +64,38 @@ impl SourceExecutor for Executor { Ok(result) } - async fn get_value(&self, key: &KeyValue) -> Result> { + async fn get_value(&self, key: &KeyValue) -> Result>> { if !self.is_file_included(key.str_value()?.as_ref()) { return Ok(None); } let path = self.root_path.join(key.str_value()?.as_ref()); - let result = match std::fs::read(path) { - Ok(content) => { - let content = if self.binary { - fields_value!(content) - } else { - fields_value!(String::from_utf8_lossy(&content).to_string()) - }; - Some(content) + let modified_time = match std::fs::metadata(&path) { + Ok(metadata) => metadata.modified()?, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + return Ok(None); } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => None, Err(e) => return Err(e.into()), }; - Ok(result) + let value = async move { + let value = match std::fs::read(path) { + Ok(content) => { + let content = if self.binary { + fields_value!(content) + } else { + fields_value!(String::from_utf8_lossy(&content).to_string()) + }; + Some(content) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => None, + Err(e) => Err(e)?, + }; + Ok(value) + } + .boxed(); + Ok(Some(SourceData { + ordinal: Some(modified_time.try_into()?), + value, + })) } } diff --git a/src/prelude.rs b/src/prelude.rs new file mode 100644 index 00000000..a49f4704 --- /dev/null +++ b/src/prelude.rs @@ -0,0 +1,5 @@ +pub use anyhow::Result; +pub use async_trait::async_trait; +pub use futures::FutureExt; +pub use futures::{future::BoxFuture, stream::BoxStream}; +pub use std::sync::Arc;