diff --git a/cmd/s3/s3.go b/cmd/s3/s3.go index 8f20d47..7640913 100644 --- a/cmd/s3/s3.go +++ b/cmd/s3/s3.go @@ -15,6 +15,11 @@ package s3 import ( + "log/slog" + "os" + "os/signal" + "syscall" + "github.com/spf13/cobra" "github.com/cockroachdb/field-eng-powertools/stopper" @@ -29,7 +34,22 @@ func command(env *env.Env) *cobra.Command { Use: "s3", Short: "Performs a validation test for a s3 object store", RunE: func(cmd *cobra.Command, args []string) error { - ctx := stopper.WithContext(cmd.Context()) + // Parent context for cleanup operations + parentCtx := stopper.WithContext(cmd.Context()) + // Child context for validator operations that can be stopped + ctx := stopper.WithContext(parentCtx) + + // Set up signal handler + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) + defer signal.Stop(sigChan) + + go func() { + sig := <-sigChan + slog.Info("Received signal, stopping validator", slog.String("signal", sig.String())) + ctx.Stop(0) + }() + store, err := blob.S3FromEnv(ctx, env) if err != nil { return err @@ -44,12 +64,16 @@ func command(env *env.Env) *cobra.Command { if err != nil { return err } - defer validator.Clean(ctx) + // Use parent context for cleanup so it can access the database + defer validator.Clean(parentCtx) + report, err := validator.Validate(ctx) if err != nil { return err } - format.Report(cmd.OutOrStdout(), report) + if report != nil { + format.Report(cmd.OutOrStdout(), report) + } return nil }, } diff --git a/internal/db/kvtable.go b/internal/db/kvtable.go index 7046a64..db1921d 100644 --- a/internal/db/kvtable.go +++ b/internal/db/kvtable.go @@ -122,3 +122,28 @@ func (t *KvTable) Fingerprint(ctx *stopper.Context, conn *pgxpool.Conn) (string, } return b.String(), rows.Err() } + +const jobsStmt = ` +SELECT job_id +FROM [SHOW JOBS] +WHERE + NOT status = ANY (@status) + AND description LIKE @desc +` + +var pendingStatues = []string{"succeeded", "failed"} + +// PendingJobs returns a list of job IDs that are still pending (not succeeded or failed). +func (t *KvTable) PendingJobs(ctx *stopper.Context, conn *pgxpool.Conn) ([]int64, error) { + slog.Debug("Checking for pending jobs", slog.String("table", t.String())) + rows, err := conn.Query(ctx, jobsStmt, pgx.NamedArgs{ + "status": pendingStatues, + "desc": fmt.Sprintf("%%%s%%", t.Name), + }) + if err != nil { + return nil, err + } + defer rows.Close() + + return pgx.CollectRows(rows, pgx.RowTo[int64]) +} diff --git a/internal/validate/backup.go b/internal/validate/backup.go index 4fb2f6e..399efb5 100644 --- a/internal/validate/backup.go +++ b/internal/validate/backup.go @@ -17,8 +17,6 @@ package validate import ( "log/slog" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/cockroachdb/errors" "github.com/cockroachdb/field-eng-powertools/stopper" "github.com/cockroachlabs-field/blobcheck/internal/db" @@ -62,9 +60,13 @@ func (v *Validator) checkBackups(ctx *stopper.Context, extConn *db.ExternalConn) } // performRestore restores the backup to a separate database. -func (v *Validator) performRestore( - ctx *stopper.Context, conn *pgxpool.Conn, extConn *db.ExternalConn, -) error { +func (v *Validator) performRestore(ctx *stopper.Context, extConn *db.ExternalConn) error { + conn, err := v.acquireConn(ctx) + if err != nil { + return err + } + defer conn.Release() + slog.Info("restoring backup") if err := v.restoredTable.Restore(ctx, conn, extConn, &v.sourceTable); err != nil { return errors.Wrap(err, "failed to restore backup") @@ -88,9 +90,12 @@ func (v *Validator) runFullBackup(ctx *stopper.Context, extConn *db.ExternalConn } // runIncrementalBackup runs an incremental backup. -func (v *Validator) runIncrementalBackup( - ctx *stopper.Context, conn *pgxpool.Conn, extConn *db.ExternalConn, -) error { +func (v *Validator) runIncrementalBackup(ctx *stopper.Context, extConn *db.ExternalConn) error { + conn, err := v.acquireConn(ctx) + if err != nil { + return err + } + defer conn.Release() slog.Info("starting incremental backup") if err := v.sourceTable.Backup(ctx, conn, extConn, true); err != nil { return errors.Wrap(err, "failed to create incremental backup") @@ -99,7 +104,13 @@ func (v *Validator) runIncrementalBackup( } // verifyIntegrity checks that the restored data matches the original. -func (v *Validator) verifyIntegrity(ctx *stopper.Context, conn *pgxpool.Conn) error { +func (v *Validator) verifyIntegrity(ctx *stopper.Context) error { + conn, err := v.acquireConn(ctx) + if err != nil { + return err + } + defer conn.Release() + slog.Info("checking integrity") original, err := v.sourceTable.Fingerprint(ctx, conn) if err != nil { diff --git a/internal/validate/db.go b/internal/validate/db.go index a230c81..7efb184 100644 --- a/internal/validate/db.go +++ b/internal/validate/db.go @@ -33,6 +33,24 @@ func (v *Validator) acquireConn(ctx *stopper.Context) (*pgxpool.Conn, error) { return conn, nil } +// captureInitialStats captures initial database statistics. +func (v *Validator) captureInitialStats( + ctx *stopper.Context, extConn *db.ExternalConn, +) ([]*db.Stats, error) { + conn, err := v.acquireConn(ctx) + if err != nil { + return nil, err + } + defer conn.Release() + + slog.Info("capturing initial statistics") + stats, err := extConn.Stats(ctx, conn) + if err != nil { + return nil, errors.Wrap(err, "failed to capture initial statistics") + } + return stats, nil +} + // createSourceTable creates the source database and table. func createSourceTable(ctx *stopper.Context, conn *pgxpool.Conn) (db.KvTable, error) { source := db.Database{Name: "_blobcheck"} @@ -66,15 +84,3 @@ func createRestoredTable(ctx *stopper.Context, conn *pgxpool.Conn) (db.KvTable, } return restoredTable, nil } - -// captureInitialStats captures initial database statistics. -func captureInitialStats( - ctx *stopper.Context, extConn *db.ExternalConn, conn *pgxpool.Conn, -) ([]*db.Stats, error) { - slog.Info("capturing initial statistics") - stats, err := extConn.Stats(ctx, conn) - if err != nil { - return nil, errors.Wrap(err, "failed to capture initial statistics") - } - return stats, nil -} diff --git a/internal/validate/validate.go b/internal/validate/validate.go index 94e3224..40d7272 100644 --- a/internal/validate/validate.go +++ b/internal/validate/validate.go @@ -77,6 +77,16 @@ func New(ctx *stopper.Context, env *env.Env, blobStorage blob.Storage) (*Validat return nil, err } + // Check for pending jobs on the source table + pendingJobs, err := sourceTable.PendingJobs(ctx, conn) + if err != nil { + return nil, errors.Wrap(err, "failed to check for pending jobs on source table") + } + if len(pendingJobs) > 0 { + slog.Error("pending jobs found on source table. Please review and cancel them.", slog.Any("job_ids", pendingJobs)) + return nil, errors.New("pending jobs found on source table") + } + restoredTable, err := createRestoredTable(ctx, conn) if err != nil { return nil, err @@ -113,6 +123,7 @@ func preflight(ctx *stopper.Context, env *env.Env, blobStorage blob.Storage) err // Clean removes all resources created by the validator. func (v *Validator) Clean(ctx *stopper.Context) error { + slog.Debug("Starting cleanup of validator resources") conn, err := v.acquireConn(ctx) if err != nil { return err @@ -120,17 +131,26 @@ func (v *Validator) Clean(ctx *stopper.Context) error { defer conn.Release() var e1, e2 error + slog.Debug("Dropping source database", slog.String("database", v.sourceTable.Database.String())) if err := v.sourceTable.Database.Drop(ctx, conn); err != nil { - slog.Error("drop source DB", "err", err) e1 = errors.Wrap(err, "failed to drop source database") } + slog.Debug("Dropping restored database", slog.String("database", v.restoredTable.Database.String())) if err := v.restoredTable.Database.Drop(ctx, conn); err != nil { - slog.Error("drop restored DB", "err", err) e2 = errors.Wrap(err, "failed to drop restored database") } return errors.Join(e1, e2) } +// validationStepFn is a function that performs a validation step. +type validationStepFn func(ctx *stopper.Context, extConn *db.ExternalConn) error + +// validationStep represents a step in the validation process. +type validationStep struct { + name string + fn validationStepFn +} + // Validate performs a backup/restore against a storage provider // to asses minimum compatibility at the functional level. // This does not imply that a storage provider passing the test is supported. @@ -148,31 +168,55 @@ func (v *Validator) Validate(ctx *stopper.Context) (*Report, error) { } defer extConn.Drop(ctx, conn) - stats, err := captureInitialStats(ctx, extConn, conn) - if err != nil { - return nil, err - } - - if err := v.runWorkloadWithBackup(ctx, extConn); err != nil { - return nil, err - } - - if err := v.runIncrementalBackup(ctx, conn, extConn); err != nil { - return nil, err - } - - if err := v.checkBackups(ctx, extConn); err != nil { - return nil, err - } - - if err := v.performRestore(ctx, conn, extConn); err != nil { - return nil, err - } - - if err := v.verifyIntegrity(ctx, conn); err != nil { - // If we fail to verify the integrity, just log the error, but - // still provide a complete report - slog.Error("failed to verify integrity", slog.Any("error", err)) + var stats []*db.Stats + + // Define validation steps + steps := []validationStep{ + { + name: "capture initial stats", + fn: func(ctx *stopper.Context, extConn *db.ExternalConn) error { + var err error + stats, err = v.captureInitialStats(ctx, extConn) + return err + }, + }, + { + name: "workload with backup", + fn: v.runWorkloadWithBackup, + }, + { + name: "incremental backup", + fn: v.runIncrementalBackup, + }, + { + name: "check backups", + fn: v.checkBackups, + }, + { + name: "restore", + fn: v.performRestore, + }, + { + name: "verify integrity", + fn: func(ctx *stopper.Context, extConn *db.ExternalConn) error { + if err := v.verifyIntegrity(ctx); err != nil { + // If we fail to verify the integrity, just log the error, but + // still provide a complete report + slog.Error("failed to verify integrity", slog.Any("error", err)) + } + return nil + }, + }, + } + + // Execute steps + for _, step := range steps { + if ctx.IsStopping() { + return nil, ctx.Err() + } + if err := step.fn(ctx, extConn); err != nil { + return nil, errors.Wrapf(err, "failed during step: %s", step.name) + } } return &Report{ diff --git a/internal/validate/workload.go b/internal/validate/workload.go index a1cf305..baec4e0 100644 --- a/internal/validate/workload.go +++ b/internal/validate/workload.go @@ -33,6 +33,9 @@ func (v *Validator) runWorkloadWithBackup(ctx *stopper.Context, extConn *db.Exte if err := v.runWorkload(ctx, v.env.WorkloadDuration); err != nil { return errors.Wrap(err, "failed to run initial workload") } + if ctx.IsStopping() { + return nil + } return v.runConcurrentWorkloadAndBackup(ctx, extConn) } @@ -45,8 +48,9 @@ func (v *Validator) runConcurrentWorkloadAndBackup( for w := range v.env.Workers { g.Add(1) ctx.Go(func(ctx *stopper.Context) error { + slog.Info("starting", "worker", w) defer g.Done() - return v.runWorkloadWorker(ctx, w) + return v.runWorkload(ctx, v.env.WorkloadDuration) }) } @@ -86,13 +90,3 @@ func (v *Validator) runWorkload(ctx *stopper.Context, duration time.Duration) er } return nil } - -// runWorkloadWorker runs a single worker instance. -func (v *Validator) runWorkloadWorker(ctx *stopper.Context, workerID int) error { - slog.Info("starting", "worker", workerID) - if err := v.runWorkload(ctx, v.env.WorkloadDuration); err != nil { - slog.Error("worker failed", "worker", workerID, "error", err) - return errors.Wrapf(err, "worker %d failed", workerID) - } - return nil -}