From 4a645e3788819670ac058ea8afad2c270b3cf2de Mon Sep 17 00:00:00 2001 From: LJ Date: Sat, 22 Mar 2025 08:40:02 -0700 Subject: [PATCH 1/8] Expose `ScopeValueBuilder` from `evaluate_source_entry_with_cache()`. --- src/execution/indexer.rs | 4 ++-- src/service/flows.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index a18ffdf9..05a39f67 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -423,7 +423,7 @@ pub async fn evaluate_source_entry_with_cache( schema: &schema::DataSchema, key: &value::KeyValue, pool: &PgPool, -) -> Result> { +) -> Result> { let source_key_json = serde_json::to_value(key)?; let existing_tracking_info = read_source_tracking_info( source_op.source_id, @@ -440,7 +440,7 @@ pub async fn evaluate_source_entry_with_cache( EvaluationCache::new(process_timestamp, memoization_info.map(|info| info.cache)); let data_builder = evaluate_source_entry(plan, source_op, schema, key, Some(&evaluation_cache)).await?; - Ok(data_builder.map(|builder| builder.into())) + Ok(data_builder) } pub async fn update_source_entry( diff --git a/src/service/flows.rs b/src/service/flows.rs index b665c5b0..977c4748 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -145,7 +145,7 @@ pub async fn evaluate_data( .ok_or_else(|| api_error!("field {} does not have a key", query.field))?; let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?; - let data = indexer::evaluate_source_entry_with_cache( + let value_builder = indexer::evaluate_source_entry_with_cache( &plan, source_op, schema, @@ -157,7 +157,7 @@ pub async fn evaluate_data( Ok(Json(EvaluateDataResponse { schema: schema.clone(), - data, + data: value_builder.into(), })) } From 0730083e80835ba3fd5235e993c709b4eff85db3 Mon Sep 17 00:00:00 2001 From: LJ Date: Sat, 22 Mar 2025 09:16:11 -0700 Subject: [PATCH 2/8] Take cache enablement as an option in `evalute_source_entry_with_cache` --- src/execution/indexer.rs | 45 +++++++++++++++++++++++++--------------- src/service/flows.rs | 9 ++++---- 2 files changed, 32 insertions(+), 22 deletions(-) diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index 05a39f67..ebba37d9 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -417,29 +417,40 @@ async fn commit_source_tracking_info( Ok(WithApplyStatus::Normal(())) } +pub enum EvaluationCacheOption<'a> { + NoCache, + UseCache(&'a PgPool), +} + pub async fn evaluate_source_entry_with_cache( plan: &ExecutionPlan, source_op: &AnalyzedSourceOp, schema: &schema::DataSchema, key: &value::KeyValue, - pool: &PgPool, + cache_option: EvaluationCacheOption<'_>, ) -> Result> { - let source_key_json = serde_json::to_value(key)?; - let existing_tracking_info = read_source_tracking_info( - source_op.source_id, - &source_key_json, - &plan.tracking_table_setup, - pool, - ) - .await?; - let process_timestamp = chrono::Utc::now(); - let memoization_info = existing_tracking_info - .and_then(|info| info.memoization_info.map(|info| info.0)) - .flatten(); - let evaluation_cache = - EvaluationCache::new(process_timestamp, memoization_info.map(|info| info.cache)); - let data_builder = - evaluate_source_entry(plan, source_op, schema, key, Some(&evaluation_cache)).await?; + let cache = match cache_option { + EvaluationCacheOption::NoCache => None, + EvaluationCacheOption::UseCache(pool) => { + let source_key_json = serde_json::to_value(key)?; + let existing_tracking_info = read_source_tracking_info( + source_op.source_id, + &source_key_json, + &plan.tracking_table_setup, + pool, + ) + .await?; + let process_timestamp = chrono::Utc::now(); + let memoization_info = existing_tracking_info + .and_then(|info| info.memoization_info.map(|info| info.0)) + .flatten(); + Some(EvaluationCache::new( + process_timestamp, + memoization_info.map(|info| info.cache), + )) + } + }; + let data_builder = evaluate_source_entry(plan, source_op, schema, key, cache.as_ref()).await?; Ok(data_builder) } diff --git a/src/service/flows.rs b/src/service/flows.rs index 977c4748..fc055b78 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -10,14 +10,13 @@ use axum_extra::extract::Query; use serde::{Deserialize, Serialize}; use super::error::ApiError; +use crate::base::{schema::DataSchema, value}; +use crate::lib_context::LibContext; use crate::{ api_bail, api_error, base::{schema, spec}, execution::indexer, }; -use crate::{execution::indexer::IndexUpdateInfo, lib_context::LibContext}; - -use crate::base::{schema::DataSchema, value}; pub async fn list_flows( State(lib_context): State>, @@ -150,7 +149,7 @@ pub async fn evaluate_data( source_op, schema, &key, - &lib_context.pool, + indexer::EvaluationCacheOption::UseCache(&lib_context.pool), ) .await? .ok_or_else(|| api_error!("value not found for source at the specified key: {key:?}"))?; @@ -164,7 +163,7 @@ pub async fn evaluate_data( pub async fn update( Path(flow_name): Path, State(lib_context): State>, -) -> Result, ApiError> { +) -> Result, ApiError> { let fl = &lib_context.with_flow_context(&flow_name, |ctx| ctx.flow.clone())?; let execution_plan = fl.get_execution_plan().await?; let update_info = indexer::update(&execution_plan, &fl.data_schema, &lib_context.pool).await?; From cbf40d9227d541c34cfece12d3c067e87b807f75 Mon Sep 17 00:00:00 2001 From: LJ Date: Sat, 22 Mar 2025 09:26:52 -0700 Subject: [PATCH 3/8] Add `to_strs()` for `KeyValue`. --- src/base/value.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/base/value.rs b/src/base/value.rs index 9eae89aa..9ce6c08f 100644 --- a/src/base/value.rs +++ b/src/base/value.rs @@ -195,6 +195,24 @@ impl KeyValue { Ok(result) } + fn parts_to_strs(&self, output: &mut Vec) { + match self { + KeyValue::Bytes(v) => output.push(BASE64_STANDARD.encode(v)), + KeyValue::Str(v) => output.push(v.to_string()), + KeyValue::Bool(v) => output.push(v.to_string()), + KeyValue::Int64(v) => output.push(v.to_string()), + KeyValue::Range(v) => { + output.push(v.start.to_string()); + output.push(v.end.to_string()); + } + KeyValue::Struct(v) => { + for part in v { + part.parts_to_strs(output); + } + } + } + } + pub fn from_strs(value: impl IntoIterator, schema: &ValueType) -> Result { let mut values_iter = value.into_iter(); let result = Self::parts_from_str(&mut values_iter, schema)?; @@ -204,6 +222,12 @@ impl KeyValue { Ok(result) } + pub fn to_strs(&self) -> Vec { + let mut output = Vec::with_capacity(self.num_parts()); + self.parts_to_strs(&mut output); + output + } + pub fn kind_str(&self) -> &'static str { match self { KeyValue::Bytes(_) => "bytes", @@ -256,6 +280,14 @@ impl KeyValue { _ => anyhow::bail!("expected struct value, but got {}", self.kind_str()), } } + + pub fn num_parts(&self) -> usize { + match self { + KeyValue::Range(_) => 2, + KeyValue::Struct(v) => v.iter().map(|v| v.num_parts()).sum(), + _ => 1, + } + } } #[derive(Debug, Clone)] From 342d4bcd0f1365852be13cf26f5f276a4362492b Mon Sep 17 00:00:00 2001 From: LJ Date: Sat, 22 Mar 2025 11:24:44 -0700 Subject: [PATCH 4/8] Evaluate and write YTML files for exported data. --- Cargo.toml | 1 + src/base/value.rs | 14 +-- src/builder/plan.rs | 1 + src/execution/dumper.rs | 233 +++++++++++++++++++++++++++++++++++++++ src/execution/indexer.rs | 4 +- src/execution/mod.rs | 1 + 6 files changed, 245 insertions(+), 9 deletions(-) create mode 100644 src/execution/dumper.rs diff --git a/Cargo.toml b/Cargo.toml index e9dac76a..2f59b9af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,3 +88,4 @@ yup-oauth2 = "12.1.0" rustls = { version = "0.23.25" } http-body-util = "0.1.3" yaml-rust2 = "0.10.0" +urlencoding = "2.1.3" diff --git a/src/base/value.rs b/src/base/value.rs index 9ce6c08f..a465bd07 100644 --- a/src/base/value.rs +++ b/src/base/value.rs @@ -909,7 +909,7 @@ impl Serialize for TypedValue<'_> { (_, Value::Null) => serializer.serialize_none(), (ValueType::Basic(_), v) => v.serialize(serializer), (ValueType::Struct(s), Value::Struct(field_values)) => TypedFieldsValue { - schema: s, + schema: &s.fields, values_iter: field_values.fields.iter(), } .serialize(serializer), @@ -917,7 +917,7 @@ impl Serialize for TypedValue<'_> { let mut seq = serializer.serialize_seq(Some(rows.len()))?; for row in rows { seq.serialize_element(&TypedFieldsValue { - schema: &c.row, + schema: &c.row.fields, values_iter: row.fields.iter(), })?; } @@ -927,7 +927,7 @@ impl Serialize for TypedValue<'_> { let mut seq = serializer.serialize_seq(Some(rows.len()))?; for (k, v) in rows { seq.serialize_element(&TypedFieldsValue { - schema: &c.row, + schema: &c.row.fields, values_iter: std::iter::once(&Value::from(k.clone())) .chain(v.fields.iter()), })?; @@ -943,15 +943,15 @@ impl Serialize for TypedValue<'_> { } pub struct TypedFieldsValue<'a, I: Iterator + Clone> { - schema: &'a StructSchema, - values_iter: I, + pub schema: &'a [FieldSchema], + pub values_iter: I, } impl<'a, I: Iterator + Clone> Serialize for TypedFieldsValue<'a, I> { fn serialize(&self, serializer: S) -> Result { - let mut map = serializer.serialize_map(Some(self.schema.fields.len()))?; + let mut map = serializer.serialize_map(Some(self.schema.len()))?; let values_iter = self.values_iter.clone(); - for (field, value) in self.schema.fields.iter().zip(values_iter) { + for (field, value) in self.schema.iter().zip(values_iter) { map.serialize_entry( &field.name, &TypedValue { diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 2334bf24..de45cfa7 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -104,6 +104,7 @@ pub struct AnalyzedExportOp { pub query_target: Option>, pub primary_key_def: AnalyzedPrimaryKeyDef, pub primary_key_type: ValueType, + /// idx for value fields - excluding the primary key field. pub value_fields: Vec, } diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs new file mode 100644 index 00000000..ae09953a --- /dev/null +++ b/src/execution/dumper.rs @@ -0,0 +1,233 @@ +use anyhow::Result; +use futures::future::try_join_all; +use indexmap::IndexMap; +use itertools::Itertools; +use serde::ser::SerializeSeq; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use std::collections::BTreeMap; +use std::path::{Path, PathBuf}; +use yaml_rust2::YamlEmitter; + +use super::indexer; +use crate::base::{schema, value}; +use crate::builder::plan::{AnalyzedExportOp, AnalyzedSourceOp, ExecutionPlan}; +use crate::utils::yaml_ser::YamlSerializer; + +#[derive(Debug, Clone, Deserialize)] +pub struct DumpEvaluationOutputOptions { + pub output_dir: String, + pub use_cache: bool, +} + +const FILENAME_PREFIX_MAX_LENGTH: usize = 128; + +struct TargetExportData<'a> { + schema: &'a Vec, + data: BTreeMap, +} + +impl<'a> Serialize for TargetExportData<'a> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.data.len()))?; + for (_, values) in self.data.iter() { + seq.serialize_element(&value::TypedFieldsValue { + schema: self.schema, + values_iter: values.fields.iter(), + })?; + } + seq.end() + } +} + +#[derive(Serialize)] +struct SourceOutputData<'a> { + key: value::TypedValue<'a>, + exports: Option>>, + error: Option, +} + +struct Dumper<'a> { + plan: &'a ExecutionPlan, + schema: &'a schema::DataSchema, + pool: &'a PgPool, + options: DumpEvaluationOutputOptions, +} + +impl<'a> Dumper<'a> { + async fn evaluate_source_entry<'b>( + &'a self, + source_op: &'a AnalyzedSourceOp, + key: &value::KeyValue, + collected_values_buffer: &'b mut Vec>, + ) -> Result>>> + where + 'a: 'b, + { + let cache_option = if self.options.use_cache { + indexer::EvaluationCacheOption::UseCache(self.pool) + } else { + indexer::EvaluationCacheOption::NoCache + }; + + let data_builder = indexer::evaluate_source_entry_with_cache( + self.plan, + source_op, + self.schema, + key, + cache_option, + ) + .await?; + + let data_builder = if let Some(data_builder) = data_builder { + data_builder + } else { + return Ok(None); + }; + + *collected_values_buffer = data_builder + .collected_values + .into_iter() + .map(|v| v.into_inner().unwrap()) + .collect(); + let exports = self + .plan + .export_ops + .iter() + .map(|export_op| -> Result<_> { + let collector_idx = export_op.input.collector_idx as usize; + let entry = ( + export_op.name.as_str(), + TargetExportData { + schema: &self.schema.collectors[collector_idx].spec.fields, + data: collected_values_buffer + .iter() + .map(|v| -> Result<_> { + let key = indexer::extract_primary_key( + &export_op.primary_key_def, + &v[collector_idx], + )?; + Ok((key, &v[collector_idx])) + }) + .collect::>()?, + }, + ); + Ok(entry) + }) + .collect::>()?; + Ok(Some(exports)) + } + + async fn evaluate_and_dump_source_entry( + &self, + source_op: &AnalyzedSourceOp, + key: value::KeyValue, + file_name: PathBuf, + ) -> Result<()> { + let mut collected_values_buffer = Vec::new(); + let (exports, error) = match self + .evaluate_source_entry(source_op, &key, &mut collected_values_buffer) + .await + { + Ok(exports) => (exports, None), + Err(e) => (None, Some(format!("{e:?}"))), + }; + let key_value = value::Value::from(key); + let file_data = SourceOutputData { + key: value::TypedValue { + t: &self.schema.fields[source_op.output.field_idx as usize] + .value_type + .typ, + v: &key_value, + }, + exports, + error, + }; + let yaml_output = { + let mut yaml_output = String::new(); + let yaml_data = YamlSerializer::serialize(&file_data)?; + let mut yaml_emitter = YamlEmitter::new(&mut yaml_output); + yaml_emitter.dump(&yaml_data)?; + yaml_output + }; + let mut file_path = file_name; + file_path.push(".yaml"); + tokio::fs::write(file_path, yaml_output).await?; + Ok(()) + } + + async fn evaluate_and_dump_for_source_op(&self, source_op: &AnalyzedSourceOp) -> Result<()> { + let all_keys = source_op.executor.list_keys().await?; + + let mut keys_by_filename_prefix: IndexMap> = IndexMap::new(); + for key in all_keys { + let mut s = key + .to_strs() + .into_iter() + .map(|s| urlencoding::encode(&s).into_owned()) + .join(":"); + s.truncate( + (0..FILENAME_PREFIX_MAX_LENGTH) + .rev() + .find(|i| s.is_char_boundary(*i)) + .unwrap_or(0), + ); + keys_by_filename_prefix.entry(s).or_default().push(key); + } + + let evaluate_futs = + keys_by_filename_prefix + .into_iter() + .flat_map(|(filename_prefix, keys)| { + let num_keys = keys.len(); + keys.into_iter().enumerate().map(move |(i, key)| { + let mut file_path = + Path::new(&self.options.output_dir).join(Path::new(&filename_prefix)); + if num_keys > 1 { + file_path.push(format!(".{}", i)); + } + self.evaluate_and_dump_source_entry(source_op, key, file_path) + }) + }); + try_join_all(evaluate_futs).await?; + Ok(()) + } + + async fn evaluate_and_dump(&self) -> Result<()> { + try_join_all( + self.plan + .source_ops + .iter() + .map(|source_op| self.evaluate_and_dump_for_source_op(source_op)), + ) + .await?; + Ok(()) + } +} + +pub async fn dump_evaluation_output( + plan: &ExecutionPlan, + schema: &schema::DataSchema, + options: DumpEvaluationOutputOptions, + pool: &PgPool, +) -> Result<()> { + let output_dir = Path::new(&options.output_dir); + if output_dir.exists() { + if !output_dir.is_dir() { + return Err(anyhow::anyhow!("The path exists and is not a directory")); + } + } else { + tokio::fs::create_dir(output_dir).await?; + } + + let dumper = Dumper { + plan, + schema, + pool, + options, + }; + dumper.evaluate_and_dump().await +} diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index ebba37d9..525a42ed 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -73,7 +73,7 @@ impl std::fmt::Display for IndexUpdateInfo { } } -fn make_primary_key( +pub fn extract_primary_key( primary_key_def: &AnalyzedPrimaryKeyDef, record: &FieldValues, ) -> Result { @@ -215,7 +215,7 @@ async fn precommit_source_tracking_info( .or_default(); let mut keys_info = Vec::new(); for value in collected_values.iter() { - let primary_key = make_primary_key(&export_op.primary_key_def, value)?; + let primary_key = extract_primary_key(&export_op.primary_key_def, value)?; let primary_key_json = serde_json::to_value(&primary_key)?; let mut field_values = FieldValues { diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 277d718a..463cc43e 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -1,3 +1,4 @@ +pub mod dumper; pub mod evaluator; pub mod indexer; pub mod query; From 98a00b4c3c616c46590f8cf7cc59fe33d3d21f61 Mon Sep 17 00:00:00 2001 From: LJ Date: Sat, 22 Mar 2025 16:38:06 -0700 Subject: [PATCH 5/8] Expose `evaluate_and_dump()` method to Python. --- src/execution/dumper.rs | 4 ++-- src/py/mod.rs | 25 +++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index ae09953a..ef02fa51 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -11,7 +11,7 @@ use yaml_rust2::YamlEmitter; use super::indexer; use crate::base::{schema, value}; -use crate::builder::plan::{AnalyzedExportOp, AnalyzedSourceOp, ExecutionPlan}; +use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan}; use crate::utils::yaml_ser::YamlSerializer; #[derive(Debug, Clone, Deserialize)] @@ -208,7 +208,7 @@ impl<'a> Dumper<'a> { } } -pub async fn dump_evaluation_output( +pub async fn evaluate_and_dump( plan: &ExecutionPlan, schema: &schema::DataSchema, options: DumpEvaluationOutputOptions, diff --git a/src/py/mod.rs b/src/py/mod.rs index 61a235d5..a382022b 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -118,6 +118,31 @@ impl Flow { Ok(IndexUpdateInfo(update_info)) }) } + + pub fn evaluate_and_dump( + &self, + py: Python<'_>, + options: Pythonized, + ) -> PyResult<()> { + py.allow_threads(|| { + let lib_context = get_lib_context() + .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; + lib_context + .runtime + .block_on(async { + let exec_plan = self.0.get_execution_plan().await?; + execution::dumper::evaluate_and_dump( + &exec_plan, + &self.0.data_schema, + options.into_inner(), + &lib_context.pool, + ) + .await + }) + .into_py_result()?; + Ok(()) + }) + } } #[pyclass] From 21c3a03f7165f4afa8300af8493f4ee70a053c1d Mon Sep 17 00:00:00 2001 From: LJ Date: Sat, 22 Mar 2025 16:46:17 -0700 Subject: [PATCH 6/8] Put source name into filename. --- src/execution/dumper.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index ef02fa51..533f7a7b 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -170,7 +170,7 @@ impl<'a> Dumper<'a> { .map(|s| urlencoding::encode(&s).into_owned()) .join(":"); s.truncate( - (0..FILENAME_PREFIX_MAX_LENGTH) + (0..(FILENAME_PREFIX_MAX_LENGTH - source_op.name.as_str().len())) .rev() .find(|i| s.is_char_boundary(*i)) .unwrap_or(0), @@ -178,14 +178,18 @@ impl<'a> Dumper<'a> { keys_by_filename_prefix.entry(s).or_default().push(key); } + let mut file_path_base = + PathBuf::from(&self.options.output_dir).join(source_op.name.as_str()); + file_path_base.push(":"); let evaluate_futs = keys_by_filename_prefix .into_iter() .flat_map(|(filename_prefix, keys)| { let num_keys = keys.len(); + let file_path_base = &file_path_base; keys.into_iter().enumerate().map(move |(i, key)| { - let mut file_path = - Path::new(&self.options.output_dir).join(Path::new(&filename_prefix)); + let mut file_path = file_path_base.clone(); + file_path.push(&filename_prefix); if num_keys > 1 { file_path.push(format!(".{}", i)); } From 93e640b580135a8629fed489e131cd670c6aba1c Mon Sep 17 00:00:00 2001 From: LJ Date: Sat, 22 Mar 2025 17:17:19 -0700 Subject: [PATCH 7/8] Expose eval functionality by the CLI. --- python/cocoindex/cli.py | 19 +++++++++++++++++++ python/cocoindex/flow.py | 38 +++++++++++++++++++++++++++++--------- src/execution/dumper.rs | 6 +++--- src/py/mod.rs | 6 +++++- 4 files changed, 56 insertions(+), 13 deletions(-) diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index a50cd6c0..c70e8948 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -1,4 +1,5 @@ import click +import datetime from . import flow, lib from .setup import check_setup_status, CheckSetupStatusOptions, apply_setup_changes @@ -52,6 +53,24 @@ def update(flow_name: str | None): stats = _flow_by_name(flow_name).update() print(stats) +@cli.command() +@click.argument("flow_name", type=str, required=False) +@click.option( + "-o", "--output-dir", type=str, required=False, + help="The directory to dump the evaluation output to.") +@click.option( + "-c", "--use-cache", is_flag=True, show_default=True, default=True, + help="Use cached evaluation results if available.") +def evaluate(flow_name: str | None, output_dir: str | None, use_cache: bool = True): + """ + Evaluate and dump the flow. + """ + fl = _flow_by_name(flow_name) + if output_dir is None: + output_dir = f"eval_{fl.name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}" + options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=use_cache) + fl.evaluate_and_dump(options) + _default_server_settings = lib.ServerSettings.from_env() @cli.command() diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 163605eb..5af0a89d 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -9,6 +9,7 @@ from typing import Any, Callable, Sequence, TypeVar, get_origin from threading import Lock from enum import Enum +from dataclasses import dataclass from . import _engine from . import vector @@ -61,18 +62,18 @@ def _create_data_slice( def _spec_kind(spec: Any) -> str: return spec.__class__.__name__ -def _spec_value_dump(v: Any) -> Any: - """Recursively dump a spec object and its nested attributes to a dictionary.""" +def _dump_engine_object(v: Any) -> Any: + """Recursively dump an object for engine. Engine side uses `Pythonzized` to catch.""" if isinstance(v, type) or get_origin(v) is not None: return encode_enriched_type(v) elif isinstance(v, Enum): return v.value elif hasattr(v, '__dict__'): - return {k: _spec_value_dump(v) for k, v in v.__dict__.items()} + return {k: _dump_engine_object(v) for k, v in v.__dict__.items()} elif isinstance(v, (list, tuple)): - return [_spec_value_dump(item) for item in v] + return [_dump_engine_object(item) for item in v] elif isinstance(v, dict): - return {k: _spec_value_dump(v) for k, v in v.items()} + return {k: _dump_engine_object(v) for k, v in v.items()} return v T = TypeVar('T') @@ -177,7 +178,7 @@ def transform(self, fn_spec: op.FunctionSpec, *args, **kwargs) -> DataSlice: lambda target_scope, name: flow_builder_state.engine_flow_builder.transform( _spec_kind(fn_spec), - _spec_value_dump(fn_spec), + _dump_engine_object(fn_spec), transform_args, target_scope, flow_builder_state.field_name_builder.build_name( @@ -267,7 +268,7 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *, {"field_name": field_name, "metric": metric.value} for field_name, metric in vector_index] self._flow_builder_state.engine_flow_builder.export( - name, _spec_kind(target_spec), _spec_value_dump(target_spec), + name, _spec_kind(target_spec), _dump_engine_object(target_spec), index_options, self._engine_data_collector) @@ -316,13 +317,20 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli self._state, lambda target_scope, name: self._state.engine_flow_builder.add_source( _spec_kind(spec), - _spec_value_dump(spec), + _dump_engine_object(spec), target_scope, self._state.field_name_builder.build_name( name, prefix=_to_snake_case(_spec_kind(spec))+'_'), ), name ) +@dataclass +class EvaluateAndDumpOptions: + """ + Options for evaluating and dumping a flow. + """ + output_dir: str + use_cache: bool = True class Flow: """ @@ -348,6 +356,13 @@ def __str__(self): def __repr__(self): return repr(self._lazy_engine_flow()) + @property + def name(self) -> str: + """ + Get the name of the flow. + """ + return self._lazy_engine_flow().name() + def update(self): """ Update the index defined by the flow. @@ -355,13 +370,18 @@ def update(self): """ return self._lazy_engine_flow().update() + def evaluate_and_dump(self, options: EvaluateAndDumpOptions): + """ + Evaluate and dump the flow. + """ + return self._lazy_engine_flow().evaluate_and_dump(_dump_engine_object(options)) + def internal_flow(self) -> _engine.Flow: """ Get the engine flow. """ return self._lazy_engine_flow() - def _create_lazy_flow(name: str | None, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow: """ Create a flow without really building it yet. diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index 533f7a7b..3cb3f8ec 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -15,7 +15,7 @@ use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan}; use crate::utils::yaml_ser::YamlSerializer; #[derive(Debug, Clone, Deserialize)] -pub struct DumpEvaluationOutputOptions { +pub struct EvaluateAndDumpOptions { pub output_dir: String, pub use_cache: bool, } @@ -54,7 +54,7 @@ struct Dumper<'a> { plan: &'a ExecutionPlan, schema: &'a schema::DataSchema, pool: &'a PgPool, - options: DumpEvaluationOutputOptions, + options: EvaluateAndDumpOptions, } impl<'a> Dumper<'a> { @@ -215,7 +215,7 @@ impl<'a> Dumper<'a> { pub async fn evaluate_and_dump( plan: &ExecutionPlan, schema: &schema::DataSchema, - options: DumpEvaluationOutputOptions, + options: EvaluateAndDumpOptions, pool: &PgPool, ) -> Result<()> { let output_dir = Path::new(&options.output_dir); diff --git a/src/py/mod.rs b/src/py/mod.rs index a382022b..ccb61e9f 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -103,6 +103,10 @@ impl Flow { self.__str__() } + pub fn name(&self) -> &str { + &self.0.flow_instance.name + } + pub fn update(&self, py: Python<'_>) -> PyResult { py.allow_threads(|| { let lib_context = get_lib_context() @@ -122,7 +126,7 @@ impl Flow { pub fn evaluate_and_dump( &self, py: Python<'_>, - options: Pythonized, + options: Pythonized, ) -> PyResult<()> { py.allow_threads(|| { let lib_context = get_lib_context() From d8745b70d1ad1f92664c968ae79d4d690504a4d6 Mon Sep 17 00:00:00 2001 From: LJ Date: Sat, 22 Mar 2025 18:14:22 -0700 Subject: [PATCH 8/8] Bug fix for the dumper to make it work as intended. --- src/execution/dumper.rs | 49 ++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index 3cb3f8ec..10298b50 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -5,6 +5,7 @@ use itertools::Itertools; use serde::ser::SerializeSeq; use serde::{Deserialize, Serialize}; use sqlx::PgPool; +use std::borrow::Cow; use std::collections::BTreeMap; use std::path::{Path, PathBuf}; use yaml_rust2::YamlEmitter; @@ -24,10 +25,11 @@ const FILENAME_PREFIX_MAX_LENGTH: usize = 128; struct TargetExportData<'a> { schema: &'a Vec, + // The purpose is to make rows sorted by primary key. data: BTreeMap, } -impl<'a> Serialize for TargetExportData<'a> { +impl Serialize for TargetExportData<'_> { fn serialize(&self, serializer: S) -> Result where S: serde::Serializer, @@ -46,7 +48,11 @@ impl<'a> Serialize for TargetExportData<'a> { #[derive(Serialize)] struct SourceOutputData<'a> { key: value::TypedValue<'a>, + + #[serde(skip_serializing_if = "Option::is_none")] exports: Option>>, + + #[serde(skip_serializing_if = "Option::is_none")] error: Option, } @@ -103,14 +109,12 @@ impl<'a> Dumper<'a> { export_op.name.as_str(), TargetExportData { schema: &self.schema.collectors[collector_idx].spec.fields, - data: collected_values_buffer + data: collected_values_buffer[collector_idx] .iter() .map(|v| -> Result<_> { - let key = indexer::extract_primary_key( - &export_op.primary_key_def, - &v[collector_idx], - )?; - Ok((key, &v[collector_idx])) + let key = + indexer::extract_primary_key(&export_op.primary_key_def, v)?; + Ok((key, v)) }) .collect::>()?, }, @@ -125,7 +129,7 @@ impl<'a> Dumper<'a> { &self, source_op: &AnalyzedSourceOp, key: value::KeyValue, - file_name: PathBuf, + file_path: PathBuf, ) -> Result<()> { let mut collected_values_buffer = Vec::new(); let (exports, error) = match self @@ -138,24 +142,24 @@ impl<'a> Dumper<'a> { let key_value = value::Value::from(key); let file_data = SourceOutputData { key: value::TypedValue { - t: &self.schema.fields[source_op.output.field_idx as usize] - .value_type - .typ, + t: &source_op.primary_key_type, v: &key_value, }, exports, error, }; + let yaml_output = { let mut yaml_output = String::new(); let yaml_data = YamlSerializer::serialize(&file_data)?; let mut yaml_emitter = YamlEmitter::new(&mut yaml_output); + yaml_emitter.multiline_strings(true); + yaml_emitter.compact(true); yaml_emitter.dump(&yaml_data)?; yaml_output }; - let mut file_path = file_name; - file_path.push(".yaml"); tokio::fs::write(file_path, yaml_output).await?; + Ok(()) } @@ -177,22 +181,21 @@ impl<'a> Dumper<'a> { ); keys_by_filename_prefix.entry(s).or_default().push(key); } - - let mut file_path_base = - PathBuf::from(&self.options.output_dir).join(source_op.name.as_str()); - file_path_base.push(":"); + let output_dir = Path::new(&self.options.output_dir); let evaluate_futs = keys_by_filename_prefix .into_iter() .flat_map(|(filename_prefix, keys)| { let num_keys = keys.len(); - let file_path_base = &file_path_base; keys.into_iter().enumerate().map(move |(i, key)| { - let mut file_path = file_path_base.clone(); - file_path.push(&filename_prefix); - if num_keys > 1 { - file_path.push(format!(".{}", i)); - } + let extra_id = if num_keys > 1 { + Cow::Owned(format!(".{}", i)) + } else { + Cow::Borrowed("") + }; + let file_name = + format!("{}@{}{}.yaml", source_op.name, filename_prefix, extra_id); + let file_path = output_dir.join(Path::new(&file_name)); self.evaluate_and_dump_source_entry(source_op, key, file_path) }) });