Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ rust-version = "1.89"
license = "Apache-2.0"

[workspace.dependencies]
pyo3 = { version = "0.25.1", features = [
pyo3 = { version = "0.27.1", features = [
"abi3-py311",
"auto-initialize",
"chrono",
"uuid",
] }
pythonize = "0.25.0"
pyo3-async-runtimes = { version = "0.25.0", features = ["tokio-runtime"] }
pythonize = "0.27.0"
pyo3-async-runtimes = { version = "0.27.0", features = ["tokio-runtime"] }
numpy = "0.27.0"

anyhow = { version = "1.0.100", features = ["std"] }
async-trait = "0.1.89"
Expand Down Expand Up @@ -89,7 +90,6 @@ aws-config = "1.8.11"
aws-sdk-s3 = "1.115.0"
aws-sdk-sqs = "1.90.0"
time = { version = "0.3", features = ["macros", "serde"] }
numpy = "0.25.0"
infer = "0.19.0"
serde_with = { version = "3.16.0", features = ["base64"] }
google-cloud-aiplatform-v1 = { version = "0.4.5", default-features = false, features = [
Expand Down
10 changes: 5 additions & 5 deletions rust/cocoindex/src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl FlowBuilder {
#[new]
pub fn new(py: Python<'_>, name: &str, py_event_loop: Py<PyAny>) -> PyResult<Self> {
let lib_context = py
.allow_threads(|| -> anyhow::Result<Arc<LibContext>> {
.detach(|| -> anyhow::Result<Arc<LibContext>> {
get_runtime().block_on(get_lib_context())
})
.into_py_result()?;
Expand Down Expand Up @@ -331,7 +331,7 @@ impl FlowBuilder {
flow_ctx: self.flow_inst_context.clone(),
};
let analyzed = py
.allow_threads(|| {
.detach(|| {
get_runtime().block_on(
analyzer_ctx.analyze_import_op(&self.root_op_scope, import_op.clone()),
)
Expand Down Expand Up @@ -486,7 +486,7 @@ impl FlowBuilder {
flow_ctx: self.flow_inst_context.clone(),
};
let analyzed = py
.allow_threads(|| {
.detach(|| {
get_runtime().block_on(analyzer_ctx.analyze_reactive_op(op_scope, &reactive_op))
})
.into_py_result()?;
Expand Down Expand Up @@ -536,7 +536,7 @@ impl FlowBuilder {
flow_ctx: self.flow_inst_context.clone(),
};
let analyzed = py
.allow_threads(|| {
.detach(|| {
get_runtime().block_on(analyzer_ctx.analyze_reactive_op(common_scope, &reactive_op))
})
.into_py_result()?;
Expand Down Expand Up @@ -645,7 +645,7 @@ impl FlowBuilder {
};
let flow_instance_ctx = self.flow_inst_context.clone();
let flow_ctx = py
.allow_threads(|| {
.detach(|| {
get_runtime().block_on(async move {
let analyzed_flow =
super::AnalyzedFlow::from_flow_instance(spec, flow_instance_ctx).await?;
Expand Down
50 changes: 25 additions & 25 deletions rust/cocoindex/src/ops/py_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ impl PyFunctionExecutor {
impl interface::SimpleFunctionExecutor for Arc<PyFunctionExecutor> {
async fn evaluate(&self, input: Vec<value::Value>) -> Result<value::Value> {
let self = self.clone();
let result_fut = Python::with_gil(|py| -> Result<_> {
let result_fut = Python::attach(|py| -> Result<_> {
let result_coro = self.call_py_fn(py, input)?;
let task_locals =
pyo3_async_runtimes::TaskLocals::new(self.py_exec_ctx.event_loop.bind(py).clone());
Ok(from_py_future(py, &task_locals, result_coro)?)
})?;
let result = result_fut.await;
Python::with_gil(|py| -> Result<_> {
Python::attach(|py| -> Result<_> {
let result = result.to_result_with_py_trace(py)?;
Ok(py::value_from_py_object(
&self.result_type.typ,
Expand Down Expand Up @@ -129,7 +129,7 @@ struct PyBatchedFunctionExecutor {
#[async_trait]
impl BatchedFunctionExecutor for PyBatchedFunctionExecutor {
async fn evaluate_batch(&self, args: Vec<Vec<value::Value>>) -> Result<Vec<value::Value>> {
let result_fut = Python::with_gil(|py| -> pyo3::PyResult<_> {
let result_fut = Python::attach(|py| -> pyo3::PyResult<_> {
let py_args = PyList::new(
py,
args.into_iter()
Expand All @@ -155,7 +155,7 @@ impl BatchedFunctionExecutor for PyBatchedFunctionExecutor {
)?)
})?;
let result = result_fut.await;
Python::with_gil(|py| -> Result<_> {
Python::attach(|py| -> Result<_> {
let result = result.to_result_with_py_trace(py)?;
let result_bound = result.into_bound(py);
let result_list = result_bound.extract::<Vec<Bound<'_, PyAny>>>()?;
Expand Down Expand Up @@ -189,7 +189,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
context: Arc<interface::FlowInstanceContext>,
) -> Result<interface::SimpleFunctionBuildOutput> {
let (result_type, executor, kw_args_names, num_positional_args, behavior_version) =
Python::with_gil(|py| -> anyhow::Result<_> {
Python::attach(|py| -> anyhow::Result<_> {
let mut args = vec![pythonize(py, &spec)?];
let mut kwargs = vec![];
let mut num_positional_args = 0;
Expand Down Expand Up @@ -246,7 +246,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
.ok_or_else(|| anyhow!("Python execution context is missing"))?
.clone();
let (prepare_fut, enable_cache, timeout, batching_options) =
Python::with_gil(|py| -> anyhow::Result<_> {
Python::attach(|py| -> anyhow::Result<_> {
let prepare_coro = executor
.call_method(py, "prepare", (), None)
.to_result_with_py_trace(py)
Expand Down Expand Up @@ -342,10 +342,10 @@ impl interface::SourceExecutor for PySourceExecutor {
options: &interface::SourceExecutorReadOptions,
) -> Result<BoxStream<'async_trait, Result<Vec<interface::PartialSourceRow>>>> {
let py_exec_ctx = self.py_exec_ctx.clone();
let py_source_executor = Python::with_gil(|py| self.py_source_executor.clone_ref(py));
let py_source_executor = Python::attach(|py| self.py_source_executor.clone_ref(py));

// Get the Python async iterator
let py_async_iter = Python::with_gil(|py| {
let py_async_iter = Python::attach(|py| {
py_source_executor
.call_method(py, "list_async", (pythonize(py, options)?,), None)
.to_result_with_py_trace(py)
Expand Down Expand Up @@ -375,10 +375,10 @@ impl interface::SourceExecutor for PySourceExecutor {
options: &interface::SourceExecutorReadOptions,
) -> Result<interface::PartialSourceRowData> {
let py_exec_ctx = self.py_exec_ctx.clone();
let py_source_executor = Python::with_gil(|py| self.py_source_executor.clone_ref(py));
let py_source_executor = Python::attach(|py| self.py_source_executor.clone_ref(py));
let key_clone = key.clone();

let py_result = Python::with_gil(|py| -> Result<_> {
let py_result = Python::attach(|py| -> Result<_> {
let result_coro = py_source_executor
.call_method(
py,
Expand Down Expand Up @@ -406,7 +406,7 @@ impl interface::SourceExecutor for PySourceExecutor {
})?
.await;

Python::with_gil(|py| -> Result<_> {
Python::attach(|py| -> Result<_> {
let result = py_result.to_result_with_py_trace(py)?;
let result_bound = result.into_bound(py);
let data = self.parse_partial_source_row_data(py, &result_bound)?;
Expand All @@ -432,7 +432,7 @@ impl PySourceExecutor {
py_exec_ctx: &Arc<crate::py::PythonExecutionContext>,
) -> Result<Option<interface::PartialSourceRow>> {
// Call the Python method to get the next item, avoiding storing Python objects across await points
let next_item_coro = Python::with_gil(|py| -> Result<_> {
let next_item_coro = Python::attach(|py| -> Result<_> {
let coro = py_async_iter
.call_method0(py, "__anext__")
.to_result_with_py_trace(py)
Expand All @@ -446,7 +446,7 @@ impl PySourceExecutor {
let py_item_result = next_item_coro.await;

// Handle StopAsyncIteration and convert to Rust data immediately to avoid Send issues
Python::with_gil(|py| -> Result<Option<interface::PartialSourceRow>> {
Python::attach(|py| -> Result<Option<interface::PartialSourceRow>> {
match py_item_result {
Ok(item) => {
let bound_item = item.into_bound(py);
Expand All @@ -472,7 +472,7 @@ impl PySourceExecutor {
) -> Result<interface::PartialSourceRow> {
// Each item should be a tuple of (key, data)
let tuple = bound_item
.downcast::<PyTuple>()
.cast::<PyTuple>()
.map_err(|e| anyhow!("Failed to downcast to PyTuple: {}", e))?;
if tuple.len() != 2 {
api_bail!("Expected tuple of length 2 from Python source iterator");
Expand Down Expand Up @@ -583,7 +583,7 @@ impl interface::SourceFactory for PySourceConnectorFactory {
.clone();

// First get the table type (this doesn't require executor)
let table_type = Python::with_gil(|py| -> Result<_> {
let table_type = Python::attach(|py| -> Result<_> {
let value_type_result = self
.py_source_connector
.call_method(py, "get_table_type", (), None)
Expand Down Expand Up @@ -625,7 +625,7 @@ impl interface::SourceFactory for PySourceConnectorFactory {
let source_name = source_name.to_string();
let executor_fut = async move {
// Create the executor using the async create_executor method
let create_future = Python::with_gil(|py| -> Result<_> {
let create_future = Python::attach(|py| -> Result<_> {
let create_coro = self
.py_source_connector
.call_method(py, "create_executor", (pythonize(py, &spec)?,), None)
Expand All @@ -645,7 +645,7 @@ impl interface::SourceFactory for PySourceConnectorFactory {
let py_executor_context_result = create_future.await;

let (py_source_executor_context, provides_ordinal) =
Python::with_gil(|py| -> Result<_> {
Python::attach(|py| -> Result<_> {
let executor_context = py_executor_context_result
.to_result_with_py_trace(py)
.with_context(|| {
Expand Down Expand Up @@ -748,7 +748,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
.ok_or_else(|| anyhow!("Python execution context is missing"))?
.clone();
for data_collection in data_collections.into_iter() {
let (py_export_ctx, persistent_key, setup_state) = Python::with_gil(|py| {
let (py_export_ctx, persistent_key, setup_state) = Python::attach(|py| {
// Deserialize the spec to Python object.
let py_export_ctx = self
.py_target_connector
Expand Down Expand Up @@ -805,7 +805,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
let py_exec_ctx = py_exec_ctx.clone();
let build_output = interface::ExportDataCollectionBuildOutput {
export_context: Box::pin(async move {
Python::with_gil(|py| {
Python::attach(|py| {
let prepare_coro = factory
.py_target_connector
.call_method(py, "prepare_async", (&py_export_ctx,), None)
Expand Down Expand Up @@ -872,7 +872,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
desired_state: &serde_json::Value,
existing_state: &serde_json::Value,
) -> Result<SetupStateCompatibility> {
let compatibility = Python::with_gil(|py| -> Result<_> {
let compatibility = Python::attach(|py| -> Result<_> {
let result = self
.py_target_connector
.call_method(
Expand All @@ -895,7 +895,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
}

fn describe_resource(&self, key: &serde_json::Value) -> Result<String> {
Python::with_gil(|py| -> Result<String> {
Python::attach(|py| -> Result<String> {
let result = self
.py_target_connector
.call_method(py, "describe_resource", (pythonize(py, key)?,), None)
Expand Down Expand Up @@ -950,7 +950,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
.as_ref()
.ok_or_else(|| anyhow!("Python execution context is missing"))?
.clone();
let py_result = Python::with_gil(move |py| -> Result<_> {
let py_result = Python::attach(move |py| -> Result<_> {
let result_coro = self
.py_target_connector
.call_method(
Expand All @@ -972,7 +972,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
)?)
})?
.await;
Python::with_gil(move |py| {
Python::attach(move |py| {
py_result
.to_result_with_py_trace(py)
.with_context(|| format!("while applying setup changes in user-configured target"))
Expand All @@ -991,7 +991,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
return Ok(());
}

let py_result = Python::with_gil(|py| -> Result<_> {
let py_result = Python::attach(|py| -> Result<_> {
// Create a `list[tuple[export_ctx, list[tuple[key, value | None]]]]` for Python, and collect `py_exec_ctx`.
let mut py_args = Vec::with_capacity(mutations.len());
let mut py_exec_ctx: Option<&Arc<crate::py::PythonExecutionContext>> = None;
Expand Down Expand Up @@ -1039,7 +1039,7 @@ impl interface::TargetFactory for PyExportTargetFactory {
})?
.await;

Python::with_gil(move |py| {
Python::attach(move |py| {
py_result
.to_result_with_py_trace(py)
.with_context(|| format!("while applying mutations in user-configured target"))
Expand Down
4 changes: 2 additions & 2 deletions rust/cocoindex/src/py/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ fn handle_ndarray_from_py<'py>(
) -> PyResult<Option<value::BasicValue>> {
macro_rules! try_convert {
($t:ty, $cast:expr) => {
if let Ok(array) = v.downcast::<PyArrayDyn<$t>>() {
if let Ok(array) = v.cast::<PyArrayDyn<$t>>() {
let data = array.readonly().as_slice()?.to_vec();
let vec = data.into_iter().map($cast).collect::<Vec<_>>();
return Ok(Some(value::BasicValue::Vector(Arc::from(vec))));
Expand Down Expand Up @@ -357,7 +357,7 @@ mod tests {
use std::sync::Arc;

fn assert_roundtrip_conversion(original_value: &value::Value, value_type: &schema::ValueType) {
Python::with_gil(|py| {
Python::attach(|py| {
// Convert Rust value to Python object using value_to_py_object
let py_object = value_to_py_object(py, original_value)
.expect("Failed to convert Rust value to Python object");
Expand Down
Loading
Loading