From f92b583c1d4a8a08a3aa60d6658da6d74eeb58de Mon Sep 17 00:00:00 2001 From: naivedogger <623628963@qq.com> Date: Wed, 27 Aug 2025 17:29:03 +0800 Subject: [PATCH 1/4] create bindings for connection and admin --- bindings/python/src/admin.rs | 107 +++++++++++++++++++++++++ bindings/python/src/connection.rs | 128 ++++++++++++++++++++++++++++++ 2 files changed, 235 insertions(+) create mode 100644 bindings/python/src/admin.rs create mode 100644 bindings/python/src/connection.rs diff --git a/bindings/python/src/admin.rs b/bindings/python/src/admin.rs new file mode 100644 index 00000000..7ec6eee9 --- /dev/null +++ b/bindings/python/src/admin.rs @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use pyo3::prelude::*; +use pyo3_async_runtimes::tokio::future_into_py; +use crate::*; +use std::sync::Arc; + +/// Administrative client for managing Fluss tables +#[pyclass] +pub struct FlussAdmin { + __admin: Arc, +} + +#[pymethods] +impl FlussAdmin { + /// Create a table with the given schema + #[pyo3(signature = (table_path, table_descriptor, ignore_if_exists=None))] + pub fn create_table<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + table_descriptor: &TableDescriptor, + ignore_if_exists: Option, + ) -> PyResult> { + let ignore = ignore_if_exists.unwrap_or(false); + + let core_table_path = table_path.to_core().clone(); + let core_descriptor = table_descriptor.to_core().clone(); + let admin = self.__admin.clone(); + + future_into_py(py, async move { + admin.create_table(&core_table_path, &core_descriptor, ignore) + .await + .map_err(|e| FlussError::new_err(e.to_string()))?; + + Python::with_gil(|py| Ok(py.None())) + }) + } + + /// Get table information + pub fn get_table<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + ) -> PyResult> { + let core_table_path = table_path.to_core().clone(); + let admin = self.__admin.clone(); + + future_into_py(py, async move { + let core_table_info = admin.get_table(&core_table_path).await + .map_err(|e| FlussError::new_err(format!("Failed to get table: {}", e)))?; + + Python::with_gil(|py| { + let table_info = TableInfo::from_core(core_table_info); + Py::new(py, table_info) + }) + }) + } + + /// Get the latest lake snapshot for a table + pub fn get_latest_lake_snapshot<'py>( + &self, + py: Python<'py>, + table_path: &TablePath, + ) -> PyResult> { + let core_table_path = table_path.to_core().clone(); + let admin = self.__admin.clone(); + + future_into_py(py, async move { + let core_lake_snapshot = admin.get_latest_lake_snapshot(&core_table_path).await + .map_err(|e| FlussError::new_err(format!("Failed to get lake snapshot: {}", e)))?; + + Python::with_gil(|py| { + let lake_snapshot = LakeSnapshot::from_core(core_lake_snapshot); + Py::new(py, lake_snapshot) + }) + }) + } + + fn __repr__(&self) -> String { + "FlussAdmin()".to_string() + } +} + +impl FlussAdmin { + // Internal method to create FlussAdmin from core admin + pub fn from_core(admin: fcore::client::FlussAdmin) -> Self { + Self { + __admin: Arc::new(admin), + } + } +} diff --git a/bindings/python/src/connection.rs b/bindings/python/src/connection.rs new file mode 100644 index 00000000..756c3389 --- /dev/null +++ b/bindings/python/src/connection.rs @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use pyo3::prelude::*; +use crate::*; +use std::sync::Arc; +use pyo3_async_runtimes::tokio::future_into_py; + +/// Connection to a Fluss cluster +#[pyclass] +pub struct FlussConnection { + inner: Arc, +} + +#[pymethods] +impl FlussConnection { + #[new] + fn new(config: &Config) -> PyResult { + // Always use connect to create a new connection + Err(FlussError::new_err("Use FlussConnection.connect() instead.")) + } + + /// Create a new FlussConnection (async) + #[staticmethod] + fn connect<'py>(py: Python<'py>, config: &Config) -> PyResult> { + let rust_config = config.get_core_config(); + + future_into_py(py, async move { + let connection = fcore::client::FlussConnection::new(rust_config) + .await + .map_err(|e| FlussError::new_err(e.to_string()))?; + + let py_connection = FlussConnection { + inner: Arc::new(connection), + }; + + Python::with_gil(|py| { + Py::new(py, py_connection) + }) + }) + } + + /// Get admin interface + fn get_admin<'py>(&self, py: Python<'py>) -> PyResult> { + let client = self.inner.clone(); + + future_into_py(py, async move { + let admin = client.get_admin() + .await + .map_err(|e| FlussError::new_err(e.to_string()))?; + + let py_admin = FlussAdmin::from_core(admin); + + Python::with_gil(|py| { + Py::new(py, py_admin) + }) + }) + } + + /// Get a table + fn get_table<'py>(&self, py: Python<'py>, table_path: &TablePath) -> PyResult> { + let client = self.inner.clone(); + let core_path = table_path.to_core().clone(); + + future_into_py(py, async move { + let metadata = client.get_metadata(); + + metadata.update_table_metadata(&core_path).await + .map_err(|e| FlussError::new_err(e.to_string()))?; + + let table_info = metadata.get_cluster().get_table(&core_path).clone(); + let table_path = table_info.table_path.clone(); + let has_primary_key = table_info.has_primary_key(); + + let py_table = FlussTable::new_table( + client, + metadata, + table_info, + table_path, + has_primary_key, + ); + + Python::with_gil(|py| { + Py::new(py, py_table) + }) + }) + } + + // Close the connection + fn close(&mut self) -> PyResult<()> { + Ok(()) + } + + // Enter the runtime context (for 'with' statement) + fn __enter__(slf: PyRef) -> PyRef { + slf + } + + // Exit the runtime context (for 'with' statement) + #[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))] + fn __exit__( + &mut self, + _exc_type: Option, + _exc_value: Option, + _traceback: Option, + ) -> PyResult { + self.close()?; + Ok(false) + } + + fn __repr__(&self) -> String { + "FlussConnection()".to_string() + } +} From 624072502b4e4e9b1fe23bea42ad5987c0a73e96 Mon Sep 17 00:00:00 2001 From: naivedogger <623628963@qq.com> Date: Wed, 27 Aug 2025 17:31:32 +0800 Subject: [PATCH 2/4] nit --- .../python/python/fluss_python/__init__.py | 20 ++ bindings/python/src/error.rs | 114 +++++++++++ bindings/python/src/lib.rs | 67 +++++++ bindings/python/src/utils.rs | 178 ++++++++++++++++++ 4 files changed, 379 insertions(+) create mode 100644 bindings/python/python/fluss_python/__init__.py create mode 100644 bindings/python/src/error.rs create mode 100644 bindings/python/src/lib.rs create mode 100644 bindings/python/src/utils.rs diff --git a/bindings/python/python/fluss_python/__init__.py b/bindings/python/python/fluss_python/__init__.py new file mode 100644 index 00000000..cceee102 --- /dev/null +++ b/bindings/python/python/fluss_python/__init__.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from .fluss_python import * + +__version__ = "0.1.0" diff --git a/bindings/python/src/error.rs b/bindings/python/src/error.rs new file mode 100644 index 00000000..a262d2ca --- /dev/null +++ b/bindings/python/src/error.rs @@ -0,0 +1,114 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use pyo3::prelude::*; +use pyo3::exceptions::{PyException, PyConnectionError, PyValueError, PyIOError}; +use fluss::error::Error as FlussRustError; +use std::fmt; + +/// Custom exception for Fluss errors +#[pyclass(extends=PyException)] +#[derive(Debug, Clone)] +pub struct FlussError { + #[pyo3(get)] + pub message: String, + #[pyo3(get)] + pub error_code: Option, +} + +#[pymethods] +impl FlussError { + #[new] + #[pyo3(signature = (message, error_code = None))] + fn new(message: String, error_code: Option) -> Self { + FlussError { + message, + error_code, + } + } + + fn __str__(&self) -> String { + if let Some(code) = self.error_code { + format!("FlussError({}): {}", code, self.message) + } else { + format!("FlussError: {}", self.message) + } + } + + fn __repr__(&self) -> String { + if let Some(code) = self.error_code { + format!("FlussError(message='{}', error_code={})", self.message, code) + } else { + format!("FlussError(message='{}')", self.message) + } + } +} + +impl fmt::Display for FlussError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.__str__()) + } +} + +impl std::error::Error for FlussError {} + +// Convert from Rust fluss error to Python FlussError +impl From for FlussError { + fn from(error: FlussRustError) -> Self { + FlussError { + message: error.to_string(), + error_code: None, + } + } +} + +// Convert FlussError to PyErr for use with PyO3 +impl From for PyErr { + fn from(error: FlussError) -> Self { + PyErr::new::(error.message.clone()) + } +} + +// Helper function to convert Rust fluss error to appropriate Python exception +pub fn fluss_error_to_pyerr(error: FlussRustError) -> PyErr { + match error { + FlussRustError::Io(_) => PyIOError::new_err(error.to_string()), + FlussRustError::IllegalArgument(_) => PyValueError::new_err(error.to_string()), + FlussRustError::RpcError(_) => PyConnectionError::new_err(error.to_string()), + FlussRustError::InvalidTableError(_) => PyValueError::new_err(error.to_string()), + _ => FlussError::from(error).into(), + } +} + +// Helper functions for creating FlussError +impl FlussError { + pub fn new_err(message: impl ToString) -> PyErr { + PyErr::new::(message.to_string()) + } + + pub fn new_with_code(message: impl ToString, code: i32) -> Self { + FlussError { + message: message.to_string(), + error_code: Some(code), + } + } + + pub fn new_with_code_err(message: impl ToString, code: i32) -> PyErr { + let error = FlussError::new_with_code(message, code); + PyErr::new::(error.message) + } +} diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs new file mode 100644 index 00000000..0d8b7a5a --- /dev/null +++ b/bindings/python/src/lib.rs @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub use ::fluss as fcore; +use pyo3::prelude::*; +use once_cell::sync::Lazy; +use tokio::runtime::Runtime; + +mod config; +mod connection; +mod table; +mod admin; +mod types; +mod error; +mod utils; + +pub use config::*; +pub use connection::*; +pub use table::*; +pub use admin::*; +pub use types::*; +pub use error::*; +pub use utils::*; + +static TOKIO_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("Failed to create Tokio runtime") +}); + +#[pymodule] +fn fluss_python(m: &Bound<'_, PyModule>) -> PyResult<()> { + // Register all classes + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + + // Register exception types + // TODO: maybe implement a separate module for exceptions + m.add("FlussError", m.py().get_type::())?; + + Ok(()) +} diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs new file mode 100644 index 00000000..c40104bf --- /dev/null +++ b/bindings/python/src/utils.rs @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use pyo3::prelude::*; +use arrow::datatypes::{Schema as ArrowSchema, SchemaRef}; +use std::sync::Arc; +use arrow_pyarrow::ToPyArrow; +use crate::*; + +/// Utilities for schema conversion between PyArrow, Arrow, and Fluss +pub struct Utils; + +impl Utils { + /// Convert PyArrow schema to Rust Arrow schema + pub fn pyarrow_to_arrow_schema(py_schema: &PyObject) -> PyResult { + Python::with_gil(|py| { + let schema_bound = py_schema.bind(py); + + let schema: ArrowSchema = arrow_pyarrow::FromPyArrow::from_pyarrow_bound(&schema_bound) + .map_err(|e| FlussError::new_err(format!("Failed to convert PyArrow schema: {}", e)))?; + Ok(Arc::new(schema)) + }) + } + + /// Convert Arrow DataType to Fluss DataType + pub fn arrow_type_to_fluss_type(arrow_type: &arrow::datatypes::DataType) -> PyResult { + use arrow::datatypes::DataType as ArrowDataType; + use fcore::metadata::DataTypes; + + let fluss_type = match arrow_type { + ArrowDataType::Boolean => DataTypes::boolean(), + ArrowDataType::Int8 => DataTypes::tinyint(), + ArrowDataType::Int16 => DataTypes::smallint(), + ArrowDataType::Int32 => DataTypes::int(), + ArrowDataType::Int64 => DataTypes::bigint(), + ArrowDataType::UInt8 => DataTypes::tinyint(), + ArrowDataType::UInt16 => DataTypes::smallint(), + ArrowDataType::UInt32 => DataTypes::int(), + ArrowDataType::UInt64 => DataTypes::bigint(), + ArrowDataType::Float32 => DataTypes::float(), + ArrowDataType::Float64 => DataTypes::double(), + ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 => DataTypes::string(), + ArrowDataType::Binary | ArrowDataType::LargeBinary => DataTypes::bytes(), + ArrowDataType::Date32 => DataTypes::date(), + ArrowDataType::Date64 => DataTypes::date(), + ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => DataTypes::time(), + ArrowDataType::Timestamp(_, _) => DataTypes::timestamp(), + ArrowDataType::Decimal128(precision, scale) => DataTypes::decimal(*precision as u32, *scale as u32), + _ => { + return Err(FlussError::new_err(format!( + "Unsupported Arrow data type: {:?}", arrow_type + ))); + } + }; + + Ok(fluss_type) + } + + /// Convert Fluss DataType to string representation + pub fn datatype_to_string(data_type: &fcore::metadata::DataType) -> String { + match data_type { + fcore::metadata::DataType::Boolean(_) => "boolean".to_string(), + fcore::metadata::DataType::TinyInt(_) => "tinyint".to_string(), + fcore::metadata::DataType::SmallInt(_) => "smallint".to_string(), + fcore::metadata::DataType::Int(_) => "int".to_string(), + fcore::metadata::DataType::BigInt(_) => "bigint".to_string(), + fcore::metadata::DataType::Float(_) => "float".to_string(), + fcore::metadata::DataType::Double(_) => "double".to_string(), + fcore::metadata::DataType::String(_) => "string".to_string(), + fcore::metadata::DataType::Bytes(_) => "bytes".to_string(), + fcore::metadata::DataType::Date(_) => "date".to_string(), + fcore::metadata::DataType::Time(t) => { + if t.precision() == 0 { + "time".to_string() + } else { + format!("time({})", t.precision()) + } + }, + fcore::metadata::DataType::Timestamp(t) => { + if t.precision() == 6 { + "timestamp".to_string() + } else { + format!("timestamp({})", t.precision()) + } + }, + fcore::metadata::DataType::TimestampLTz(t) => { + if t.precision() == 6 { + "timestamp_ltz".to_string() + } else { + format!("timestamp_ltz({})", t.precision()) + } + }, + fcore::metadata::DataType::Char(c) => format!("char({})", c.length()), + fcore::metadata::DataType::Decimal(d) => format!("decimal({},{})", d.precision(), d.scale()), + fcore::metadata::DataType::Binary(b) => format!("binary({})", b.length()), + fcore::metadata::DataType::Array(arr) => format!("array<{}>", Utils::datatype_to_string(arr.get_element_type())), + fcore::metadata::DataType::Map(map) => format!("map<{},{}>", + Utils::datatype_to_string(map.key_type()), + Utils::datatype_to_string(map.value_type())), + fcore::metadata::DataType::Row(row) => { + let fields: Vec = row.fields().iter() + .map(|field| format!("{}: {}", field.name(), Utils::datatype_to_string(field.data_type()))) + .collect(); + format!("row<{}>", fields.join(", ")) + }, + } + } + + /// Parse log format string to LogFormat enum + pub fn parse_log_format(format_str: &str) -> PyResult { + fcore::metadata::LogFormat::parse(format_str) + .map_err(|e| FlussError::new_err(format!("Invalid log format '{}': {}", format_str, e))) + } + + /// Parse kv format string to KvFormat enum + pub fn parse_kv_format(format_str: &str) -> PyResult { + fcore::metadata::KvFormat::parse(format_str) + .map_err(|e| FlussError::new_err(format!("Invalid kv format '{}': {}", format_str, e))) + } + + /// Convert ScanRecords to Arrow RecordBatch + pub fn convert_scan_records_to_arrow( + _scan_records: fcore::record::ScanRecords, + ) -> Vec> { + let mut result = Vec::new(); + for(_, records) in _scan_records.into_records() { + for record in records { + let columnar_row = record.row(); + let row_id = columnar_row.get_row_id(); + if row_id == 0 { + let record_batch = columnar_row.get_record_batch(); + result.push(record_batch.clone()); + } + } + } + result + } + + /// Combine multiple Arrow batches into a single Table + pub fn combine_batches_to_table(py: Python, batches: Vec>) -> PyResult { + if batches.is_empty() { + return Err(FlussError::new_err("No batches to combine")); + } + + // Convert Rust Arrow RecordBatch to PyObject + let py_batches: Result, _> = batches.iter() + .map(|batch| { + batch.as_ref().to_pyarrow(py) + .map_err(|e| FlussError::new_err(format!("Failed to convert RecordBatch to PyObject: {}", e))) + }) + .collect(); + + let py_batches = py_batches?; + + let pyarrow = py.import("pyarrow")?; + + // Use pyarrow.Table.from_batches to combine batches + let table = pyarrow + .getattr("Table")? + .call_method1("from_batches", (py_batches,))?; + + Ok(table.into()) + } +} From 9747a6a5c6c68e0a3fa60c8ae81b59b2efdac5c0 Mon Sep 17 00:00:00 2001 From: naivedogger <623628963@qq.com> Date: Wed, 17 Sep 2025 23:57:28 +0800 Subject: [PATCH 3/4] address comments --- .../{fluss_python => fluss}/__init__.py | 0 bindings/python/src/connection.rs | 27 ++----- bindings/python/src/error.rs | 81 +------------------ 3 files changed, 11 insertions(+), 97 deletions(-) rename bindings/python/python/{fluss_python => fluss}/__init__.py (100%) diff --git a/bindings/python/python/fluss_python/__init__.py b/bindings/python/python/fluss/__init__.py similarity index 100% rename from bindings/python/python/fluss_python/__init__.py rename to bindings/python/python/fluss/__init__.py diff --git a/bindings/python/src/connection.rs b/bindings/python/src/connection.rs index 756c3389..ba1fa505 100644 --- a/bindings/python/src/connection.rs +++ b/bindings/python/src/connection.rs @@ -28,12 +28,6 @@ pub struct FlussConnection { #[pymethods] impl FlussConnection { - #[new] - fn new(config: &Config) -> PyResult { - // Always use connect to create a new connection - Err(FlussError::new_err("Use FlussConnection.connect() instead.")) - } - /// Create a new FlussConnection (async) #[staticmethod] fn connect<'py>(py: Python<'py>, config: &Config) -> PyResult> { @@ -77,21 +71,16 @@ impl FlussConnection { let core_path = table_path.to_core().clone(); future_into_py(py, async move { - let metadata = client.get_metadata(); - - metadata.update_table_metadata(&core_path).await + let core_table = client.get_table(&core_path) + .await .map_err(|e| FlussError::new_err(e.to_string()))?; - - let table_info = metadata.get_cluster().get_table(&core_path).clone(); - let table_path = table_info.table_path.clone(); - let has_primary_key = table_info.has_primary_key(); - + let py_table = FlussTable::new_table( - client, - metadata, - table_info, - table_path, - has_primary_key, + client, + core_table.metadata, + core_table.table_info, + core_table.table_path, + core_table.has_primary_key, ); Python::with_gil(|py| { diff --git a/bindings/python/src/error.rs b/bindings/python/src/error.rs index a262d2ca..2db2991c 100644 --- a/bindings/python/src/error.rs +++ b/bindings/python/src/error.rs @@ -16,99 +16,24 @@ // under the License. use pyo3::prelude::*; -use pyo3::exceptions::{PyException, PyConnectionError, PyValueError, PyIOError}; -use fluss::error::Error as FlussRustError; -use std::fmt; -/// Custom exception for Fluss errors +/// Fluss errors #[pyclass(extends=PyException)] #[derive(Debug, Clone)] pub struct FlussError { #[pyo3(get)] pub message: String, - #[pyo3(get)] - pub error_code: Option, } #[pymethods] impl FlussError { - #[new] - #[pyo3(signature = (message, error_code = None))] - fn new(message: String, error_code: Option) -> Self { - FlussError { - message, - error_code, - } - } - fn __str__(&self) -> String { - if let Some(code) = self.error_code { - format!("FlussError({}): {}", code, self.message) - } else { - format!("FlussError: {}", self.message) - } - } - - fn __repr__(&self) -> String { - if let Some(code) = self.error_code { - format!("FlussError(message='{}', error_code={})", self.message, code) - } else { - format!("FlussError(message='{}')", self.message) - } - } -} - -impl fmt::Display for FlussError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.__str__()) - } -} - -impl std::error::Error for FlussError {} - -// Convert from Rust fluss error to Python FlussError -impl From for FlussError { - fn from(error: FlussRustError) -> Self { - FlussError { - message: error.to_string(), - error_code: None, - } + format!("FlussError: {}", self.message) } } -// Convert FlussError to PyErr for use with PyO3 -impl From for PyErr { - fn from(error: FlussError) -> Self { - PyErr::new::(error.message.clone()) - } -} - -// Helper function to convert Rust fluss error to appropriate Python exception -pub fn fluss_error_to_pyerr(error: FlussRustError) -> PyErr { - match error { - FlussRustError::Io(_) => PyIOError::new_err(error.to_string()), - FlussRustError::IllegalArgument(_) => PyValueError::new_err(error.to_string()), - FlussRustError::RpcError(_) => PyConnectionError::new_err(error.to_string()), - FlussRustError::InvalidTableError(_) => PyValueError::new_err(error.to_string()), - _ => FlussError::from(error).into(), - } -} - -// Helper functions for creating FlussError impl FlussError { pub fn new_err(message: impl ToString) -> PyErr { PyErr::new::(message.to_string()) } - - pub fn new_with_code(message: impl ToString, code: i32) -> Self { - FlussError { - message: message.to_string(), - error_code: Some(code), - } - } - - pub fn new_with_code_err(message: impl ToString, code: i32) -> PyErr { - let error = FlussError::new_with_code(message, code); - PyErr::new::(error.message) - } -} +} \ No newline at end of file From ab1311153bebdfc202a7c07f649127c48775e464 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Thu, 18 Sep 2025 12:00:24 +0800 Subject: [PATCH 4/4] rename directory of fluss core --- bindings/python/{python => }/fluss/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename bindings/python/{python => }/fluss/__init__.py (100%) diff --git a/bindings/python/python/fluss/__init__.py b/bindings/python/fluss/__init__.py similarity index 100% rename from bindings/python/python/fluss/__init__.py rename to bindings/python/fluss/__init__.py