π Enterprise-grade distributed transactions made simple
FireflyTX implements the "Python defines, Java executes" architecture for distributed transactions. Write your business logic in Python with simple decorators, while the battle-tested Java lib-transactional-engine handles all the complex orchestration, reliability, and execution.
from fireflytx import SagaEngine
from fireflytx.decorators.saga import saga, saga_step
@saga("payment-processing")
class PaymentSaga:
@saga_step("validate", retry=3, timeout_ms=5000)
async def validate_payment(self, amount: float):
return {"validated": True, "amount": amount}
@saga_step("charge", depends_on=["validate"])
async def charge_payment(self, amount: float):
return {"charged": True, "transaction_id": "tx_123"}
# Execute with enterprise reliability
engine = SagaEngine()
await engine.initialize()
result = await engine.execute(PaymentSaga, {"amount": 100.0})
print(f"β
Payment completed: {result.is_success}")
await engine.shutdown()| You Write (Python) | Engine Handles (Java) |
|---|---|
@saga business logic |
Transaction orchestration |
@saga_step methods |
Retry & timeout logic |
| Event configuration | Kafka/Redis integration |
| Compensation logic | Automatic rollbacks |
- π₯ Zero Boilerplate: Write business logic, not infrastructure code
- ποΈ Production Ready: Battle-tested Java lib-transactional-engine under the hood
- π Built-in Observability: Events, logging, and distributed tracing
- π Automatic Compensation: Rollbacks handled transparently
- β‘ Async Native: Full asyncio support for modern Python apps
- π‘οΈ Type Safe: Complete type hints and validation
β οΈ Important: FireflyTX is not published to PyPI. You must install from source or use the install script.
One-line install script:
curl -fsSL https://raw.githubusercontent.com/firefly-oss/python-transactional-engine/main/install.sh | bashThis automated script will:
- β Check system requirements (Python 3.9+, Java 11+)
- β Clone the repository from GitHub
- β Install FireflyTX and all dependencies
- β Download/build lib-transactional-engine JAR
- β Verify the installation
- β Show you next steps
If you prefer manual control:
# 1. Clone the repository
git clone https://github.com/firefly-oss/python-transactional-engine.git
cd python-transactional-engine
# 2. Install FireflyTX
pip install .
# For development (editable mode):
pip install -e .
# 3. Verify installation
python -c "import fireflytx; print(f'β
FireflyTX {fireflytx.__version__} installed')"- Python: 3.9 or higher
- Java: JDK 11 or higher (for lib-transactional-engine)
- Git: For cloning the repository
- OS: Linux, macOS, or Windows (WSL recommended)
Note: PyPI package is not yet published. Install from source using the methods above.
After installation, create your first distributed transaction:
# my_first_saga.py
import asyncio
from fireflytx import SagaEngine, saga, saga_step
@saga("hello-world")
class HelloWorldSaga:
@saga_step("greet", retry=3, timeout_ms=5000)
async def greet(self, name: str) -> dict:
print(f"π Hello, {name}!")
return {"message": f"Hello, {name}!"}
async def main():
# Create and initialize the engine
engine = SagaEngine()
await engine.initialize()
# Execute the SAGA
result = await engine.execute(
HelloWorldSaga,
{"name": "World"}
)
print(f"β
Success: {result.is_success}")
# Cleanup
await engine.shutdown()
if __name__ == "__main__":
asyncio.run(main())Run it:
python my_first_saga.pyOutput:
π Hello, World!
β
Success: True
FireflyTX uses a simple configuration hierarchy:
- Engine-level: Global settings for all SAGAs and TCC transactions
- SAGA/TCC-level: Settings for a specific workflow
- Step/Participant-level: Settings for individual operations
from fireflytx import SagaEngine, TccEngine
from fireflytx.config import ConfigurationManager
# β
Method 1: Simplest setup - uses sensible defaults
saga_engine = SagaEngine()
await saga_engine.initialize()
tcc_engine = TccEngine()
tcc_engine.start()
# β
Method 2: Explicit default configuration
config = ConfigurationManager.get_default_config()
saga_engine = SagaEngine(config=config)
await saga_engine.initialize()
# β
Method 3: Production configuration helper
config = ConfigurationManager.get_production_config(
persistence_type="redis",
persistence_connection_string="redis://localhost:6379",
heap_size="2g",
max_concurrent_executions=200
)
saga_engine = SagaEngine(config=config)
await saga_engine.initialize()
# β
Method 4: High-performance configuration helper
config = ConfigurationManager.get_high_performance_config(
persistence_connection_string="redis://redis-cluster:6379"
)
saga_engine = SagaEngine(config=config)
await saga_engine.initialize()Default Configuration Values:
- Max concurrent executions: 100
- Default timeout: 30 seconds
- Thread pool size: 50
- JVM heap size: 256MB
- Persistence: In-memory (development only)
- Retry attempts: 3 with exponential backoff
Production Configuration Values:
- Max concurrent executions: 200
- Default timeout: 60 seconds
- Thread pool size: 100
- JVM heap size: 2GB
- Persistence: Redis with auto-checkpointing
- Retry attempts: 5 with exponential backoff
- GC: G1GC with 200ms max pause
High-Performance Configuration Values:
- Max concurrent executions: 500
- Default timeout: 30 seconds
- Thread pool size: 200
- JVM heap size: 4GB
- Persistence: Redis with high connection pool
- Retry attempts: 3 with exponential backoff
- GC: G1GC with 100ms max pause
For production use, configure engines with EngineConfig:
from fireflytx import SagaEngine, TccEngine, EngineConfig, PersistenceConfig, JvmConfig
# Create comprehensive configuration
config = EngineConfig(
# Execution settings
max_concurrent_executions=200,
default_timeout_ms=60000, # 60 seconds
thread_pool_size=100,
enable_monitoring=True,
# Persistence configuration
persistence=PersistenceConfig(
type="redis",
connection_string="redis://localhost:6379",
auto_checkpoint=True,
checkpoint_interval_seconds=60
),
# JVM configuration
jvm=JvmConfig(
heap_size="1g",
gc_algorithm="G1GC",
max_gc_pause_ms=200,
additional_jvm_args=[
"-XX:+UseStringDeduplication",
"-XX:+OptimizeStringConcat"
]
)
)
# Initialize engines with configuration
saga_engine = SagaEngine(config=config)
await saga_engine.initialize()
tcc_engine = TccEngine(config=config)
tcc_engine.start()from fireflytx import SagaEngine, saga, saga_step, compensation_step
# 1. Engine-level configuration (applies to all SAGAs)
engine = SagaEngine(
compensation_policy="STRICT_SEQUENTIAL",
auto_optimization_enabled=True,
config=config # Optional EngineConfig object
)
# 2. SAGA-level configuration (applies to all steps in this SAGA)
@saga("order-processing", layer_concurrency=10)
class OrderSaga:
# 3. Step-level configuration (applies to this step only)
@saga_step(
step_id="validate",
retry=5, # Override engine default
timeout_ms=5000, # Step-specific timeout
compensate="undo_validate" # Compensation method
)
async def validate_order(self, order: dict) -> dict:
# Your business logic here
return {"validated": True}
@compensation_step("validate")
async def undo_validate(self, result: dict) -> None:
# Compensation logic
passfrom fireflytx import EngineConfig
# Load from environment variables
config = EngineConfig.from_env(prefix="FIREFLYTX_")
# Or from YAML file
config = EngineConfig.from_file("config/production.yaml")
# Use with engines
engine = SagaEngine(config=config)Environment Variables:
export FIREFLYTX_MAX_CONCURRENT_EXECUTIONS=200
export FIREFLYTX_DEFAULT_TIMEOUT_MS=60000
export FIREFLYTX_THREAD_POOL_SIZE=100
export FIREFLYTX_PERSISTENCE_TYPE=redis
export FIREFLYTX_PERSISTENCE_CONNECTION_STRING=redis://localhost:6379
export FIREFLYTX_JVM_HEAP_SIZE=1gSee Configuration Reference for complete details.
For contributors and developers:
# Clone and install in development mode
git clone https://github.com/firefly-oss/python-transactional-engine.git
cd python-transactional-engine
pip install -e .
# Build the Java bridge
python -m fireflytx.cli build
# Verify installation
python -m fireflytx.cli statusOr use the install script with --dev flag:
curl -fsSL https://raw.githubusercontent.com/firefly-oss/python-transactional-engine/main/install.sh | bash -s -- --dev# Custom Python version
curl -fsSL https://raw.githubusercontent.com/firefly-oss/python-transactional-engine/main/install.sh | bash -s -- --python python3.11
# Quiet installation
curl -fsSL https://raw.githubusercontent.com/firefly-oss/python-transactional-engine/main/install.sh | bash -s -- --quiet
# Force reinstall
curl -fsSL https://raw.githubusercontent.com/firefly-oss/python-transactional-engine/main/install.sh | bash -s -- --force
# Development mode with custom Python
curl -fsSL https://raw.githubusercontent.com/firefly-oss/python-transactional-engine/main/install.sh | bash -s -- --dev --python python3.11
# Download and run locally
wget https://raw.githubusercontent.com/firefly-oss/python-transactional-engine/main/install.sh
chmod +x install.sh
./install.sh --help| Component | Version | Purpose | Installation |
|---|---|---|---|
| Python | 3.9+ | Your business logic | python.org |
| Java | 11+ | Transaction engine | openjdk.org |
| Redis | 5.0+ (optional) | Distributed state | brew install redis |
| Kafka | 2.8+ (optional) | Event streaming | kafka.apache.org |
import fireflytx
from fireflytx import SagaEngine, saga, saga_step
print("β
FireflyTX installed successfully!")
print(f"π₯ Version: {fireflytx.__version__}")π Real Java Integration: FireflyTX uses the actual lib-transactional-engine Java library, not mocks or simulations.
After installation, build the Java bridge (happens automatically on first use, or manually):
# Check status
python -m fireflytx.cli status
# Build manually (optional)
python -m fireflytx.cli build
# Verify Java integration
python -m fireflytx.cli versionThe Java bridge is built once and cached in .cache/fireflytx/. The CLI will show:
- β Java version and location
- β Build environment status
- β JAR dependencies status
- β Integration readiness
We use Task for project automation:
# Install Task (macOS)
brew install go-task/tap/go-task
# Or download from https://taskfile.dev
# See all available tasks
task --list
# Run tests
task test-unit # Fast unit tests
task test-integration # Integration tests
task all-tests # All tests
# Development workflow
task clean # Clean build artifacts
task install-dev # Install dev dependencies
task format # Format code
task lint # Run linting
task pre-commit # Pre-commit checksFireflyTX follows a unique architecture:
| You Write (Python) | Engine Handles (Java) |
|---|---|
| Business logic with decorators | Transaction orchestration |
| Step definitions and dependencies | Retry logic and timeouts |
| Compensation methods | Automatic rollbacks |
| Configuration and events | State persistence |
| Simple async functions | Distributed execution |
Key Concept: You focus on WHAT your business logic does. The Java engine handles HOW it executes reliably.
FireflyTX configuration happens in three layers:
When you create an engine, Python sends configuration to Java:
from fireflytx import SagaEngine
# Python defines the configuration
engine = SagaEngine(
compensation_policy="STRICT_SEQUENTIAL", # β Python defines
auto_optimization_enabled=True, # β Python defines
persistence_enabled=False # β Python defines
)
# β Configuration sent to Java engine via IPC
# β Java engine starts with these settingsWhat happens:
- Python creates configuration object
- Configuration serialized to JSON
- Sent to Java subprocess via IPC
- Java engine initializes with settings
- Python receives confirmation
When you define a SAGA/TCC, Python sends the structure to Java:
from fireflytx import saga, saga_step
@saga("payment-processing") # β Python defines SAGA name
class PaymentSaga:
@saga_step("validate", retry=3) # β Python defines step config
async def validate(self, data):
# Your business logic here
return {"validated": True}What happens:
- Python decorators capture SAGA structure
- Step metadata (name, retry, timeout, dependencies) extracted
- SAGA definition sent to Java engine
- Java engine registers the SAGA workflow
- Java engine ready to orchestrate execution
When you execute a SAGA, Java orchestrates but Python runs the logic:
# Execute the SAGA
result = await engine.execute_by_class(PaymentSaga, {"amount": 100})What happens:
- Python β Java: "Execute PaymentSaga with this data"
- Java β Determines execution order based on dependencies
- Java β Python: "Execute step 'validate' with this data" (HTTP callback)
- Python β Runs
validate()method with your business logic - Python β Java: "Step completed, here's the result"
- Java β Applies retry logic if needed, manages state
- Java β Python: "Execute next step..." (repeat)
- Java β Python: "SAGA completed successfully" (or failed with compensations)
# Good: Engine-level configuration
engine = SagaEngine(
compensation_policy="STRICT_SEQUENTIAL", # All SAGAs use this
auto_optimization_enabled=True, # Enable optimizations
persistence_enabled=False # Disable persistence for dev
)# Good: SAGA-specific configuration
@saga("critical-payment",
timeout_ms=30000, # This SAGA's timeout
max_retries=5) # This SAGA's retry limit
class CriticalPaymentSaga:
pass# Good: Step-specific configuration
@saga_step("validate",
retry=3, # This step's retry count
timeout_ms=5000, # This step's timeout
critical=True) # This step's criticality
async def validate(self, data):
pass# Bad: Don't configure engine settings in SAGA
@saga("payment")
class PaymentSaga:
@saga_step("validate")
async def validate(self, data):
# β Don't create engine here
engine = SagaEngine() # Wrong!βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β YOUR PYTHON CODE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. Define Configuration β
β engine = SagaEngine(...) β
β β β
β 2. Define SAGA Structure β
β @saga("name") β
β class MySaga: ... β
β β β
β 3. Execute SAGA β
β result = await engine.execute_by_class(MySaga, data) β
β β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β JSON IPC
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β JAVA ENGINE (lib-transactional-engine) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β 1. Receive Configuration β Initialize Engine β
β 2. Receive SAGA Definition β Register Workflow β
β 3. Receive Execute Request β Orchestrate Steps β
β ββ Manage dependencies β
β ββ Apply retry logic β
β ββ Handle timeouts β
β ββ Persist state β
β ββ Trigger compensations on failure β
β β β
β 4. Call Python for Business Logic (HTTP callbacks) β
β β
ββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββ
β HTTP Callbacks
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β YOUR BUSINESS LOGIC (Python Methods) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β async def validate(self, data): β
β # Your code runs here β
β return result β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
from fireflytx import SagaEngine, saga, saga_step
# Step 1: Configure the engine (Python β Java)
engine = SagaEngine(
compensation_policy="STRICT_SEQUENTIAL",
auto_optimization_enabled=True
)
# Step 2: Define your SAGA (Python β Java)
@saga("hello-world")
class HelloWorldSaga:
@saga_step("greet")
async def greet(self, name: str):
return {"message": f"Hello, {name}!"}
# Step 3: Execute (Java orchestrates, Python executes)
result = await engine.execute_by_class(
HelloWorldSaga,
{"name": "World"}
)
print(result.output) # {"message": "Hello, World!"}What happened:
- β Python created engine config β Java initialized engine
- β Python defined SAGA structure β Java registered workflow
- β Python requested execution β Java orchestrated
- β
Java called Python's
greet()method β Python executed business logic - β Python returned result β Java completed SAGA
- β Java returned final result β Python received output
FireflyTX includes a powerful interactive shell for development, testing, and debugging - just like PySpark!
# Method 1: Using CLI
python -m fireflytx.cli shell
# Method 2: Using Taskfile
task shell
# Method 3: Direct module
python -m fireflytx.shellThe FireflyTX shell provides:
- β
Custom Prompt:
fireflytx>>>prompt like PySpark - β
Top-level Await: Use
awaitdirectly (with IPython) - β Pre-loaded Context: All engines, decorators, and utilities ready to use
- β Tab Completion: Auto-complete for all FireflyTX objects
- β
Command History: Persistent history saved to
~/.fireflytx_history - β Syntax Highlighting: Beautiful colored output (with IPython)
- β Developer Tools: Inspect, benchmark, and debug SAGAs interactively
$ python -m fireflytx.cli shellYou'll see:
_____.__ _____.__
_/ ____\__|______ _____/ ____\ | ___.__.
\ __\| \_ __ \_/ __ \ __\| |< | |
| | | || | \/\ ___/| | | |_\___ |
|__| |__||__| \___ >__| |____/ ____|
\/ \/
:: fireflytx :: (v2025-08)
π₯ FireflyTX Interactive Shell v1.0.0
Enterprise-grade distributed transactions for Python
"Python defines, Java executes"
Python 3.9.6 | IPython 8.x.x
Session started: 2025-10-19 14:30:00
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Quick Start:
help() - Show all available commands
init_engines() - Initialize SAGA and TCC engines (async)
status() - Show engine and Java bridge status
examples() - Show example code snippets
fireflytx>>>
fireflytx>>> await init_engines()
π Initializing FireflyTX engines...
π¦ Creating SAGA engine...
β
SAGA engine initialized
π¦ Creating TCC engine...
β
TCC engine initialized
β
Java bridge connected
β¨ All engines initialized successfully!
Use 'saga_engine' and 'tcc_engine' to interact with enginesfireflytx>>> status()
π§ FireflyTX Status
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Version: 1.0.0
SAGA Engine: β
Initialized
TCC Engine: β
Initialized
Java Bridge: β
Connected
Java Process: β
Running
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββfireflytx>>> @saga("payment-processing")
... ... class PaymentSaga:
... ... @saga_step("validate", retry=3)
... ... async def validate_payment(self, amount: float):
... ... print(f"Validating payment: ${amount}")
... ... return {"validated": True, "amount": amount}
... ...
... ... @saga_step("charge", depends_on=["validate"])
... ... async def charge_payment(self, amount: float):
... ... print(f"Charging payment: ${amount}")
... ... return {"charged": True, "transaction_id": "tx_123"}fireflytx>>> result = await saga_engine.execute(
... ... PaymentSaga,
... ... {"amount": 100.0}
... ... )
Validating payment: $100.0
Charging payment: $100.0
fireflytx>>> print(f"Success: {result.is_success}")
Success: Truefireflytx>>> inspect(PaymentSaga)
π Inspecting SAGA: PaymentSaga
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
SAGA Name: payment-processing
Steps: 2
Steps:
β’ validate
Retry: 3
β’ charge
Depends on: ['validate']
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββfireflytx>>> await benchmark(PaymentSaga, {"amount": 50.0}, iterations=100)
β±οΈ Benchmarking PaymentSaga
Iterations: 100
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Progress: 10/100
Progress: 20/100
...
Progress: 100/100
π Results:
Total: 100
Successes: 100 (100.0%)
Failures: 0 (0.0%)
β±οΈ Timing:
Average: 45.23ms
Min: 38.12ms
Max: 67.89ms
Throughput: 22.11 ops/sec
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββfireflytx>>> examples()
π― FireflyTX Code Examples
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1οΈβ£ INITIALIZE ENGINES
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
await init_engines()
2οΈβ£ DEFINE A SIMPLE SAGA
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@saga("payment-processing")
class PaymentSaga:
@saga_step("validate", retry=3)
async def validate_payment(self, amount: float):
print(f"Validating payment: ${amount}")
return {"validated": True, "amount": amount}
...fireflytx>>> help()
π₯ FireflyTX Shell Help
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π¦ PRE-LOADED OBJECTS:
saga_engine - SAGA engine instance
tcc_engine - TCC engine instance
java_bridge - Java subprocess bridge
π¨ DECORATORS:
@saga - Define a SAGA workflow
@saga_step - Define a SAGA step
@compensation_step - Define compensation logic
@tcc - Define a TCC transaction
@tcc_participant - Define a TCC participant
π οΈ HELPER FUNCTIONS:
init_engines() - Initialize engines (async)
shutdown_engines() - Shutdown engines (async)
reset() - Reset engines (shutdown + reinit) (async)
status() - Show current status
java_info() - Show Java subprocess info
config() - Show engine configuration
help() - Show this help
examples() - Show code examples
clear() - Clear screen and show banner
π§ DEVELOPER TOOLS:
inspect(SagaClass) - Inspect SAGA structure
benchmark(Saga, {}) - Benchmark SAGA execution (async)
logs(lines=50) - Show recent Java logs
...fireflytx>>> await shutdown_engines()
π Shutting down FireflyTX engines...
β
SAGA engine shutdown
β
TCC engine shutdown
β¨ All engines shutdown successfully!
fireflytx>>> exit()
π Goodbye! FireflyTX shell exiting...# Install IPython for enhanced features
pip install ipython
# Then launch shell - you'll get:
# β’ Top-level await (no need for run())
# β’ Syntax highlighting
# β’ Better tab completion
# β’ Custom fireflytx>>> promptIf IPython is not installed, use the run() helper:
fireflytx>>> run(init_engines()) # Instead of await
fireflytx>>> run(saga_engine.execute_by_class(MySaga, {}))# History is saved to ~/.fireflytx_history
# Use β/β arrows to navigate previous commands
# Search history with Ctrl+R (in IPython)fireflytx>>> saga_<TAB>
saga_engine saga_step saga
fireflytx>>> saga_engine.<TAB>
execute_by_class initialize shutdown ...fireflytx>>> clear()
# Clears screen and shows banner againfireflytx>>> await reset()
# Shuts down and reinitializes all engines# Start shell
$ python -m fireflytx.cli shell
# Initialize
fireflytx>>> await init_engines()
# Define and test SAGA
fireflytx>>> @saga("test")
... ... class TestSaga:
... ... @saga_step("step1")
... ... async def step1(self, data):
... ... return {"result": "ok"}
# Execute
fireflytx>>> result = await saga_engine.execute(TestSaga, {})
fireflytx>>> print(result.is_success)
True
# Done
fireflytx>>> await shutdown_engines()
fireflytx>>> exit()# Start with engines initialized
fireflytx>>> await init_engines()
# Check Java bridge
fireflytx>>> java_info()
# Inspect your SAGA
fireflytx>>> inspect(MySaga)
# Run with debugging
fireflytx>>> result = await saga_engine.execute(MySaga, {"debug": True})
# Check logs
fireflytx>>> logs(100)# Initialize
fireflytx>>> await init_engines()
# Benchmark different configurations
fireflytx>>> await benchmark(FastSaga, {}, iterations=1000)
fireflytx>>> await benchmark(SlowSaga, {}, iterations=100)
# Compare results| Command | Description | Example |
|---|---|---|
help() |
Show all commands | help() |
init_engines() |
Initialize engines | await init_engines() |
shutdown_engines() |
Shutdown engines | await shutdown_engines() |
reset() |
Reset engines | await reset() |
status() |
Show status | status() |
java_info() |
Java subprocess info | java_info() |
config() |
Show configuration | config() |
examples() |
Show examples | examples() |
inspect(Saga) |
Inspect SAGA | inspect(MySaga) |
benchmark(Saga, {}) |
Benchmark SAGA | await benchmark(MySaga, {}, 100) |
logs(n) |
Show logs | logs(50) |
clear() |
Clear screen | clear() |
exit() |
Exit shell | exit() or Ctrl+D |
import asyncio
from fireflytx import (
SagaEngine, saga, saga_step, compensation_step, step_events,
KafkaStepEventPublisher, RedisSagaPersistenceProvider
)
@saga("e-commerce-order")
class OrderProcessingSaga:
"""Complete order processing with automatic rollbacks."""
@step_events(
topic="order-events",
include_timing=True,
custom_headers={"service": "orders"}
)
@saga_step("validate-order", retry=3, timeout_ms=10000)
async def validate_order(self, order_data):
# Your validation logic
if order_data.get("amount", 0) <= 0:
raise ValueError("Invalid amount")
return {"validated": True, "order_id": order_data["order_id"]}
@saga_step("reserve-inventory", depends_on=["validate-order"], compensate="release_inventory")
async def reserve_inventory(self, order_data):
# Reserve inventory
return {"reserved": True, "reservation_id": "res_123"}
@saga_step("charge-payment", depends_on=["reserve-inventory"], compensate="refund_payment")
async def charge_payment(self, order_data):
# Process payment
return {"charged": True, "transaction_id": "tx_456"}
# Automatic compensation methods
@compensation_step("reserve-inventory")
async def release_inventory(self, reservation_data):
print(f"π Rolling back inventory reservation")
@compensation_step("charge-payment")
async def refund_payment(self, payment_data):
print(f"πΈ Refunding payment")
# Production setup with Kafka + Redis
engine = SagaEngine(
event_publisher=KafkaStepEventPublisher("localhost:9092"),
persistence_provider=RedisSagaPersistenceProvider()
)
await engine.initialize()
# Execute with full enterprise reliability
order_data = {
"order_id": "ORDER-123",
"amount": 99.99,
"customer_id": "CUST-456"
}
result = await engine.execute_by_class(OrderProcessingSaga, order_data)
if result.is_success:
print(f"β
Order processed in {result.duration_ms}ms")
else:
print(f"β Order failed: {result.error}")
print(f"π Auto-compensated: {len(result.compensated_steps)} steps")- π Python: You defined business logic with simple decorators
- β Java Engine: Handled orchestration, retries, events, and persistence
- π Auto-Rollback: Failed steps automatically triggered compensations
- π Events: Published to Kafka with timing and custom headers
- πΎ State: Persisted to Redis for crash recovery
β¨ Zero infrastructure code, maximum reliability!
import asyncio
from typing import Dict, Any
from fireflytx import (
SagaEngine, saga, saga_step, step_events, compensation_step,
KafkaStepEventPublisher, RedisSagaPersistenceProvider
)
@saga("payment-processing")
class PaymentProcessingSaga:
"""Payment processing SAGA with step events - Python defines, Java executes."""
@step_events(
topic="payment-validation",
key_template="{saga_id}-validation",
include_timing=True,
custom_headers={"service": "payment", "operation": "validate"}
)
@saga_step("validate-payment", retry=3, timeout_ms=10000)
async def validate_payment(self, payment_data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate payment information - Python business logic."""
if payment_data.get("amount", 0) <= 0:
raise ValueError("Invalid payment amount")
validation_id = f"validation_{payment_data['customer_id']}"
print(f"β
Payment validated: {validation_id}")
return {"validation_id": validation_id, "status": "validated"}
@step_events(
topic="payment-processing",
key_template="{customer_id}-payment",
include_result=True,
publish_on_failure=True
)
@saga_step("process-payment", depends_on=["validate-payment"],
retry=5, compensate="refund_payment")
async def process_payment(self, payment_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process the payment - Python business logic."""
payment_id = f"pay_{payment_data['customer_id']}_{payment_data['amount']}"
print(f"π³ Payment processed: {payment_id}")
return {"payment_id": payment_id, "status": "completed", "amount": payment_data['amount']}
@compensation_step("process-payment")
async def refund_payment(self, payment_result: Dict[str, Any]) -> None:
"""Refund the payment - Python compensation logic."""
payment_id = payment_result.get("payment_id")
if payment_id:
print(f"πΈ Payment refunded: {payment_id}")
# Usage with "Python defines, Java executes" architecture
async def main():
# Create engine with event publishing and persistence (Python defines)
engine = SagaEngine(
event_publisher=KafkaStepEventPublisher(
bootstrap_servers="localhost:9092",
default_topic="saga-events"
),
persistence_provider=RedisSagaPersistenceProvider(
host="localhost",
key_prefix="saga:"
)
)
# Payment data
payment_data = {
"customer_id": "customer_123",
"amount": 99.99,
"payment_method": "credit_card"
}
# Execute SAGA - Java handles orchestration, events, persistence
result = await engine.execute_by_class(
PaymentProcessingSaga,
input_data=payment_data
)
if result.is_success:
print(f"β
SAGA completed successfully in {result.duration_ms}ms!")
print(f" - Correlation ID: {result.correlation_id}")
print(f" - Steps executed: {len(result.steps)}")
else:
print(f"β SAGA failed: {result.error}")
print(f" - Compensated steps: {result.compensated_steps}")
await engine.shutdown()
if __name__ == "__main__":
asyncio.run(main())from fireflytx import saga, saga_step, step_events, compensation_step
@saga("e-commerce-order", layer_concurrency=5)
class ECommerceOrderSaga:
"""Complete e-commerce order processing with step events."""
@step_events(
topic="order-validation",
key_template="{saga_id}-{step_id}",
include_timing=True,
include_payload=True,
custom_headers={"service": "validation", "priority": "high"},
publish_on_start=True,
publish_on_success=True,
publish_on_failure=True,
publish_on_retry=True
)
@saga_step("validate-order", retry=5, timeout_ms=10000)
async def validate_order(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
"""Validate order - Python defines business logic."""
# Your validation logic here
return {"validated": True, "order_id": order_data["order_id"]}
@step_events(topic="inventory-events")
@saga_step("reserve-inventory", depends_on=["validate-order"], compensate="release_inventory")
async def reserve_inventory(self, order_data: Dict[str, Any]) -> Dict[str, Any]:
"""Reserve inventory - Python defines business logic."""
# Your inventory logic here
return {"reserved": True, "reservation_id": "res_123"}
@compensation_step("reserve-inventory")
async def release_inventory(self, reservation_data: Dict[str, Any]) -> None:
"""Release inventory - Python defines compensation logic."""
# Your compensation logic here
passfrom fireflytx import (
SagaEngine,
KafkaStepEventPublisher,
RedisSagaPersistenceProvider
)
# Python defines all configuration, Java executes
engine = SagaEngine(
# Event publishing configuration (Python defines)
event_publisher=KafkaStepEventPublisher(
bootstrap_servers="kafka-cluster:9092",
default_topic="saga-events",
key_serializer="string",
value_serializer="json",
acks="all",
retries=5,
compression_type="snappy"
),
# Persistence configuration (Python defines)
persistence_provider=RedisSagaPersistenceProvider(
host="redis-cluster",
port=6379,
database=1,
key_prefix="saga:",
ttl_seconds=3600,
max_connections=50
),
# Engine configuration (Python defines)
compensation_policy="STRICT_SEQUENTIAL",
auto_optimization=True,
thread_pool_size=20,
max_concurrent_sagas=500
)# Register and execute - Java handles orchestration
result = await engine.execute_by_class(
ECommerceOrderSaga,
input_data={
"order_id": "ORDER-12345",
"customer_id": "CUST-456",
"items": [{"sku": "ITEM-001", "quantity": 2}],
"total_amount": 199.99
},
context={"region": "US", "currency": "USD"},
timeout_ms=30000
)
# Java execution results mapped to Python
if result.is_success:
print(f"Order processed in {result.duration_ms}ms")
print(f"Steps completed: {list(result.steps.keys())}")
else:
print(f"Order failed: {result.error}")
print(f"Compensated: {result.compensated_steps}")# No-op publisher (for testing)
from fireflytx import NoOpStepEventPublisher
publisher = NoOpStepEventPublisher()
# Kafka publisher (for production)
from fireflytx import KafkaStepEventPublisher
publisher = KafkaStepEventPublisher(
bootstrap_servers="kafka1:9092,kafka2:9092",
default_topic="saga-events",
acks="all",
retries=10
)# No-op provider (for testing)
from fireflytx import NoOpSagaPersistenceProvider
provider = NoOpSagaPersistenceProvider()
# Redis provider (for production)
from fireflytx import RedisSagaPersistenceProvider
provider = RedisSagaPersistenceProvider(
host="redis-cluster",
port=6379,
key_prefix="saga:",
ttl_seconds=86400
)
# Database provider (for enterprise)
from fireflytx import DatabaseSagaPersistenceProvider
provider = DatabaseSagaPersistenceProvider(
connection_url="jdbc:postgresql://db:5432/saga_db",
table_prefix="saga_",
schema="transactions"
)The @step_events decorator allows fine-grained control over event publishing:
@step_events(
enabled=True, # Enable/disable events for this step
topic="custom-topic", # Custom Kafka topic
key_template="{saga_id}-{step_id}", # Event key template
include_payload=True, # Include step input/output
include_context=True, # Include SAGA context
include_result=True, # Include step results
include_timing=True, # Include execution timing
custom_headers={ # Custom event headers
"service": "payment",
"version": "v2",
"priority": "high"
},
publish_on_start=True, # Publish when step starts
publish_on_success=True, # Publish when step succeeds
publish_on_failure=True, # Publish when step fails
publish_on_retry=False, # Publish on retry attempts
publish_on_compensation=True # Publish during compensation
)
@saga_step("my-step")
async def my_step(self, data):
return {"processed": True}This library follows a clear separation of concerns:
- Business Logic: Define saga steps and compensation methods
- Configuration: Specify event publishing, persistence, and engine settings
- Decorators: Configure step behavior, events, and dependencies
- Data Structures: Input/output schemas and context definitions
- Orchestration: Execute saga steps in correct order with dependencies
- Event Publishing: Publish step events to Kafka or other message brokers
- Persistence: Save/restore saga state to Redis, databases, or other stores
- Transaction Management: Handle retries, timeouts, and compensation
- Concurrency: Manage parallel execution and resource allocation
# 1. Python defines configuration
saga_config = JavaConfigGenerator().generate_config(
sagas=[MySaga],
event_publisher=KafkaStepEventPublisher(...),
persistence_provider=RedisSagaPersistenceProvider(...)
)
# 2. Configuration sent to Java engine
engine.send_configuration(saga_config)
# 3. Java engine processes SAGA orchestration
# 4. Java calls back to Python for business logic
# 5. Java publishes events and manages persistence
# 6. Results returned to Pythonengine = SagaEngine(
# Execution settings
compensation_policy="STRICT_SEQUENTIAL", # or "PARALLEL", "BEST_EFFORT"
auto_optimization_enabled=True,
thread_pool_size=50,
max_concurrent_sagas=1000,
# Observability
metrics_enabled=True,
tracing_enabled=True,
log_level="INFO",
# Security
enable_step_validation=True,
max_step_execution_time_ms=300000,
# Resource management
memory_limit_mb=2048,
cleanup_interval_ms=60000
)from fireflytx.events import StepEventPublisher
from typing import Dict, Any
class CustomWebhookEventPublisher(StepEventPublisher):
"""Custom webhook event publisher implementation."""
def __init__(self, webhook_url: str, auth_token: str):
self.webhook_url = webhook_url
self.auth_token = auth_token
async def publish(self, event):
# Custom publishing logic
print(f"Publishing to webhook: {event.event_type.value}")
def get_publisher_config(self) -> Dict[str, Any]:
return {
"type": "webhook",
"webhook_url": self.webhook_url,
"auth_token": self.auth_token,
"timeout_ms": 5000
}
# Use custom publisher
engine = SagaEngine(
event_publisher=CustomWebhookEventPublisher(
webhook_url="https://api.example.com/events",
auth_token="your-auth-token"
)
)@saga("complex-workflow", layer_concurrency=10)
class ComplexWorkflowSaga:
"""Demonstrates advanced SAGA patterns."""
# Parallel execution branches
@saga_step("process-a", timeout_ms=30000)
async def process_a(self, data): pass
@saga_step("process-b", timeout_ms=30000)
async def process_b(self, data): pass
# Convergence step
@saga_step("merge-results", depends_on=["process-a", "process-b"])
async def merge_results(self, data): pass
# Conditional execution
@saga_step("conditional-step", depends_on=["merge-results"])
async def conditional_step(self, data):
if data.get("condition"):
return {"executed": True}
else:
return {"skipped": True}
# Dynamic branching
@saga_step("dynamic-branch", depends_on=["conditional-step"])
async def dynamic_branch(self, data):
# Java engine can handle dynamic step creation
return {"next_steps": ["step-1", "step-2", "step-3"]}import logging
# Enable comprehensive logging for observability
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Monitor SAGA execution through logs
logger = logging.getLogger('fireflytx')
logger.setLevel(logging.DEBUG)
# SAGA execution automatically logs:
# - SAGA start/completion
# - Step execution progress
# - Event publishing
# - Compensation actions
# - Error detailsimport pytest
from fireflytx import SagaEngine, NoOpStepEventPublisher, NoOpSagaPersistenceProvider
@pytest.mark.asyncio
async def test_order_validation():
"""Test individual saga step."""
saga = ECommerceOrderSaga()
# Test step in isolation
result = await saga.validate_order({
"order_id": "TEST-123",
"customer_id": "CUST-456"
})
assert result["validated"] is True
assert result["order_id"] == "TEST-123"
@pytest.mark.asyncio
async def test_full_saga_execution():
"""Test complete SAGA execution."""
# Use test engine with no-op providers
engine = SagaEngine(
event_publisher=NoOpStepEventPublisher(),
persistence_provider=NoOpSagaPersistenceProvider()
)
await engine.initialize()
try:
result = await engine.execute_by_class(
ECommerceOrderSaga,
input_data={"order_id": "TEST-123"},
timeout_ms=10000
)
assert result.is_success
assert len(result.steps) > 0
finally:
await engine.shutdown()import os
from fireflytx import (
SagaEngine,
KafkaStepEventPublisher,
RedisSagaPersistenceProvider
)
from flask import Flask, request, jsonify
app = Flask(__name__)
# Initialize engine on startup
engine = SagaEngine(
event_publisher=KafkaStepEventPublisher(
bootstrap_servers=os.getenv("KAFKA_BROKERS"),
default_topic="web-saga-events"
),
persistence_provider=RedisSagaPersistenceProvider(
host=os.getenv("REDIS_HOST"),
port=int(os.getenv("REDIS_PORT", 6379))
)
)
@app.route('/orders', methods=['POST'])
async def create_order():
"""REST endpoint that triggers SAGA execution."""
order_data = request.json
try:
result = await engine.execute_by_class(
ECommerceOrderSaga,
input_data=order_data,
context={"user_id": request.headers.get("User-ID")},
timeout_ms=30000
)
if result.is_success:
return jsonify({
"status": "success",
"saga_id": result.saga_id,
"duration_ms": result.duration_ms
}), 201
else:
return jsonify({
"status": "failed",
"error": str(result.error),
"compensated_steps": result.compensated_steps
}), 400
except Exception as e:
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)apiVersion: apps/v1
kind: Deployment
metadata:
name: saga-service
spec:
replicas: 3
selector:
matchLabels:
app: saga-service
template:
metadata:
labels:
app: saga-service
spec:
containers:
- name: saga-service
image: your-registry/saga-service:latest
ports:
- containerPort: 8080
env:
- name: KAFKA_BROKERS
value: "kafka-cluster:9092"
- name: REDIS_HOST
value: "redis-cluster"
- name: JAVA_ENGINE_JAR
value: "/app/lib-transactional-engine.jar"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10import asyncio
from typing import List, Dict
from fireflytx import SagaEngine, NoOpStepEventPublisher, NoOpSagaPersistenceProvider
async def process_batch_orders(orders: List[Dict]):
"""Process multiple orders concurrently."""
engine = SagaEngine(
event_publisher=NoOpStepEventPublisher(),
persistence_provider=NoOpSagaPersistenceProvider()
)
await engine.initialize()
async def process_single_order(order_data):
return await engine.execute_by_class(
ECommerceOrderSaga,
input_data=order_data,
timeout_ms=60000
)
# Process orders concurrently
tasks = [process_single_order(order) for order in orders]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Analyze results
successful = [r for r in results if hasattr(r, 'is_success') and r.is_success]
failed = [r for r in results if isinstance(r, Exception) or (hasattr(r, 'is_success') and not r.is_success)]
print(f"Processed {len(successful)} orders successfully, {len(failed)} failed")
await engine.shutdown()
return results
# Usage
# orders = load_orders_from_database()
# results = await process_batch_orders(orders)# Enable debug logging
import logging
logging.getLogger('fireflytx').setLevel(logging.DEBUG)
# Initialize engine and check logs for issues
engine = SagaEngine()
try:
await engine.initialize()
print("Engine initialized successfully")
except Exception as e:
print(f"Engine initialization failed: {e}")# Check configuration generation
from fireflytx import generate_saga_engine_config
try:
config = generate_saga_engine_config(
engine_name="test-engine",
saga_classes=[MySaga],
event_publisher=KafkaStepEventPublisher("localhost:9092")
)
print("Configuration generated successfully")
except Exception as e:
print(f"Configuration error: {e}")# Increase timeouts for long-running steps
@saga_step("long-running-step", timeout_ms=300000) # 5 minutes
async def long_running_step(self, data):
# Your long-running logic here
pass
# Or configure engine-level timeouts
engine = SagaEngine(
# Configure via engine options if needed
)# Configure memory limits via Java options if needed
engine = SagaEngine()
await engine.initialize()-
Enable comprehensive logging:
logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
-
Monitor Java engine logs:
# View Java engine logs tail -f /var/log/saga-engine.log -
Use no-op providers for development:
# Use no-op providers for debugging engine = SagaEngine( event_publisher=NoOpStepEventPublisher(), persistence_provider=NoOpSagaPersistenceProvider() ) await engine.initialize()
-
Monitor execution through logs:
# Enable debug logging to monitor execution import logging logging.getLogger('fireflytx').setLevel(logging.DEBUG)
# Optimize for high throughput
engine = SagaEngine(
auto_optimization_enabled=True,
# Batch event publishing
event_publisher=KafkaStepEventPublisher(
batch_size=100,
linger_ms=10,
compression_type="lz4"
),
# Connection pooling
persistence_provider=RedisSagaPersistenceProvider(
max_connections=50,
connection_pool_timeout=5000
)
)import asyncio
from pydantic import BaseModel
from fireflytx import TccEngine, tcc, tcc_participant, try_method, confirm_method, cancel_method
# Define Pydantic models for type safety
class DepositRequest(BaseModel):
account_id: str
amount: float
class CreditReservation(BaseModel):
reserved: bool
reservation_id: str
amount: float
@tcc("account-deposit")
class AccountDepositTcc:
"""Simple account deposit TCC transaction with Pydantic models."""
@tcc_participant("credit-account")
class CreditAccountParticipant:
@try_method
async def try_credit(self, data: DepositRequest) -> CreditReservation:
"""Try to reserve credit for account."""
reservation_id = f"reserve_{data.account_id}_{data.amount}"
# Reserve account credit (example business logic)
print(f"π° Reserving ${data.amount} credit for account {data.account_id}")
return CreditReservation(
reserved=True,
reservation_id=reservation_id,
amount=data.amount
)
@confirm_method
async def confirm_credit(self, data: DepositRequest, try_result: CreditReservation):
"""Confirm the credit to account."""
# Access Pydantic model attributes directly
print(f"β
Confirmed ${try_result.amount} credit to account {data.account_id} (reservation: {try_result.reservation_id})")
return {"confirmed": True, "final_amount": try_result.amount}
@cancel_method
async def cancel_credit(self, data: DepositRequest, try_result: CreditReservation):
"""Cancel the credit reservation."""
# Access Pydantic model attributes directly
print(f"β Cancelled ${try_result.amount} credit reservation for account {data.account_id} (reservation: {try_result.reservation_id})")
return {"cancelled": True, "reservation_id": try_result.reservation_id}
# Usage
async def main():
# Initialize the TCC engine (connects to Java lib-transactional-engine)
engine = TccEngine()
await engine.initialize()
# Define deposit data
deposit_data = {
"account_id": "account_123",
"amount": 50.0
}
# Execute the TCC (Python methods executed via Java engine)
result = await engine.execute_by_class(AccountDepositTcc, deposit_data)
if result.is_success:
print(f"β
TCC completed successfully!")
print(f" - Correlation ID: {result.correlation_id}")
print(f" - Confirmed: {result.is_confirmed}")
print(f" - Participants: {result.participants_count}")
print(f" - Java engine version: {result.lib_transactional_version}")
else:
print(f"β TCC failed: {result.error}")
print(f" - Final phase: {result.final_phase}")
print(f" - Cancelled: {result.is_canceled}")
await engine.shutdown()
if __name__ == "__main__":
asyncio.run(main())FireflyTX includes powerful visualization tools to understand your transaction flows:
# Visualize SAGA topology
fireflytx visualize order_saga.py OrderProcessingSaga
# Generate DOT format for Graphviz
fireflytx visualize order_saga.py OrderProcessingSaga --format=dot
# Generate Mermaid diagram
fireflytx visualize payment_tcc.py PaymentTcc --format=mermaidFireflyTX now streams the Java lib-transactional-engine logs into Python logging and exposes helper APIs to inspect them programmatically.
import logging
from fireflytx.utils.java_subprocess_bridge import get_java_bridge
# Optional: configure Python logging
logging.basicConfig(level=logging.INFO)
bridge = get_java_bridge()
bridge.start_jvm() # starts Java and log streaming
# Tail recent Java logs
recent = bridge.get_java_logs(count=100)
for line in recent:
print(line)
# Follow logs in real-time via callback
bridge.follow_java_logs(lambda line, stream: print(f"FOLLOW: {line}"))
# Adjust verbosity of Java log forwarding
bridge.set_java_log_level("DEBUG") # or logging.WARNING
# Tee Java logs to a file
bridge.enable_java_log_file("java-engine.log")
# Environment variable to set level at startup
# export FIREFLYTX_JAVA_LOG_LEVEL=DEBUGNote: A dedicated CLI command (fireflytx java-logs) may be added in a future release; for now, use the programmatic APIs above or rely on the Python logging output from the dedicated logger name "fireflytx.java".
FireflyTX provides a comprehensive CLI for development and operations:
# Engine management
fireflytx status # Show engine status
fireflytx test # Test connectivity
# Development tools
fireflytx validate saga.py # Validate decorators
fireflytx visualize saga.py MySaga # Visualize topology
fireflytx list-examples # List examples
fireflytx run-example saga-basic # Run example
# Java integration
fireflytx jar-info # JAR information
fireflytx jar-build # Build from source
fireflytx java-logs --follow # Monitor logs
# Configuration
fireflytx init-config config.yaml # Generate configFireflyTX provides flexible configuration options for different deployment scenarios.
from fireflytx import SagaEngine, TccEngine
from fireflytx.config import ConfigurationManager
# π Development (default) - in-memory, minimal resources
config = ConfigurationManager.get_default_config()
saga_engine = SagaEngine(config=config)
# π Production - Redis persistence, optimized settings
config = ConfigurationManager.get_production_config(
persistence_type="redis",
persistence_connection_string="redis://localhost:6379",
heap_size="2g",
max_concurrent_executions=200
)
saga_engine = SagaEngine(config=config)
# β‘ High-Performance - maximum throughput
config = ConfigurationManager.get_high_performance_config(
persistence_connection_string="redis://redis-cluster:6379"
)
saga_engine = SagaEngine(config=config)
# π¨ Custom - full control
from fireflytx.config import EngineConfig, PersistenceConfig, JvmConfig
config = EngineConfig(
max_concurrent_executions=150,
default_timeout_ms=45000,
thread_pool_size=75,
persistence=PersistenceConfig(
type="postgresql",
connection_string="postgresql://user:pass@localhost:5432/fireflytx",
auto_checkpoint=True
),
jvm=JvmConfig(
heap_size="3g",
gc_algorithm="G1GC",
max_gc_pause_ms=150
)
)
saga_engine = SagaEngine(config=config)| Feature | Default | Production | High-Performance |
|---|---|---|---|
| Max Concurrent | 100 | 200 | 500 |
| Timeout | 30s | 60s | 30s |
| Thread Pool | 50 | 100 | 200 |
| JVM Heap | 256m | 2g | 4g |
| Persistence | Memory | Redis | Redis (high pool) |
| Retry Attempts | 3 | 5 | 3 |
| GC Algorithm | Default | G1GC | G1GC (optimized) |
# Override any configuration via environment variables
export FIREFLYTX_MAX_CONCURRENT_EXECUTIONS=200
export FIREFLYTX_DEFAULT_TIMEOUT_MS=60000
export FIREFLYTX_THREAD_POOL_SIZE=100
export FIREFLYTX_PERSISTENCE_TYPE=redis
export FIREFLYTX_PERSISTENCE_CONNECTION_STRING=redis://localhost:6379
export FIREFLYTX_JVM_HEAP_SIZE=2g
export FIREFLYTX_JVM_GC_ALGORITHM=G1GC# fireflytx-config.yaml
engine:
max_concurrent_executions: 200
default_timeout_ms: 60000
thread_pool_size: 100
enable_monitoring: true
retry:
max_attempts: 5
initial_delay_ms: 1000
max_delay_ms: 60000
backoff_multiplier: 2.0
persistence:
type: redis
connection_string: redis://localhost:6379
auto_checkpoint: true
checkpoint_interval_seconds: 60
jvm:
heap_size: 2g
gc_algorithm: G1GC
max_gc_pause_ms: 200
additional_jvm_args:
- "-XX:+UseStringDeduplication"
- "-XX:+OptimizeStringConcat"See Configuration Reference for complete details and examples.
- Step Dependencies: Define execution order with
depends_on - Automatic Compensation: Rollback on failures with compensation steps
- Retry Logic: Configurable retry with exponential backoff
- Parallel Execution: Execute independent steps concurrently
- Compensation Policies: Multiple strategies (sequential, parallel, circuit breaker)
- Type Safety: Full Pydantic model support
- Resource Reservation: Try phase reserves resources
- Two-Phase Commit: Atomic confirm/cancel operations
- Participant Ordering: Control execution sequence
- Timeout Management: Per-phase timeout configuration
- Failure Handling: Automatic compensation on try failures
- Strong Consistency: ACID properties across distributed services
from fireflytx.events import SagaEvents
class CustomSagaEvents(SagaEvents):
async def on_saga_started(self, saga_name: str, correlation_id: str):
print(f"Started SAGA {saga_name}: {correlation_id}")
async def on_step_completed(self, step_id: str, result: any):
print(f"Step {step_id} completed with result: {result}")
engine = SagaEngine(events_handler=CustomSagaEvents())from fireflytx.persistence import SagaPersistenceProvider
class CustomPersistenceProvider(SagaPersistenceProvider):
async def save_saga_state(self, correlation_id: str, state: dict):
# Custom persistence implementation
pass
engine = SagaEngine(persistence_provider=CustomPersistenceProvider())from fireflytx import SagaContext
# Access context in steps
@saga_step("process_order")
async def process_order(self, order: OrderData, context: SagaContext) -> dict:
# Set context variables
context.set_variable("user_id", order.customer_id)
context.put_header("trace_id", "abc-123")
# Access context in subsequent steps
user_id = context.get_variable("user_id")
return {"processed_by": user_id}FireflyTX provides excellent testing support, allowing you to test your Python transaction logic against the real Java engine:
import pytest
import asyncio
from fireflytx import SagaEngine, TccEngine
@pytest.mark.asyncio
async def test_successful_payment_saga():
"""Test successful payment processing SAGA."""
engine = SagaEngine()
await engine.initialize()
payment_data = {
"customer_id": "test_customer_123",
"amount": 50.0,
"payment_method": "credit_card"
}
# Execute SAGA through real Java engine
result = await engine.execute_by_class(PaymentProcessingSaga, payment_data)
# Verify successful execution
assert result.is_success
assert result.saga_name == "payment-processing"
assert result.engine_used is True # Confirms Java engine was used
assert result.lib_transactional_version is not None
assert len(result.failed_steps) == 0
await engine.shutdown()
@pytest.mark.asyncio
async def test_saga_compensation():
"""Test SAGA compensation when steps fail."""
engine = SagaEngine()
await engine.initialize()
# Execute a SAGA designed to fail
result = await engine.execute_by_class(FailingSaga, {"test": "data"})
# Verify compensation was executed
assert not result.is_success
assert len(result.failed_steps) > 0
assert len(result.compensated_steps) > 0 # Compensation executed
await engine.shutdown()
@pytest.mark.asyncio
async def test_tcc_transaction():
"""Test TCC transaction execution."""
engine = TccEngine()
await engine.initialize()
deposit_data = {
"account_id": "test_account",
"amount": 25.0
}
# Execute TCC through real Java engine
result = await engine.execute_by_class(AccountDepositTcc, deposit_data)
# Verify successful TCC execution
assert result.is_success
assert result.is_confirmed
assert not result.is_canceled
assert result.participants_count > 0
await engine.shutdown()
@pytest.mark.asyncio
async def test_tcc_rollback():
"""Test TCC rollback when confirm fails."""
engine = TccEngine()
await engine.initialize()
# Execute a TCC designed to fail during confirm
result = await engine.execute_by_class(FailingTcc, {"test": "rollback"})
# Verify rollback was executed
assert not result.is_success
assert not result.is_confirmed
assert result.is_canceled
await engine.shutdown()
# Run tests with:
# pytest tests/integration/ -vKey Testing Features:
- π’ Real Engine Testing: Tests run against actual lib-transactional-engine
- π Automatic Setup: Engine initialization and JAR building handled automatically
- β±οΈ Async Support: Full asyncio support with
@pytest.mark.asyncio - π Comprehensive Results: Test success, failure, compensation, and rollback scenarios
- π Detailed Assertions: Verify engine integration, transaction states, and step execution
- Architecture Guide - System design and components
- SAGA Pattern Guide - Detailed SAGA implementation
- TCC Pattern Guide - TCC transaction details
- Configuration Reference - Complete config options
- API Reference - Full API documentation
- Developers Guide - How the wrapper is designed and implemented
- Project Structure - Where features live and how execution flows
- Tutorial - Step-by-step getting started guide
- Examples - Working code examples
-
JVM Startup Failures
# Check Java installation fireflytx status # Test connectivity fireflytx test
-
Memory Issues
jvm: heap_size: 2g # Increase heap size
-
Connection Issues
# Check persistence backend fireflytx validate-config config.yaml
FireflyTX is designed as a Python wrapper that provides a seamless interface to the powerful lib-transactional-engine Java library:
What FireflyTX Does:
- π Provides Python decorators (
@saga,@tcc_participant, etc.) for defining transactions - π Automatically downloads and builds lib-transactional-engine JAR from GitHub
- π Manages subprocess communication bridge to Java engine
- π Marshalls Python method calls to/from Java execution context
- π Provides unified Python API for results and monitoring
What lib-transactional-engine Does:
- βοΈ Handles the actual distributed transaction orchestration
- πΎ Manages persistence, checkpointing, and recovery
- π Provides enterprise-grade reliability and fault tolerance
- β±οΈ Implements timeout handling, retry logic, and compensation
- π Ensures ACID properties across distributed operations
# Verify the wrapper's Java engine integration
fireflytx jar-info
# Test wrapper connectivity to Java engine
fireflytx test --verbose
# Monitor both Python and Java logs
fireflytx java-logs --followThis architecture gives you:
- π Fast Python Development - Write and test transaction logic in Python
- π’ Enterprise Java Execution - Run with production-grade transaction processing
- π§ Zero Java Setup - No manual JAR management or Java configuration needed
- π Cross-Platform - Works everywhere Python runs, including Apple Silicon
We welcome contributions! Please see our Contributing Guide for details.
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Submit a pull request
Licensed under the Apache License, Version 2.0. See LICENSE file for details.
FireflyTX is developed by Firefly Software Solutions Inc, specialists in distributed systems and transaction processing.
- Website: https://getfirefly.io
- GitHub: https://github.com/firefly-oss
- Documentation: https://docs.getfirefly.io
- Support: support@getfirefly.io
Build robust distributed systems with confidence using FireflyTX π