Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions cmd/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
package s3

import (
"log/slog"
"os"
"os/signal"
"syscall"

"github.com/spf13/cobra"

"github.com/cockroachdb/field-eng-powertools/stopper"
Expand All @@ -29,7 +34,22 @@ func command(env *env.Env) *cobra.Command {
Use: "s3",
Short: "Performs a validation test for a s3 object store",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := stopper.WithContext(cmd.Context())
// Parent context for cleanup operations
parentCtx := stopper.WithContext(cmd.Context())
// Child context for validator operations that can be stopped
ctx := stopper.WithContext(parentCtx)

// Set up signal handler
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
defer signal.Stop(sigChan)

go func() {
sig := <-sigChan
slog.Info("Received signal, stopping validator", slog.String("signal", sig.String()))
ctx.Stop(0)
}()

store, err := blob.S3FromEnv(ctx, env)
if err != nil {
return err
Expand All @@ -44,12 +64,16 @@ func command(env *env.Env) *cobra.Command {
if err != nil {
return err
}
defer validator.Clean(ctx)
// Use parent context for cleanup so it can access the database
defer validator.Clean(parentCtx)

report, err := validator.Validate(ctx)
if err != nil {
return err
}
format.Report(cmd.OutOrStdout(), report)
if report != nil {
format.Report(cmd.OutOrStdout(), report)
}
return nil
},
}
Expand Down
25 changes: 25 additions & 0 deletions internal/db/kvtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,28 @@ func (t *KvTable) Fingerprint(ctx *stopper.Context, conn *pgxpool.Conn) (string,
}
return b.String(), rows.Err()
}

const jobsStmt = `
SELECT job_id
FROM [SHOW JOBS]
WHERE
NOT status = ANY (@status)
AND description LIKE @desc
`

var pendingStatues = []string{"succeeded", "failed"}

// PendingJobs returns a list of job IDs that are still pending (not succeeded or failed).
func (t *KvTable) PendingJobs(ctx *stopper.Context, conn *pgxpool.Conn) ([]int64, error) {
slog.Debug("Checking for pending jobs", slog.String("table", t.String()))
rows, err := conn.Query(ctx, jobsStmt, pgx.NamedArgs{
"status": pendingStatues,
"desc": fmt.Sprintf("%%%s%%", t.Name),
})
if err != nil {
return nil, err
}
defer rows.Close()

return pgx.CollectRows(rows, pgx.RowTo[int64])
}
29 changes: 20 additions & 9 deletions internal/validate/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package validate
import (
"log/slog"

"github.com/jackc/pgx/v5/pgxpool"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/field-eng-powertools/stopper"
"github.com/cockroachlabs-field/blobcheck/internal/db"
Expand Down Expand Up @@ -62,9 +60,13 @@ func (v *Validator) checkBackups(ctx *stopper.Context, extConn *db.ExternalConn)
}

// performRestore restores the backup to a separate database.
func (v *Validator) performRestore(
ctx *stopper.Context, conn *pgxpool.Conn, extConn *db.ExternalConn,
) error {
func (v *Validator) performRestore(ctx *stopper.Context, extConn *db.ExternalConn) error {
conn, err := v.acquireConn(ctx)
if err != nil {
return err
}
defer conn.Release()

slog.Info("restoring backup")
if err := v.restoredTable.Restore(ctx, conn, extConn, &v.sourceTable); err != nil {
return errors.Wrap(err, "failed to restore backup")
Expand All @@ -88,9 +90,12 @@ func (v *Validator) runFullBackup(ctx *stopper.Context, extConn *db.ExternalConn
}

// runIncrementalBackup runs an incremental backup.
func (v *Validator) runIncrementalBackup(
ctx *stopper.Context, conn *pgxpool.Conn, extConn *db.ExternalConn,
) error {
func (v *Validator) runIncrementalBackup(ctx *stopper.Context, extConn *db.ExternalConn) error {
conn, err := v.acquireConn(ctx)
if err != nil {
return err
}
defer conn.Release()
slog.Info("starting incremental backup")
if err := v.sourceTable.Backup(ctx, conn, extConn, true); err != nil {
return errors.Wrap(err, "failed to create incremental backup")
Expand All @@ -99,7 +104,13 @@ func (v *Validator) runIncrementalBackup(
}

// verifyIntegrity checks that the restored data matches the original.
func (v *Validator) verifyIntegrity(ctx *stopper.Context, conn *pgxpool.Conn) error {
func (v *Validator) verifyIntegrity(ctx *stopper.Context) error {
conn, err := v.acquireConn(ctx)
if err != nil {
return err
}
defer conn.Release()

slog.Info("checking integrity")
original, err := v.sourceTable.Fingerprint(ctx, conn)
if err != nil {
Expand Down
30 changes: 18 additions & 12 deletions internal/validate/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,24 @@ func (v *Validator) acquireConn(ctx *stopper.Context) (*pgxpool.Conn, error) {
return conn, nil
}

// captureInitialStats captures initial database statistics.
func (v *Validator) captureInitialStats(
ctx *stopper.Context, extConn *db.ExternalConn,
) ([]*db.Stats, error) {
conn, err := v.acquireConn(ctx)
if err != nil {
return nil, err
}
defer conn.Release()

slog.Info("capturing initial statistics")
stats, err := extConn.Stats(ctx, conn)
if err != nil {
return nil, errors.Wrap(err, "failed to capture initial statistics")
}
return stats, nil
}

// createSourceTable creates the source database and table.
func createSourceTable(ctx *stopper.Context, conn *pgxpool.Conn) (db.KvTable, error) {
source := db.Database{Name: "_blobcheck"}
Expand Down Expand Up @@ -66,15 +84,3 @@ func createRestoredTable(ctx *stopper.Context, conn *pgxpool.Conn) (db.KvTable,
}
return restoredTable, nil
}

// captureInitialStats captures initial database statistics.
func captureInitialStats(
ctx *stopper.Context, extConn *db.ExternalConn, conn *pgxpool.Conn,
) ([]*db.Stats, error) {
slog.Info("capturing initial statistics")
stats, err := extConn.Stats(ctx, conn)
if err != nil {
return nil, errors.Wrap(err, "failed to capture initial statistics")
}
return stats, nil
}
98 changes: 71 additions & 27 deletions internal/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ func New(ctx *stopper.Context, env *env.Env, blobStorage blob.Storage) (*Validat
return nil, err
}

// Check for pending jobs on the source table
pendingJobs, err := sourceTable.PendingJobs(ctx, conn)
if err != nil {
return nil, errors.Wrap(err, "failed to check for pending jobs on source table")
}
if len(pendingJobs) > 0 {
slog.Error("pending jobs found on source table. Please review and cancel them.", slog.Any("job_ids", pendingJobs))
return nil, errors.New("pending jobs found on source table")
}

restoredTable, err := createRestoredTable(ctx, conn)
if err != nil {
return nil, err
Expand Down Expand Up @@ -113,24 +123,34 @@ func preflight(ctx *stopper.Context, env *env.Env, blobStorage blob.Storage) err

// Clean removes all resources created by the validator.
func (v *Validator) Clean(ctx *stopper.Context) error {
slog.Debug("Starting cleanup of validator resources")
conn, err := v.acquireConn(ctx)
if err != nil {
return err
}
defer conn.Release()

var e1, e2 error
slog.Debug("Dropping source database", slog.String("database", v.sourceTable.Database.String()))
if err := v.sourceTable.Database.Drop(ctx, conn); err != nil {
slog.Error("drop source DB", "err", err)
e1 = errors.Wrap(err, "failed to drop source database")
}
slog.Debug("Dropping restored database", slog.String("database", v.restoredTable.Database.String()))
if err := v.restoredTable.Database.Drop(ctx, conn); err != nil {
slog.Error("drop restored DB", "err", err)
e2 = errors.Wrap(err, "failed to drop restored database")
}
return errors.Join(e1, e2)
}

// validationStepFn is a function that performs a validation step.
type validationStepFn func(ctx *stopper.Context, extConn *db.ExternalConn) error

// validationStep represents a step in the validation process.
type validationStep struct {
name string
fn validationStepFn
}

// Validate performs a backup/restore against a storage provider
// to asses minimum compatibility at the functional level.
// This does not imply that a storage provider passing the test is supported.
Expand All @@ -148,31 +168,55 @@ func (v *Validator) Validate(ctx *stopper.Context) (*Report, error) {
}
defer extConn.Drop(ctx, conn)

stats, err := captureInitialStats(ctx, extConn, conn)
if err != nil {
return nil, err
}

if err := v.runWorkloadWithBackup(ctx, extConn); err != nil {
return nil, err
}

if err := v.runIncrementalBackup(ctx, conn, extConn); err != nil {
return nil, err
}

if err := v.checkBackups(ctx, extConn); err != nil {
return nil, err
}

if err := v.performRestore(ctx, conn, extConn); err != nil {
return nil, err
}

if err := v.verifyIntegrity(ctx, conn); err != nil {
// If we fail to verify the integrity, just log the error, but
// still provide a complete report
slog.Error("failed to verify integrity", slog.Any("error", err))
var stats []*db.Stats

// Define validation steps
steps := []validationStep{
{
name: "capture initial stats",
fn: func(ctx *stopper.Context, extConn *db.ExternalConn) error {
var err error
stats, err = v.captureInitialStats(ctx, extConn)
return err
},
},
{
name: "workload with backup",
fn: v.runWorkloadWithBackup,
},
{
name: "incremental backup",
fn: v.runIncrementalBackup,
},
{
name: "check backups",
fn: v.checkBackups,
},
{
name: "restore",
fn: v.performRestore,
},
{
name: "verify integrity",
fn: func(ctx *stopper.Context, extConn *db.ExternalConn) error {
if err := v.verifyIntegrity(ctx); err != nil {
// If we fail to verify the integrity, just log the error, but
// still provide a complete report
slog.Error("failed to verify integrity", slog.Any("error", err))
}
return nil
},
},
}

// Execute steps
for _, step := range steps {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is a full state machine really needed here? It's fine I guess, just seems over-complicated. And is begging for a test. You could just repeat a check for is stopping between the calls. Anyway, up to you.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the check repeated originally, I decided to refactor it to make the steps more declarative.
I guess it's just matter of preference. As per the testing, it's covered in the minio integration test.

if ctx.IsStopping() {
return nil, ctx.Err()
}
if err := step.fn(ctx, extConn); err != nil {
return nil, errors.Wrapf(err, "failed during step: %s", step.name)
}
}

return &Report{
Expand Down
16 changes: 5 additions & 11 deletions internal/validate/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func (v *Validator) runWorkloadWithBackup(ctx *stopper.Context, extConn *db.Exte
if err := v.runWorkload(ctx, v.env.WorkloadDuration); err != nil {
return errors.Wrap(err, "failed to run initial workload")
}
if ctx.IsStopping() {
return nil
}
return v.runConcurrentWorkloadAndBackup(ctx, extConn)
}

Expand All @@ -45,8 +48,9 @@ func (v *Validator) runConcurrentWorkloadAndBackup(
for w := range v.env.Workers {
g.Add(1)
ctx.Go(func(ctx *stopper.Context) error {
slog.Info("starting", "worker", w)
defer g.Done()
return v.runWorkloadWorker(ctx, w)
return v.runWorkload(ctx, v.env.WorkloadDuration)
})
}

Expand Down Expand Up @@ -86,13 +90,3 @@ func (v *Validator) runWorkload(ctx *stopper.Context, duration time.Duration) er
}
return nil
}

// runWorkloadWorker runs a single worker instance.
func (v *Validator) runWorkloadWorker(ctx *stopper.Context, workerID int) error {
slog.Info("starting", "worker", workerID)
if err := v.runWorkload(ctx, v.env.WorkloadDuration); err != nil {
slog.Error("worker failed", "worker", workerID, "error", err)
return errors.Wrapf(err, "worker %d failed", workerID)
}
return nil
}