Skip to content

Commit

Permalink
[v13] pgbk: add change_feed_conn_string option (#31938)
Browse files Browse the repository at this point in the history
* pgbk: add change_feed_conn_string option

* Add timeout on pg_create_logical_replication_slot
  • Loading branch information
espadolini committed Sep 15, 2023
1 parent 1b8a137 commit 74445e8
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 23 deletions.
59 changes: 38 additions & 21 deletions lib/backend/pgbk/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package pgbk

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"time"
Expand Down Expand Up @@ -116,26 +115,30 @@ func (b *Backend) backgroundChangeFeed(ctx context.Context) {
// events. Assumes that b.buf is not initialized but not closed, and will reset
// it before returning.
func (b *Backend) runChangeFeed(ctx context.Context) error {
// we manually copy the pool configuration and connect because we don't want
// to hit a connection limit or mess with the connection pool stats; we need
// a separate, long-running connection here anyway.
poolConfig := b.pool.Config()
if poolConfig.BeforeConnect != nil {
if err := poolConfig.BeforeConnect(ctx, poolConfig.ConnConfig); err != nil {
connConfig := b.feedConfig.ConnConfig.Copy()
if bc := b.feedConfig.BeforeConnect; bc != nil {
if err := bc(ctx, connConfig); err != nil {
return trace.Wrap(err)
}
}
conn, err := pgx.ConnectConfig(ctx, poolConfig.ConnConfig)
// TODO(espadolini): use a replication connection if
// connConfig.RuntimeParams["replication"] == "database"
conn, err := pgx.ConnectConfig(ctx, connConfig)
if err != nil {
return trace.Wrap(err)
}
defer func() {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
closeCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
if err := conn.Close(ctx); err != nil && ctx.Err() != nil {
if err := conn.Close(closeCtx); err != nil && closeCtx.Err() != nil {
b.log.WithError(err).Warn("Error closing change feed connection.")
}
}()
if ac := b.feedConfig.AfterConnect; ac != nil {
if err := ac(ctx, conn); err != nil {
return trace.Wrap(err)
}
}

// reading from a replication slot adds to the postgres log at "log" level
// (right below "fatal") for every poll, and we poll every second here, so
Expand All @@ -146,26 +149,40 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
b.log.WithError(err).Debug("Failed to silence log messages for change feed session.")
}

// this can be useful if we're some sort of admin but we haven't gotten the
// REPLICATION attribute yet
// HACK(espadolini): ALTER ROLE CURRENT_USER REPLICATION just crashes postgres on Azure
if _, err := conn.Exec(ctx,
fmt.Sprintf("ALTER ROLE %v REPLICATION", pgx.Identifier{poolConfig.ConnConfig.User}.Sanitize()),
pgx.QueryExecModeExec,
); err != nil {
b.log.WithError(err).Debug("Failed to enable replication for the current user.")
// this can be useful on Azure if we have azure_pg_admin permissions but not
// the REPLICATION attribute; in vanilla Postgres you have to be SUPERUSER
// to grant REPLICATION, and if you are SUPERUSER you can do replication
// things even without the attribute anyway
//
// HACK(espadolini): ALTER ROLE CURRENT_USER crashes Postgres on Azure, so
// we have to use an explicit username
if b.cfg.AuthMode == AzureADAuth && connConfig.User != "" {
if _, err := conn.Exec(ctx,
fmt.Sprintf("ALTER ROLE %v REPLICATION", pgx.Identifier{connConfig.User}.Sanitize()),
pgx.QueryExecModeExec,
); err != nil {
b.log.WithError(err).Debug("Failed to enable replication for the current user.")
}
}

u := uuid.New()
slotName := hex.EncodeToString(u[:])
// a replication slot must be 1-63 lowercase letters, numbers and
// underscores, as per
// https://github.com/postgres/postgres/blob/b0ec61c9c27fb932ae6524f92a18e0d1fadbc144/src/backend/replication/slot.c#L193-L194
slotName := fmt.Sprintf("teleport_%x", [16]byte(uuid.New()))

b.log.WithField("slot_name", slotName).Info("Setting up change feed.")
if _, err := conn.Exec(ctx,

// be noisy about pg_create_logical_replication_slot taking too long, since
// hanging here leaves the backend non-functional
createCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
if _, err := conn.Exec(createCtx,
"SELECT * FROM pg_create_logical_replication_slot($1, 'wal2json', true)",
pgx.QueryExecModeExec, slotName,
); err != nil {
cancel()
return trace.Wrap(err)
}
cancel()

b.log.WithField("slot_name", slotName).Info("Change feed started.")
b.buf.SetInit()
Expand Down
17 changes: 15 additions & 2 deletions lib/backend/pgbk/pgbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Config struct {

AuthMode AuthMode `json:"auth_mode"`

ChangeFeedConnString string `json:"change_feed_conn_string"`
ChangeFeedPollInterval types.Duration `json:"change_feed_poll_interval"`
ChangeFeedBatchSize int `json:"change_feed_batch_size"`

Expand All @@ -95,6 +96,9 @@ func (c *Config) CheckAndSetDefaults() error {
return trace.Wrap(err)
}

if c.ChangeFeedConnString == "" {
c.ChangeFeedConnString = c.ConnString
}
if c.ChangeFeedPollInterval < 0 {
return trace.BadParameter("change feed poll interval must be non-negative")
}
Expand Down Expand Up @@ -150,6 +154,10 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {
if err != nil {
return nil, trace.Wrap(err)
}
feedConfig, err := pgxpool.ParseConfig(cfg.ChangeFeedConnString)
if err != nil {
return nil, trace.Wrap(err)
}

log := logrus.WithField(trace.Component, componentName)

Expand All @@ -159,6 +167,7 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {
return nil, trace.Wrap(err)
}
poolConfig.BeforeConnect = bc
feedConfig.BeforeConnect = bc
}

const defaultTxIsoParamName = "default_transaction_isolation"
Expand All @@ -185,7 +194,9 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {

ctx, cancel := context.WithCancel(ctx)
b := &Backend{
cfg: cfg,
cfg: cfg,
feedConfig: feedConfig,

log: log,
pool: pool,
buf: backend.NewCircularBuffer(),
Expand All @@ -211,7 +222,9 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {

// Backend is a PostgreSQL-backed [backend.Backend].
type Backend struct {
cfg Config
cfg Config
feedConfig *pgxpool.Config

log logrus.FieldLogger
pool *pgxpool.Pool
buf *backend.CircularBuffer
Expand Down

0 comments on commit 74445e8

Please sign in to comment.