基于 pgx/v5 的 PostgreSQL 连接管理 + 事务封装 + 健康检查 + 观察性工具包。
pgorm 只做连接层工程化封装,不抽象查询 API —— 查询可直接使用 pgx 原生
Query/Exec/CopyFrom,或搭配 sqlc、go-jet、riverqueue 等。
pgx 是 Go 社区 PG 的事实共识:riverqueue、pgvector-go、neon-go 等高质量
项目都基于 pgx。GORM 的 PG 方言在 jsonb、upsert、RETURNING 上有已知坑;
pgx 能直达 CopyFrom(批量插入 10-100x)、binary protocol、pipeline mode、
LISTEN/NOTIFY 等 PG 独家能力。
go get github.com/gtkit/pgorm- 纯 pgx/v5:底层直连
*pgxpool.Pool,零反射开销 - 安全默认值:MaxConns=50,ConnectTimeout=10s,HealthCheckPeriod=1m
- 前置配置校验:非法
MaxConns/MinConns或负数池时长在建池前直接返回pgorm错误 - 密码脱敏:
json:"-"+String()/GoString()/RedactedDSN()三重防泄漏 - 冲突自动重试:
WithTx识别40P01(死锁)/40001(序列化失败)/55P03(锁不可用)并指数退避重试 - 观察性:
TxRetryObserver记录每次重试、HealthProbe注入业务探针、Metrics()输出 13 项池指标 - Trace ID 链路追踪:使用内置
zlogger时,可通过pgorm.WithTraceIDExtractor或zlogger.WithTraceIDExtractor将请求 id 注入每条 SQL 日志 - 启动期 Ping retry:容器冷启动时 DB 瞬时不可达可配置 backoff 重试
- 外部池包装:
OpenWithPool让已有的*pgxpool.Pool享受相同封装
package main
import (
"context"
"log"
"time"
"github.com/gtkit/pgorm"
"github.com/gtkit/pgorm/zlogger"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/tracelog"
"go.uber.org/zap"
)
func main() {
ctx := context.Background()
zl, _ := zap.NewProduction()
logger := zlogger.New(
zlogger.WithLogger(zl),
zlogger.WithSlowThreshold(200*time.Millisecond),
)
client, err := pgorm.Open(ctx,
pgorm.WithName("orders"),
pgorm.WithHost("127.0.0.1"),
pgorm.WithPort(5432),
pgorm.WithDatabase("app"),
pgorm.WithUser("postgres"),
pgorm.WithPassword("secret"),
pgorm.WithMaxConns(50),
pgorm.WithStatementTimeout(5*time.Second),
pgorm.WithLogger(logger),
pgorm.WithTraceIDExtractor(func(ctx context.Context) string {
if v, ok := ctx.Value("request_id").(string); ok {
return v
}
return ""
}),
pgorm.WithLogLevel(tracelog.LogLevelInfo),
)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 直接使用 pgx 原生 API
var n int
if err := client.Pool().QueryRow(ctx, "SELECT 1").Scan(&n); err != nil {
log.Fatal(err)
}
// 事务 + 冲突自动重试
_ = client.WithTx(ctx, nil, func(tx pgx.Tx) error {
_, err := tx.Exec(ctx, "INSERT INTO orders(sku) VALUES ($1)", "A-001")
return err
})
}pgorm.WithTraceIDExtractor(...) 会在你使用内置 zlogger 作为 WithLogger(...)
参数时自动生效;如果你自己直接构造了其它 tracelog.Logger,则是否支持
trace id 注入取决于该 logger 本身是否实现了对应能力。
对内置 zlogger,pgorm 会为每个 client 使用带 extractor 的克隆 logger,
不会原地改写你传入的共享 logger 实例。
- 显式配置校验:可在应用启动阶段先调用
cfg.Validate(),提前发现 DSN 解析或池配置错误。 - nil context 兜底:
Open/OpenWithPool/ 启动期 Ping retry / 事务 retry 遇到 nilcontext.Context时,会先规范化为可用 context,避免运行时 panic。 - 非法池配置前置失败:非法
MaxConns/MinConns/ 负数池时间配置会在 建池前直接返回pgorm错误,而不是延后到pgxpool.NewWithConfig。 - 空闲连接预热:
WithMinIdleConns(...)可单独控制最小空闲连接数,适合优化请求尾延迟。 - Trace extractor 兼容边界:
pgorm.WithTraceIDExtractor(...)只会对兼容 logger 自动接线;内置zlogger支持自动注入,其它tracelog.Logger是否支持取决于 实现本身。
cfg := pgorm.NewConfig(
pgorm.WithDSN("postgres://postgres:postgres@127.0.0.1:5432/app?sslmode=disable"),
pgorm.WithMaxConns(32),
pgorm.WithMinIdleConns(8),
)
if err := cfg.Validate(); err != nil {
log.Fatal(err)
}client, _ := pgorm.Open(ctx,
pgorm.WithDSN("postgres://user:pw@db.internal:5432/app?sslmode=require"),
)// 默认:冲突自动重试 3 次,退避 5ms-50ms
err := client.WithTx(ctx, nil, func(tx pgx.Tx) error {
return createOrder(ctx, tx, order)
})
// 自定义重试
err := client.WithTx(ctx, nil, fn,
pgorm.WithMaxRetries(5),
pgorm.WithRetryBaseWait(10*time.Millisecond),
pgorm.WithRetryMaxWait(100*time.Millisecond),
)
// 禁用重试
err := client.WithTx(ctx, nil, fn, pgorm.WithMaxRetries(0))
// 序列化隔离级别
err := client.WithTx(ctx, &pgx.TxOptions{IsoLevel: pgx.Serializable}, fn)
// 只读事务
err := client.WithReadTx(ctx, func(tx pgx.Tx) error {
rows, err := tx.Query(ctx, "SELECT id FROM users")
// ...
return err
})WithTx 只重试 40P01 / 40001 / 55P03 三种已由 PG 显式回滚的冲突错误。
不重试连接级错误(例如 connection reset),避免非幂等写入的重复执行。
观察重试:
client, _ := pgorm.Open(ctx,
pgorm.WithTxRetryObserver(func(ctx context.Context, e pgorm.TxRetryEvent) {
metrics.TxRetryCounter.WithLabelValues(e.ClientName).Inc()
}),
)WithTx 回调内 panic 会先 Rollback 再 re-panic,不吞 panic。
report := client.HealthCheck(ctx)
if !report.Healthy() {
log.Printf("db down: %v", report.Error)
}
for _, m := range client.Metrics() {
prometheus.GaugeVec.WithLabelValues(m.Labels...).Set(m.Value)
}自定义业务探针:
client, _ := pgorm.Open(ctx,
pgorm.WithHealthProbe(func(ctx context.Context, c *pgorm.Client) error {
var lag float64
return c.Pool().QueryRow(ctx,
"SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))").Scan(&lag)
}),
)指标清单:
pgorm_pool_max_conns/pgorm_pool_total_conns/pgorm_pool_acquired_connspgorm_pool_idle_conns/pgorm_pool_constructing_connspgorm_pool_acquire_count_total/pgorm_pool_acquire_duration_seconds_totalpgorm_pool_empty_acquire_count_total/pgorm_pool_canceled_acquire_count_totalpgorm_pool_new_conns_total/pgorm_pool_max_lifetime_destroy_total/pgorm_pool_max_idle_destroy_totalpgorm_pool_utilization
client, _ := pgorm.Open(ctx,
pgorm.WithStartupPingRetry(5, 200*time.Millisecond, 2*time.Second),
)适用于 K8s 冷启动时 DB 偶发不可达的场景。禁用启动 Ping:WithStartupPing(false)。
已经有 *pgxpool.Pool 时可复用:
pool, _ := pgxpool.New(ctx, dsn)
client, _ := pgorm.OpenWithPool(ctx, pool,
pgorm.WithName("legacy"),
pgorm.WithStartupPing(false),
)
// Close() 不会关闭外部传入的 pool// sqlc
queries := db.New(client.Pool())
order, _ := queries.GetOrder(ctx, orderID)
// go-jet
stmt := SELECT(Orders.AllColumns).FROM(Orders).WHERE(Orders.ID.EQ(Int64(1)))
_ = stmt.Query(client.Pool(), &result) // 需要自己实现 pgx -> go-jet 适配
// river (后台任务)
riverClient, _ := river.NewClient(riverpgxv5.New(client.Pool()), ...)pgorm 只负责连接池、事务重试、健康检查和观测,不提供查询抽象。查询层推荐按下面的优先级选择:
| 组合 | 适合场景 | PostgreSQL 特性适配 | 推荐度 |
|---|---|---|---|
pgorm + sqlc |
中大型业务、复杂 SQL、性能敏感 | 很强 | 很高 |
pgorm + Bun |
想要 ORM 便利,但仍希望贴近 SQL | 很强 | 高 |
pgorm + Ent |
大型团队、强 schema/代码生成治理 | 强 | 中高 |
pgorm + GORM |
快速 CRUD、原型验证 | 中 | 中 |
默认推荐:
- 首选:
pgorm + sqlc - 想保留 ORM 手感:
pgorm + Bun - 重 schema / 代码生成治理:
pgorm + Ent - 快速原型:
pgorm + GORM
原因很直接:你既然已经用了 pgx 和 pgorm,通常说明你希望保留 PostgreSQL 特性的可控性。这个前提下,sqlc 和 Bun 往往比重抽象 ORM 更合适。
最推荐的组合。pgorm 管连接和事务,sqlc 把 SQL 生成为类型安全的方法。
queries.sql:
-- name: CreateUser :one
INSERT INTO users (name, email)
VALUES ($1, $2)
RETURNING id, name, email, created_at;
-- name: GetUser :one
SELECT id, name, email, created_at
FROM users
WHERE id = $1;
-- name: UpdateUserEmail :one
UPDATE users
SET email = $2
WHERE id = $1
RETURNING id, name, email, created_at;
-- name: DeleteUser :exec
DELETE FROM users
WHERE id = $1;CRUD:
queries := db.New(client.Pool())
created, _ := queries.CreateUser(ctx, db.CreateUserParams{
Name: "alice",
Email: "alice@example.com",
})
user, _ := queries.GetUser(ctx, created.ID)
updated, _ := queries.UpdateUserEmail(ctx, db.UpdateUserEmailParams{
ID: user.ID,
Email: "alice+new@example.com",
})
_ = queries.DeleteUser(ctx, updated.ID)事务中使用:
_ = client.WithTx(ctx, nil, func(tx pgx.Tx) error {
q := queries.WithTx(tx)
_, err := q.CreateUser(ctx, db.CreateUserParams{
Name: "bob",
Email: "bob@example.com",
})
return err
})适合想要 ORM 风格,但不想完全失去 SQL 控制的场景。
模型:
type User struct {
bun.BaseModel `bun:"table:users"`
ID int64 `bun:",pk,autoincrement"`
Name string `bun:",notnull"`
Email string `bun:",notnull,unique"`
CreatedAt time.Time `bun:",nullzero,notnull,default:current_timestamp"`
}CRUD:
user := &User{
Name: "alice",
Email: "alice@example.com",
}
_, _ = bunDB.NewInsert().Model(user).Exec(ctx)
found := new(User)
_ = bunDB.NewSelect().Model(found).Where("id = ?", user.ID).Scan(ctx)
found.Email = "alice+new@example.com"
_, _ = bunDB.NewUpdate().
Model(found).
Column("email").
WherePK().
Exec(ctx)
_, _ = bunDB.NewDelete().
Model((*User)(nil)).
Where("id = ?", user.ID).
Exec(ctx)适合大型项目和强 schema 约束场景。
Schema:
type User struct {
ent.Schema
}
func (User) Fields() []ent.Field {
return []ent.Field{
field.String("name").NotEmpty(),
field.String("email").Unique(),
field.Time("created_at").Default(time.Now),
}
}CRUD:
created, _ := entClient.User.
Create().
SetName("alice").
SetEmail("alice@example.com").
Save(ctx)
found, _ := entClient.User.Get(ctx, created.ID)
updated, _ := entClient.User.
UpdateOneID(found.ID).
SetEmail("alice+new@example.com").
Save(ctx)
_ = entClient.User.DeleteOneID(updated.ID).Exec(ctx)适合快速起 CRUD,但不建议作为 PostgreSQL 特性较重项目的默认方案。
模型:
type User struct {
ID int64 `gorm:"primaryKey"`
Name string
Email string `gorm:"uniqueIndex"`
CreatedAt time.Time
}CRUD:
user := &User{
Name: "alice",
Email: "alice@example.com",
}
_ = gormDB.Create(user).Error
var found User
_ = gormDB.First(&found, user.ID).Error
_ = gormDB.Model(&found).Update("email", "alice+new@example.com").Error
_ = gormDB.Delete(&User{}, user.ID).Error- PostgreSQL 特性越多,越建议优先
sqlc或直接pgx - 想在 ORM 便利和 SQL 可控性之间平衡,优先
Bun pgorm本身不绑定 ORM,推荐把查询层选择权保留给业务- 对这个仓库来说,默认推荐写法是:
pgorm + sqlc
推荐方式是使用独立示例模块,而不是把 pgvector 依赖直接引入 pgorm 核心包:
- 示例位置:
examples/pgvector/ - 关键思路:通过
Config.PoolConfig()取得pgxpool.Config,在AfterConnect中调用pgxvec.RegisterTypes(...),然后再用OpenWithPool(...)包装回pgorm.Client
最小集成路径:
cfg := pgorm.NewConfig(
pgorm.WithDSN(dsn),
pgorm.WithStartupPing(false),
)
poolCfg, _ := cfg.PoolConfig()
poolCfg.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
return pgxvec.RegisterTypes(ctx, conn)
}
pool, _ := pgxpool.NewWithConfig(ctx, poolCfg)
client, _ := pgorm.OpenWithPool(ctx, pool, pgorm.WithStartupPing(false))这样做的好处是:
pgorm保持连接层边界,不内建向量类型抽象pgvector-go只出现在业务或示例模块,不污染根go.mod- 你仍然可以直接使用
client.Pool()执行CREATE EXTENSION、INSERT embedding、ORDER BY embedding <=> $1等原生向量查询
pgbouncer transaction pooling 与 prepared statement 缓存冲突,需要:
client, _ := pgorm.Open(ctx,
pgorm.WithDSN("postgres://.../app"),
pgorm.WithPreferSimpleProtocol(true),
)- CPU 或数据库连接预算较紧时,不要直接使用默认
MaxConns=50 - Web API 常见起点可先用
MaxConns = 4-16 * 实例数的量级,再根据pgorm_pool_utilization和 DB 侧等待情况调整 - 后台任务或批处理服务通常比同步 API 更容易占满连接,建议单独配置
WithName和池大小
- 优先同时配置调用侧
context deadline和服务端WithStatementTimeout(...) - 长事务场景建议同时配置
WithLockTimeout(...)和WithIdleInTxSessionTimeout(...) HealthCheck自带 5 秒默认超时;如果你的探针查询更重,应该传入带 deadline 的ctx
WithTx只会自动重试 PostgreSQL 已明确回滚的冲突错误,但回调函数本身仍应避免把不可回滚的外部副作用放进事务闭包- 发送 MQ、调用外部 HTTP、写本地文件这类动作,建议放到事务提交成功之后,或通过 outbox 模式衔接
- 如果业务不能容忍重试,请显式
WithMaxRetries(0)
- 先从负载均衡摘流量,再等待请求排空,最后调用
Close() Close()会立即关闭连接池,不会等待外部已经拿到的查询自动完成- 对
OpenWithPool包装的外部池,Client.Close()是 no-op,生命周期仍由外层管理
- 建议至少采集
pgorm_pool_utilization、pgorm_pool_empty_acquire_count_total、pgorm_pool_canceled_acquire_count_total - 事务冲突较多的业务建议接入
WithTxRetryObserver(...),把重试次数和等待时间打到 metrics 或日志 - 多个连接池并存时务必配置
WithName(...),否则跨实例指标不易区分
PGORM_RUN_INTEGRATION=1 \
PGORM_TEST_DSN='postgres://postgres:postgres@127.0.0.1:5432/postgres?sslmode=disable' \
make integration默认 go test ./... 不会连接 DB;只有 PGORM_RUN_INTEGRATION=1 时才会执行
TestIntegration*。每个测试使用独立 schema,结束自动清理。
- 库边界清晰:pgorm 只管连接、事务重试、健康、观测。查询、migration、 schema 管理不属于本库,由上层选择(sqlc、goose、atlas 等)
- 不做集群 / HA:读写分离、failover 留给 pg 原生方案(Patroni、repmgr、 Aurora RDS、PgCat)。单应用 pgorm 看到的永远是一个逻辑端点
- 启动即失败,不兜底:配置错误、首次连接失败等在
Open阶段明确返回, 不隐藏为"先启起来再说"
WithPreferSimpleProtocol(true)会禁用 binary protocol,损失少量性能但 解决 pgbouncer transaction pooling 的 prepared statement 冲突Close()立即关闭连接池,不等待外部已拿到的查询排空;优雅停机请在上层 先停流量再调用Close()MetricSample是中立样本,接 Prometheus / OTel 由上层适配
MIT