diff --git a/cmd/root.go b/cmd/root.go index d33bb1f..424abac 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -19,6 +19,7 @@ import ( "fmt" "log/slog" "os" + "time" "github.com/spf13/cobra" @@ -41,7 +42,7 @@ and integration with CockroachDB backup/restore workflows. It verifies that the storage provider is correctly configured, runs synthetic workloads, and produces network performance statistics.`, PersistentPreRunE: func(_ *cobra.Command, _ []string) error { - if envConfig.DatabaseURL == "" { + if envConfig.DatabaseURL == "" && !envConfig.Guess { return errors.New("database URL cannot be blank") } if envConfig.URI != "" { @@ -74,7 +75,13 @@ func Execute() { f.StringVar(&envConfig.Path, "path", envConfig.Path, "destination path (e.g. bucket/folder)") f.StringVar(&envConfig.Endpoint, "endpoint", envConfig.Path, "http endpoint") f.StringVar(&envConfig.URI, "uri", envConfig.URI, "S3 URI") + f.BoolVar(&envConfig.Guess, "guess", false, `perform a short test to guess suggested parameters: +it only require access to the bucket; +it does not try to run a full backup/restore cycle +in the CockroachDB cluster.`) f.CountVarP(&verbosity, "verbosity", "v", "increase logging verbosity to debug") + f.IntVar(&envConfig.Workers, "workers", 5, "number of concurrent workers") + f.DurationVar(&envConfig.WorkloadDuration, "workload-duration", 5*time.Second, "duration of the workload") err := rootCmd.Execute() if err != nil { diff --git a/cmd/s3/s3.go b/cmd/s3/s3.go index 5fc0907..8f20d47 100644 --- a/cmd/s3/s3.go +++ b/cmd/s3/s3.go @@ -34,6 +34,12 @@ func command(env *env.Env) *cobra.Command { if err != nil { return err } + if env.Guess { + format.Report(cmd.OutOrStdout(), &validate.Report{ + SuggestedParams: store.Params(), + }) + return nil + } validator, err := validate.New(ctx, env, store) if err != nil { return err diff --git a/internal/env/env.go b/internal/env/env.go index cb7fba6..25146c2 100644 --- a/internal/env/env.go +++ b/internal/env/env.go @@ -14,16 +14,21 @@ package env +import "time" + // LookupEnv is a function that retrieves the value of an environment variable. type LookupEnv func(key string) (string, bool) // Env holds the environment configuration. type Env struct { - DatabaseURL string // the database connection URL - Endpoint string // the S3 endpoint - Path string // the S3 bucket path - LookupEnv LookupEnv // allows injection of environment variable lookup for testing - Testing bool // enables testing mode - URI string // the S3 object URI (if not provided,will be constructed from Endpoint and Path) - Verbose bool // enables verbose logging + DatabaseURL string // the database connection URL + Endpoint string // the S3 endpoint + Guess bool // Guess the URL parameters, no validation. + LookupEnv LookupEnv // allows injection of environment variable lookup for testing + Path string // the S3 bucket path + Testing bool // enables testing mode + URI string // the S3 object URI (if not provided,will be constructed from Endpoint and Path) + Verbose bool // enables verbose logging + Workers int // number of concurrent workers + WorkloadDuration time.Duration // duration to run the workload } diff --git a/internal/validate/backup.go b/internal/validate/backup.go new file mode 100644 index 0000000..4fb2f6e --- /dev/null +++ b/internal/validate/backup.go @@ -0,0 +1,119 @@ +// Copyright 2025 Cockroach Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validate + +import ( + "log/slog" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/field-eng-powertools/stopper" + "github.com/cockroachlabs-field/blobcheck/internal/db" +) + +// checkBackups verifies that there is exactly one full and one incremental backup. +func (v *Validator) checkBackups(ctx *stopper.Context, extConn *db.ExternalConn) error { + conn, err := v.acquireConn(ctx) + if err != nil { + return err + } + defer conn.Release() + + backups, err := extConn.ListTableBackups(ctx, conn) + if err != nil { + return errors.Wrap(err, "failed to list table backups") + } + if len(backups) != expectedBackupCollections { + return errors.Newf("expected exactly %d backup collection, got %d", expectedBackupCollections, len(backups)) + } + + v.latest = backups[0] + info, err := extConn.BackupInfo(ctx, conn, backups[0], v.sourceTable) + if err != nil { + return errors.Wrap(err, "failed to get backup info") + } + if len(info) != expectedBackupCount { + return errors.Newf("expected exactly %d backups (1 full, 1 incremental), got %d backups", expectedBackupCount, len(info)) + } + + fullCount := 0 + for _, i := range info { + if i.Full { + fullCount++ + } + } + if fullCount != expectedFullBackupCount { + return errors.Newf("expected exactly %d full backup, got %d", expectedFullBackupCount, fullCount) + } + return nil +} + +// performRestore restores the backup to a separate database. +func (v *Validator) performRestore( + ctx *stopper.Context, conn *pgxpool.Conn, extConn *db.ExternalConn, +) error { + slog.Info("restoring backup") + if err := v.restoredTable.Restore(ctx, conn, extConn, &v.sourceTable); err != nil { + return errors.Wrap(err, "failed to restore backup") + } + return nil +} + +// runFullBackup runs a full backup in a separate database connection. +func (v *Validator) runFullBackup(ctx *stopper.Context, extConn *db.ExternalConn) error { + conn, err := v.acquireConn(ctx) + if err != nil { + return err + } + defer conn.Release() + + slog.Info("starting full backup") + if err := v.sourceTable.Backup(ctx, conn, extConn, false); err != nil { + return errors.Wrap(err, "failed to create full backup") + } + return nil +} + +// runIncrementalBackup runs an incremental backup. +func (v *Validator) runIncrementalBackup( + ctx *stopper.Context, conn *pgxpool.Conn, extConn *db.ExternalConn, +) error { + slog.Info("starting incremental backup") + if err := v.sourceTable.Backup(ctx, conn, extConn, true); err != nil { + return errors.Wrap(err, "failed to create incremental backup") + } + return nil +} + +// verifyIntegrity checks that the restored data matches the original. +func (v *Validator) verifyIntegrity(ctx *stopper.Context, conn *pgxpool.Conn) error { + slog.Info("checking integrity") + original, err := v.sourceTable.Fingerprint(ctx, conn) + if err != nil { + return errors.Wrap(err, "failed to get original table fingerprint") + } + + restore, err := v.restoredTable.Fingerprint(ctx, conn) + if err != nil { + return errors.Wrap(err, "failed to get restored table fingerprint") + } + + if original != restore { + return errors.Errorf("integrity check failed: got %s, expected %s while comparing restored data with original", + restore, original) + } + return nil +} diff --git a/internal/validate/db.go b/internal/validate/db.go new file mode 100644 index 0000000..a230c81 --- /dev/null +++ b/internal/validate/db.go @@ -0,0 +1,80 @@ +// Copyright 2025 Cockroach Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validate + +import ( + "log/slog" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/field-eng-powertools/stopper" + "github.com/cockroachlabs-field/blobcheck/internal/db" +) + +// acquireConn acquires a database connection from the pool. +func (v *Validator) acquireConn(ctx *stopper.Context) (*pgxpool.Conn, error) { + conn, err := v.pool.Acquire(ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to acquire database connection") + } + return conn, nil +} + +// createSourceTable creates the source database and table. +func createSourceTable(ctx *stopper.Context, conn *pgxpool.Conn) (db.KvTable, error) { + source := db.Database{Name: "_blobcheck"} + if err := source.Create(ctx, conn); err != nil { + return db.KvTable{}, errors.Wrap(err, "failed to create source database") + } + + // TODO (silvano): presplit table to have ranges in all nodes + sourceTable := db.KvTable{ + Database: source, + Schema: db.Public, + Name: "mytable", + } + if err := sourceTable.Create(ctx, conn); err != nil { + return db.KvTable{}, errors.Wrap(err, "failed to create source table") + } + return sourceTable, nil +} + +// createRestoredTable creates the restored database and table. +func createRestoredTable(ctx *stopper.Context, conn *pgxpool.Conn) (db.KvTable, error) { + dest := db.Database{Name: "_blobcheck_restored"} + if err := dest.Create(ctx, conn); err != nil { + return db.KvTable{}, errors.Wrap(err, "failed to create restored database") + } + + restoredTable := db.KvTable{ + Database: dest, + Schema: db.Public, + Name: "mytable", + } + return restoredTable, nil +} + +// captureInitialStats captures initial database statistics. +func captureInitialStats( + ctx *stopper.Context, extConn *db.ExternalConn, conn *pgxpool.Conn, +) ([]*db.Stats, error) { + slog.Info("capturing initial statistics") + stats, err := extConn.Stats(ctx, conn) + if err != nil { + return nil, errors.Wrap(err, "failed to capture initial statistics") + } + return stats, nil +} diff --git a/internal/validate/minio_integration_test.go b/internal/validate/minio_integration_test.go index 46b756d..00cc088 100644 --- a/internal/validate/minio_integration_test.go +++ b/internal/validate/minio_integration_test.go @@ -82,11 +82,13 @@ func TestMinio(t *testing.T) { bucketName := fmt.Sprintf("bucket-%d", time.Now().UnixMilli()) var env = &env.Env{ - DatabaseURL: "postgresql://root@localhost:26257?sslmode=disable", - Endpoint: endpoint, - LookupEnv: lookup, - Path: bucketName, - Testing: true, + DatabaseURL: "postgresql://root@localhost:26257?sslmode=disable", + Endpoint: endpoint, + LookupEnv: lookup, + Path: bucketName, + Testing: true, + Workers: 5, + WorkloadDuration: 5 * time.Second, } r.NoError(createMinioBucket(ctx, vars, env, bucketName)) blobStorage, err := blob.S3FromEnv(ctx, env) diff --git a/internal/validate/validate.go b/internal/validate/validate.go index 853fac6..dfc90cb 100644 --- a/internal/validate/validate.go +++ b/internal/validate/validate.go @@ -17,10 +17,7 @@ package validate import ( "log/slog" - "sync" - "time" - "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" "github.com/cockroachdb/errors" @@ -28,13 +25,13 @@ import ( "github.com/cockroachlabs-field/blobcheck/internal/blob" "github.com/cockroachlabs-field/blobcheck/internal/db" "github.com/cockroachlabs-field/blobcheck/internal/env" - "github.com/cockroachlabs-field/blobcheck/internal/workload" ) const ( - maxConns = 10 - workers = 5 - defaultTime = 5 * time.Second + maxConns = 10 + expectedBackupCount = 2 + expectedBackupCollections = 1 + expectedFullBackupCount = 1 ) // Report contains the results of a validation run. @@ -45,6 +42,7 @@ type Report struct { // Validator verifies backup/restore functionality type Validator struct { + env *env.Env pool *pgxpool.Pool blobStorage blob.Storage sourceTable, restoredTable db.KvTable @@ -53,43 +51,39 @@ type Validator struct { // New creates a new Validator. func New(ctx *stopper.Context, env *env.Env, blobStorage blob.Storage) (*Validator, error) { + if err := preflight(ctx, env, blobStorage); err != nil { + return nil, err + } + config, err := pgxpool.ParseConfig(env.DatabaseURL) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to parse database URL") } config.MaxConns = maxConns + pool, err := pgxpool.NewWithConfig(ctx, config) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create database pool") } + conn, err := pool.Acquire(ctx) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to acquire database connection") } defer conn.Release() - source := db.Database{Name: "_blobcheck"} - if err := source.Create(ctx, conn); err != nil { - return nil, err - } - // TODO (silvano): presplit table to have ranges in all nodes - sourceTable := db.KvTable{ - Database: source, - Schema: db.Public, - Name: "mytable", - } - if err := sourceTable.Create(ctx, conn); err != nil { + + sourceTable, err := createSourceTable(ctx, conn) + if err != nil { return nil, err } - dest := db.Database{Name: "_blobcheck_restored"} - if err := dest.Create(ctx, conn); err != nil { + + restoredTable, err := createRestoredTable(ctx, conn) + if err != nil { return nil, err } - restoredTable := db.KvTable{ - Database: dest, - Schema: db.Public, - Name: "mytable", - } + return &Validator{ + env: env, pool: pool, restoredTable: restoredTable, sourceTable: sourceTable, @@ -97,56 +91,42 @@ func New(ctx *stopper.Context, env *env.Env, blobStorage blob.Storage) (*Validat }, nil } -// checkBackups verifies that there is exactly one full and one incremental backup. -func (v *Validator) checkBackups(ctx *stopper.Context, extConn *db.ExternalConn) error { - conn, err := v.pool.Acquire(ctx) - if err != nil { - return err - } - defer conn.Release() - backups, err := extConn.ListTableBackups(ctx, conn) - if err != nil { - return err - } - if len(backups) != 1 { - return errors.Newf("expected exactly 1 backup collection, got %d", len(backups)) +// preflight validates the input parameters for New. +func preflight(ctx *stopper.Context, env *env.Env, blobStorage blob.Storage) error { + if env == nil { + return errors.New("environment cannot be nil") } - v.latest = backups[0] - info, err := extConn.BackupInfo(ctx, conn, backups[0], v.sourceTable) - if err != nil { - return err + if blobStorage == nil { + return errors.New("blob storage cannot be nil") } - if len(info) != 2 { - return errors.Newf("expected exactly 2 backups (1 full, 1 incremental), got %d backups", len(info)) + if env.DatabaseURL == "" { + return errors.New("database URL cannot be empty") } - fullCount := 0 - for _, i := range info { - if i.Full { - fullCount++ - } + if env.Workers < 0 { + return errors.New("workers count cannot be negative") } - if fullCount != 1 { - return errors.Newf("expected exactly 1 full backup, got %d", fullCount) + if env.WorkloadDuration <= 0 { + return errors.New("workload duration must be positive") } return nil - } // Clean removes all resources created by the validator. func (v *Validator) Clean(ctx *stopper.Context) error { - conn, err := v.pool.Acquire(ctx) + conn, err := v.acquireConn(ctx) if err != nil { return err } defer conn.Release() + var e1, e2 error if err := v.sourceTable.Database.Drop(ctx, conn); err != nil { slog.Error("drop source DB", "err", err) - e1 = err + e1 = errors.Wrap(err, "failed to drop source database") } if err := v.restoredTable.Database.Drop(ctx, conn); err != nil { slog.Error("drop restored DB", "err", err) - e2 = err + e2 = errors.Wrap(err, "failed to drop restored database") } return errors.Join(e1, e2) } @@ -156,79 +136,41 @@ func (v *Validator) Clean(ctx *stopper.Context) error { // This does not imply that a storage provider passing the test is supported. func (v *Validator) Validate(ctx *stopper.Context) (*Report, error) { // TODO (silvano): add a progress writer "github.com/jedib0t/go-pretty/v6/progress" - conn, err := v.pool.Acquire(ctx) + conn, err := v.acquireConn(ctx) if err != nil { return nil, err } defer conn.Release() + extConn, err := db.NewExternalConn(ctx, conn, v.blobStorage) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create external connection") } defer extConn.Drop(ctx, conn) - slog.Info("capturing initial statistics") - // Capture initial stats - stats, err := extConn.Stats(ctx, conn) + + stats, err := captureInitialStats(ctx, extConn, conn) if err != nil { return nil, err } - // write some data - slog.Info("running workload to populate some data") - if err := v.runWorkload(ctx, defaultTime); err != nil { + if err := v.runWorkloadWithBackup(ctx, extConn); err != nil { return nil, err } - g := sync.WaitGroup{} - for w := range workers { - g.Add(1) - ctx.Go(func(ctx *stopper.Context) error { - defer g.Done() - slog.Info("starting", "worker", w) - return v.runWorkload(ctx, defaultTime) - }) - } - g.Add(1) - ctx.Go(func(ctx *stopper.Context) error { - defer g.Done() - conn, err := v.pool.Acquire(ctx) - if err != nil { - return err - } - defer conn.Release() - slog.Info("starting full backup") - return v.sourceTable.Backup(ctx, conn, extConn, false) - }) - g.Wait() - // Run an incremental backup - slog.Info("workers done") - slog.Info("starting incremental backup") - if err := v.sourceTable.Backup(ctx, conn, extConn, true); err != nil { + if err := v.runIncrementalBackup(ctx, conn, extConn); err != nil { return nil, err } - // Verify that we have the backups, then restore in a separate database. if err := v.checkBackups(ctx, extConn); err != nil { return nil, err } - slog.Info("restoring backup") - if err := v.restoredTable.Restore(ctx, conn, extConn, &v.sourceTable); err != nil { - return nil, err - } - slog.Info("checking integrity") - originalBank, err := v.sourceTable.Fingerprint(ctx, conn) - if err != nil { - return nil, err - } - restore, err := v.restoredTable.Fingerprint(ctx, conn) - if err != nil { + if err := v.performRestore(ctx, conn, extConn); err != nil { return nil, err } - if originalBank != restore { - return nil, errors.Errorf("got %s, expected %s while comparing restoreDB with originalBank", - restore, originalBank) + if err := v.verifyIntegrity(ctx, conn); err != nil { + return nil, err } return &Report{ @@ -236,28 +178,3 @@ func (v *Validator) Validate(ctx *stopper.Context) (*Report, error) { Stats: stats, }, nil } - -// runWorkload runs the bank workload for the specified duration. -func (v *Validator) runWorkload(ctx *stopper.Context, duration time.Duration) error { - // TODO (silvano): if table is presplit, use prefix according to the split - w := workload.Workload{ - Prefix: uuid.New().String(), - Table: v.sourceTable, - } - done := make(chan bool) - ctx.Go(func(ctx *stopper.Context) error { - conn, err := v.pool.Acquire(ctx) - if err != nil { - return err - } - defer conn.Release() - return w.Run(ctx, conn, done) - }) - select { - case <-time.Tick(duration): - // signal workload to stop - close(done) - case <-ctx.Stopping(): - } - return nil -} diff --git a/internal/validate/workload.go b/internal/validate/workload.go new file mode 100644 index 0000000..a1cf305 --- /dev/null +++ b/internal/validate/workload.go @@ -0,0 +1,98 @@ +// Copyright 2025 Cockroach Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validate + +import ( + "log/slog" + "sync" + "time" + + "github.com/google/uuid" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/field-eng-powertools/stopper" + "github.com/cockroachlabs-field/blobcheck/internal/db" + "github.com/cockroachlabs-field/blobcheck/internal/workload" +) + +// runWorkloadWithBackup runs the workload concurrently with a full backup. +func (v *Validator) runWorkloadWithBackup(ctx *stopper.Context, extConn *db.ExternalConn) error { + slog.Info("running workload to populate some data") + if err := v.runWorkload(ctx, v.env.WorkloadDuration); err != nil { + return errors.Wrap(err, "failed to run initial workload") + } + return v.runConcurrentWorkloadAndBackup(ctx, extConn) +} + +// runConcurrentWorkloadAndBackup runs multiple workers and a backup concurrently. +func (v *Validator) runConcurrentWorkloadAndBackup( + ctx *stopper.Context, extConn *db.ExternalConn, +) error { + g := sync.WaitGroup{} + // Start worker goroutines + for w := range v.env.Workers { + g.Add(1) + ctx.Go(func(ctx *stopper.Context) error { + defer g.Done() + return v.runWorkloadWorker(ctx, w) + }) + } + + // Start backup goroutine + g.Add(1) + ctx.Go(func(ctx *stopper.Context) error { + defer g.Done() + return v.runFullBackup(ctx, extConn) + }) + + g.Wait() + slog.Info("workers done") + return nil +} + +// runWorkload runs a simple kv-style workload for the specified duration. +func (v *Validator) runWorkload(ctx *stopper.Context, duration time.Duration) error { + // TODO (silvano): if table is presplit, use prefix according to the split + w := workload.Workload{ + Prefix: uuid.New().String(), + Table: v.sourceTable, + } + done := make(chan bool) + ctx.Go(func(ctx *stopper.Context) error { + conn, err := v.pool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + return w.Run(ctx, conn, done) + }) + select { + case <-time.Tick(duration): + // signal workload to stop + close(done) + case <-ctx.Stopping(): + } + return nil +} + +// runWorkloadWorker runs a single worker instance. +func (v *Validator) runWorkloadWorker(ctx *stopper.Context, workerID int) error { + slog.Info("starting", "worker", workerID) + if err := v.runWorkload(ctx, v.env.WorkloadDuration); err != nil { + slog.Error("worker failed", "worker", workerID, "error", err) + return errors.Wrapf(err, "worker %d failed", workerID) + } + return nil +}