Async PostgreSQL with connection pooling, prepared statements, transactions, and LISTEN/NOTIFY — all non-blocking, all composable with Phalanx's concurrency primitives.
composer require phalanx/postgresRequires ext-pgsql and PHP 8.4+.
Register the service bundle when building your application:
<?php
use Phalanx\Postgres\PgServiceBundle;
$app = Application::starting()
->providers(new PgServiceBundle())
->compile();PgServiceBundle registers PgPool and PgListener as singletons with automatic shutdown hooks. Connections close cleanly when the scope tears down.
PgConfig accepts a DSN string or individual parameters:
<?php
// DSN
PgConfig::fromDsn('postgresql://user:pass@localhost:5432/mydb');
// Individual params
new PgConfig(
host: 'localhost',
port: 5432,
user: 'app',
password: 'secret',
database: 'mydb',
maxConnections: 10,
idleTimeout: 30,
);Environment variables work out of the box — PgConfig resolves PG_HOST, PG_PORT, PG_USER, PG_PASSWORD, PG_DATABASE when no explicit values are provided.
PgPool wraps Amphp's PostgresConnectionPool with a clean async interface:
<?php
$pg = $scope->service(PgPool::class);
// Parameterized SELECT — returns rows as associative arrays
$users = $pg->query(
'SELECT * FROM users WHERE active = $1 LIMIT $2',
[true, 50]
);
// Non-SELECT statements — returns affected row count
$pg->execute(
'UPDATE users SET last_seen = NOW() WHERE id = $1',
[$userId]
);
// Prepared statements — parse once, execute many
$stmt = $pg->prepare('INSERT INTO events (type, payload) VALUES ($1, $2)');
foreach ($events as $event) {
$stmt->execute([$event->type, json_encode($event->data)]);
}Every query uses parameterized placeholders ($1, $2, ...). No string interpolation, no injection surface.
Because PgPool manages multiple connections, you can run queries concurrently through Phalanx's concurrency primitives:
<?php
[$users, $orders, $stats] = $scope->concurrent([
Task::of(static fn($s) => $s->service(PgPool::class)->query('SELECT * FROM users')),
Task::of(static fn($s) => $s->service(PgPool::class)->query('SELECT * FROM orders')),
Task::of(static fn($s) => $s->service(PgPool::class)->query('SELECT count(*) FROM events')),
]);Three queries, three connections, one await. The pool handles connection checkout and return automatically.
<?php
$pg = $scope->service(PgPool::class);
$tx = $pg->beginTransaction();
try {
$tx->execute('INSERT INTO orders (user_id, total) VALUES ($1, $2)', [$userId, $total]);
$tx->execute('UPDATE inventory SET qty = qty - $1 WHERE sku = $2', [$qty, $sku]);
$tx->commit();
} catch (\Throwable $e) {
$tx->rollback();
throw $e;
}The transaction holds a dedicated connection from the pool until committed or rolled back.
PostgreSQL's LISTEN/NOTIFY turns your database into a lightweight message broker. PgListener provides a dedicated connection for subscriptions so notifications never compete with query traffic.
<?php
$listener = $scope->service(PgListener::class);
// Subscribe — returns an async iterable
foreach ($listener->listen('order_created') as $notification) {
$order = json_decode($notification->payload, true);
processOrder($order);
}Publishing from any connection:
<?php
$pg = $scope->service(PgPool::class);
$pg->notify('order_created', json_encode(['id' => $orderId, 'total' => $total]));Combine LISTEN with Phalanx's concurrency to fan out event handling across multiple channels without blocking:
<?php
$scope->concurrent([
Task::of(static fn($s) => handleChannel($s, 'order_created')),
Task::of(static fn($s) => handleChannel($s, 'payment_received')),
Task::of(static fn($s) => handleChannel($s, 'inventory_low')),
]);PgServiceBundle registers disposal hooks on the scope. When the scope tears down — whether from normal completion, cancellation, or error — open connections drain and close. No manual cleanup required.