diff --git a/src/ops/py_factory.rs b/src/ops/py_factory.rs index 6ed35adf..fc2794e0 100644 --- a/src/ops/py_factory.rs +++ b/src/ops/py_factory.rs @@ -6,7 +6,7 @@ use futures::FutureExt; use pyo3::{ exceptions::PyException, pyclass, pymethods, - types::{IntoPyDict, PyAnyMethods, PyString, PyTuple}, + types::{IntoPyDict, PyAnyMethods, PyList, PyString, PyTuple}, Bound, IntoPyObjectExt, Py, PyAny, PyResult, Python, }; use pythonize::pythonize; @@ -22,6 +22,70 @@ use super::sdk::{ ExecutorFuture, FlowInstanceContext, SimpleFunctionExecutor, SimpleFunctionFactory, }; +fn basic_value_to_py_object<'py>( + py: Python<'py>, + v: &value::BasicValue, +) -> PyResult> { + let result = match v { + value::BasicValue::Bytes(v) => v.into_bound_py_any(py)?, + value::BasicValue::Str(v) => v.into_bound_py_any(py)?, + value::BasicValue::Bool(v) => v.into_bound_py_any(py)?, + value::BasicValue::Int64(v) => v.into_bound_py_any(py)?, + value::BasicValue::Float32(v) => v.into_bound_py_any(py)?, + value::BasicValue::Float64(v) => v.into_bound_py_any(py)?, + value::BasicValue::Vector(v) => v + .iter() + .map(|v| basic_value_to_py_object(py, v)) + .collect::>>()? + .into_bound_py_any(py)?, + _ => { + return Err(PyException::new_err(format!( + "unsupported value type: {}", + v.kind() + ))) + } + }; + Ok(result) +} + +fn field_values_to_py_object<'py, 'a>( + py: Python<'py>, + values: impl Iterator, +) -> PyResult> { + let fields = values + .map(|v| value_to_py_object(py, v)) + .collect::>>()?; + Ok(PyTuple::new(py, fields)?.into_any()) +} + +fn value_to_py_object<'py>(py: Python<'py>, v: &value::Value) -> PyResult> { + let result = match v { + value::Value::Null => py.None().into_bound(py), + value::Value::Basic(v) => basic_value_to_py_object(py, v)?, + value::Value::Struct(v) => field_values_to_py_object(py, v.fields.iter())?, + value::Value::Collection(v) | value::Value::List(v) => { + let rows = v + .iter() + .map(|v| field_values_to_py_object(py, v.0.fields.iter())) + .collect::>>()?; + PyList::new(py, rows)?.into_any() + } + value::Value::Table(v) => { + let rows = v + .iter() + .map(|(k, v)| { + field_values_to_py_object( + py, + std::iter::once(&value::Value::from(k.clone())).chain(v.0.fields.iter()), + ) + }) + .collect::>>()?; + PyList::new(py, rows)?.into_any() + } + }; + Ok(result) +} + fn basic_value_from_py_object<'py>( typ: &schema::BasicValueType, v: &Bound<'py, PyAny>, @@ -159,7 +223,7 @@ impl SimpleFunctionExecutor for Arc { Python::with_gil(|py| -> Result<_> { let mut args = Vec::with_capacity(self.num_positional_args); for v in input[0..self.num_positional_args].iter() { - args.push(pythonize(py, v)?); + args.push(value_to_py_object(py, v)?); } let kwargs = if self.kw_args_names.is_empty() { @@ -171,7 +235,7 @@ impl SimpleFunctionExecutor for Arc { .iter() .zip(input[self.num_positional_args..].iter()) { - kwargs.push((name.bind(py), pythonize(py, v)?)); + kwargs.push((name.bind(py), value_to_py_object(py, v)?)); } Some(kwargs) };