Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 106 additions & 18 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"errors"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/hookdeck/outpost/internal/config"
"github.com/hookdeck/outpost/internal/infra"
Expand Down Expand Up @@ -191,33 +193,119 @@ func constructServices(
return services, nil
}

// runMigration handles database schema migrations with retry logic for lock conflicts.
//
// MIGRATION LOCK BEHAVIOR:
// - Database locks are only acquired when migrations need to be performed
// - When multiple nodes start simultaneously and migrations are pending:
// 1. One node acquires the lock and performs migrations (ideally < 5 seconds)
// 2. Other nodes fail with lock errors ("try lock failed", "can't acquire lock")
// 3. Failed nodes wait 5 seconds and retry
// 4. On retry, migrations are complete and nodes proceed successfully
//
// RETRY STRATEGY:
// - Max 3 attempts with 5-second delays between retries
// - 5 seconds is sufficient because most migrations complete quickly
// - If no migrations are needed (common case), all nodes proceed immediately without lock contention
func runMigration(ctx context.Context, cfg *config.Config, logger *logging.Logger) error {
migrator, err := migrator.New(cfg.ToMigratorOpts())
if err != nil {
return err
}
const (
maxRetries = 3
retryDelay = 5 * time.Second
)

defer func() {
var lastErr error

for attempt := 1; attempt <= maxRetries; attempt++ {
migrator, err := migrator.New(cfg.ToMigratorOpts())
if err != nil {
return err
}

version, versionJumped, err := migrator.Up(ctx, -1)

// Always close the migrator after each attempt
sourceErr, dbErr := migrator.Close(ctx)
if sourceErr != nil {
logger.Error("failed to close migrator", zap.Error(sourceErr))
logger.Error("failed to close migrator source", zap.Error(sourceErr))
}
if dbErr != nil {
logger.Error("failed to close migrator", zap.Error(dbErr))
logger.Error("failed to close migrator database connection", zap.Error(dbErr))
}
}()

version, versionJumped, err := migrator.Up(ctx, -1)
if err != nil {
return err
if err == nil {
// Migration succeeded
if versionJumped > 0 {
logger.Info("migrations applied",
zap.Int("version", version),
zap.Int("version_applied", versionJumped))
} else {
logger.Info("no migrations applied", zap.Int("version", version))
}
return nil
}

// Check if this is a lock-related error
// Lock errors can manifest as:
// - "can't acquire lock" (database.ErrLocked)
// - "try lock failed" (postgres advisory lock failure)
// - "pg_advisory_lock" (postgres lock function errors)
isLockError := isLockRelatedError(err)
lastErr = err

if !isLockError {
// Not a lock error, fail immediately
logger.Error("migration failed", zap.Error(err))
return err
}

// Lock error - retry if we have attempts remaining
if attempt < maxRetries {
logger.Warn("migration lock conflict, retrying",
zap.Int("attempt", attempt),
zap.Int("max_retries", maxRetries),
zap.Duration("retry_delay", retryDelay),
zap.Error(err))

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(retryDelay):
// Continue to next attempt
}
} else {
// Exhausted all retries
logger.Error("migration failed after retries",
zap.Int("attempts", maxRetries),
zap.Error(err))
}
}
if versionJumped > 0 {
logger.Info("migrations applied",
zap.Int("version", version),
zap.Int("version_applied", versionJumped))
} else {
logger.Info("no migrations applied", zap.Int("version", version))

return lastErr
}

// isLockRelatedError checks if an error is related to database migration lock acquisition.
// This includes errors from golang-migrate's locking mechanism.
func isLockRelatedError(err error) bool {
if err == nil {
return false
}

return nil
errMsg := err.Error()

// Check for lock-related error messages from golang-migrate:
// 1. "can't acquire lock" - database.ErrLocked from golang-migrate/migrate/v4/database
// 2. "try lock failed" - returned by postgres driver when pg_advisory_lock() fails
// See: https://github.com/golang-migrate/migrate/blob/master/database/postgres/postgres.go
lockIndicators := []string{
"can't acquire lock",
"try lock failed",
}

for _, indicator := range lockIndicators {
if strings.Contains(errMsg, indicator) {
return true
}
}

return false
}
69 changes: 69 additions & 0 deletions internal/app/lock_error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package app

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
)

// TestIsLockRelatedError verifies lock error detection for all known lock error patterns
func TestIsLockRelatedError(t *testing.T) {
tests := []struct {
name string
err error
shouldMatch bool
}{
// Lock errors that should be retried
{
name: "database.ErrLocked",
err: errors.New("can't acquire lock"),
shouldMatch: true,
},
{
name: "postgres advisory lock failure with pg_advisory_lock",
err: errors.New("migrate.New: failed to open database: try lock failed in line 0: SELECT pg_advisory_lock($1) (details: pq: unnamed prepared statement does not exist)"),
shouldMatch: true,
},
{
name: "try lock failed",
err: errors.New("try lock failed"),
shouldMatch: true,
},

// Non-lock errors that should NOT be retried
{
name: "connection refused",
err: errors.New("connection refused"),
shouldMatch: false,
},
{
name: "SQL syntax error",
err: errors.New("syntax error at or near"),
shouldMatch: false,
},
{
name: "authentication error",
err: errors.New("password authentication failed"),
shouldMatch: false,
},
{
name: "timeout error",
err: errors.New("context deadline exceeded"),
shouldMatch: false,
},
{
name: "nil error",
err: nil,
shouldMatch: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := isLockRelatedError(tt.err)
assert.Equal(t, tt.shouldMatch, result,
"isLockRelatedError should return %v for: %v", tt.shouldMatch, tt.err)
})
}
}
Loading