diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index 1447dc08..1fcb3381 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -11,8 +11,8 @@ use std::collections::BTreeMap; use std::path::{Path, PathBuf}; use yaml_rust2::YamlEmitter; -use super::indexer; use super::memoization::EvaluationMemoryOptions; +use super::row_indexer; use crate::base::{schema, value}; use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan}; use crate::ops::interface::SourceExecutorListOptions; @@ -76,7 +76,7 @@ impl<'a> Dumper<'a> { where 'a: 'b, { - let data_builder = indexer::evaluate_source_entry_with_memory( + let data_builder = row_indexer::evaluate_source_entry_with_memory( self.plan, source_op, self.schema, @@ -113,8 +113,10 @@ impl<'a> Dumper<'a> { data: collected_values_buffer[collector_idx] .iter() .map(|v| -> Result<_> { - let key = - indexer::extract_primary_key(&export_op.primary_key_def, v)?; + let key = row_indexer::extract_primary_key( + &export_op.primary_key_def, + v, + )?; Ok((key, v)) }) .collect::>()?, diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 5de6b5e7..fce1a8c8 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -1,9 +1,10 @@ +pub(crate) mod db_tracking_setup; pub(crate) mod dumper; pub(crate) mod evaluator; -pub(crate) mod indexer; +pub(crate) mod memoization; pub(crate) mod query; +pub(crate) mod row_indexer; +pub(crate) mod source_indexer; +pub(crate) mod stats; mod db_tracking; -pub mod db_tracking_setup; - -pub(crate) mod memoization; diff --git a/src/execution/indexer.rs b/src/execution/row_indexer.rs similarity index 81% rename from src/execution/indexer.rs rename to src/execution/row_indexer.rs index 0e854e36..49b7c171 100644 --- a/src/execution/indexer.rs +++ b/src/execution/row_indexer.rs @@ -1,92 +1,24 @@ use crate::prelude::*; -use futures::future::{join, join_all, try_join_all}; -use itertools::Itertools; +use futures::future::try_join_all; use log::error; -use serde::Serialize; use sqlx::PgPool; use std::collections::{HashMap, HashSet}; -use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; +use std::sync::atomic::Ordering::Relaxed; use super::db_tracking::{self, read_source_tracking_info_for_processing, TrackedTargetKey}; use super::db_tracking_setup; +use super::evaluator::{evaluate_source_entry, ScopeValueBuilder}; use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, StoredMemoizationInfo}; +use super::stats; + use crate::base::schema; use crate::base::value::{self, FieldValues, KeyValue}; use crate::builder::plan::*; -use crate::ops::interface::{ - ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorListOptions, -}; +use crate::ops::interface::{ExportTargetMutation, ExportTargetUpsertEntry, Ordinal}; use crate::utils::db::WriteAction; use crate::utils::fingerprint::{Fingerprint, Fingerprinter}; -use super::evaluator::{evaluate_source_entry, ScopeValueBuilder}; - -#[derive(Debug, Serialize, Default)] -pub struct UpdateStats { - pub num_skipped: AtomicUsize, - pub num_insertions: AtomicUsize, - pub num_deletions: AtomicUsize, - pub num_repreocesses: AtomicUsize, - pub num_errors: AtomicUsize, -} - -impl std::fmt::Display for UpdateStats { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let num_skipped = self.num_skipped.load(Relaxed); - if num_skipped > 0 { - write!(f, "{} rows skipped", num_skipped)?; - } - - let num_insertions = self.num_insertions.load(Relaxed); - let num_deletions = self.num_deletions.load(Relaxed); - let num_reprocesses = self.num_repreocesses.load(Relaxed); - let num_source_rows = num_insertions + num_deletions + num_reprocesses; - if num_source_rows > 0 { - if num_skipped > 0 { - write!(f, ", ")?; - } - write!(f, "{num_source_rows} source rows processed",)?; - - let num_errors = self.num_errors.load(Relaxed); - if num_errors > 0 { - write!(f, " with {num_errors} ERRORS",)?; - } - write!( - f, - ": {num_insertions} added, {num_deletions} removed, {num_reprocesses} repocessed", - )?; - } - Ok(()) - } -} - -#[derive(Debug, Serialize)] -pub struct SourceUpdateInfo { - pub source_name: String, - pub stats: UpdateStats, -} - -impl std::fmt::Display for SourceUpdateInfo { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}: {}", self.source_name, self.stats) - } -} - -#[derive(Debug, Serialize)] -pub struct IndexUpdateInfo { - pub sources: Vec, -} - -impl std::fmt::Display for IndexUpdateInfo { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for source in self.sources.iter() { - writeln!(f, "{}", source)?; - } - Ok(()) - } -} - pub fn extract_primary_key( primary_key_def: &AnalyzedPrimaryKeyDef, record: &FieldValues, @@ -470,14 +402,14 @@ pub async fn evaluate_source_entry_with_memory( Ok(Some(output)) } -pub async fn update_source_entry( +pub async fn update_source_row( plan: &ExecutionPlan, source_op: &AnalyzedSourceOp, schema: &schema::DataSchema, key: &value::KeyValue, only_for_deletion: bool, pool: &PgPool, - stats: &UpdateStats, + stats: &stats::UpdateStats, ) -> Result<()> { let source_key_json = serde_json::to_value(key)?; let process_timestamp = chrono::Utc::now(); @@ -617,85 +549,18 @@ pub async fn update_source_entry( Ok(()) } -async fn update_source_entry_with_err_handling( +pub(super) async fn update_source_row_with_err_handling( plan: &ExecutionPlan, source_op: &AnalyzedSourceOp, schema: &schema::DataSchema, key: &value::KeyValue, only_for_deletion: bool, pool: &PgPool, - stats: &UpdateStats, + stats: &stats::UpdateStats, ) { - let r = update_source_entry(plan, source_op, schema, key, only_for_deletion, pool, stats).await; + let r = update_source_row(plan, source_op, schema, key, only_for_deletion, pool, stats).await; if let Err(e) = r { stats.num_errors.fetch_add(1, Relaxed); error!("{:?}", e.context("Error in indexing a source row")); } } - -async fn update_source( - source_name: &str, - plan: &ExecutionPlan, - source_op: &AnalyzedSourceOp, - schema: &schema::DataSchema, - pool: &PgPool, -) -> Result { - let existing_keys_json = db_tracking::list_source_tracking_keys( - source_op.source_id, - &plan.tracking_table_setup, - pool, - ) - .await?; - - let mut keys = Vec::new(); - let mut rows_stream = source_op.executor.list(SourceExecutorListOptions { - include_ordinal: false, - }); - while let Some(rows) = rows_stream.next().await { - keys.extend(rows?.into_iter().map(|row| row.key)); - } - - let stats = UpdateStats::default(); - let upsert_futs = join_all(keys.iter().map(|key| { - update_source_entry_with_err_handling(plan, source_op, schema, key, false, pool, &stats) - })); - let deleted_keys = existing_keys_json - .into_iter() - .map(|existing_key_json| { - value::Value::::from_json( - existing_key_json.source_key, - &source_op.primary_key_type, - )? - .as_key() - }) - .filter_ok(|existing_key| !keys.contains(existing_key)) - .collect::>>()?; - let delete_futs = join_all(deleted_keys.iter().map(|key| { - update_source_entry_with_err_handling(plan, source_op, schema, key, true, pool, &stats) - })); - join(upsert_futs, delete_futs).await; - - Ok(SourceUpdateInfo { - source_name: source_name.to_string(), - stats, - }) -} - -pub async fn update( - plan: &ExecutionPlan, - schema: &schema::DataSchema, - pool: &PgPool, -) -> Result { - let source_update_stats = try_join_all( - plan.source_ops - .iter() - .map(|source_op| async move { - update_source(source_op.name.as_str(), plan, source_op, schema, pool).await - }) - .collect::>(), - ) - .await?; - Ok(IndexUpdateInfo { - sources: source_update_stats, - }) -} diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs new file mode 100644 index 00000000..111b01a1 --- /dev/null +++ b/src/execution/source_indexer.rs @@ -0,0 +1,78 @@ +use crate::prelude::*; + +use super::{db_tracking, row_indexer, stats}; +use futures::future::{join, join_all, try_join_all}; +use sqlx::PgPool; + +async fn update_source( + source_name: &str, + plan: &plan::ExecutionPlan, + source_op: &plan::AnalyzedSourceOp, + schema: &schema::DataSchema, + pool: &PgPool, +) -> Result { + let existing_keys_json = db_tracking::list_source_tracking_keys( + source_op.source_id, + &plan.tracking_table_setup, + pool, + ) + .await?; + + let mut keys = Vec::new(); + let mut rows_stream = source_op + .executor + .list(interface::SourceExecutorListOptions { + include_ordinal: false, + }); + while let Some(rows) = rows_stream.next().await { + keys.extend(rows?.into_iter().map(|row| row.key)); + } + + let stats = stats::UpdateStats::default(); + let upsert_futs = join_all(keys.iter().map(|key| { + row_indexer::update_source_row_with_err_handling( + plan, source_op, schema, key, false, pool, &stats, + ) + })); + let deleted_keys = existing_keys_json + .into_iter() + .map(|existing_key_json| { + value::Value::::from_json( + existing_key_json.source_key, + &source_op.primary_key_type, + )? + .as_key() + }) + .filter_ok(|existing_key| !keys.contains(existing_key)) + .collect::>>()?; + let delete_futs = join_all(deleted_keys.iter().map(|key| { + row_indexer::update_source_row_with_err_handling( + plan, source_op, schema, key, true, pool, &stats, + ) + })); + join(upsert_futs, delete_futs).await; + + Ok(stats::SourceUpdateInfo { + source_name: source_name.to_string(), + stats, + }) +} + +pub async fn update( + plan: &plan::ExecutionPlan, + schema: &schema::DataSchema, + pool: &PgPool, +) -> Result { + let source_update_stats = try_join_all( + plan.source_ops + .iter() + .map(|source_op| async move { + update_source(source_op.name.as_str(), plan, source_op, schema, pool).await + }) + .collect::>(), + ) + .await?; + Ok(stats::IndexUpdateInfo { + sources: source_update_stats, + }) +} diff --git a/src/execution/stats.rs b/src/execution/stats.rs new file mode 100644 index 00000000..eb978b2b --- /dev/null +++ b/src/execution/stats.rs @@ -0,0 +1,68 @@ +use crate::prelude::*; + +use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; + +#[derive(Debug, Serialize, Default)] +pub struct UpdateStats { + pub num_skipped: AtomicUsize, + pub num_insertions: AtomicUsize, + pub num_deletions: AtomicUsize, + pub num_repreocesses: AtomicUsize, + pub num_errors: AtomicUsize, +} + +impl std::fmt::Display for UpdateStats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let num_skipped = self.num_skipped.load(Relaxed); + if num_skipped > 0 { + write!(f, "{} rows skipped", num_skipped)?; + } + + let num_insertions = self.num_insertions.load(Relaxed); + let num_deletions = self.num_deletions.load(Relaxed); + let num_reprocesses = self.num_repreocesses.load(Relaxed); + let num_source_rows = num_insertions + num_deletions + num_reprocesses; + if num_source_rows > 0 { + if num_skipped > 0 { + write!(f, ", ")?; + } + write!(f, "{num_source_rows} source rows processed",)?; + + let num_errors = self.num_errors.load(Relaxed); + if num_errors > 0 { + write!(f, " with {num_errors} ERRORS",)?; + } + write!( + f, + ": {num_insertions} added, {num_deletions} removed, {num_reprocesses} repocessed", + )?; + } + Ok(()) + } +} + +#[derive(Debug, Serialize)] +pub struct SourceUpdateInfo { + pub source_name: String, + pub stats: UpdateStats, +} + +impl std::fmt::Display for SourceUpdateInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}: {}", self.source_name, self.stats) + } +} + +#[derive(Debug, Serialize)] +pub struct IndexUpdateInfo { + pub sources: Vec, +} + +impl std::fmt::Display for IndexUpdateInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for source in self.sources.iter() { + writeln!(f, "{}", source)?; + } + Ok(()) + } +} diff --git a/src/ops/sdk.rs b/src/ops/sdk.rs index fe7c7d8b..0938b0f5 100644 --- a/src/ops/sdk.rs +++ b/src/ops/sdk.rs @@ -9,7 +9,6 @@ pub use super::interface::*; pub use crate::base::schema::*; pub use crate::base::spec::*; pub use crate::base::value::*; -pub use serde::Deserialize; /// Defined for all types convertible to ValueType, to ease creation for ValueType in various operation factories. pub trait TypeCore { diff --git a/src/prelude.rs b/src/prelude.rs index 88613106..60984ee1 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -2,4 +2,12 @@ pub use anyhow::Result; pub use async_trait::async_trait; pub use futures::{future::BoxFuture, prelude::*, stream::BoxStream}; pub use futures::{FutureExt, StreamExt}; +pub use itertools::Itertools; +pub use serde::{Deserialize, Serialize}; pub use std::sync::Arc; + +pub use crate::base::{schema, spec, value}; +pub use crate::builder::plan; +pub use crate::ops::interface; +pub use crate::service::error::ApiError; +pub use crate::{api_bail, api_error}; diff --git a/src/py/mod.rs b/src/py/mod.rs index ccb61e9f..61b5836c 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -77,7 +77,7 @@ fn register_function_factory(name: String, py_function_factory: Py) -> Py } #[pyclass] -pub struct IndexUpdateInfo(pub execution::indexer::IndexUpdateInfo); +pub struct IndexUpdateInfo(pub execution::stats::IndexUpdateInfo); #[pymethods] impl IndexUpdateInfo { @@ -115,8 +115,12 @@ impl Flow { .runtime .block_on(async { let exec_plan = self.0.get_execution_plan().await?; - execution::indexer::update(&exec_plan, &self.0.data_schema, &lib_context.pool) - .await + execution::source_indexer::update( + &exec_plan, + &self.0.data_schema, + &lib_context.pool, + ) + .await }) .into_py_result()?; Ok(IndexUpdateInfo(update_info)) diff --git a/src/service/flows.rs b/src/service/flows.rs index bebb1033..47689493 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -1,27 +1,17 @@ -use std::sync::Arc; +use crate::prelude::*; -use anyhow::Result; +use crate::{base::schema::DataSchema, ops::interface::SourceExecutorListOptions}; +use crate::{ + execution::memoization, + execution::{row_indexer, stats}, +}; +use crate::{execution::source_indexer, lib_context::LibContext}; use axum::{ extract::{Path, State}, http::StatusCode, Json, }; use axum_extra::extract::Query; -use futures::StreamExt; -use serde::{Deserialize, Serialize}; - -use super::error::ApiError; -use crate::lib_context::LibContext; -use crate::{ - api_bail, api_error, - base::{schema, spec}, - execution::indexer, - execution::memoization, -}; -use crate::{ - base::{schema::DataSchema, value}, - ops::interface::SourceExecutorListOptions, -}; pub async fn list_flows( State(lib_context): State>, @@ -155,7 +145,7 @@ pub async fn evaluate_data( .ok_or_else(|| api_error!("field {} does not have a key", query.field))?; let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?; - let value_builder = indexer::evaluate_source_entry_with_memory( + let value_builder = row_indexer::evaluate_source_entry_with_memory( &plan, source_op, schema, @@ -178,9 +168,10 @@ pub async fn evaluate_data( pub async fn update( Path(flow_name): Path, State(lib_context): State>, -) -> Result, ApiError> { +) -> Result, ApiError> { let fl = &lib_context.with_flow_context(&flow_name, |ctx| ctx.flow.clone())?; let execution_plan = fl.get_execution_plan().await?; - let update_info = indexer::update(&execution_plan, &fl.data_schema, &lib_context.pool).await?; + let update_info = + source_indexer::update(&execution_plan, &fl.data_schema, &lib_context.pool).await?; Ok(Json(update_info)) }