Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,5 @@ http-body-util = "0.1.3"
yaml-rust2 = "0.10.0"
urlencoding = "2.1.3"
uuid = { version = "1.16.0", features = ["serde", "v4", "v8"] }
tokio-stream = "0.1.17"
async-stream = "0.3.6"
37 changes: 16 additions & 21 deletions src/execution/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,9 @@ pub async fn evaluate_source_entry(
source_op: &AnalyzedSourceOp,
schema: &schema::DataSchema,
key: &value::KeyValue,
source_value: value::FieldValues,
memory: &EvaluationMemory,
) -> Result<Option<ScopeValueBuilder>> {
) -> Result<ScopeValueBuilder> {
let root_schema = &schema.schema;
let root_scope_value =
ScopeValueBuilder::new(root_schema.fields.len(), schema.collectors.len());
Expand All @@ -464,26 +465,20 @@ pub async fn evaluate_source_entry(
}
};

let result = match source_op.executor.get_value(key).await? {
Some(val) => {
let scope_value =
ScopeValueBuilder::augmented_from(&value::ScopeValue(val), collection_schema)?;
root_scope_entry.define_field_w_builder(
&source_op.output,
value::Value::Table(BTreeMap::from([(key.clone(), scope_value)])),
);

evaluate_op_scope(
&plan.op_scope,
RefList::Nil.prepend(&root_scope_entry),
memory,
)
.await?;
Some(root_scope_value)
}
None => None,
};
anyhow::Ok(result)
let scope_value =
ScopeValueBuilder::augmented_from(&value::ScopeValue(source_value), collection_schema)?;
root_scope_entry.define_field_w_builder(
&source_op.output,
value::Value::Table(BTreeMap::from([(key.clone(), scope_value)])),
);

evaluate_op_scope(
&plan.op_scope,
RefList::Nil.prepend(&root_scope_entry),
memory,
)
.await?;
Ok(root_scope_value)
}

pub async fn evaluate_transient_flow(
Expand Down
139 changes: 87 additions & 52 deletions src/execution/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Result;
use crate::prelude::*;

use futures::future::{join, join_all, try_join, try_join_all};
use itertools::Itertools;
use log::error;
Expand All @@ -13,7 +14,7 @@ use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoiz
use crate::base::schema;
use crate::base::value::{self, FieldValues, KeyValue};
use crate::builder::plan::*;
use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry};
use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry, Ordinal};
use crate::utils::db::WriteAction;
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};

Expand Down Expand Up @@ -93,9 +94,9 @@ pub fn extract_primary_key(
Ok(key)
}

enum WithApplyStatus<T = ()> {
pub enum UnchangedOr<T> {
Normal(T),
Collapsed,
Unchanged,
}

#[derive(Default)]
Expand Down Expand Up @@ -140,7 +141,7 @@ async fn precommit_source_tracking_info(
db_setup: &db_tracking_setup::TrackingTableSetupState,
export_ops: &[AnalyzedExportOp],
pool: &PgPool,
) -> Result<WithApplyStatus<PrecommitOutput>> {
) -> Result<UnchangedOr<PrecommitOutput>> {
let mut txn = pool.begin().await?;

let tracking_info = db_tracking::read_source_tracking_info_for_precommit(
Expand All @@ -157,7 +158,7 @@ async fn precommit_source_tracking_info(
.and_then(|info| info.processed_source_ordinal)
> source_ordinal
{
return Ok(WithApplyStatus::Collapsed);
return Ok(UnchangedOr::Unchanged);
}
let process_ordinal = (tracking_info
.as_ref()
Expand Down Expand Up @@ -323,7 +324,7 @@ async fn precommit_source_tracking_info(

txn.commit().await?;

Ok(WithApplyStatus::Normal(PrecommitOutput {
Ok(UnchangedOr::Normal(PrecommitOutput {
metadata: PrecommitMetadata {
source_entry_exists: data.is_some(),
process_ordinal,
Expand All @@ -343,7 +344,7 @@ async fn commit_source_tracking_info(
process_timestamp: &chrono::DateTime<chrono::Utc>,
db_setup: &db_tracking_setup::TrackingTableSetupState,
pool: &PgPool,
) -> Result<WithApplyStatus<()>> {
) -> Result<UnchangedOr<()>> {
let mut txn = pool.begin().await?;

let tracking_info = db_tracking::read_source_tracking_info_for_commit(
Expand All @@ -357,7 +358,7 @@ async fn commit_source_tracking_info(
if tracking_info.as_ref().and_then(|info| info.process_ordinal)
>= Some(precommit_metadata.process_ordinal)
{
return Ok(WithApplyStatus::Collapsed);
return Ok(UnchangedOr::Unchanged);
}

let cleaned_staging_target_keys = tracking_info
Expand Down Expand Up @@ -417,7 +418,7 @@ async fn commit_source_tracking_info(

txn.commit().await?;

Ok(WithApplyStatus::Normal(()))
Ok(UnchangedOr::Normal(()))
}

pub async fn evaluate_source_entry_with_memory(
Expand All @@ -444,8 +445,16 @@ pub async fn evaluate_source_entry_with_memory(
None
};
let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options);
let data_builder = evaluate_source_entry(plan, source_op, schema, key, &memory).await?;
Ok(data_builder)
let source_data = match source_op.executor.get_value(key).await? {
Some(d) => d,
None => return Ok(None),
};
let source_value = match source_data.value.await? {
Some(value) => value,
None => return Ok(None),
};
let output = evaluate_source_entry(plan, source_op, schema, key, source_value, &memory).await?;
Ok(Some(output))
}

pub async fn update_source_entry(
Expand All @@ -461,8 +470,6 @@ pub async fn update_source_entry(
let process_timestamp = chrono::Utc::now();

// Phase 1: Evaluate with memoization info.

// TODO: Skip if the source is not newer and the processing logic is not changed.
let existing_tracking_info = read_source_tracking_info(
source_op.source_id,
&source_key_json,
Expand All @@ -471,66 +478,94 @@ pub async fn update_source_entry(
)
.await?;
let already_exists = existing_tracking_info.is_some();
let memoization_info = existing_tracking_info
.and_then(|info| info.memoization_info.map(|info| info.0))
.flatten();
let evaluation_memory = EvaluationMemory::new(
process_timestamp,
memoization_info,
EvaluationMemoryOptions {
enable_cache: true,
evaluation_only: false,
},
);
let value_builder = if !only_for_deletion {
evaluate_source_entry(plan, source_op, schema, key, &evaluation_memory).await?
let (existing_source_ordinal, existing_logic_fingerprint, memoization_info) =
match existing_tracking_info {
Some(info) => (
info.processed_source_ordinal.map(Ordinal),
info.process_logic_fingerprint,
info.memoization_info.map(|info| info.0).flatten(),
),
None => Default::default(),
};
let (source_ordinal, output, stored_mem_info) = if !only_for_deletion {
let source_data = source_op.executor.get_value(key).await?;
let source_ordinal = source_data.as_ref().and_then(|d| d.ordinal);
match (source_ordinal, existing_source_ordinal) {
// TODO: Collapse if the source is not newer and the processing logic is not changed.
(Some(source_ordinal), Some(existing_source_ordinal)) => {
if source_ordinal < existing_source_ordinal
|| (source_ordinal == existing_source_ordinal
&& existing_logic_fingerprint == source_op.)
{
return Ok(());
}
}
_ => {}
}
let source_value = match source_data {
Some(d) => d.value.await?,
None => None,
};
match source_value {
Some(source_value) => {
let evaluation_memory = EvaluationMemory::new(
process_timestamp,
memoization_info,
EvaluationMemoryOptions {
enable_cache: true,
evaluation_only: false,
},
);
let output = evaluate_source_entry(
plan,
source_op,
schema,
key,
source_value,
&evaluation_memory,
)
.await?;
(
source_ordinal,
Some(output),
evaluation_memory.into_stored()?,
)
}
None => Default::default(),
}
} else {
None
Default::default()
};
let exists = value_builder.is_some();

if already_exists {
if exists {
if output.is_some() {
stats.num_already_exists.fetch_add(1, Relaxed);
} else {
stats.num_deletions.fetch_add(1, Relaxed);
}
} else if exists {
} else if output.is_some() {
stats.num_insertions.fetch_add(1, Relaxed);
} else {
return Ok(());
}

let memoization_info = evaluation_memory.into_stored()?;
let (source_ordinal, precommit_data) = match &value_builder {
Some(scope_value) => {
(
// TODO: Generate the actual source ordinal.
Some(1),
Some(PrecommitData {
scope_value,
memoization_info: &memoization_info,
}),
)
}
None => (None, None),
};

// Phase 2 (precommit): Update with the memoization info and stage target keys.
let precommit_output = precommit_source_tracking_info(
source_op.source_id,
&source_key_json,
source_ordinal,
precommit_data,
source_ordinal.map(|o| o.into()),
output.as_ref().map(|scope_value| PrecommitData {
scope_value,
memoization_info: &stored_mem_info,
}),
&process_timestamp,
&plan.tracking_table_setup,
&plan.export_ops,
pool,
)
.await?;
let precommit_output = match precommit_output {
WithApplyStatus::Normal(output) => output,
WithApplyStatus::Collapsed => return Ok(()),
UnchangedOr::Normal(output) => output,
UnchangedOr::Unchanged => return Ok(()),
};

// Phase 3: Apply changes to the target storage, including upserting new target records and removing existing ones.
Expand All @@ -554,7 +589,7 @@ pub async fn update_source_entry(
commit_source_tracking_info(
source_op.source_id,
&source_key_json,
source_ordinal,
source_ordinal.map(|o| o.into()),
&plan.logic_fingerprint,
precommit_output.metadata,
&process_timestamp,
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod execution;
mod lib_context;
mod llm;
mod ops;
mod prelude;
mod py;
mod server;
mod service;
Expand Down
55 changes: 50 additions & 5 deletions src/ops/interface.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,71 @@
use std::time::SystemTime;

use crate::base::{
schema::*,
spec::{IndexOptions, VectorSimilarityMetric},
value::*,
};
use crate::prelude::*;
use crate::setup;
use anyhow::Result;
use async_trait::async_trait;
use futures::future::BoxFuture;
use chrono::TimeZone;
use serde::Serialize;
use std::sync::Arc;

pub struct FlowInstanceContext {
pub flow_instance_name: String,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Ordinal(pub i64);

impl Into<i64> for Ordinal {
fn into(self) -> i64 {
self.0
}
}

impl TryFrom<SystemTime> for Ordinal {
type Error = anyhow::Error;

fn try_from(time: SystemTime) -> Result<Self, Self::Error> {
let duration = time.duration_since(std::time::UNIX_EPOCH)?;
Ok(duration.as_micros().try_into().map(Ordinal)?)
}
}

impl<TZ: TimeZone> TryFrom<chrono::DateTime<TZ>> for Ordinal {
type Error = anyhow::Error;

fn try_from(time: chrono::DateTime<TZ>) -> Result<Self, Self::Error> {
Ok(Ordinal(time.timestamp_micros()))
}
}

pub struct SourceData<'a> {
/// Value that must increase monotonically after change. E.g. can be from the update time.
pub ordinal: Option<Ordinal>,
/// None means the item is gone when polling.
pub value: BoxFuture<'a, Result<Option<FieldValues>>>,
}

pub struct SourceChange<'a> {
/// Last update/deletion ordinal. None means unavailable.
pub ordinal: Option<Ordinal>,
pub key: KeyValue,
/// None means a deletion. None within the `BoxFuture` means the item is gone when polling.
pub value: Option<BoxFuture<'a, Result<Option<FieldValues>>>>,
}

#[async_trait]
pub trait SourceExecutor: Send + Sync {
/// Get the list of keys for the source.
async fn list_keys(&self) -> Result<Vec<KeyValue>>;

// Get the value for the given key.
async fn get_value(&self, key: &KeyValue) -> Result<Option<FieldValues>>;
async fn get_value(&self, key: &KeyValue) -> Result<Option<SourceData<'async_trait>>>;

fn change_stream<'a>(&'a self) -> Option<BoxStream<'a, SourceChange<'a>>> {
None
}
}

pub trait SourceFactory {
Expand Down
Loading
Loading