Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,4 @@ yup-oauth2 = "12.1.0"
rustls = { version = "0.23.25" }
http-body-util = "0.1.3"
yaml-rust2 = "0.10.0"
urlencoding = "2.1.3"
19 changes: 19 additions & 0 deletions python/cocoindex/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import click
import datetime

from . import flow, lib
from .setup import check_setup_status, CheckSetupStatusOptions, apply_setup_changes
Expand Down Expand Up @@ -52,6 +53,24 @@ def update(flow_name: str | None):
stats = _flow_by_name(flow_name).update()
print(stats)

@cli.command()
@click.argument("flow_name", type=str, required=False)
@click.option(
"-o", "--output-dir", type=str, required=False,
help="The directory to dump the evaluation output to.")
@click.option(
"-c", "--use-cache", is_flag=True, show_default=True, default=True,
help="Use cached evaluation results if available.")
def evaluate(flow_name: str | None, output_dir: str | None, use_cache: bool = True):
"""
Evaluate and dump the flow.
"""
fl = _flow_by_name(flow_name)
if output_dir is None:
output_dir = f"eval_{fl.name}_{datetime.datetime.now().strftime('%y%m%d_%H%M%S')}"
options = flow.EvaluateAndDumpOptions(output_dir=output_dir, use_cache=use_cache)
fl.evaluate_and_dump(options)

_default_server_settings = lib.ServerSettings.from_env()

@cli.command()
Expand Down
38 changes: 29 additions & 9 deletions python/cocoindex/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import Any, Callable, Sequence, TypeVar, get_origin
from threading import Lock
from enum import Enum
from dataclasses import dataclass

from . import _engine
from . import vector
Expand Down Expand Up @@ -61,18 +62,18 @@ def _create_data_slice(
def _spec_kind(spec: Any) -> str:
return spec.__class__.__name__

def _spec_value_dump(v: Any) -> Any:
"""Recursively dump a spec object and its nested attributes to a dictionary."""
def _dump_engine_object(v: Any) -> Any:
"""Recursively dump an object for engine. Engine side uses `Pythonzized` to catch."""
if isinstance(v, type) or get_origin(v) is not None:
return encode_enriched_type(v)
elif isinstance(v, Enum):
return v.value
elif hasattr(v, '__dict__'):
return {k: _spec_value_dump(v) for k, v in v.__dict__.items()}
return {k: _dump_engine_object(v) for k, v in v.__dict__.items()}
elif isinstance(v, (list, tuple)):
return [_spec_value_dump(item) for item in v]
return [_dump_engine_object(item) for item in v]
elif isinstance(v, dict):
return {k: _spec_value_dump(v) for k, v in v.items()}
return {k: _dump_engine_object(v) for k, v in v.items()}
return v

T = TypeVar('T')
Expand Down Expand Up @@ -177,7 +178,7 @@ def transform(self, fn_spec: op.FunctionSpec, *args, **kwargs) -> DataSlice:
lambda target_scope, name:
flow_builder_state.engine_flow_builder.transform(
_spec_kind(fn_spec),
_spec_value_dump(fn_spec),
_dump_engine_object(fn_spec),
transform_args,
target_scope,
flow_builder_state.field_name_builder.build_name(
Expand Down Expand Up @@ -267,7 +268,7 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *,
{"field_name": field_name, "metric": metric.value}
for field_name, metric in vector_index]
self._flow_builder_state.engine_flow_builder.export(
name, _spec_kind(target_spec), _spec_value_dump(target_spec),
name, _spec_kind(target_spec), _dump_engine_object(target_spec),
index_options, self._engine_data_collector)


Expand Down Expand Up @@ -316,13 +317,20 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli
self._state,
lambda target_scope, name: self._state.engine_flow_builder.add_source(
_spec_kind(spec),
_spec_value_dump(spec),
_dump_engine_object(spec),
target_scope,
self._state.field_name_builder.build_name(
name, prefix=_to_snake_case(_spec_kind(spec))+'_'),
),
name
)
@dataclass
class EvaluateAndDumpOptions:
"""
Options for evaluating and dumping a flow.
"""
output_dir: str
use_cache: bool = True

class Flow:
"""
Expand All @@ -348,20 +356,32 @@ def __str__(self):
def __repr__(self):
return repr(self._lazy_engine_flow())

@property
def name(self) -> str:
"""
Get the name of the flow.
"""
return self._lazy_engine_flow().name()

def update(self):
"""
Update the index defined by the flow.
Once the function returns, the indice is fresh up to the moment when the function is called.
"""
return self._lazy_engine_flow().update()

def evaluate_and_dump(self, options: EvaluateAndDumpOptions):
"""
Evaluate and dump the flow.
"""
return self._lazy_engine_flow().evaluate_and_dump(_dump_engine_object(options))

def internal_flow(self) -> _engine.Flow:
"""
Get the engine flow.
"""
return self._lazy_engine_flow()


def _create_lazy_flow(name: str | None, fl_def: Callable[[FlowBuilder, DataScope], None]) -> Flow:
"""
Create a flow without really building it yet.
Expand Down
46 changes: 39 additions & 7 deletions src/base/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,24 @@ impl KeyValue {
Ok(result)
}

fn parts_to_strs(&self, output: &mut Vec<String>) {
match self {
KeyValue::Bytes(v) => output.push(BASE64_STANDARD.encode(v)),
KeyValue::Str(v) => output.push(v.to_string()),
KeyValue::Bool(v) => output.push(v.to_string()),
KeyValue::Int64(v) => output.push(v.to_string()),
KeyValue::Range(v) => {
output.push(v.start.to_string());
output.push(v.end.to_string());
}
KeyValue::Struct(v) => {
for part in v {
part.parts_to_strs(output);
}
}
}
}

pub fn from_strs(value: impl IntoIterator<Item = String>, schema: &ValueType) -> Result<Self> {
let mut values_iter = value.into_iter();
let result = Self::parts_from_str(&mut values_iter, schema)?;
Expand All @@ -204,6 +222,12 @@ impl KeyValue {
Ok(result)
}

pub fn to_strs(&self) -> Vec<String> {
let mut output = Vec::with_capacity(self.num_parts());
self.parts_to_strs(&mut output);
output
}

pub fn kind_str(&self) -> &'static str {
match self {
KeyValue::Bytes(_) => "bytes",
Expand Down Expand Up @@ -256,6 +280,14 @@ impl KeyValue {
_ => anyhow::bail!("expected struct value, but got {}", self.kind_str()),
}
}

pub fn num_parts(&self) -> usize {
match self {
KeyValue::Range(_) => 2,
KeyValue::Struct(v) => v.iter().map(|v| v.num_parts()).sum(),
_ => 1,
}
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -877,15 +909,15 @@ impl Serialize for TypedValue<'_> {
(_, Value::Null) => serializer.serialize_none(),
(ValueType::Basic(_), v) => v.serialize(serializer),
(ValueType::Struct(s), Value::Struct(field_values)) => TypedFieldsValue {
schema: s,
schema: &s.fields,
values_iter: field_values.fields.iter(),
}
.serialize(serializer),
(ValueType::Collection(c), Value::Collection(rows) | Value::List(rows)) => {
let mut seq = serializer.serialize_seq(Some(rows.len()))?;
for row in rows {
seq.serialize_element(&TypedFieldsValue {
schema: &c.row,
schema: &c.row.fields,
values_iter: row.fields.iter(),
})?;
}
Expand All @@ -895,7 +927,7 @@ impl Serialize for TypedValue<'_> {
let mut seq = serializer.serialize_seq(Some(rows.len()))?;
for (k, v) in rows {
seq.serialize_element(&TypedFieldsValue {
schema: &c.row,
schema: &c.row.fields,
values_iter: std::iter::once(&Value::from(k.clone()))
.chain(v.fields.iter()),
})?;
Expand All @@ -911,15 +943,15 @@ impl Serialize for TypedValue<'_> {
}

pub struct TypedFieldsValue<'a, I: Iterator<Item = &'a Value> + Clone> {
schema: &'a StructSchema,
values_iter: I,
pub schema: &'a [FieldSchema],
pub values_iter: I,
}

impl<'a, I: Iterator<Item = &'a Value> + Clone> Serialize for TypedFieldsValue<'a, I> {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let mut map = serializer.serialize_map(Some(self.schema.fields.len()))?;
let mut map = serializer.serialize_map(Some(self.schema.len()))?;
let values_iter = self.values_iter.clone();
for (field, value) in self.schema.fields.iter().zip(values_iter) {
for (field, value) in self.schema.iter().zip(values_iter) {
map.serialize_entry(
&field.name,
&TypedValue {
Expand Down
1 change: 1 addition & 0 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub struct AnalyzedExportOp {
pub query_target: Option<Arc<dyn QueryTarget>>,
pub primary_key_def: AnalyzedPrimaryKeyDef,
pub primary_key_type: ValueType,
/// idx for value fields - excluding the primary key field.
pub value_fields: Vec<u32>,
}

Expand Down
Loading