Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v13] pgbk: add change_feed_conn_string option #31938

Merged
merged 2 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
59 changes: 38 additions & 21 deletions lib/backend/pgbk/background.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package pgbk

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"time"
Expand Down Expand Up @@ -116,26 +115,30 @@ func (b *Backend) backgroundChangeFeed(ctx context.Context) {
// events. Assumes that b.buf is not initialized but not closed, and will reset
// it before returning.
func (b *Backend) runChangeFeed(ctx context.Context) error {
// we manually copy the pool configuration and connect because we don't want
// to hit a connection limit or mess with the connection pool stats; we need
// a separate, long-running connection here anyway.
poolConfig := b.pool.Config()
if poolConfig.BeforeConnect != nil {
if err := poolConfig.BeforeConnect(ctx, poolConfig.ConnConfig); err != nil {
connConfig := b.feedConfig.ConnConfig.Copy()
if bc := b.feedConfig.BeforeConnect; bc != nil {
if err := bc(ctx, connConfig); err != nil {
return trace.Wrap(err)
}
}
conn, err := pgx.ConnectConfig(ctx, poolConfig.ConnConfig)
// TODO(espadolini): use a replication connection if
// connConfig.RuntimeParams["replication"] == "database"
conn, err := pgx.ConnectConfig(ctx, connConfig)
if err != nil {
return trace.Wrap(err)
}
defer func() {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
closeCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
if err := conn.Close(ctx); err != nil && ctx.Err() != nil {
if err := conn.Close(closeCtx); err != nil && closeCtx.Err() != nil {
b.log.WithError(err).Warn("Error closing change feed connection.")
}
}()
if ac := b.feedConfig.AfterConnect; ac != nil {
if err := ac(ctx, conn); err != nil {
return trace.Wrap(err)
}
}

// reading from a replication slot adds to the postgres log at "log" level
// (right below "fatal") for every poll, and we poll every second here, so
Expand All @@ -146,26 +149,40 @@ func (b *Backend) runChangeFeed(ctx context.Context) error {
b.log.WithError(err).Debug("Failed to silence log messages for change feed session.")
}

// this can be useful if we're some sort of admin but we haven't gotten the
// REPLICATION attribute yet
// HACK(espadolini): ALTER ROLE CURRENT_USER REPLICATION just crashes postgres on Azure
if _, err := conn.Exec(ctx,
fmt.Sprintf("ALTER ROLE %v REPLICATION", pgx.Identifier{poolConfig.ConnConfig.User}.Sanitize()),
pgx.QueryExecModeExec,
); err != nil {
b.log.WithError(err).Debug("Failed to enable replication for the current user.")
// this can be useful on Azure if we have azure_pg_admin permissions but not
// the REPLICATION attribute; in vanilla Postgres you have to be SUPERUSER
// to grant REPLICATION, and if you are SUPERUSER you can do replication
// things even without the attribute anyway
//
// HACK(espadolini): ALTER ROLE CURRENT_USER crashes Postgres on Azure, so
// we have to use an explicit username
if b.cfg.AuthMode == AzureADAuth && connConfig.User != "" {
if _, err := conn.Exec(ctx,
fmt.Sprintf("ALTER ROLE %v REPLICATION", pgx.Identifier{connConfig.User}.Sanitize()),
pgx.QueryExecModeExec,
); err != nil {
b.log.WithError(err).Debug("Failed to enable replication for the current user.")
}
}

u := uuid.New()
slotName := hex.EncodeToString(u[:])
// a replication slot must be 1-63 lowercase letters, numbers and
// underscores, as per
// https://github.com/postgres/postgres/blob/b0ec61c9c27fb932ae6524f92a18e0d1fadbc144/src/backend/replication/slot.c#L193-L194
slotName := fmt.Sprintf("teleport_%x", [16]byte(uuid.New()))

b.log.WithField("slot_name", slotName).Info("Setting up change feed.")
if _, err := conn.Exec(ctx,

// be noisy about pg_create_logical_replication_slot taking too long, since
// hanging here leaves the backend non-functional
createCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
if _, err := conn.Exec(createCtx,
"SELECT * FROM pg_create_logical_replication_slot($1, 'wal2json', true)",
pgx.QueryExecModeExec, slotName,
); err != nil {
cancel()
return trace.Wrap(err)
}
cancel()

b.log.WithField("slot_name", slotName).Info("Change feed started.")
b.buf.SetInit()
Expand Down
17 changes: 15 additions & 2 deletions lib/backend/pgbk/pgbk.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type Config struct {

AuthMode AuthMode `json:"auth_mode"`

ChangeFeedConnString string `json:"change_feed_conn_string"`
ChangeFeedPollInterval types.Duration `json:"change_feed_poll_interval"`
ChangeFeedBatchSize int `json:"change_feed_batch_size"`

Expand All @@ -95,6 +96,9 @@ func (c *Config) CheckAndSetDefaults() error {
return trace.Wrap(err)
}

if c.ChangeFeedConnString == "" {
c.ChangeFeedConnString = c.ConnString
}
if c.ChangeFeedPollInterval < 0 {
return trace.BadParameter("change feed poll interval must be non-negative")
}
Expand Down Expand Up @@ -150,6 +154,10 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {
if err != nil {
return nil, trace.Wrap(err)
}
feedConfig, err := pgxpool.ParseConfig(cfg.ChangeFeedConnString)
if err != nil {
return nil, trace.Wrap(err)
}

log := logrus.WithField(trace.Component, componentName)

Expand All @@ -159,6 +167,7 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {
return nil, trace.Wrap(err)
}
poolConfig.BeforeConnect = bc
feedConfig.BeforeConnect = bc
}

const defaultTxIsoParamName = "default_transaction_isolation"
Expand All @@ -185,7 +194,9 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {

ctx, cancel := context.WithCancel(ctx)
b := &Backend{
cfg: cfg,
cfg: cfg,
feedConfig: feedConfig,

log: log,
pool: pool,
buf: backend.NewCircularBuffer(),
Expand All @@ -211,7 +222,9 @@ func NewWithConfig(ctx context.Context, cfg Config) (*Backend, error) {

// Backend is a PostgreSQL-backed [backend.Backend].
type Backend struct {
cfg Config
cfg Config
feedConfig *pgxpool.Config

log logrus.FieldLogger
pool *pgxpool.Pool
buf *backend.CircularBuffer
Expand Down