diff --git a/database/postgres/README.md b/database/postgres/README.md index bc823f4e1..ccb54d00c 100644 --- a/database/postgres/README.md +++ b/database/postgres/README.md @@ -9,6 +9,7 @@ | `x-statement-timeout` | `StatementTimeout` | Abort any statement that takes more than the specified number of milliseconds | | `x-multi-statement` | `MultiStatementEnabled` | Enable multi-statement execution (default: false) | | `x-multi-statement-max-size` | `MultiStatementMaxSize` | Maximum size of single statement in bytes (default: 10MB) | +| `x-lock-retry-max-interval` | `Locking` | When acquiring a lock fails, retries are used with an exponential backoff. This parameter specifies what is the maximum interval between retries in milliseconds (default: 1000ms). Values below the [initial retry interval](./postgres.go#L39) (100ms) will be ignored. | | `dbname` | `DatabaseName` | The name of the database to connect to | | `search_path` | | This variable specifies the order in which schemas are searched when an object is referenced by a simple name with no schema specified. | | `user` | | The user to sign in as | diff --git a/database/postgres/postgres.go b/database/postgres/postgres.go index 9e6d6277f..89d60d5d2 100644 --- a/database/postgres/postgres.go +++ b/database/postgres/postgres.go @@ -16,6 +16,7 @@ import ( "go.uber.org/atomic" + "github.com/cenkalti/backoff/v4" "github.com/golang-migrate/migrate/v4" "github.com/golang-migrate/migrate/v4/database" "github.com/golang-migrate/migrate/v4/database/multistmt" @@ -34,6 +35,9 @@ var ( DefaultMigrationsTable = "schema_migrations" DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB + + DefaultLockInitialRetryInterval = 100 * time.Millisecond + DefaultLockMaxRetryInterval = 1000 * time.Millisecond ) var ( @@ -53,6 +57,17 @@ type Config struct { migrationsTableName string StatementTimeout time.Duration MultiStatementMaxSize int + Locking LockConfig +} + +type LockConfig struct { + // InitialRetryInterval the initial (minimum) retry interval used for exponential backoff + // to try acquire a lock + InitialRetryInterval time.Duration + + // MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches this limit, + // the retry interval remains the same + MaxRetryInterval time.Duration } type Postgres struct { @@ -167,7 +182,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) { if s := purl.Query().Get("x-migrations-table-quoted"); len(s) > 0 { migrationsTableQuoted, err = strconv.ParseBool(s) if err != nil { - return nil, fmt.Errorf("Unable to parse option x-migrations-table-quoted: %w", err) + return nil, fmt.Errorf("unable to parse option x-migrations-table-quoted: %w", err) } } if (len(migrationsTable) > 0) && (migrationsTableQuoted) && ((migrationsTable[0] != '"') || (migrationsTable[len(migrationsTable)-1] != '"')) { @@ -198,7 +213,22 @@ func (p *Postgres) Open(url string) (database.Driver, error) { if s := purl.Query().Get("x-multi-statement"); len(s) > 0 { multiStatementEnabled, err = strconv.ParseBool(s) if err != nil { - return nil, fmt.Errorf("Unable to parse option x-multi-statement: %w", err) + return nil, fmt.Errorf("unable to parse option x-multi-statement: %w", err) + } + } + + lockConfig := LockConfig{ + InitialRetryInterval: DefaultLockInitialRetryInterval, + MaxRetryInterval: DefaultLockMaxRetryInterval, + } + if s := purl.Query().Get("x-lock-retry-max-interval"); len(s) > 0 { + maxRetryIntervalMillis, err := strconv.Atoi(s) + if err != nil { + return nil, fmt.Errorf("unable to parse option x-lock-retry-max-interval: %w", err) + } + maxRetryInterval := time.Duration(maxRetryIntervalMillis) * time.Millisecond + if maxRetryInterval > DefaultLockInitialRetryInterval { + lockConfig.MaxRetryInterval = maxRetryInterval } } @@ -209,6 +239,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) { StatementTimeout: time.Duration(statementTimeout) * time.Millisecond, MultiStatementEnabled: multiStatementEnabled, MultiStatementMaxSize: multiStatementMaxSize, + Locking: lockConfig, }) if err != nil { @@ -231,24 +262,45 @@ func (p *Postgres) Close() error { return nil } +// Lock tries to acquire an advisory lock and retries indefinitely with an exponential backoff strategy // https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS func (p *Postgres) Lock() error { return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error { - aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) - if err != nil { - return err - } + backOff := p.config.Locking.nonStopBackoff() + err := backoff.Retry(func() error { + ok, err := p.tryLock() + if err != nil { + return fmt.Errorf("p.tryLock: %w", err) + } - // This will wait indefinitely until the lock can be acquired. - query := `SELECT pg_advisory_lock($1)` - if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { - return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} - } + if ok { + return nil + } - return nil + return fmt.Errorf("could not acquire lock") // causes retry + }, backOff) + + return err }) } +func (p *Postgres) tryLock() (bool, error) { + aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) + if err != nil { + return false, err + } + + // https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS + // should always return true or false + query := `SELECT pg_try_advisory_lock($1)` + var ok bool + if err := p.conn.QueryRowContext(context.Background(), query, aid).Scan(&ok); err != nil { + return false, &database.Error{OrigErr: err, Err: "pg_try_advisory_lock failed", Query: []byte(query)} + } + + return ok, nil +} + func (p *Postgres) Unlock() error { return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error { aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName) @@ -491,3 +543,13 @@ func (p *Postgres) ensureVersionTable() (err error) { return nil } + +func (l *LockConfig) nonStopBackoff() backoff.BackOff { + b := backoff.NewExponentialBackOff() + b.InitialInterval = l.InitialRetryInterval + b.MaxInterval = l.MaxRetryInterval + b.MaxElapsedTime = 0 // this backoff won't stop + b.Reset() + + return b +} diff --git a/database/postgres/postgres_test.go b/database/postgres/postgres_test.go index 02bf991b8..128637085 100644 --- a/database/postgres/postgres_test.go +++ b/database/postgres/postgres_test.go @@ -96,6 +96,7 @@ func Test(t *testing.T) { t.Run("testFailToCreateTableWithoutPermissions", testFailToCreateTableWithoutPermissions) t.Run("testCheckBeforeCreateTable", testCheckBeforeCreateTable) t.Run("testParallelSchema", testParallelSchema) + t.Run("testPostgresConcurrentMigrations", testPostgresConcurrentMigrations) t.Run("testPostgresLock", testPostgresLock) t.Run("testWithInstanceConcurrent", testWithInstanceConcurrent) t.Run("testWithConnection", testWithConnection) @@ -628,6 +629,50 @@ func testParallelSchema(t *testing.T) { }) } +func testPostgresConcurrentMigrations(t *testing.T) { + dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { + // GIVEN - a set of concurrent processes running migrations + const concurrency = 3 + var wg sync.WaitGroup + + ip, port, err := c.FirstPort() + if err != nil { + t.Fatal(err) + } + addr := pgConnectionString(ip, port, "x-lock-retry-max-interval=2000") + + // WHEN + for i := 0; i < concurrency; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + p := &Postgres{} + d, err := p.Open(addr) + if err != nil { + t.Error(err) + } + defer func() { + if err := d.Close(); err != nil { + t.Error(err) + } + }() + + m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "postgres", d) + if err != nil { + t.Error(err) + } + dt.TestMigrate(t, m) + }() + } + + wg.Wait() + + // THEN + }) +} + func testPostgresLock(t *testing.T) { dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) { ip, port, err := c.FirstPort()