Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ header:
- 'LICENSE'
- 'NOTICE'
- 'DISCLAIMER'
- 'bindings/python/fluss/py.typed'
comment: on-failure
19 changes: 9 additions & 10 deletions bindings/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ uv run python example/example.py
### Build API docs:

```bash
uv run pdoc fluss_python
uv run pdoc fluss
```

### Release
Expand All @@ -124,22 +124,21 @@ uv run maturin publish
## Project Structure
```
bindings/python/
├── Cargo.toml # Rust dependency configuration
├── pyproject.toml # Python project configuration
├── README.md # This file
├── src/ # Rust source code
├── Cargo.toml # Rust dependency configuration
├── pyproject.toml # Python project configuration
├── README.md # This file
├── src/ # Rust source code
│ ├── lib.rs # Main entry module
│ ├── config.rs # Configuration related
│ ├── connection.rs # Connection management
│ ├── admin.rs # Admin operations
│ ├── table.rs # Table operations
│ ├── types.rs # Data types
│ └── error.rs # Error handling
├── python/ # Python package source
│ └── fluss_python/
│ ├── __init__.py # Python package entry
│ ├── __init__.pyi # Stub file
│ └── py.typed # Type declarations
├── fluss/ # Python package source
│ ├── __init__.py # Python package entry
│ ├── __init__.pyi # Stub file
│ └── py.typed # Type declarations
└── example/ # Example code
└── example.py
```
Expand Down
188 changes: 188 additions & 0 deletions bindings/python/example/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import asyncio
import time

import pandas as pd
import pyarrow as pa

import fluss


async def main():
# Create connection configuration
config_spec = {
"bootstrap.servers": "127.0.0.1:9123",
# Add other configuration options as needed
"request.max.size": "10485760", # 10 MB
"writer.acks": "all", # Wait for all replicas to acknowledge
"writer.retries": "3", # Retry up to 3 times on failure
"writer.batch.size": "1000", # Batch size for writes
}
config = fluss.Config(config_spec)

# Create connection using the static connect method
conn = await fluss.FlussConnection.connect(config)

# Define fields for PyArrow
fields = [
pa.field("id", pa.int32()),
pa.field("name", pa.string()),
pa.field("score", pa.float32()),
pa.field("age", pa.int32()),
]

# Create a PyArrow schema
schema = pa.schema(fields)

# Create a Fluss Schema first (this is what TableDescriptor expects)
fluss_schema = fluss.Schema(schema)

# Create a Fluss TableDescriptor
table_descriptor = fluss.TableDescriptor(fluss_schema)

# Get the admin for Fluss
admin = await conn.get_admin()

# Create a Fluss table
table_path = fluss.TablePath("fluss", "sample_table")

try:
await admin.create_table(table_path, table_descriptor, True)
print(f"Created table: {table_path}")
except Exception as e:
print(f"Table creation failed: {e}")

# Get table information via admin
try:
table_info = await admin.get_table(table_path)
print(f"Table info: {table_info}")
print(f"Table ID: {table_info.table_id}")
print(f"Schema ID: {table_info.schema_id}")
print(f"Created time: {table_info.created_time}")
print(f"Primary keys: {table_info.get_primary_keys()}")
except Exception as e:
print(f"Failed to get table info: {e}")

# Get the table instance
table = await conn.get_table(table_path)
print(f"Got table: {table}")

# Create a writer for the table
append_writer = await table.new_append_writer()
print(f"Created append writer: {append_writer}")

try:
# Test 1: Write PyArrow Table
print("\n--- Testing PyArrow Table write ---")
pa_table = pa.Table.from_arrays(
[
pa.array([1, 2, 3], type=pa.int32()),
pa.array(["Alice", "Bob", "Charlie"], type=pa.string()),
pa.array([95.2, 87.2, 92.1], type=pa.float32()),
pa.array([25, 30, 35], type=pa.int32()),
],
schema=schema,
)

append_writer.write_arrow(pa_table)
print("Successfully wrote PyArrow Table")

# Test 2: Write PyArrow RecordBatch
print("\n--- Testing PyArrow RecordBatch write ---")
pa_record_batch = pa.RecordBatch.from_arrays(
[
pa.array([4, 5], type=pa.int32()),
pa.array(["David", "Eve"], type=pa.string()),
pa.array([88.5, 91.0], type=pa.float32()),
pa.array([28, 32], type=pa.int32()),
],
schema=schema,
)

append_writer.write_arrow_batch(pa_record_batch)
print("Successfully wrote PyArrow RecordBatch")

# Test 3: Write Pandas DataFrame
print("\n--- Testing Pandas DataFrame write ---")
df = pd.DataFrame(
{
"id": [6, 7],
"name": ["Frank", "Grace"],
"score": [89.3, 94.7],
"age": [29, 27],
}
)

append_writer.write_pandas(df)
print("Successfully wrote Pandas DataFrame")

# Flush all pending data
print("\n--- Flushing data ---")
append_writer.flush()
print("Successfully flushed data")

except Exception as e:
print(f"Error during writing: {e}")

# Now scan the table to verify data was written
print("\n--- Scanning table ---")
try:
log_scanner = await table.new_log_scanner()
print(f"Created log scanner: {log_scanner}")

# Subscribe to scan from earliest to latest
# start_timestamp=None (earliest), end_timestamp=None (latest)
log_scanner.subscribe(None, None)

print("Scanning results using to_arrow():")

# Try to get as PyArrow Table
try:
pa_table_result = log_scanner.to_arrow()
print(f"\nAs PyArrow Table: {pa_table_result}")
except Exception as e:
print(f"Could not convert to PyArrow: {e}")

# Let's subscribe from the beginning again.
# Reset subscription
log_scanner.subscribe(None, None)

# Try to get as Pandas DataFrame
try:
df_result = log_scanner.to_pandas()
print(f"\nAs Pandas DataFrame:\n{df_result}")
except Exception as e:
print(f"Could not convert to Pandas: {e}")

# TODO: support to_arrow_batch_reader()
# which is reserved for streaming use cases

# TODO: support to_duckdb()

except Exception as e:
print(f"Error during scanning: {e}")

# Close connection
conn.close()
print("\nConnection closed")


if __name__ == "__main__":
# Run the async main function
asyncio.run(main())
Loading
Loading