From 2dec05ecff522ee0772b85c3c1fc486423f7dc35 Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Thu, 3 Oct 2024 13:06:50 -0500 Subject: [PATCH 1/8] bump rust-version to match upstream datafusion --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a0723984f..4f2602316 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ description = "Apache DataFusion DataFrame and SQL Query Engine" readme = "README.md" license = "Apache-2.0" edition = "2021" -rust-version = "1.64" +rust-version = "1.78" include = ["/src", "/datafusion", "/LICENSE.txt", "pyproject.toml", "Cargo.toml", "Cargo.lock"] [features] From 2934dac943945979ef2ddb233fb02ca957528b48 Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Thu, 3 Oct 2024 13:12:55 -0500 Subject: [PATCH 2/8] use std::sync::OnceLock to store tokio runtime instead of round-tripping to python --- src/utils.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/utils.rs b/src/utils.rs index 0d72eaf75..0500e9a89 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -20,20 +20,18 @@ use crate::TokioRuntime; use datafusion::logical_expr::Volatility; use pyo3::prelude::*; use std::future::Future; +use std::sync::{Arc, OnceLock}; use tokio::runtime::Runtime; /// Utility to get the Tokio Runtime from Python -pub(crate) fn get_tokio_runtime(py: Python) -> PyRef { - let datafusion = py.import_bound("datafusion._internal").unwrap(); - let tmp = datafusion.getattr("runtime").unwrap(); - match tmp.extract::>() { - Ok(runtime) => runtime, - Err(_e) => { +pub(crate) fn get_tokio_runtime(_: Python) -> Arc { + static RUNTIME: OnceLock> = OnceLock::new(); + RUNTIME + .get_or_init(|| { let rt = TokioRuntime(tokio::runtime::Runtime::new().unwrap()); - let obj: Bound<'_, TokioRuntime> = Py::new(py, rt).unwrap().into_bound(py); - obj.extract().unwrap() - } - } + Arc::new(rt) + }) + .clone() } /// Utility to collect rust futures with GIL released From 4c347188ecfbf2ba3ae1ec3f325ed90861aa617f Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Fri, 4 Oct 2024 12:29:46 -0500 Subject: [PATCH 3/8] stop exporting TokioRuntime to python --- python/datafusion/__init__.py | 3 +-- src/lib.rs | 6 ------ 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py index 63c19b3e1..e0bc57f44 100644 --- a/python/datafusion/__init__.py +++ b/python/datafusion/__init__.py @@ -36,7 +36,7 @@ from .catalog import Catalog, Database, Table # The following imports are okay to remain as opaque to the user. -from ._internal import Config, runtime +from ._internal import Config from .record_batch import RecordBatchStream, RecordBatch @@ -75,7 +75,6 @@ "literal", "lit", "DFSchema", - "runtime", "Catalog", "Database", "Table", diff --git a/src/lib.rs b/src/lib.rs index 98821833d..0b57e0999 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,7 +66,6 @@ pub mod utils; static GLOBAL: MiMalloc = MiMalloc; // Used to define Tokio Runtime as a Python module attribute -#[pyclass] pub(crate) struct TokioRuntime(tokio::runtime::Runtime); /// Low-level DataFusion internal package. @@ -75,11 +74,6 @@ pub(crate) struct TokioRuntime(tokio::runtime::Runtime); /// datafusion directory. #[pymodule] fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { - // Register the Tokio Runtime as a module attribute so we can reuse it - m.add( - "runtime", - TokioRuntime(tokio::runtime::Runtime::new().unwrap()), - )?; // Register the python classes m.add_class::()?; m.add_class::()?; From cafd59e35d4488dcbb539742d4d74e7241d5be25 Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Thu, 3 Oct 2024 15:49:30 -0500 Subject: [PATCH 4/8] remove unused argument from get_tokio_runtime --- src/context.rs | 2 +- src/dataframe.rs | 4 ++-- src/utils.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/context.rs b/src/context.rs index fde442ce4..5317a3eda 100644 --- a/src/context.rs +++ b/src/context.rs @@ -982,7 +982,7 @@ impl PySessionContext { ) -> PyResult { let ctx: TaskContext = TaskContext::from(&self.ctx.state()); // create a Tokio runtime to run the async code - let rt = &get_tokio_runtime(py).0; + let rt = &get_tokio_runtime().0; let plan = plan.plan.clone(); let fut: JoinHandle> = rt.spawn(async move { plan.execute(part, Arc::new(ctx)) }); diff --git a/src/dataframe.rs b/src/dataframe.rs index 1f7f2e643..e77ca8425 100644 --- a/src/dataframe.rs +++ b/src/dataframe.rs @@ -543,7 +543,7 @@ impl PyDataFrame { fn execute_stream(&self, py: Python) -> PyResult { // create a Tokio runtime to run the async code - let rt = &get_tokio_runtime(py).0; + let rt = &get_tokio_runtime().0; let df = self.df.as_ref().clone(); let fut: JoinHandle> = rt.spawn(async move { df.execute_stream().await }); @@ -553,7 +553,7 @@ impl PyDataFrame { fn execute_stream_partitioned(&self, py: Python) -> PyResult> { // create a Tokio runtime to run the async code - let rt = &get_tokio_runtime(py).0; + let rt = &get_tokio_runtime().0; let df = self.df.as_ref().clone(); let fut: JoinHandle>> = rt.spawn(async move { df.execute_stream_partitioned().await }); diff --git a/src/utils.rs b/src/utils.rs index 0500e9a89..e2ff3e29c 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -24,7 +24,7 @@ use std::sync::{Arc, OnceLock}; use tokio::runtime::Runtime; /// Utility to get the Tokio Runtime from Python -pub(crate) fn get_tokio_runtime(_: Python) -> Arc { +pub(crate) fn get_tokio_runtime() -> Arc { static RUNTIME: OnceLock> = OnceLock::new(); RUNTIME .get_or_init(|| { @@ -40,7 +40,7 @@ where F: Future + Send, F::Output: Send, { - let runtime: &Runtime = &get_tokio_runtime(py).0; + let runtime: &Runtime = &get_tokio_runtime().0; py.allow_threads(|| runtime.block_on(f)) } From 85f3ca3a6b4f8d03907fff0174369fa211c2edcc Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Fri, 4 Oct 2024 13:09:11 -0500 Subject: [PATCH 5/8] remove superflous Arc from get_tokio_runtime --- src/utils.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/utils.rs b/src/utils.rs index e2ff3e29c..0cf7152ba 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -20,18 +20,16 @@ use crate::TokioRuntime; use datafusion::logical_expr::Volatility; use pyo3::prelude::*; use std::future::Future; -use std::sync::{Arc, OnceLock}; +use std::sync::OnceLock; use tokio::runtime::Runtime; /// Utility to get the Tokio Runtime from Python -pub(crate) fn get_tokio_runtime() -> Arc { - static RUNTIME: OnceLock> = OnceLock::new(); - RUNTIME - .get_or_init(|| { - let rt = TokioRuntime(tokio::runtime::Runtime::new().unwrap()); - Arc::new(rt) - }) - .clone() +pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime { + static RUNTIME: OnceLock = OnceLock::new(); + RUNTIME.get_or_init(|| { + let rt = TokioRuntime(tokio::runtime::Runtime::new().unwrap()); + rt + }) } /// Utility to collect rust futures with GIL released From d2dbc25f2da03913af68fb67b87c784a851fcb87 Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Fri, 4 Oct 2024 14:35:17 -0500 Subject: [PATCH 6/8] add #[inline] annotation to get_tokio_runtime I also included a reference comment in case future users experience problems with using datafusion-python behind a forking app server l ike `gunicorn`. --- src/utils.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/utils.rs b/src/utils.rs index 0cf7152ba..5d34f01c1 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -24,7 +24,13 @@ use std::sync::OnceLock; use tokio::runtime::Runtime; /// Utility to get the Tokio Runtime from Python +#[inline] pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime { + // NOTE: Other pyo3 python libraries have had issues with using tokio + // behind a forking app-server like `gunicorn` + // If we run into that problem, in the future we can look to `delta-rs` + // which adds a check in that disallows calls from a forked process + // https://github.com/delta-io/delta-rs/blob/87010461cfe01563d91a4b9cd6fa468e2ad5f283/python/src/utils.rs#L10-L31 static RUNTIME: OnceLock = OnceLock::new(); RUNTIME.get_or_init(|| { let rt = TokioRuntime(tokio::runtime::Runtime::new().unwrap()); From 76dc0e2139ca6878425ec99aaa57507f700daab8 Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Fri, 4 Oct 2024 14:37:48 -0500 Subject: [PATCH 7/8] fix clippy lint --- src/utils.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/utils.rs b/src/utils.rs index 5d34f01c1..7bd920e0e 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -33,8 +33,7 @@ pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime { // https://github.com/delta-io/delta-rs/blob/87010461cfe01563d91a4b9cd6fa468e2ad5f283/python/src/utils.rs#L10-L31 static RUNTIME: OnceLock = OnceLock::new(); RUNTIME.get_or_init(|| { - let rt = TokioRuntime(tokio::runtime::Runtime::new().unwrap()); - rt + TokioRuntime(tokio::runtime::Runtime::new().unwrap()) }) } From f99152ed4ac82f45741ec9dfd2cd7268e662ac8e Mon Sep 17 00:00:00 2001 From: Michael-J-Ward Date: Fri, 4 Oct 2024 14:55:27 -0500 Subject: [PATCH 8/8] cargo fmt --- src/utils.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/utils.rs b/src/utils.rs index 7bd920e0e..7fb23cafe 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -32,9 +32,7 @@ pub(crate) fn get_tokio_runtime() -> &'static TokioRuntime { // which adds a check in that disallows calls from a forked process // https://github.com/delta-io/delta-rs/blob/87010461cfe01563d91a4b9cd6fa468e2ad5f283/python/src/utils.rs#L10-L31 static RUNTIME: OnceLock = OnceLock::new(); - RUNTIME.get_or_init(|| { - TokioRuntime(tokio::runtime::Runtime::new().unwrap()) - }) + RUNTIME.get_or_init(|| TokioRuntime(tokio::runtime::Runtime::new().unwrap())) } /// Utility to collect rust futures with GIL released