Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
.env
.coverage
.gitignore
.idea
.mypy_cache
.ruff_cache
.vscode
.git
.pytest_cache
.DS_Store
*.yml
Dockerfile
**/__pycache__
.hypothesis
.venv
61 changes: 61 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
name: main

on:
push:
branches:
- main
pull_request: {}

concurrency:
group: ${{ github.head_ref || github.run_id }}
cancel-in-progress: true

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: extractions/setup-just@v2
- uses: astral-sh/setup-uv@v3
with:
enable-cache: true
cache-dependency-glob: "**/pyproject.toml"
- run: uv python install 3.10
- run: just install lint-ci

pytest:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:latest
env:
POSTGRES_DB: postgres
POSTGRES_PASSWORD: password
POSTGRES_USER: postgres
ports:
- 5432:5432
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v3
- uses: astral-sh/setup-uv@v3
- run: uv python install 3.13
- run: |
uv sync --all-extras --no-install-project
uv run --no-sync pytest . --cov=. --cov-report xml
env:
PYTHONDONTWRITEBYTECODE: 1
PYTHONUNBUFFERED: 1
DB_DSN: postgresql+asyncpg://postgres:password@127.0.0.1/postgres
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4.0.1
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: ./coverage.xml
flags: unittests
name: codecov-${{ matrix.python-version }}
20 changes: 20 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: Publish Package

on:
release:
types:
- published

jobs:
publish:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: extractions/setup-just@v2
- uses: astral-sh/setup-uv@v3
with:
enable-cache: true
cache-dependency-glob: "**/pyproject.toml"
- run: just publish
env:
PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }}
22 changes: 22 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generic things
*.pyc
*~
__pycache__/*
*.swp
*.sqlite3
*.map
.vscode
.idea
.DS_Store
.env
.mypy_cache
.pytest_cache
.ruff_cache
.coverage
htmlcov/
coverage.xml
pytest.xml
dist/
.python-version
.venv
uv.lock
28 changes: 28 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM python:3.13-slim

# required for psycopg2
RUN apt update \
&& apt install -y --no-install-recommends \
build-essential \
libpq-dev \
&& apt clean \
&& rm -rf /var/lib/apt/lists/*

COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv
RUN useradd --no-create-home --gid root runner

ENV UV_PYTHON_PREFERENCE=only-system
ENV UV_NO_CACHE=true

WORKDIR /code

COPY pyproject.toml .
COPY uv.lock .

RUN uv sync --all-extras --frozen --no-install-project

COPY . .

RUN chown -R runner:root /code && chmod -R g=u /code

USER runner
32 changes: 32 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
default: install lint build test

down:
docker compose down --remove-orphans

sh:
docker compose run --service-ports application bash

test *args: down && down
docker compose run application uv run --no-sync pytest {{ args }}

build:
docker compose build application

install:
uv lock --upgrade
uv sync --all-extras --frozen

lint:
uv run --frozen ruff format
uv run --frozen ruff check --fix
uv run --frozen mypy .

lint-ci:
uv run --frozen ruff format --check
uv run --frozen ruff check --no-fix
uv run --frozen mypy .

publish:
rm -rf dist
uv build
uv publish --token $PYPI_TOKEN
26 changes: 26 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
services:
application:
build:
context: .
dockerfile: ./Dockerfile
restart: always
volumes:
- .:/srv/www/
depends_on:
db:
condition: service_healthy
environment:
- DB_DSN=postgresql+asyncpg://postgres:password@db/postgres
stdin_open: true
tty: true

db:
image: postgres
restart: always
environment:
- POSTGRES_PASSWORD=password
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d postgres"]
interval: 1s
timeout: 5s
retries: 15
14 changes: 14 additions & 0 deletions pg_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pg_tools.connections import build_connection_factory
from pg_tools.decorators import postgres_reconnect, transaction_retry
from pg_tools.helpers import build_db_dsn, is_dsn_multihost
from pg_tools.transaction import Transaction


__all__ = [
"Transaction",
"build_connection_factory",
"build_db_dsn",
"is_dsn_multihost",
"postgres_reconnect",
"transaction_retry",
]
79 changes: 79 additions & 0 deletions pg_tools/connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import logging
import random
import typing
from operator import itemgetter

import asyncpg
import sqlalchemy
from asyncpg.connect_utils import SessionAttribute
from sqlalchemy.dialects.postgresql.asyncpg import PGDialect_asyncpg


if typing.TYPE_CHECKING:
ConnectionType = asyncpg.Connection[typing.Any]


logger = logging.getLogger(__name__)


def build_connection_factory(
url: sqlalchemy.URL,
timeout: float,
) -> typing.Callable[[], typing.Awaitable["ConnectionType"]]:
connect_args: typing.Final[dict[str, typing.Any]] = PGDialect_asyncpg().create_connect_args(url)[1] # type: ignore[no-untyped-call]
raw_target_session_attrs: typing.Final[str | None] = connect_args.pop("target_session_attrs", None)
target_session_attrs: typing.Final[SessionAttribute | None] = (
SessionAttribute(raw_target_session_attrs) if raw_target_session_attrs else None
)

raw_hosts: typing.Final[str | list[str]] = connect_args.pop("host")
raw_ports: typing.Final[int | list[int] | None] = connect_args.pop("port", None)
hosts_and_ports: list[tuple[str, int]]
hosts: str | list[str]
ports: int | list[int] | None
if isinstance(raw_hosts, list) and isinstance(raw_ports, list):
hosts_and_ports = list(zip(raw_hosts, raw_ports, strict=True))
random.shuffle(hosts_and_ports)
hosts = list(map(itemgetter(0), hosts_and_ports))
ports = list(map(itemgetter(1), hosts_and_ports))
else:
hosts_and_ports = []
hosts = raw_hosts
ports = raw_ports

async def _connection_factory() -> "ConnectionType":
connection: ConnectionType
nonlocal hosts_and_ports
try:
connection = await asyncpg.connect(
**connect_args,
host=hosts,
port=ports,
timeout=timeout,
target_session_attrs=target_session_attrs,
)
return connection # noqa: TRY300
except TimeoutError:
if not hosts_and_ports:
raise

logger.warning("Failed to fetch asyncpg connection. Trying host by host.")

hosts_and_ports_copy: typing.Final = hosts_and_ports.copy()
random.shuffle(hosts_and_ports_copy)
for one_host, one_port in hosts_and_ports_copy:
try:
connection = await asyncpg.connect(
**connect_args,
host=one_host,
port=one_port,
timeout=timeout,
target_session_attrs=target_session_attrs,
)
return connection # noqa: TRY300
except (TimeoutError, OSError, asyncpg.TargetServerAttributeNotMatched) as exc: # noqa: PERF203
logger.warning("Failed to fetch asyncpg connection from %s, %s", one_host, exc)
msg: typing.Final = f"None of the hosts match the target attribute requirement {target_session_attrs}"
raise asyncpg.TargetServerAttributeNotMatched(msg)

return _connection_factory
72 changes: 72 additions & 0 deletions pg_tools/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import functools
import logging
import typing

import asyncpg
import tenacity
from sqlalchemy.exc import DBAPIError

from pg_tools import settings


P = typing.ParamSpec("P")
T = typing.TypeVar("T")
logger = logging.getLogger(__name__)


def _connection_retry_handler(exception: BaseException) -> bool:
if (
isinstance(exception, DBAPIError)
and hasattr(exception, "orig")
and isinstance(exception.orig.__cause__, asyncpg.PostgresConnectionError) # type: ignore[union-attr]
):
logger.debug("postgres_reconnect, backoff triggered")
return True

logger.debug("postgres_reconnect, giving up on backoff")
return False


def postgres_reconnect(func: typing.Callable[P, typing.Awaitable[T]]) -> typing.Callable[P, typing.Awaitable[T]]:
@tenacity.retry(
stop=tenacity.stop_after_attempt(settings.DB_UTILS_CONNECTION_TRIES),
wait=tenacity.wait_exponential_jitter(),
retry=tenacity.retry_if_exception(_connection_retry_handler),
reraise=True,
before=tenacity.before_log(logger, logging.DEBUG),
)
@functools.wraps(func)
async def wrapped_method(*args: P.args, **kwargs: P.kwargs) -> T:
return await func(*args, **kwargs)

return wrapped_method


def _transaction_retry_handler(exception: BaseException) -> bool:
if (
isinstance(exception, DBAPIError)
and hasattr(exception, "orig")
and isinstance(exception.orig.__cause__, asyncpg.SerializationError) # type: ignore[union-attr]
):
logger.debug("transaction_retry, backoff triggered")
return True

logger.debug("transaction_retry, giving up on backoff")
return False


def transaction_retry(
func: typing.Callable[P, typing.Coroutine[typing.Any, typing.Any, T]],
) -> typing.Callable[P, typing.Coroutine[typing.Any, typing.Any, T]]:
@tenacity.retry(
stop=tenacity.stop_after_attempt(settings.DB_UTILS_TRANSACTIONS_TRIES),
wait=tenacity.wait_exponential_jitter(),
retry=tenacity.retry_if_exception(_transaction_retry_handler),
reraise=True,
before=tenacity.before_log(logger, logging.DEBUG),
)
@functools.wraps(func)
async def wrapped_method(*args: P.args, **kwargs: P.kwargs) -> T:
return await func(*args, **kwargs)

return wrapped_method
Loading
Loading