From fd54e3f8200bb11b9aebd1126c1581a6a192762e Mon Sep 17 00:00:00 2001 From: LJ Date: Wed, 5 Mar 2025 22:57:38 -0800 Subject: [PATCH] Leverage `pythonize` to bypass JSON serialization in Python<->Rust --- Cargo.toml | 1 + python/cocoindex/flow.py | 13 +++++---- python/cocoindex/functions.py | 5 ++-- python/cocoindex/lib.py | 4 +-- python/cocoindex/op.py | 7 +++-- python/cocoindex/query.py | 7 ++--- python/cocoindex/setup.py | 3 +-- python/cocoindex/typing.py | 7 +++-- src/builder/flow_builder.rs | 10 +++---- src/ops/py_factory.rs | 18 ++++++------- src/py.rs | 51 ++++++++++++++++++----------------- 11 files changed, 60 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 35e98254..b2230c1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,3 +37,4 @@ itertools = "0.14.0" derivative = "2.2.0" async-lock = "3.4.0" hex = "0.4.3" +pythonize = "0.23.0" diff --git a/python/cocoindex/flow.py b/python/cocoindex/flow.py index 67e5d8f5..99c8ed08 100644 --- a/python/cocoindex/flow.py +++ b/python/cocoindex/flow.py @@ -5,7 +5,6 @@ from __future__ import annotations import re -import json import inspect from typing import Any, Callable, Sequence, TypeVar from threading import Lock @@ -61,8 +60,8 @@ def _create_data_slice( def _spec_kind(spec: Any) -> str: return spec.__class__.__name__ -def _spec_json_dump(spec: Any) -> str: - return json.dumps(spec.__dict__) +def _spec_dump(spec: Any) -> dict[str, Any]: + return spec.__dict__ T = TypeVar('T') @@ -162,7 +161,7 @@ def transform(self, fn_spec: op.FunctionSpec, /, name: str | None = None) -> Dat lambda target_scope, name: flow_builder_state.engine_flow_builder.transform( _spec_kind(fn_spec), - _spec_json_dump(fn_spec), + _spec_dump(fn_spec), args, target_scope, flow_builder_state.field_name_builder.build_name( @@ -253,8 +252,8 @@ def export(self, name: str, target_spec: op.StorageSpec, /, *, {"field_name": field_name, "metric": metric.value} for field_name, metric in vector_index] self._flow_builder_state.engine_flow_builder.export( - name, _spec_kind(target_spec), _spec_json_dump(target_spec), - json.dumps(index_options), self._engine_data_collector) + name, _spec_kind(target_spec), _spec_dump(target_spec), + index_options, self._engine_data_collector) _flow_name_builder = _NameBuilder() @@ -294,7 +293,7 @@ def add_source(self, spec: op.SourceSpec, /, name: str | None = None) -> DataSli self._state, lambda target_scope, name: self._state.engine_flow_builder.add_source( _spec_kind(spec), - _spec_json_dump(spec), + _spec_dump(spec), target_scope, self._state.field_name_builder.build_name( name, prefix=_to_snake_case(_spec_kind(spec))+'_'), diff --git a/python/cocoindex/functions.py b/python/cocoindex/functions.py index 33898fc7..d88052c4 100644 --- a/python/cocoindex/functions.py +++ b/python/cocoindex/functions.py @@ -1,7 +1,6 @@ """All builtin functions.""" from typing import Annotated, Any -import json import sentence_transformers from .typing import Float32, Vector, TypeAttr from . import op @@ -31,11 +30,11 @@ class SentenceTransformerEmbedExecutor: spec: SentenceTransformerEmbed _model: sentence_transformers.SentenceTransformer - def analyze(self, text = None): + def analyze(self, text): args = self.spec.args or {} self._model = sentence_transformers.SentenceTransformer(self.spec.model, **args) dim = self._model.get_sentence_embedding_dimension() - return Annotated[list[Float32], Vector(dim=dim), TypeAttr("cocoindex.io/vector_origin_text", json.loads(text.analyzed_value))] + return Annotated[list[Float32], Vector(dim=dim), TypeAttr("cocoindex.io/vector_origin_text", text.analyzed_value)] def __call__(self, text: str) -> list[Float32]: return self._model.encode(text).tolist() diff --git a/python/cocoindex/lib.py b/python/cocoindex/lib.py index 57d950cd..d385e677 100644 --- a/python/cocoindex/lib.py +++ b/python/cocoindex/lib.py @@ -35,7 +35,7 @@ def from_env(cls) -> Self: def init(settings: Settings): """Initialize the cocoindex library.""" - _engine.init(json.dumps(settings.__dict__)) + _engine.init(settings.__dict__) @dataclass class ServerSettings: @@ -62,7 +62,7 @@ def start_server(settings: ServerSettings): """Start the cocoindex server.""" flow.ensure_all_flows_built() query.ensure_all_handlers_built() - _engine.start_server(json.dumps(settings.__dict__)) + _engine.start_server(settings.__dict__) def stop(): """Stop the cocoindex library.""" diff --git a/python/cocoindex/op.py b/python/cocoindex/op.py index 9fe11b5d..a51e4b3a 100644 --- a/python/cocoindex/op.py +++ b/python/cocoindex/op.py @@ -2,9 +2,8 @@ Facilities for defining cocoindex operations. """ import inspect -import json -from typing import get_type_hints, Protocol, Callable, dataclass_transform +from typing import get_type_hints, Protocol, Any, Callable, dataclass_transform from dataclasses import dataclass from enum import Enum from threading import Lock @@ -54,8 +53,8 @@ def __init__(self, spec_cls: type, executor_cls: type): self._spec_cls = spec_cls self._executor_cls = executor_cls - def __call__(self, spec_json: str, *args, **kwargs): - spec = self._spec_cls(**json.loads(spec_json)) + def __call__(self, spec: dict[str, Any], *args, **kwargs): + spec = self._spec_cls(**spec) executor = self._executor_cls(spec) result_type = executor.analyze(*args, **kwargs) return (dump_type(result_type), executor) diff --git a/python/cocoindex/query.py b/python/cocoindex/query.py index 11bb0d51..59e8d822 100644 --- a/python/cocoindex/query.py +++ b/python/cocoindex/query.py @@ -1,7 +1,6 @@ from typing import Callable, Any from dataclasses import dataclass from threading import Lock -import json from . import flow as fl from . import vector @@ -52,7 +51,7 @@ def _lazy_handler() -> _engine.SimpleSemanticsQueryHandler: engine_handler = _engine.SimpleSemanticsQueryHandler( flow.internal_flow(), target_name, fl.TransientFlow(query_transform_flow, [str]).internal_flow(), - json.dumps(default_similarity_metric.value)) + default_similarity_metric.value) engine_handler.register_query_handler(name) return engine_handler self._lazy_query_handler = _lazy_handler @@ -71,11 +70,9 @@ def search(self, query: str, limit: int, vector_field_name: str | None = None, """ Search the index with the given query, limit, vector field name, and similarity metric. """ - internal_results_json, internal_info_json = self.internal_handler().search( + internal_results, internal_info = self.internal_handler().search( query, limit, vector_field_name, similarity_matric.value if similarity_matric is not None else None) - internal_results = json.loads(internal_results_json) - internal_info = json.loads(internal_info_json) fields = [field['name'] for field in internal_results['fields']] results = [QueryResult(data=dict(zip(fields, result['data'])), score=result['score']) for result in internal_results['results']] info = SimpleSemanticsQueryInfo( diff --git a/python/cocoindex/setup.py b/python/cocoindex/setup.py index 437990ff..27ee9475 100644 --- a/python/cocoindex/setup.py +++ b/python/cocoindex/setup.py @@ -1,4 +1,3 @@ -import json from typing import NamedTuple from . import flow @@ -9,7 +8,7 @@ class CheckSetupStatusOptions(NamedTuple): def check_setup_status(options: CheckSetupStatusOptions) -> _engine.SetupStatusCheck: flow.ensure_all_flows_built() - return _engine.check_setup_status(json.dumps(options)) + return _engine.check_setup_status(options) def apply_setup_changes(status_check: _engine.SetupStatusCheck): _engine.apply_setup_changes(status_check) diff --git a/python/cocoindex/typing.py b/python/cocoindex/typing.py index 39b22e68..dfa231f2 100644 --- a/python/cocoindex/typing.py +++ b/python/cocoindex/typing.py @@ -1,4 +1,3 @@ -import json import typing import collections from typing import Annotated, NamedTuple, Any @@ -67,7 +66,7 @@ def _basic_type_to_json_value(t, metadata): return type_json -def _enriched_type_to_json_value(t): +def _enriched_type_to_json_value(t) -> dict[str, Any] | None: if t is None: return None t, metadata = _get_origin_type_and_metadata(t) @@ -83,8 +82,8 @@ def _enriched_type_to_json_value(t): return enriched_type_json -def dump_type(t) -> str: +def dump_type(t) -> dict[str, Any] | None: """ Convert a Python type to a CocoIndex's type in JSON. """ - return json.dumps(_enriched_type_to_json_value(t)) + return _enriched_type_to_json_value(t) diff --git a/src/builder/flow_builder.rs b/src/builder/flow_builder.rs index 44aa4a38..48340a84 100644 --- a/src/builder/flow_builder.rs +++ b/src/builder/flow_builder.rs @@ -386,7 +386,7 @@ impl FlowBuilder { pub fn add_source( &mut self, kind: String, - op_spec: py::Json>, + op_spec: py::Pythonized>, target_scope: Option, name: String, ) -> PyResult { @@ -423,7 +423,7 @@ impl FlowBuilder { pub fn add_direct_input( &mut self, name: String, - value_type: py::Json, + value_type: py::Pythonized, ) -> PyResult { let mut root_data_scope = self.root_data_scope.lock().unwrap(); root_data_scope @@ -453,7 +453,7 @@ impl FlowBuilder { pub fn transform( &mut self, kind: String, - op_spec: py::Json>, + op_spec: py::Pythonized>, args: Vec<(DataSlice, Option)>, target_scope: Option, name: String, @@ -555,8 +555,8 @@ impl FlowBuilder { &mut self, name: String, kind: String, - op_spec: py::Json>, - index_options: py::Json, + op_spec: py::Pythonized>, + index_options: py::Pythonized, input: &DataCollector, ) -> PyResult<()> { let spec = spec::OpSpec { diff --git a/src/ops/py_factory.rs b/src/ops/py_factory.rs index 6694da7f..4c21636c 100644 --- a/src/ops/py_factory.rs +++ b/src/ops/py_factory.rs @@ -113,26 +113,26 @@ fn value_from_py_object<'py>( #[pyclass(name = "OpArgSchema")] pub struct PyOpArgSchema { - value_type: crate::py::Json, - analyzed_value: crate::py::Json, + value_type: crate::py::Pythonized, + analyzed_value: crate::py::Pythonized, } #[pymethods] impl PyOpArgSchema { #[getter] - fn value_type(&self) -> &crate::py::Json { + fn value_type(&self) -> &crate::py::Pythonized { &self.value_type } #[getter] - fn analyzed_value(&self) -> &crate::py::Json { + fn analyzed_value(&self) -> &crate::py::Pythonized { &self.analyzed_value } fn validate_arg( &self, name: &str, - typ: crate::py::Json, + typ: crate::py::Pythonized, ) -> PyResult<()> { if self.value_type.0.typ != typ.0.typ { return Err(PyException::new_err(format!( @@ -222,13 +222,13 @@ impl SimpleFunctionFactory for PyFunctionFactory { )> { let (result_type, executor, kw_args_names, num_positional_args) = Python::with_gil(|py| -> anyhow::Result<_> { - let mut args = vec![crate::py::Json(spec).into_py_any(py)?]; + let mut args = vec![crate::py::Pythonized(spec).into_py_any(py)?]; let mut kwargs = vec![]; let mut num_positional_args = 0; for arg in input_schema.into_iter() { let py_arg_schema = PyOpArgSchema { - value_type: crate::py::Json(arg.value_type.clone()), - analyzed_value: crate::py::Json(arg.analyzed_value.clone()), + value_type: crate::py::Pythonized(arg.value_type.clone()), + analyzed_value: crate::py::Pythonized(arg.analyzed_value.clone()), }; match arg.name.0 { Some(name) => { @@ -251,7 +251,7 @@ impl SimpleFunctionFactory for PyFunctionFactory { Some(&kwargs.into_py_dict(py)?), )?; let (result_type, executor) = result - .extract::<(crate::py::Json, Py)>(py)?; + .extract::<(crate::py::Pythonized, Py)>(py)?; Ok(( result_type.into_inner(), executor, diff --git a/src/py.rs b/src/py.rs index f8c22c8e..4bad52db 100644 --- a/src/py.rs +++ b/src/py.rs @@ -11,8 +11,8 @@ use crate::LIB_CONTEXT; use crate::{api_error, setup}; use crate::{builder, execution}; use anyhow::anyhow; -use pyo3::types::PyString; use pyo3::{exceptions::PyException, prelude::*}; +use pythonize::{depythonize, pythonize}; use serde::de::DeserializeOwned; use serde::Serialize; use std::collections::btree_map; @@ -32,31 +32,27 @@ impl IntoPyResult for Result { } } -pub struct Json(pub T); +pub struct Pythonized(pub T); -impl<'py, T: DeserializeOwned> FromPyObject<'py> for Json { - fn extract_bound(ob: &Bound<'py, PyAny>) -> PyResult { - let s = ob.extract::<&str>().into_py_result()?; - let t: T = serde_json::from_str(s).into_py_result()?; - Ok(Json(t)) +impl<'py, T: DeserializeOwned> FromPyObject<'py> for Pythonized { + fn extract_bound(obj: &Bound<'py, PyAny>) -> PyResult { + Ok(Pythonized(depythonize(obj).into_py_result()?)) } } -impl<'py, T: Serialize> IntoPyObject<'py> for &Json { - type Target = PyString; - type Output = Bound<'py, PyString>; +impl<'py, T: Serialize> IntoPyObject<'py> for &Pythonized { + type Target = PyAny; + type Output = Bound<'py, PyAny>; type Error = PyErr; fn into_pyobject(self, py: Python<'py>) -> PyResult { - let s = serde_json::to_string(&self.0).into_py_result()?; - // TODO: Avoid another copy by using a writer into Python-native String buffer. - Ok(PyString::new(py, &s).into()) + pythonize(py, &self.0).into_py_result() } } -impl<'py, T: Serialize> IntoPyObject<'py> for Json { - type Target = PyString; - type Output = Bound<'py, PyString>; +impl<'py, T: Serialize> IntoPyObject<'py> for Pythonized { + type Target = PyAny; + type Output = Bound<'py, PyAny>; type Error = PyErr; fn into_pyobject(self, py: Python<'py>) -> PyResult { @@ -64,13 +60,13 @@ impl<'py, T: Serialize> IntoPyObject<'py> for Json { } } -impl Json { +impl Pythonized { pub fn into_inner(self) -> T { self.0 } } -impl Deref for Json { +impl Deref for Pythonized { type Target = T; fn deref(&self) -> &Self::Target { &self.0 @@ -78,7 +74,7 @@ impl Deref for Json { } #[pyfunction] -fn init(py: Python<'_>, settings: Json) -> PyResult<()> { +fn init(py: Python<'_>, settings: Pythonized) -> PyResult<()> { py.allow_threads(|| -> anyhow::Result<()> { let mut lib_context_locked = LIB_CONTEXT.write().unwrap(); if lib_context_locked.is_some() { @@ -91,7 +87,7 @@ fn init(py: Python<'_>, settings: Json) -> PyResult<()> { } #[pyfunction] -fn start_server(py: Python<'_>, settings: Json) -> PyResult<()> { +fn start_server(py: Python<'_>, settings: Pythonized) -> PyResult<()> { py.allow_threads(|| -> anyhow::Result<()> { let lib_context = get_lib_context().ok_or_else(|| api_error!("Cocoindex is not initialized"))?; @@ -196,7 +192,7 @@ impl SimpleSemanticsQueryHandler { flow: &Flow, target_name: &str, query_transform_flow: &TransientFlow, - default_similarity_metric: Json, + default_similarity_metric: Pythonized, ) -> PyResult { py.allow_threads(|| { let lib_context = get_lib_context() @@ -243,8 +239,11 @@ impl SimpleSemanticsQueryHandler { query: String, limit: u32, vector_field_name: Option, - similarity_matric: Option>, - ) -> PyResult<(Json, Json)> { + similarity_matric: Option>, + ) -> PyResult<( + Pythonized, + Pythonized, + )> { py.allow_threads(|| { let lib_context = get_lib_context() .ok_or_else(|| anyhow!("cocoindex library not initialized")) @@ -262,7 +261,7 @@ impl SimpleSemanticsQueryHandler { .await }) .into_py_result()?; - Ok((Json(results), Json(info))) + Ok((Pythonized(results), Pythonized(info))) }) } } @@ -286,7 +285,9 @@ impl SetupStatusCheck { } #[pyfunction] -fn check_setup_status(options: Json) -> PyResult { +fn check_setup_status( + options: Pythonized, +) -> PyResult { let lib_context = get_lib_context() .ok_or_else(|| PyException::new_err("cocoindex library not initialized"))?; let all_css = lib_context.combined_setup_states.read().unwrap();