Large-scale database operations are often challenging for data engineers and developers: poor performance due to excessive round-trips, memory overflows during bulk operations, network bottlenecks, and complex transaction management to ensure consistency. SQL Batcher provides a comprehensive toolkit designed to help lessen these problems through intelligent statement batching, optimized memory usage, and database-specific optimizations.
SQL Batcher's core components include:
- SQL Batcher Class: The central engine that intelligently batches SQL statements with column-aware capabilities based on configurable size limits and database constraints
- Insert Merger: Optimizes performance by combining compatible INSERT statements while preserving execution order of other statements
- Database Adapters: Provides database-specific optimizations for Trino, PostgreSQL, Snowflake, and more with a consistent interface
- Async Support: Offers full asynchronous execution capabilities with dedicated async adapters and context managers
- Transaction Management: Simplifies data consistency with transaction control and savepoint support for partial rollbacks
- Context Managers: Enables clean resource management with automatic flushing and proper cleanup in both sync and async modes
- Retry Facility: Implements intelligent error recovery with configurable retry strategies
- Developer Experience: Provides an intuitive API with extensibility options for custom adapters and configurations
SQL Batcher is particularly valuable in data engineering workflows, ETL pipelines, large dataset ingestion, and any scenario requiring high-performance database operations.
SQL Batcher provides a comprehensive set of features for efficient SQL statement execution:
Efficiently batch SQL statements based on size limits and other constraints. The core component that handles:
- Smart batching based on database-specific size limits
- Dynamic batch size adjustment based on column count
- Memory and network optimization
- Learn more about SQL Batcher →
Collect and track SQL queries for debugging, logging, and monitoring:
- Query collection with metadata support
- Size tracking and batch management
- Column count detection for INSERT statements
- Learn more about Query Collector →
Optimize database operations by combining compatible INSERT statements:
- Automatic detection of compatible statements
- Size-aware merging respecting query limits
- Table and column structure awareness
- Preserves execution order of non-INSERT statements
- Learn more about Insert Merging →
Optimized adapters for popular databases:
- Database-specific optimizations
- Consistent interface across databases
- Connection and resource management
- Learn more about Database Adapters →
Comprehensive async support for modern Python applications:
- Async batching and execution
- Async adapters for all supported databases
- Async context managers and transaction management
- Learn more about Async Support →
Clean resource management and automatic flushing of batched statements:
- Automatic flushing when exiting the context
- Proper resource cleanup and error handling
- Support for both synchronous and asynchronous operations
- Seamless integration with transaction management
- Learn more about Context Manager →
Control transaction boundaries and ensure data consistency:
- Begin, commit, and rollback transactions
- Error handling and recovery
- Integration with context managers
- Learn more about Transaction Management →
Create intermediate points within a transaction for partial rollbacks:
- Create, rollback to, and release savepoints
- Error recovery within transactions
- Support for complex transaction workflows
- Learn more about Savepoint Support →
Install SQL Batcher using pip:
pip install sql-batcherWith database-specific dependencies:
# For Trino support
pip install "sql-batcher[trino]"
# For PostgreSQL support
pip install "sql-batcher[postgresql]"
# For Snowflake support
pip install "sql-batcher[snowflake]"
# For BigQuery support
pip install "sql-batcher[bigquery]"
# For all supported databases
pip install "sql-batcher[all]"
# For development (includes testing and linting tools)
pip install "sql-batcher[dev]"Here's a simple example to get you started with SQL Batcher:
from sql_batcher import SQLBatcher
from sql_batcher.adapters import TrinoAdapter
# Create adapter and batcher
adapter = TrinoAdapter(
host="trino.example.com",
port=8080,
user="trino",
catalog="hive",
schema="default",
role="admin", # Trino role (sets 'x-trino-role' HTTP header as 'system=ROLE{role}')
max_query_size=600_000 # 600KB limit to provide buffer for Trino's 1MB limit
)
batcher = SQLBatcher(
adapter=adapter,
max_bytes=500_000, # 500KB limit
batch_mode=True,
auto_adjust_for_columns=True # Adjust batch size based on column count
)
# Process statements
statements = [
"INSERT INTO table1 VALUES (1, 'a')",
"INSERT INTO table1 VALUES (2, 'b')",
# ... many more statements
]
# Process all statements in batches
batcher.process_statements(statements, adapter.execute)For async usage:
import asyncio
from sql_batcher import AsyncSQLBatcher
from sql_batcher.adapters.async_trino import AsyncTrinoAdapter
async def main():
# Create async adapter and batcher
adapter = AsyncTrinoAdapter(
host="trino.example.com",
port=8080,
user="trino",
catalog="hive",
schema="default",
role="admin", # Trino role (sets 'x-trino-role' HTTP header as 'system=ROLE{role}')
max_query_size=600_000 # 600KB limit to provide buffer for Trino's 1MB limit
)
batcher = AsyncSQLBatcher(
adapter=adapter,
max_bytes=500_000, # 500KB limit
batch_mode=True,
auto_adjust_for_columns=True # Adjust batch size based on column count
)
# Process statements asynchronously
statements = [
"INSERT INTO table1 VALUES (1, 'a')",
"INSERT INTO table1 VALUES (2, 'b')",
# ... many more statements
]
await batcher.process_statements(statements, adapter.execute)
# Close the connection
await adapter.close()
# Run the async function
asyncio.run(main())For more detailed documentation, see the following pages:
- SQL Batcher - Core batching functionality
- Query Collector - Query collection and tracking
- Database Adapters - Database-specific adapters
- Async Support - Async functionality
- Context Manager - Clean resource management
- Transaction Management - Transaction control
- Savepoint Support - Savepoint functionality
- Insert Merging - INSERT statement optimization
- Usage Examples - Collection of usage examples
- Testing - Testing guidelines and examples
- Code Style Guide - Code style guidelines
Contributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Install pre-commit hooks (
pip install pre-commit && pre-commit install) - Make your changes (the pre-commit hooks will automatically format your code)
- Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
The project uses pre-commit hooks to ensure code quality:
- black for code formatting
- isort for import sorting
- flake8 for code linting
- autoflake for removing unused imports and variables
This project is licensed under the MIT License - see the LICENSE file for details.