diff --git a/.licenserc.yaml b/.licenserc.yaml index 3813b48..a3cfcd1 100644 --- a/.licenserc.yaml +++ b/.licenserc.yaml @@ -26,4 +26,5 @@ header: - 'LICENSE' - 'NOTICE' - 'DISCLAIMER' + - 'bindings/python/fluss/py.typed' comment: on-failure diff --git a/bindings/python/README.md b/bindings/python/README.md index 5258f53..44d6099 100644 --- a/bindings/python/README.md +++ b/bindings/python/README.md @@ -108,7 +108,7 @@ uv run python example/example.py ### Build API docs: ```bash -uv run pdoc fluss_python +uv run pdoc fluss ``` ### Release @@ -124,10 +124,10 @@ uv run maturin publish ## Project Structure ``` bindings/python/ -├── Cargo.toml # Rust dependency configuration -├── pyproject.toml # Python project configuration -├── README.md # This file -├── src/ # Rust source code +├── Cargo.toml # Rust dependency configuration +├── pyproject.toml # Python project configuration +├── README.md # This file +├── src/ # Rust source code │ ├── lib.rs # Main entry module │ ├── config.rs # Configuration related │ ├── connection.rs # Connection management @@ -135,11 +135,10 @@ bindings/python/ │ ├── table.rs # Table operations │ ├── types.rs # Data types │ └── error.rs # Error handling -├── python/ # Python package source -│ └── fluss_python/ -│ ├── __init__.py # Python package entry -│ ├── __init__.pyi # Stub file -│ └── py.typed # Type declarations +├── fluss/ # Python package source +│ ├── __init__.py # Python package entry +│ ├── __init__.pyi # Stub file +│ └── py.typed # Type declarations └── example/ # Example code └── example.py ``` diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py new file mode 100644 index 0000000..0523f94 --- /dev/null +++ b/bindings/python/example/example.py @@ -0,0 +1,188 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import asyncio +import time + +import pandas as pd +import pyarrow as pa + +import fluss + + +async def main(): + # Create connection configuration + config_spec = { + "bootstrap.servers": "127.0.0.1:9123", + # Add other configuration options as needed + "request.max.size": "10485760", # 10 MB + "writer.acks": "all", # Wait for all replicas to acknowledge + "writer.retries": "3", # Retry up to 3 times on failure + "writer.batch.size": "1000", # Batch size for writes + } + config = fluss.Config(config_spec) + + # Create connection using the static connect method + conn = await fluss.FlussConnection.connect(config) + + # Define fields for PyArrow + fields = [ + pa.field("id", pa.int32()), + pa.field("name", pa.string()), + pa.field("score", pa.float32()), + pa.field("age", pa.int32()), + ] + + # Create a PyArrow schema + schema = pa.schema(fields) + + # Create a Fluss Schema first (this is what TableDescriptor expects) + fluss_schema = fluss.Schema(schema) + + # Create a Fluss TableDescriptor + table_descriptor = fluss.TableDescriptor(fluss_schema) + + # Get the admin for Fluss + admin = await conn.get_admin() + + # Create a Fluss table + table_path = fluss.TablePath("fluss", "sample_table") + + try: + await admin.create_table(table_path, table_descriptor, True) + print(f"Created table: {table_path}") + except Exception as e: + print(f"Table creation failed: {e}") + + # Get table information via admin + try: + table_info = await admin.get_table(table_path) + print(f"Table info: {table_info}") + print(f"Table ID: {table_info.table_id}") + print(f"Schema ID: {table_info.schema_id}") + print(f"Created time: {table_info.created_time}") + print(f"Primary keys: {table_info.get_primary_keys()}") + except Exception as e: + print(f"Failed to get table info: {e}") + + # Get the table instance + table = await conn.get_table(table_path) + print(f"Got table: {table}") + + # Create a writer for the table + append_writer = await table.new_append_writer() + print(f"Created append writer: {append_writer}") + + try: + # Test 1: Write PyArrow Table + print("\n--- Testing PyArrow Table write ---") + pa_table = pa.Table.from_arrays( + [ + pa.array([1, 2, 3], type=pa.int32()), + pa.array(["Alice", "Bob", "Charlie"], type=pa.string()), + pa.array([95.2, 87.2, 92.1], type=pa.float32()), + pa.array([25, 30, 35], type=pa.int32()), + ], + schema=schema, + ) + + append_writer.write_arrow(pa_table) + print("Successfully wrote PyArrow Table") + + # Test 2: Write PyArrow RecordBatch + print("\n--- Testing PyArrow RecordBatch write ---") + pa_record_batch = pa.RecordBatch.from_arrays( + [ + pa.array([4, 5], type=pa.int32()), + pa.array(["David", "Eve"], type=pa.string()), + pa.array([88.5, 91.0], type=pa.float32()), + pa.array([28, 32], type=pa.int32()), + ], + schema=schema, + ) + + append_writer.write_arrow_batch(pa_record_batch) + print("Successfully wrote PyArrow RecordBatch") + + # Test 3: Write Pandas DataFrame + print("\n--- Testing Pandas DataFrame write ---") + df = pd.DataFrame( + { + "id": [6, 7], + "name": ["Frank", "Grace"], + "score": [89.3, 94.7], + "age": [29, 27], + } + ) + + append_writer.write_pandas(df) + print("Successfully wrote Pandas DataFrame") + + # Flush all pending data + print("\n--- Flushing data ---") + append_writer.flush() + print("Successfully flushed data") + + except Exception as e: + print(f"Error during writing: {e}") + + # Now scan the table to verify data was written + print("\n--- Scanning table ---") + try: + log_scanner = await table.new_log_scanner() + print(f"Created log scanner: {log_scanner}") + + # Subscribe to scan from earliest to latest + # start_timestamp=None (earliest), end_timestamp=None (latest) + log_scanner.subscribe(None, None) + + print("Scanning results using to_arrow():") + + # Try to get as PyArrow Table + try: + pa_table_result = log_scanner.to_arrow() + print(f"\nAs PyArrow Table: {pa_table_result}") + except Exception as e: + print(f"Could not convert to PyArrow: {e}") + + # Let's subscribe from the beginning again. + # Reset subscription + log_scanner.subscribe(None, None) + + # Try to get as Pandas DataFrame + try: + df_result = log_scanner.to_pandas() + print(f"\nAs Pandas DataFrame:\n{df_result}") + except Exception as e: + print(f"Could not convert to Pandas: {e}") + + # TODO: support to_arrow_batch_reader() + # which is reserved for streaming use cases + + # TODO: support to_duckdb() + + except Exception as e: + print(f"Error during scanning: {e}") + + # Close connection + conn.close() + print("\nConnection closed") + + +if __name__ == "__main__": + # Run the async main function + asyncio.run(main()) diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi new file mode 100644 index 0000000..4565242 --- /dev/null +++ b/bindings/python/fluss/__init__.pyi @@ -0,0 +1,171 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Type stubs for Fluss Python bindings.""" + +from types import TracebackType +from typing import Dict, List, Optional, Tuple + +import pandas as pd +import pyarrow as pa + +class Config: + def __init__(self, properties: Optional[Dict[str, str]] = None) -> None: ... + @property + def bootstrap_server(self) -> Optional[str]: ... + @bootstrap_server.setter + def bootstrap_server(self, server: str) -> None: ... + @property + def request_max_size(self) -> int: ... + @request_max_size.setter + def request_max_size(self, size: int) -> None: ... + @property + def writer_batch_size(self) -> int: ... + @writer_batch_size.setter + def writer_batch_size(self, size: int) -> None: ... + +class FlussConnection: + @staticmethod + async def connect(config: Config) -> FlussConnection: ... + async def get_admin(self) -> FlussAdmin: ... + async def get_table(self, table_path: TablePath) -> FlussTable: ... + def close(self) -> None: ... + def __enter__(self) -> FlussConnection: ... + def __exit__(self, exc_type: Optional[type], exc_value: Optional[BaseException], traceback: Optional[TracebackType]) -> bool: ... + def __repr__(self) -> str: ... + +class FlussAdmin: + async def create_table( + self, + table_path: TablePath, + table_descriptor: TableDescriptor, + ignore_if_exists: Optional[bool] = False, + ) -> None: ... + async def get_table(self, table_path: TablePath) -> TableInfo: ... + async def get_latest_lake_snapshot(self, table_path: TablePath) -> LakeSnapshot: ... + def __repr__(self) -> str: ... + +class FlussTable: + async def new_append_writer(self) -> AppendWriter: ... + async def new_log_scanner(self) -> LogScanner: ... + def get_table_info(self) -> TableInfo: ... + def get_table_path(self) -> TablePath: ... + def has_primary_key(self) -> bool: ... + def __repr__(self) -> str: ... + +class AppendWriter: + def write_arrow(self, table: pa.Table) -> None: ... + def write_arrow_batch(self, batch: pa.RecordBatch) -> None: ... + def write_pandas(self, df: pd.DataFrame) -> None: ... + def flush(self) -> None: ... + def __repr__(self) -> str: ... + +class LogScanner: + def subscribe( + self, start_timestamp: Optional[int], end_timestamp: Optional[int] + ) -> None: ... + def to_pandas(self) -> pd.DataFrame: ... + def to_arrow(self) -> pa.Table: ... + def __repr__(self) -> str: ... + +class Schema: + def __init__(self, schema: pa.Schema, primary_keys: Optional[List[str]] = None) -> None: ... + def get_column_names(self) -> List[str]: ... + def get_column_types(self) -> List[str]: ... + def get_columns(self) -> List[Tuple[str,str]]: ... + def __str__(self) -> str: ... + +class TableDescriptor: + def __init__(self, schema: Schema, **kwargs: str) -> None: ... + def get_schema(self) -> Schema: ... + +class TablePath: + def __init__(self, database: str, table: str) -> None: ... + @property + def database_name(self) -> str: ... + @property + def table_name(self) -> str: ... + def table_path_str(self) -> str: ... + def __str__(self) -> str: ... + def __repr__(self) -> str: ... + def __hash__(self) -> int: ... + def __eq__(self, other: object) -> bool: ... + +class TableInfo: + @property + def table_id(self) -> int: ... + @property + def schema_id(self) -> int: ... + @property + def created_time(self) -> int: ... + @property + def modified_time(self) -> int: ... + @property + def table_path(self) -> TablePath: ... + @property + def num_buckets(self) -> int: ... + @property + def comment(self) -> Optional[str]: ... + def get_primary_keys(self) -> List[str]: ... + def get_bucket_keys(self) -> List[str]: ... + def get_partition_keys(self) -> List[str]: ... + def has_primary_key(self) -> bool: ... + def is_partitioned(self) -> bool: ... + def get_properties(self) -> Dict[str, str]: ... + def get_custom_properties(self) -> Dict[str, str]: ... + def get_schema(self) -> Schema: ... + def get_column_names(self) -> List[str]: ... + def get_column_count(self) -> int: ... + +class FlussError(Exception): + message: str + def __init__(self, message: str) -> None: ... + def __str__(self) -> str: ... + +class LakeSnapshot: + def __init__(self, snapshot_id: int) -> None: ... + @property + def snapshot_id(self) -> int: ... + @property + def table_buckets_offset(self) -> Dict[TableBucket, int]: ... + def get_bucket_offset(self, bucket: TableBucket) -> Optional[int]: ... + def get_table_buckets(self) -> List[TableBucket]: ... + def __str__(self) -> str: ... + def __repr__(self) -> str: ... + +class TableBucket: + def __init__(self, table_id: int, bucket: int) -> None: ... + @staticmethod + def with_partition( + table_id: int, partition_id: int, bucket: int + ) -> TableBucket: ... + @property + def table_id(self) -> int: ... + @property + def bucket_id(self) -> int: ... + @property + def partition_id(self) -> Optional[int]: ... + def __hash__(self) -> int: ... + def __eq__(self, other: object) -> bool: ... + def __str__(self) -> str: ... + def __repr__(self) -> str: ... + +class TableDistribution: + def bucket_keys(self) -> List[str]: ... + def bucket_count(self) -> Optional[int]: ... + +__version__: str diff --git a/bindings/python/fluss/py.typed b/bindings/python/fluss/py.typed new file mode 100644 index 0000000..e69de29