From 129922aae306ce181df73bfd7061725cba4ae23b Mon Sep 17 00:00:00 2001 From: Andreas Emilsson Date: Wed, 20 Sep 2023 09:59:55 +0200 Subject: [PATCH 1/3] Added support for pgx locking table In order to support running migrations through PgBouncer which does not support advisory locks. --- database/pgx/pgx.go | 158 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 139 insertions(+), 19 deletions(-) diff --git a/database/pgx/pgx.go b/database/pgx/pgx.go index deaca94ea..9e7d36d9e 100644 --- a/database/pgx/pgx.go +++ b/database/pgx/pgx.go @@ -25,6 +25,11 @@ import ( _ "github.com/jackc/pgx/v4/stdlib" ) +const ( + LockStrategyAdvisory = "advisory" + LockStrategyTable = "table" +) + func init() { db := Postgres{} database.Register("pgx", &db) @@ -36,6 +41,8 @@ var ( DefaultMigrationsTable = "schema_migrations" DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB + DefaultLockTable = "schema_lock" + DefaultLockStrategy = "advisory" ) var ( @@ -49,6 +56,8 @@ type Config struct { MigrationsTable string DatabaseName string SchemaName string + LockTable string + LockStrategy string migrationsSchemaName string migrationsTableName string StatementTimeout time.Duration @@ -108,6 +117,14 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { config.MigrationsTable = DefaultMigrationsTable } + if len(config.LockTable) == 0 { + config.LockTable = DefaultLockTable + } + + if len(config.LockStrategy) == 0 { + config.LockStrategy = DefaultLockStrategy + } + config.migrationsSchemaName = config.SchemaName config.migrationsTableName = config.MigrationsTable if config.MigrationsTableQuoted { @@ -133,6 +150,10 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { config: config, } + if err := px.ensureLockTable(); err != nil { + return nil, err + } + if err := px.ensureVersionTable(); err != nil { return nil, err } @@ -196,6 +217,8 @@ func (p *Postgres) Open(url string) (database.Driver, error) { } } + lockStrategy := purl.Query().Get("x-lock-strategy") + px, err := WithInstance(db, &Config{ DatabaseName: purl.Path, MigrationsTable: migrationsTable, @@ -203,6 +226,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) { StatementTimeout: time.Duration(statementTimeout) * time.Millisecond, MultiStatementEnabled: multiStatementEnabled, MultiStatementMaxSize: multiStatementMaxSize, + LockStrategy: lockStrategy, }) if err != nil { @@ -221,36 +245,110 @@ func (p *Postgres) Close() error { return nil } -// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS func (p *Postgres) Lock() error { return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error { - aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) - if err != nil { - return err - } - - // This will wait indefinitely until the lock can be acquired. - query := `SELECT pg_advisory_lock($1)` - if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { - return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} + switch p.config.LockStrategy { + case LockStrategyAdvisory: + return p.applyAdvisoryLock() + case LockStrategyTable: + return p.applyTableLock() + default: + return fmt.Errorf("unknown lock strategy \"%s\"", p.config.LockStrategy) } - return nil }) } func (p *Postgres) Unlock() error { return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error { - aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) - if err != nil { - return err + switch p.config.LockStrategy { + case LockStrategyAdvisory: + return p.releaseAdvisoryLock() + case LockStrategyTable: + return p.releaseTableLock() + default: + return fmt.Errorf("unknown lock strategy \"%s\"", p.config.LockStrategy) } + }) +} - query := `SELECT pg_advisory_unlock($1)` - if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { - return &database.Error{OrigErr: err, Query: []byte(query)} +// https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS +func (p *Postgres) applyAdvisoryLock() error { + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) + if err != nil { + return err + } + + // This will wait indefinitely until the lock can be acquired. + query := `SELECT pg_advisory_lock($1)` + if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} + } + return nil +} + +func (p *Postgres) applyTableLock() error { + tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + return &database.Error{OrigErr: err, Err: "transaction start failed"} + } + + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName) + if err != nil { + return err + } + + query := "SELECT * FROM " + p.config.LockTable + " WHERE lock_id = $1" + rows, err := tx.Query(query, aid) + if err != nil { + return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)} + } + + defer func() { + if errClose := rows.Close(); errClose != nil { + err = multierror.Append(err, errClose) } - return nil - }) + }() + + // If row exists at all, lock is present + locked := rows.Next() + if locked { + return database.ErrLocked + } + + query = "INSERT INTO " + p.config.LockTable + " (lock_id) VALUES ($1)" + if _, err := tx.Exec(query, aid); err != nil { + return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)} + } + + return tx.Commit() +} + +func (p *Postgres) releaseAdvisoryLock() error { + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) + if err != nil { + return err + } + + query := `SELECT pg_advisory_unlock($1)` + if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + + return nil +} + +func (p *Postgres) releaseTableLock() error { + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName) + if err != nil { + return err + } + + query := "DELETE FROM " + p.config.LockTable + " WHERE lock_id = $1" + if _, err := p.db.Exec(query, aid); err != nil { + return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)} + } + + return nil } func (p *Postgres) Run(migration io.Reader) error { @@ -478,6 +576,28 @@ func (p *Postgres) ensureVersionTable() (err error) { return nil } +func (p *Postgres) ensureLockTable() error { + if p.config.LockStrategy != LockStrategyTable { + return nil + } + + var count int + query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_name = $1 AND table_schema = (SELECT current_schema()) LIMIT 1` + if err := p.db.QueryRow(query, p.config.LockTable).Scan(&count); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + if count == 1 { + return nil + } + + query = `CREATE TABLE "` + p.config.LockTable + `" (lock_id BIGINT NOT NULL PRIMARY KEY)` + if _, err := p.db.Exec(query); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + + return nil +} + // Copied from lib/pq implementation: https://github.com/lib/pq/blob/v1.9.0/conn.go#L1611 func quoteIdentifier(name string) string { end := strings.IndexRune(name, 0) From bead4a952b892495f44c8df81adf010988929a01 Mon Sep 17 00:00:00 2001 From: Tommy Karlsson Date: Wed, 20 Sep 2023 14:27:26 +0200 Subject: [PATCH 2/3] Added documentation and test for lock strategy --- database/pgx/README.md | 2 ++ database/pgx/pgx.go | 8 ++++++++ database/pgx/pgx_test.go | 26 ++++++++++++++++++++++++++ 3 files changed, 36 insertions(+) diff --git a/database/pgx/README.md b/database/pgx/README.md index bad669315..bec7c5c75 100644 --- a/database/pgx/README.md +++ b/database/pgx/README.md @@ -11,6 +11,8 @@ This package is for [pgx/v4](https://pkg.go.dev/github.com/jackc/pgx/v4). A back | `x-statement-timeout` | `StatementTimeout` | Abort any statement that takes more than the specified number of milliseconds | | `x-multi-statement` | `MultiStatementEnabled` | Enable multi-statement execution (default: false) | | `x-multi-statement-max-size` | `MultiStatementMaxSize` | Maximum size of single statement in bytes (default: 10MB) | +| `x-lock-strategy` | `LockStrategy` | Strategy used for locking during migration (default: advisory) | +| `x-lock-table` | `LockTable` | Name of the table which maintains the migration lock (default: schema_lock) | | `dbname` | `DatabaseName` | The name of the database to connect to | | `search_path` | | This variable specifies the order in which schemas are searched when an object is referenced by a simple name with no schema specified. | | `user` | | The user to sign in as | diff --git a/database/pgx/pgx.go b/database/pgx/pgx.go index 9e7d36d9e..849785e38 100644 --- a/database/pgx/pgx.go +++ b/database/pgx/pgx.go @@ -218,6 +218,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) { } lockStrategy := purl.Query().Get("x-lock-strategy") + lockTable := purl.Query().Get("x-lock-table") px, err := WithInstance(db, &Config{ DatabaseName: purl.Path, @@ -227,6 +228,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) { MultiStatementEnabled: multiStatementEnabled, MultiStatementMaxSize: multiStatementMaxSize, LockStrategy: lockStrategy, + LockTable: lockTable, }) if err != nil { @@ -512,6 +514,12 @@ func (p *Postgres) Drop() (err error) { if err := tables.Scan(&tableName); err != nil { return err } + + // do not drop lock table + if tableName == p.config.LockTable && p.config.LockStrategy == LockStrategyTable { + continue + } + if len(tableName) > 0 { tableNames = append(tableNames, tableName) } diff --git a/database/pgx/pgx_test.go b/database/pgx/pgx_test.go index 5d7a5238e..53e8e1d86 100644 --- a/database/pgx/pgx_test.go +++ b/database/pgx/pgx_test.go @@ -134,6 +134,32 @@ func TestMigrate(t *testing.T) { }) } +func TestMigrateLockTable(t *testing.T) { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + ip, port, err := c.FirstPort() + if err != nil { + t.Fatal(err) + } + + addr := pgConnectionString(ip, port, "x-lock-strategy=table", "x-lock-table=lock_table") + p := &Postgres{} + d, err := p.Open(addr) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "pgx", d) + if err != nil { + t.Fatal(err) + } + dt.TestMigrate(t, m) + }) +} + func TestMultipleStatements(t *testing.T) { dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { ip, port, err := c.FirstPort() From 091ad5db99b020ab8e6046b82435a283a1ce1803 Mon Sep 17 00:00:00 2001 From: Tommy Karlsson Date: Wed, 20 Dec 2023 10:25:29 +0100 Subject: [PATCH 3/3] Quote locktable from config in queries Defer rollback of transactions --- database/pgx/pgx.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/database/pgx/pgx.go b/database/pgx/pgx.go index 849785e38..7e42d29c9 100644 --- a/database/pgx/pgx.go +++ b/database/pgx/pgx.go @@ -23,6 +23,7 @@ import ( "github.com/jackc/pgconn" "github.com/jackc/pgerrcode" _ "github.com/jackc/pgx/v4/stdlib" + "github.com/lib/pq" ) const ( @@ -42,7 +43,7 @@ var ( DefaultMigrationsTable = "schema_migrations" DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB DefaultLockTable = "schema_lock" - DefaultLockStrategy = "advisory" + DefaultLockStrategy = LockStrategyAdvisory ) var ( @@ -293,13 +294,19 @@ func (p *Postgres) applyTableLock() error { if err != nil { return &database.Error{OrigErr: err, Err: "transaction start failed"} } + defer func() { + errRollback := tx.Rollback() + if errRollback != nil { + err = multierror.Append(err, errRollback) + } + }() aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName) if err != nil { return err } - query := "SELECT * FROM " + p.config.LockTable + " WHERE lock_id = $1" + query := "SELECT * FROM " + pq.QuoteIdentifier(p.config.LockTable) + " WHERE lock_id = $1" rows, err := tx.Query(query, aid) if err != nil { return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)} @@ -317,7 +324,7 @@ func (p *Postgres) applyTableLock() error { return database.ErrLocked } - query = "INSERT INTO " + p.config.LockTable + " (lock_id) VALUES ($1)" + query = "INSERT INTO " + pq.QuoteIdentifier(p.config.LockTable) + " (lock_id) VALUES ($1)" if _, err := tx.Exec(query, aid); err != nil { return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)} } @@ -345,7 +352,7 @@ func (p *Postgres) releaseTableLock() error { return err } - query := "DELETE FROM " + p.config.LockTable + " WHERE lock_id = $1" + query := "DELETE FROM " + pq.QuoteIdentifier(p.config.LockTable) + " WHERE lock_id = $1" if _, err := p.db.Exec(query, aid); err != nil { return database.Error{OrigErr: err, Err: "failed to release migration lock", Query: []byte(query)} } @@ -598,7 +605,7 @@ func (p *Postgres) ensureLockTable() error { return nil } - query = `CREATE TABLE "` + p.config.LockTable + `" (lock_id BIGINT NOT NULL PRIMARY KEY)` + query = `CREATE TABLE ` + pq.QuoteIdentifier(p.config.LockTable) + ` (lock_id BIGINT NOT NULL PRIMARY KEY)` if _, err := p.db.Exec(query); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} }