Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Project Context for Agents

## Project Overview

This is a Python library called `db-try` that provides PostgreSQL and SQLAlchemy utilities, specifically focusing on:

1. **Retry decorators** for handling database connection issues and serialization errors
2. **Connection factory builders** for managing PostgreSQL connections with multiple hosts
3. **DSN (Data Source Name) utilities** for parsing and manipulating database connection strings
4. **Transaction helpers** for managing SQLAlchemy async sessions

The library is built with modern Python practices (3.13+) and uses type hints extensively. It's designed to work with PostgreSQL databases using the asyncpg driver and SQLAlchemy's asyncio extension.

## Key Technologies

- **Python 3.13+**
- **SQLAlchemy** with asyncio extension
- **asyncpg** PostgreSQL driver
- **tenacity** for retry logic
- **uv** for package management and building
- **Docker** for development and testing environments
- **pytest** for testing
- **ruff** and **mypy** for linting and type checking

## Project Structure

```
db_try/
├── __init__.py # Exports all public APIs
├── connections.py # Connection factory builders
├── dsn.py # DSN parsing and manipulation utilities
├── retry.py # Retry decorators for database operations
├── settings.py # Configuration settings
├── transaction.py # Transaction helper classes
└── py.typed # Marker file for type checking
tests/
├── test_connection_factory.py
├── test_dsn.py
├── test_retry.py
├── test_transaction.py
├── conftest.py # pytest configuration
└── __init__.py
```

## Main Components

### Retry Decorators (`retry.py`)
Provides `@postgres_retry` decorator that automatically retries database operations when encountering:
- PostgreSQL connection errors
- Serialization errors

The retry logic uses exponential backoff with jitter and is configurable via environment variables.

### Connection Factory (`connections.py`)
Provides `build_connection_factory()` function that creates connection factories for PostgreSQL databases with support for:
- Multiple fallback hosts
- Randomized host selection
- Target session attributes (read-write vs standby)

### DSN Utilities (`dsn.py`)
Provides functions for:
- `build_db_dsn()`: Parse and modify DSN strings, replacing database names and setting target session attributes
- `is_dsn_multihost()`: Check if a DSN contains multiple hosts

### Transaction Helpers (`transaction.py`)
Provides `Transaction` class that wraps SQLAlchemy AsyncSession with automatic transaction management.

## Building and Running

### Development Environment Setup
```bash
# Install dependencies
just install

# Run tests
just test

# Run linting and type checking
just lint

# Run all checks (default)
just
```

### Docker-based Development
```bash
# Run tests in Docker
just test

# Run shell in Docker container
just sh
```

### Testing
Tests are written using pytest and can be run with:
```bash
# Run all tests
just test

# Run specific test file
just test tests/test_retry.py

# Run tests with coverage
just test --cov=.
```

## Configuration

The library can be configured using environment variables:

- `DB_UTILS_RETRIES_NUMBER`: Number of retry attempts (default: 3)

## Development Conventions

1. **Type Safety**: Strict mypy checking is enforced
2. **Code Style**: Ruff is used for linting with specific rules configured
3. **Testing**: All functionality should have corresponding tests
4. **Async/Await**: All database operations are asynchronous
5. **Documentation**: Public APIs should be documented with docstrings

## Common Tasks

### Adding New Features
1. Implement the feature in the appropriate module
2. Add tests in the corresponding test file
3. Update exports in `__init__.py` if adding public APIs
4. Run `just` to ensure all checks pass

### Modifying Retry Logic
The retry behavior is defined in `retry.py` and uses the tenacity library. Modify the `_retry_handler` function to change which exceptions trigger retries.

### Working with Connections
Connection handling is in `connections.py`. The `build_connection_factory` function handles connecting to PostgreSQL with support for multiple hosts and fallback mechanisms.
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2024 modern-python

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
199 changes: 196 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,199 @@
# db-try

This library provides retry decorators for sqlalchemy and some helpers
A Python library providing robust retry mechanisms, connection utilities, and transaction helpers for PostgreSQL and SQLAlchemy applications.

Default settings are in [./db_try/settings.py](db_try/settings.py).
You can redefine them by environment variables.
## Features

- **Retry Decorators**: Automatic retry logic for transient database errors
- **Connection Factories**: Robust connection handling with multi-host support
- **DSN Utilities**: Flexible Data Source Name parsing and manipulation
- **Transaction Helpers**: Simplified transaction management with automatic cleanup

## Installation

### Using uv

```bash
uv add db-try
```

### Using pip

```bash
pip install db-try
```

## ORM-Based Usage Examples

### 1. Database Operations with Automatic Retry

Protect your database operations from transient failures using ORM models:

```python
import asyncio
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from db_try import postgres_retry

class User(DeclarativeBase):
__tablename__ = "users"

id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(sa.String())
email: Mapped[str] = mapped_column(sa.String(), index=True)

# Apply retry logic to ORM operations
@postgres_retry
async def get_user_by_email(session: AsyncSession, email: str) -> User:
return await session.scalar(
sa.select(User).where(User.email == email)
)


async def main():
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")
async with AsyncSession(engine) as session:
# Automatically retries on connection failures or serialization errors
user = await get_user_by_email(session, "john.doe@example.com")
if user:
print(f"Found user: {user.name}")

asyncio.run(main())
```

### 2. High Availability Database Connections

Set up resilient database connections with multiple fallback hosts:

```python
import sqlalchemy as sa
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from db_try import build_connection_factory, build_db_dsn


# Configure multiple database hosts for high availability
multi_host_dsn = (
"postgresql://user:password@/"
"myapp_db?"
"host=primary-db:5432&"
"host=secondary-db:5432&"
"host=backup-db:5432"
)

# Build production-ready DSN
dsn = build_db_dsn(
db_dsn=multi_host_dsn,
database_name="production_database",
drivername="postgresql+asyncpg"
)

# Create connection factory with timeout
connection_factory = build_connection_factory(
url=dsn,
timeout=5.0 # 5 second connection timeout
)

# Engine will automatically try different hosts on failure
engine = create_async_engine(dsn, async_creator=connection_factory)
```

### 3. Simplified Transaction Management

Handle database transactions with automatic cleanup using ORM:

```python
import dataclasses
import datetime
import typing

from schemas import AnalyticsEventCreate, AnalyticsEvent
from db_try import Transaction, postgres_retry

from your_service_name.database.tables import EventsTable
from your_service_name.producers.analytics_service_events_producer import AnalyticsEventsProducer
from your_service_name.repositories.events_repository import EventsRepository
from your_service_name.settings import settings


@dataclasses.dataclass(kw_only=True, frozen=True, slots=True)
class CreateEventUseCase:
events_repository: EventsRepository
transaction: Transaction
analytics_events_producer: AnalyticsEventsProducer

@postgres_retry
async def __call__(
self,
event_create_data: AnalyticsEventCreate,
) -> AnalyticsEvent:
async with self.transaction:
model: typing.Final = EventsTable(
**event_create_data.model_dump(),
created_at=datetime.datetime.now(tz=settings.common.default_timezone),
)
saved_event: typing.Final[EventsTable] = await self.events_repository.create(model)
event: typing.Final = AnalyticsEvent.model_validate(saved_event)
await self.analytics_events_producer.send_message(event)
await self.transaction.commit()
return event

```

### 4. Serializable Transactions for Consistency

Use serializable isolation level to prevent race conditions with ORM:

```python
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from db_try import Transaction


async def main():
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/mydb")

async with AsyncSession(engine) as session:
strict_transaction = Transaction(
session=session,
isolation_level="SERIALIZABLE",
)
# use strict_transaction where needed
```

## Configuration

The library can be configured using environment variables:

| Variable | Description | Default |
|---------------------------|--------------------------------------------------|---------|
| `DB_UTILS_RETRIES_NUMBER` | Number of retry attempts for database operations | 3 |

Example:
```bash
export DB_UTILS_RETRIES_NUMBER=5
```

## API Reference

### Retry Decorator
- `@postgres_retry` - Decorator for async functions that should retry on database errors

### Connection Utilities
- `build_connection_factory(url, timeout)` - Creates a connection factory for multi-host setups
- `build_db_dsn(db_dsn, database_name, use_replica=False, drivername="postgresql")` - Builds a DSN with specified parameters
- `is_dsn_multihost(db_dsn)` - Checks if a DSN contains multiple hosts

### Transaction Helper
- `Transaction(session, isolation_level=None)` - Context manager for simplified transaction handling

## Requirements

- Python 3.13+
- SQLAlchemy with asyncio support
- asyncpg PostgreSQL driver
- tenacity for retry logic

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
Loading