diff --git a/Cargo.toml b/Cargo.toml index e9dac76a..2f59b9af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,3 +88,4 @@ yup-oauth2 = "12.1.0" rustls = { version = "0.23.25" } http-body-util = "0.1.3" yaml-rust2 = "0.10.0" +urlencoding = "2.1.3" diff --git a/python/cocoindex/cli.py b/python/cocoindex/cli.py index a50cd6c0..c70e8948 100644 --- a/python/cocoindex/cli.py +++ b/python/cocoindex/cli.py @@ -1,4 +1,5 @@ import click +import datetime from . import flow, lib from .setup import check_setup_status, CheckSetupStatusOptions, apply_setup_changes @@ -52,6 +53,24 @@ def update(flow_name: str | None): stats = _flow_by_name(flow_name).update() print(stats) +@cli.command() +@click.argument("flow_name", type=str, required=False) +@click.option( + "-o", "--output-dir", type=str, required=False, + help="The directory to dump the evaluation output to.") +@click.option( + "-c", "--use-cache", is_flag=True, show_default=True, default=True, + help="Use cached evaluation results if available.") +def evaluate(flow_name: str | None, output_dir: str | None, use_cache: bool = True): + """ + Evaluate and dump the flow. + """ + fl = _flow_by_name(flow_name) + if output_dir is None: + output_dir = f"eval_{fl.name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}" + options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=use_cache) + fl.evaluate_and_dump(options) + _default_server_settings = lib.ServerSettings.from_env() @cli.command() diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 163605eb..5af0a89d 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -9,6 +9,7 @@ from typing import Any, Callable, Sequence, TypeVar, get_origin from threading import Lock from enum import Enum +from dataclasses import dataclass from . import _engine from . import vector @@ -61,18 +62,18 @@ def _create_data_slice( def _spec_kind(spec: Any) -> str: return spec.__class__.__name__ -def _spec_value_dump(v: Any) -> Any: - """Recursively dump a spec object and its nested attributes to a dictionary.""" +def _dump_engine_object(v: Any) -> Any: + """Recursively dump an object for engine. Engine side uses `Pythonzized` to catch.""" if isinstance(v, type) or get_origin(v) is not None: return encode_enriched_type(v) elif isinstance(v, Enum): return v.value elif hasattr(v, '__dict__'): - return {k: _spec_value_dump(v) for k, v in v.__dict__.items()} + return {k: _dump_engine_object(v) for k, v in v.__dict__.items()} elif isinstance(v, (list, tuple)): - return [_spec_value_dump(item) for item in v] + return [_dump_engine_object(item) for item in v] elif isinstance(v, dict): - return {k: _spec_value_dump(v) for k, v in v.items()} + return {k: _dump_engine_object(v) for k, v in v.items()} return v T = TypeVar('T') @@ -177,7 +178,7 @@ def transform(self, fn_spec: op.FunctionSpec, *args, **kwargs) -> DataSlice: lambda target_scope, name: flow_builder_state.engine_flow_builder.transform( _spec_kind(fn_spec), - _spec_value_dump(fn_spec), + _dump_engine_object(fn_spec), transform_args, target_scope, flow_builder_state.field_name_builder.build_name( @@ -267,7 +268,7 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *, {"field_name": field_name, "metric": metric.value} for field_name, metric in vector_index] self._flow_builder_state.engine_flow_builder.export( - name, _spec_kind(target_spec), _spec_value_dump(target_spec), + name, _spec_kind(target_spec), _dump_engine_object(target_spec), index_options, self._engine_data_collector) @@ -316,13 +317,20 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli self._state, lambda target_scope, name: self._state.engine_flow_builder.add_source( _spec_kind(spec), - _spec_value_dump(spec), + _dump_engine_object(spec), target_scope, self._state.field_name_builder.build_name( name, prefix=_to_snake_case(_spec_kind(spec))+'_'), ), name ) +@dataclass +class EvaluateAndDumpOptions: + """ + Options for evaluating and dumping a flow. + """ + output_dir: str + use_cache: bool = True class Flow: """ @@ -348,6 +356,13 @@ def __str__(self): def __repr__(self): return repr(self._lazy_engine_flow()) + @property + def name(self) -> str: + """ + Get the name of the flow. + """ + return self._lazy_engine_flow().name() + def update(self): """ Update the index defined by the flow. @@ -355,13 +370,18 @@ def update(self): """ return self._lazy_engine_flow().update() + def evaluate_and_dump(self, options: EvaluateAndDumpOptions): + """ + Evaluate and dump the flow. + """ + return self._lazy_engine_flow().evaluate_and_dump(_dump_engine_object(options)) + def internal_flow(self) -> _engine.Flow: """ Get the engine flow. """ return self._lazy_engine_flow() - def _create_lazy_flow(name: str | None, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow: """ Create a flow without really building it yet. diff --git a/src/base/value.rs b/src/base/value.rs index 9eae89aa..a465bd07 100644 --- a/src/base/value.rs +++ b/src/base/value.rs @@ -195,6 +195,24 @@ impl KeyValue { Ok(result) } + fn parts_to_strs(&self, output: &mut Vec) { + match self { + KeyValue::Bytes(v) => output.push(BASE64_STANDARD.encode(v)), + KeyValue::Str(v) => output.push(v.to_string()), + KeyValue::Bool(v) => output.push(v.to_string()), + KeyValue::Int64(v) => output.push(v.to_string()), + KeyValue::Range(v) => { + output.push(v.start.to_string()); + output.push(v.end.to_string()); + } + KeyValue::Struct(v) => { + for part in v { + part.parts_to_strs(output); + } + } + } + } + pub fn from_strs(value: impl IntoIterator, schema: &ValueType) -> Result { let mut values_iter = value.into_iter(); let result = Self::parts_from_str(&mut values_iter, schema)?; @@ -204,6 +222,12 @@ impl KeyValue { Ok(result) } + pub fn to_strs(&self) -> Vec { + let mut output = Vec::with_capacity(self.num_parts()); + self.parts_to_strs(&mut output); + output + } + pub fn kind_str(&self) -> &'static str { match self { KeyValue::Bytes(_) => "bytes", @@ -256,6 +280,14 @@ impl KeyValue { _ => anyhow::bail!("expected struct value, but got {}", self.kind_str()), } } + + pub fn num_parts(&self) -> usize { + match self { + KeyValue::Range(_) => 2, + KeyValue::Struct(v) => v.iter().map(|v| v.num_parts()).sum(), + _ => 1, + } + } } #[derive(Debug, Clone)] @@ -877,7 +909,7 @@ impl Serialize for TypedValue<'_> { (_, Value::Null) => serializer.serialize_none(), (ValueType::Basic(_), v) => v.serialize(serializer), (ValueType::Struct(s), Value::Struct(field_values)) => TypedFieldsValue { - schema: s, + schema: &s.fields, values_iter: field_values.fields.iter(), } .serialize(serializer), @@ -885,7 +917,7 @@ impl Serialize for TypedValue<'_> { let mut seq = serializer.serialize_seq(Some(rows.len()))?; for row in rows { seq.serialize_element(&TypedFieldsValue { - schema: &c.row, + schema: &c.row.fields, values_iter: row.fields.iter(), })?; } @@ -895,7 +927,7 @@ impl Serialize for TypedValue<'_> { let mut seq = serializer.serialize_seq(Some(rows.len()))?; for (k, v) in rows { seq.serialize_element(&TypedFieldsValue { - schema: &c.row, + schema: &c.row.fields, values_iter: std::iter::once(&Value::from(k.clone())) .chain(v.fields.iter()), })?; @@ -911,15 +943,15 @@ impl Serialize for TypedValue<'_> { } pub struct TypedFieldsValue<'a, I: Iterator + Clone> { - schema: &'a StructSchema, - values_iter: I, + pub schema: &'a [FieldSchema], + pub values_iter: I, } impl<'a, I: Iterator + Clone> Serialize for TypedFieldsValue<'a, I> { fn serialize(&self, serializer: S) -> Result { - let mut map = serializer.serialize_map(Some(self.schema.fields.len()))?; + let mut map = serializer.serialize_map(Some(self.schema.len()))?; let values_iter = self.values_iter.clone(); - for (field, value) in self.schema.fields.iter().zip(values_iter) { + for (field, value) in self.schema.iter().zip(values_iter) { map.serialize_entry( &field.name, &TypedValue { diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 2334bf24..de45cfa7 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -104,6 +104,7 @@ pub struct AnalyzedExportOp { pub query_target: Option>, pub primary_key_def: AnalyzedPrimaryKeyDef, pub primary_key_type: ValueType, + /// idx for value fields - excluding the primary key field. pub value_fields: Vec, } diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs new file mode 100644 index 00000000..10298b50 --- /dev/null +++ b/src/execution/dumper.rs @@ -0,0 +1,240 @@ +use anyhow::Result; +use futures::future::try_join_all; +use indexmap::IndexMap; +use itertools::Itertools; +use serde::ser::SerializeSeq; +use serde::{Deserialize, Serialize}; +use sqlx::PgPool; +use std::borrow::Cow; +use std::collections::BTreeMap; +use std::path::{Path, PathBuf}; +use yaml_rust2::YamlEmitter; + +use super::indexer; +use crate::base::{schema, value}; +use crate::builder::plan::{AnalyzedSourceOp, ExecutionPlan}; +use crate::utils::yaml_ser::YamlSerializer; + +#[derive(Debug, Clone, Deserialize)] +pub struct EvaluateAndDumpOptions { + pub output_dir: String, + pub use_cache: bool, +} + +const FILENAME_PREFIX_MAX_LENGTH: usize = 128; + +struct TargetExportData<'a> { + schema: &'a Vec, + // The purpose is to make rows sorted by primary key. + data: BTreeMap, +} + +impl Serialize for TargetExportData<'_> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.data.len()))?; + for (_, values) in self.data.iter() { + seq.serialize_element(&value::TypedFieldsValue { + schema: self.schema, + values_iter: values.fields.iter(), + })?; + } + seq.end() + } +} + +#[derive(Serialize)] +struct SourceOutputData<'a> { + key: value::TypedValue<'a>, + + #[serde(skip_serializing_if = "Option::is_none")] + exports: Option>>, + + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, +} + +struct Dumper<'a> { + plan: &'a ExecutionPlan, + schema: &'a schema::DataSchema, + pool: &'a PgPool, + options: EvaluateAndDumpOptions, +} + +impl<'a> Dumper<'a> { + async fn evaluate_source_entry<'b>( + &'a self, + source_op: &'a AnalyzedSourceOp, + key: &value::KeyValue, + collected_values_buffer: &'b mut Vec>, + ) -> Result>>> + where + 'a: 'b, + { + let cache_option = if self.options.use_cache { + indexer::EvaluationCacheOption::UseCache(self.pool) + } else { + indexer::EvaluationCacheOption::NoCache + }; + + let data_builder = indexer::evaluate_source_entry_with_cache( + self.plan, + source_op, + self.schema, + key, + cache_option, + ) + .await?; + + let data_builder = if let Some(data_builder) = data_builder { + data_builder + } else { + return Ok(None); + }; + + *collected_values_buffer = data_builder + .collected_values + .into_iter() + .map(|v| v.into_inner().unwrap()) + .collect(); + let exports = self + .plan + .export_ops + .iter() + .map(|export_op| -> Result<_> { + let collector_idx = export_op.input.collector_idx as usize; + let entry = ( + export_op.name.as_str(), + TargetExportData { + schema: &self.schema.collectors[collector_idx].spec.fields, + data: collected_values_buffer[collector_idx] + .iter() + .map(|v| -> Result<_> { + let key = + indexer::extract_primary_key(&export_op.primary_key_def, v)?; + Ok((key, v)) + }) + .collect::>()?, + }, + ); + Ok(entry) + }) + .collect::>()?; + Ok(Some(exports)) + } + + async fn evaluate_and_dump_source_entry( + &self, + source_op: &AnalyzedSourceOp, + key: value::KeyValue, + file_path: PathBuf, + ) -> Result<()> { + let mut collected_values_buffer = Vec::new(); + let (exports, error) = match self + .evaluate_source_entry(source_op, &key, &mut collected_values_buffer) + .await + { + Ok(exports) => (exports, None), + Err(e) => (None, Some(format!("{e:?}"))), + }; + let key_value = value::Value::from(key); + let file_data = SourceOutputData { + key: value::TypedValue { + t: &source_op.primary_key_type, + v: &key_value, + }, + exports, + error, + }; + + let yaml_output = { + let mut yaml_output = String::new(); + let yaml_data = YamlSerializer::serialize(&file_data)?; + let mut yaml_emitter = YamlEmitter::new(&mut yaml_output); + yaml_emitter.multiline_strings(true); + yaml_emitter.compact(true); + yaml_emitter.dump(&yaml_data)?; + yaml_output + }; + tokio::fs::write(file_path, yaml_output).await?; + + Ok(()) + } + + async fn evaluate_and_dump_for_source_op(&self, source_op: &AnalyzedSourceOp) -> Result<()> { + let all_keys = source_op.executor.list_keys().await?; + + let mut keys_by_filename_prefix: IndexMap> = IndexMap::new(); + for key in all_keys { + let mut s = key + .to_strs() + .into_iter() + .map(|s| urlencoding::encode(&s).into_owned()) + .join(":"); + s.truncate( + (0..(FILENAME_PREFIX_MAX_LENGTH - source_op.name.as_str().len())) + .rev() + .find(|i| s.is_char_boundary(*i)) + .unwrap_or(0), + ); + keys_by_filename_prefix.entry(s).or_default().push(key); + } + let output_dir = Path::new(&self.options.output_dir); + let evaluate_futs = + keys_by_filename_prefix + .into_iter() + .flat_map(|(filename_prefix, keys)| { + let num_keys = keys.len(); + keys.into_iter().enumerate().map(move |(i, key)| { + let extra_id = if num_keys > 1 { + Cow::Owned(format!(".{}", i)) + } else { + Cow::Borrowed("") + }; + let file_name = + format!("{}@{}{}.yaml", source_op.name, filename_prefix, extra_id); + let file_path = output_dir.join(Path::new(&file_name)); + self.evaluate_and_dump_source_entry(source_op, key, file_path) + }) + }); + try_join_all(evaluate_futs).await?; + Ok(()) + } + + async fn evaluate_and_dump(&self) -> Result<()> { + try_join_all( + self.plan + .source_ops + .iter() + .map(|source_op| self.evaluate_and_dump_for_source_op(source_op)), + ) + .await?; + Ok(()) + } +} + +pub async fn evaluate_and_dump( + plan: &ExecutionPlan, + schema: &schema::DataSchema, + options: EvaluateAndDumpOptions, + pool: &PgPool, +) -> Result<()> { + let output_dir = Path::new(&options.output_dir); + if output_dir.exists() { + if !output_dir.is_dir() { + return Err(anyhow::anyhow!("The path exists and is not a directory")); + } + } else { + tokio::fs::create_dir(output_dir).await?; + } + + let dumper = Dumper { + plan, + schema, + pool, + options, + }; + dumper.evaluate_and_dump().await +} diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index a18ffdf9..525a42ed 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -73,7 +73,7 @@ impl std::fmt::Display for IndexUpdateInfo { } } -fn make_primary_key( +pub fn extract_primary_key( primary_key_def: &AnalyzedPrimaryKeyDef, record: &FieldValues, ) -> Result { @@ -215,7 +215,7 @@ async fn precommit_source_tracking_info( .or_default(); let mut keys_info = Vec::new(); for value in collected_values.iter() { - let primary_key = make_primary_key(&export_op.primary_key_def, value)?; + let primary_key = extract_primary_key(&export_op.primary_key_def, value)?; let primary_key_json = serde_json::to_value(&primary_key)?; let mut field_values = FieldValues { @@ -417,30 +417,41 @@ async fn commit_source_tracking_info( Ok(WithApplyStatus::Normal(())) } +pub enum EvaluationCacheOption<'a> { + NoCache, + UseCache(&'a PgPool), +} + pub async fn evaluate_source_entry_with_cache( plan: &ExecutionPlan, source_op: &AnalyzedSourceOp, schema: &schema::DataSchema, key: &value::KeyValue, - pool: &PgPool, -) -> Result> { - let source_key_json = serde_json::to_value(key)?; - let existing_tracking_info = read_source_tracking_info( - source_op.source_id, - &source_key_json, - &plan.tracking_table_setup, - pool, - ) - .await?; - let process_timestamp = chrono::Utc::now(); - let memoization_info = existing_tracking_info - .and_then(|info| info.memoization_info.map(|info| info.0)) - .flatten(); - let evaluation_cache = - EvaluationCache::new(process_timestamp, memoization_info.map(|info| info.cache)); - let data_builder = - evaluate_source_entry(plan, source_op, schema, key, Some(&evaluation_cache)).await?; - Ok(data_builder.map(|builder| builder.into())) + cache_option: EvaluationCacheOption<'_>, +) -> Result> { + let cache = match cache_option { + EvaluationCacheOption::NoCache => None, + EvaluationCacheOption::UseCache(pool) => { + let source_key_json = serde_json::to_value(key)?; + let existing_tracking_info = read_source_tracking_info( + source_op.source_id, + &source_key_json, + &plan.tracking_table_setup, + pool, + ) + .await?; + let process_timestamp = chrono::Utc::now(); + let memoization_info = existing_tracking_info + .and_then(|info| info.memoization_info.map(|info| info.0)) + .flatten(); + Some(EvaluationCache::new( + process_timestamp, + memoization_info.map(|info| info.cache), + )) + } + }; + let data_builder = evaluate_source_entry(plan, source_op, schema, key, cache.as_ref()).await?; + Ok(data_builder) } pub async fn update_source_entry( diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 277d718a..463cc43e 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -1,3 +1,4 @@ +pub mod dumper; pub mod evaluator; pub mod indexer; pub mod query; diff --git a/src/py/mod.rs b/src/py/mod.rs index 61a235d5..ccb61e9f 100644 --- a/src/py/mod.rs +++ b/src/py/mod.rs @@ -103,6 +103,10 @@ impl Flow { self.__str__() } + pub fn name(&self) -> &str { + &self.0.flow_instance.name + } + pub fn update(&self, py: Python<'_>) -> PyResult { py.allow_threads(|| { let lib_context = get_lib_context() @@ -118,6 +122,31 @@ impl Flow { Ok(IndexUpdateInfo(update_info)) }) } + + pub fn evaluate_and_dump( + &self, + py: Python<'_>, + options: Pythonized, + ) -> PyResult<()> { + py.allow_threads(|| { + let lib_context = get_lib_context() + .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; + lib_context + .runtime + .block_on(async { + let exec_plan = self.0.get_execution_plan().await?; + execution::dumper::evaluate_and_dump( + &exec_plan, + &self.0.data_schema, + options.into_inner(), + &lib_context.pool, + ) + .await + }) + .into_py_result()?; + Ok(()) + }) + } } #[pyclass] diff --git a/src/service/flows.rs b/src/service/flows.rs index b665c5b0..fc055b78 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -10,14 +10,13 @@ use axum_extra::extract::Query; use serde::{Deserialize, Serialize}; use super::error::ApiError; +use crate::base::{schema::DataSchema, value}; +use crate::lib_context::LibContext; use crate::{ api_bail, api_error, base::{schema, spec}, execution::indexer, }; -use crate::{execution::indexer::IndexUpdateInfo, lib_context::LibContext}; - -use crate::base::{schema::DataSchema, value}; pub async fn list_flows( State(lib_context): State>, @@ -145,26 +144,26 @@ pub async fn evaluate_data( .ok_or_else(|| api_error!("field {} does not have a key", query.field))?; let key = value::KeyValue::from_strs(query.key, &key_field.value_type.typ)?; - let data = indexer::evaluate_source_entry_with_cache( + let value_builder = indexer::evaluate_source_entry_with_cache( &plan, source_op, schema, &key, - &lib_context.pool, + indexer::EvaluationCacheOption::UseCache(&lib_context.pool), ) .await? .ok_or_else(|| api_error!("value not found for source at the specified key: {key:?}"))?; Ok(Json(EvaluateDataResponse { schema: schema.clone(), - data, + data: value_builder.into(), })) } pub async fn update( Path(flow_name): Path, State(lib_context): State>, -) -> Result, ApiError> { +) -> Result, ApiError> { let fl = &lib_context.with_flow_context(&flow_name, |ctx| ctx.flow.clone())?; let execution_plan = fl.get_execution_plan().await?; let update_info = indexer::update(&execution_plan, &fl.data_schema, &lib_context.pool).await?;