diff --git a/internal/app/app.go b/internal/app/app.go index 95942433..a2d0ecf5 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -5,8 +5,10 @@ import ( "errors" "os" "os/signal" + "strings" "sync" "syscall" + "time" "github.com/hookdeck/outpost/internal/config" "github.com/hookdeck/outpost/internal/infra" @@ -191,33 +193,119 @@ func constructServices( return services, nil } +// runMigration handles database schema migrations with retry logic for lock conflicts. +// +// MIGRATION LOCK BEHAVIOR: +// - Database locks are only acquired when migrations need to be performed +// - When multiple nodes start simultaneously and migrations are pending: +// 1. One node acquires the lock and performs migrations (ideally < 5 seconds) +// 2. Other nodes fail with lock errors ("try lock failed", "can't acquire lock") +// 3. Failed nodes wait 5 seconds and retry +// 4. On retry, migrations are complete and nodes proceed successfully +// +// RETRY STRATEGY: +// - Max 3 attempts with 5-second delays between retries +// - 5 seconds is sufficient because most migrations complete quickly +// - If no migrations are needed (common case), all nodes proceed immediately without lock contention func runMigration(ctx context.Context, cfg *config.Config, logger *logging.Logger) error { - migrator, err := migrator.New(cfg.ToMigratorOpts()) - if err != nil { - return err - } + const ( + maxRetries = 3 + retryDelay = 5 * time.Second + ) - defer func() { + var lastErr error + + for attempt := 1; attempt <= maxRetries; attempt++ { + migrator, err := migrator.New(cfg.ToMigratorOpts()) + if err != nil { + return err + } + + version, versionJumped, err := migrator.Up(ctx, -1) + + // Always close the migrator after each attempt sourceErr, dbErr := migrator.Close(ctx) if sourceErr != nil { - logger.Error("failed to close migrator", zap.Error(sourceErr)) + logger.Error("failed to close migrator source", zap.Error(sourceErr)) } if dbErr != nil { - logger.Error("failed to close migrator", zap.Error(dbErr)) + logger.Error("failed to close migrator database connection", zap.Error(dbErr)) } - }() - version, versionJumped, err := migrator.Up(ctx, -1) - if err != nil { - return err + if err == nil { + // Migration succeeded + if versionJumped > 0 { + logger.Info("migrations applied", + zap.Int("version", version), + zap.Int("version_applied", versionJumped)) + } else { + logger.Info("no migrations applied", zap.Int("version", version)) + } + return nil + } + + // Check if this is a lock-related error + // Lock errors can manifest as: + // - "can't acquire lock" (database.ErrLocked) + // - "try lock failed" (postgres advisory lock failure) + // - "pg_advisory_lock" (postgres lock function errors) + isLockError := isLockRelatedError(err) + lastErr = err + + if !isLockError { + // Not a lock error, fail immediately + logger.Error("migration failed", zap.Error(err)) + return err + } + + // Lock error - retry if we have attempts remaining + if attempt < maxRetries { + logger.Warn("migration lock conflict, retrying", + zap.Int("attempt", attempt), + zap.Int("max_retries", maxRetries), + zap.Duration("retry_delay", retryDelay), + zap.Error(err)) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(retryDelay): + // Continue to next attempt + } + } else { + // Exhausted all retries + logger.Error("migration failed after retries", + zap.Int("attempts", maxRetries), + zap.Error(err)) + } } - if versionJumped > 0 { - logger.Info("migrations applied", - zap.Int("version", version), - zap.Int("version_applied", versionJumped)) - } else { - logger.Info("no migrations applied", zap.Int("version", version)) + + return lastErr +} + +// isLockRelatedError checks if an error is related to database migration lock acquisition. +// This includes errors from golang-migrate's locking mechanism. +func isLockRelatedError(err error) bool { + if err == nil { + return false } - return nil + errMsg := err.Error() + + // Check for lock-related error messages from golang-migrate: + // 1. "can't acquire lock" - database.ErrLocked from golang-migrate/migrate/v4/database + // 2. "try lock failed" - returned by postgres driver when pg_advisory_lock() fails + // See: https://github.com/golang-migrate/migrate/blob/master/database/postgres/postgres.go + lockIndicators := []string{ + "can't acquire lock", + "try lock failed", + } + + for _, indicator := range lockIndicators { + if strings.Contains(errMsg, indicator) { + return true + } + } + + return false } diff --git a/internal/app/lock_error_test.go b/internal/app/lock_error_test.go new file mode 100644 index 00000000..65e44a3b --- /dev/null +++ b/internal/app/lock_error_test.go @@ -0,0 +1,69 @@ +package app + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +// TestIsLockRelatedError verifies lock error detection for all known lock error patterns +func TestIsLockRelatedError(t *testing.T) { + tests := []struct { + name string + err error + shouldMatch bool + }{ + // Lock errors that should be retried + { + name: "database.ErrLocked", + err: errors.New("can't acquire lock"), + shouldMatch: true, + }, + { + name: "postgres advisory lock failure with pg_advisory_lock", + err: errors.New("migrate.New: failed to open database: try lock failed in line 0: SELECT pg_advisory_lock($1) (details: pq: unnamed prepared statement does not exist)"), + shouldMatch: true, + }, + { + name: "try lock failed", + err: errors.New("try lock failed"), + shouldMatch: true, + }, + + // Non-lock errors that should NOT be retried + { + name: "connection refused", + err: errors.New("connection refused"), + shouldMatch: false, + }, + { + name: "SQL syntax error", + err: errors.New("syntax error at or near"), + shouldMatch: false, + }, + { + name: "authentication error", + err: errors.New("password authentication failed"), + shouldMatch: false, + }, + { + name: "timeout error", + err: errors.New("context deadline exceeded"), + shouldMatch: false, + }, + { + name: "nil error", + err: nil, + shouldMatch: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := isLockRelatedError(tt.err) + assert.Equal(t, tt.shouldMatch, result, + "isLockRelatedError should return %v for: %v", tt.shouldMatch, tt.err) + }) + } +}