From 4f812346a707d66b00e441c53463a3998462214e Mon Sep 17 00:00:00 2001 From: Silvano Ravotto Date: Thu, 21 Aug 2025 20:29:51 -0400 Subject: [PATCH 1/2] validate: add Validator for backup/restore compatibility checks Introduces a Validator component to verify backup/restore functionality against storage providers. It: - Creates source and restored databases/tables - Runs workloads to generate and mutate data - Executes full + incremental backups - Validates backup metadata (1 full, 1 incremental) - Restores into a new database and compares fingerprints - Cleans up resources after validation It also introduce a simple workload that adds rows into a kv style table. --- go.mod | 16 +- go.sum | 29 +++ internal/db/db.go | 2 +- internal/db/ext_conn.go | 17 +- internal/db/kvtable.go | 10 +- internal/db/testutil.go | 2 +- internal/validate/integration_test.go | 112 +++++++++++ internal/validate/validate.go | 263 ++++++++++++++++++++++++++ internal/workload/workload.go | 58 ++++++ scripts/test.sh | 2 +- 10 files changed, 492 insertions(+), 19 deletions(-) create mode 100644 internal/validate/integration_test.go create mode 100644 internal/validate/validate.go create mode 100644 internal/workload/workload.go diff --git a/go.mod b/go.mod index db08cff..49de232 100644 --- a/go.mod +++ b/go.mod @@ -37,14 +37,25 @@ require ( github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect github.com/cockroachdb/redact v1.1.5 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/getsentry/sentry-go v0.27.0 // indirect + github.com/go-ini/ini v1.67.0 // indirect + github.com/goccy/go-json v0.10.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/klauspost/cpuid/v2 v2.2.11 // indirect github.com/kr/text v0.2.0 // indirect + github.com/minio/crc64nvme v1.0.2 // indirect + github.com/minio/md5-simd v1.1.2 // indirect + github.com/philhofer/fwd v1.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.13.1 // indirect + github.com/rs/xid v1.6.0 // indirect + github.com/tinylib/msgp v1.3.0 // indirect golang.org/x/exp/typeparams v0.0.0-20231108232855-2478ac86f678 // indirect golang.org/x/mod v0.27.0 // indirect + golang.org/x/net v0.42.0 // indirect golang.org/x/sys v0.34.0 // indirect golang.org/x/tools v0.35.0 // indirect golang.org/x/tools/go/expect v0.1.1-deprecated // indirect @@ -61,8 +72,9 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/minio/minio-go/v7 v7.0.95 github.com/stretchr/testify v1.10.0 - golang.org/x/crypto v0.39.0 // indirect + golang.org/x/crypto v0.40.0 // indirect golang.org/x/sync v0.16.0 // indirect - golang.org/x/text v0.26.0 // indirect + golang.org/x/text v0.27.0 // indirect ) diff --git a/go.sum b/go.sum index fee8d6c..71218a6 100644 --- a/go.sum +++ b/go.sum @@ -54,10 +54,16 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= +github.com/go-ini/ini v1.67.0 h1:z6ZrTEZqSWOTyH2FlglNbNgARyHG8oLW9gMELqKr06A= +github.com/go-ini/ini v1.67.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= +github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/google/addlicense v1.2.0 h1:W+DP4A639JGkcwBGMDvjSurZHvaq2FN0pP7se9czsKA= @@ -76,10 +82,23 @@ github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= +github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/minio/crc64nvme v1.0.2 h1:6uO1UxGAD+kwqWWp7mBFsi5gAse66C4NXO8cmcVculg= +github.com/minio/crc64nvme v1.0.2/go.mod h1:eVfm2fAzLlxMdUGc0EEBGSMmPwmXD5XiNRpnu9J3bvg= +github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= +github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= +github.com/minio/minio-go/v7 v7.0.95 h1:ywOUPg+PebTMTzn9VDsoFJy32ZuARN9zhB+K3IYEvYU= +github.com/minio/minio-go/v7 v7.0.95/go.mod h1:wOOX3uxS334vImCNRVyIDdXX9OsXDm89ToynKgqUKlo= +github.com/philhofer/fwd v1.2.0 h1:e6DnBTl7vGY+Gz322/ASL4Gyp1FspeMvx1RNDoToZuM= +github.com/philhofer/fwd v1.2.0/go.mod h1:RqIHx9QI14HlwKwm98g9Re5prTQ6LdeRQn+gXJFxsJM= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -90,11 +109,15 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tinylib/msgp v1.3.0 h1:ULuf7GPooDaIlbyvgAxBV/FI7ynli6LZ1/nVUNu+0ww= +github.com/tinylib/msgp v1.3.0/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -102,6 +125,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= +golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= +golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/exp/typeparams v0.0.0-20231108232855-2478ac86f678 h1:1P7xPZEwZMoBoz0Yze5Nx2/4pxj6nw9ZqHWXqP0iRgQ= golang.org/x/exp/typeparams v0.0.0-20231108232855-2478ac86f678/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/lint v0.0.0-20241112194109-818c5a804067 h1:adDmSQyFTCiv19j015EGKJBoaa7ElV0Q1Wovb/4G7NA= @@ -115,6 +140,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= +golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -129,6 +156,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= +golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= diff --git a/internal/db/db.go b/internal/db/db.go index 0d26066..09a87a6 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -63,7 +63,7 @@ const dropDbStmt = `DROP database IF EXISTS %[1]s CASCADE;` // Drop removes the database. func (d *Database) Drop(ctx *stopper.Context, conn *pgxpool.Conn) error { - slog.Info("Dropping database", slog.String("database", d.Name.String())) + slog.Debug("Dropping database", slog.String("database", d.Name.String())) _, err := conn.Exec(ctx, fmt.Sprintf(dropDbStmt, d.Name)) return err } diff --git a/internal/db/ext_conn.go b/internal/db/ext_conn.go index b0e9391..92b4cd1 100644 --- a/internal/db/ext_conn.go +++ b/internal/db/ext_conn.go @@ -79,7 +79,6 @@ func (c *ExternalConn) ListTableBackups( ) ([]string, error) { res := make([]string, 0) stmt := fmt.Sprintf(backupsStmt, c.String()) - slog.Info(stmt) rows, err := conn.Query(ctx, stmt) if err != nil { return nil, err @@ -119,7 +118,7 @@ func (c *ExternalConn) BackupInfo( return nil, err } t.Full = (backupType == "full") - slog.Info("backup info", "type", backupType, "full", + slog.Debug("backup info", "type", backupType, "full", t.Full, "table", tableName, "schema", schemeName) res = append(res, t) } @@ -131,20 +130,20 @@ const createExtConnStmt = `CREATE EXTERNAL CONNECTION '%[1]s' AS '%[2]s'` func (c *ExternalConn) create(ctx *stopper.Context, conn *pgxpool.Conn) error { destURL := c.store.URL() stmt := fmt.Sprintf(createExtConnStmt, c.name, destURL) - slog.Info("trying", slog.String("url", destURL)) + slog.Debug("trying", slog.String("url", destURL)) if _, err := conn.Exec(ctx, stmt); err != nil { - slog.Info("failed", slog.Any("error", err)) + slog.Error("failed", slog.Any("error", err)) return err } - slog.Info("checking existing backups") + slog.Debug("checking existing backups") backups, err := c.ListTableBackups(ctx, conn) if err == nil { - slog.Info("success", + slog.Debug("success", slog.String("url", destURL), slog.Any("existing", backups)) return nil } - slog.Info("failed", slog.Any("error", err)) + slog.Error("failed", slog.Any("error", err)) return errors.Newf("external connection failed") } @@ -156,7 +155,7 @@ func (c *ExternalConn) Drop(ctx *stopper.Context, conn *pgxpool.Conn) error { var name string err := conn.QueryRow(ctx, fmt.Sprintf(showExtConnStmt, c.name)).Scan(&name) if err == pgx.ErrNoRows { - slog.Info("external connection not found", slog.String("name", name)) + slog.Debug("external connection not found", slog.String("name", name)) return nil } if err != nil { @@ -204,6 +203,6 @@ func (c *ExternalConn) String() string { } // SuggestedParams returns the suggested parameters for the external connection. -func (c *ExternalConn) SuggestedParams() map[string]string { +func (c *ExternalConn) SuggestedParams() store.Params { return c.store.Params() } diff --git a/internal/db/kvtable.go b/internal/db/kvtable.go index c456e27..7046a64 100644 --- a/internal/db/kvtable.go +++ b/internal/db/kvtable.go @@ -63,16 +63,16 @@ DROP TABLE IF EXISTS %[1]s;` // Drop removes the table. func (t *KvTable) Drop(ctx *stopper.Context, conn *pgxpool.Conn) error { - slog.Info("Dropping table", slog.String("table", t.String())) + slog.Debug("Dropping table", slog.String("table", t.String())) _, err := conn.Exec(ctx, fmt.Sprintf(dropTableStmt, t.String())) return err } const insertTableStmt = ` -INSERT INTO %[1]s (k, v) values (@key, @value);` +UPSERT INTO %[1]s (k, v) values (@key, @value);` -// Insert adds a new row to the table. -func (t *KvTable) Insert(ctx *stopper.Context, conn *pgxpool.Conn, key, value string) error { +// Upsert adds a new row to the table. +func (t *KvTable) Upsert(ctx *stopper.Context, conn *pgxpool.Conn, key, value string) error { _, err := conn.Exec(ctx, fmt.Sprintf(insertTableStmt, t.String()), pgx.NamedArgs{ "key": key, "value": value, @@ -87,7 +87,7 @@ func (t *KvTable) Restore( ctx *stopper.Context, conn *pgxpool.Conn, from *ExternalConn, original *KvTable, ) error { stmt := fmt.Sprintf(restoreTableStmt, original.String(), "LATEST", from, t.Database.Name) - slog.Info(stmt) + slog.Debug(stmt) _, err := conn.Exec(ctx, stmt) return err } diff --git a/internal/db/testutil.go b/internal/db/testutil.go index 932ac83..94e5ced 100644 --- a/internal/db/testutil.go +++ b/internal/db/testutil.go @@ -64,7 +64,7 @@ func NewTestEnv(ctx *stopper.Context, numRows int) (TestEnv, error) { } for idx := range numRows { - if err := table.Insert(ctx, conn, fmt.Sprintf("key-%020d", idx), fmt.Sprintf("value-%020d", idx)); err != nil { + if err := table.Upsert(ctx, conn, fmt.Sprintf("key-%020d", idx), fmt.Sprintf("value-%020d", idx)); err != nil { return TestEnv{}, err } } diff --git a/internal/validate/integration_test.go b/internal/validate/integration_test.go new file mode 100644 index 0000000..b17e1bf --- /dev/null +++ b/internal/validate/integration_test.go @@ -0,0 +1,112 @@ +// Copyright 2025 Cockroach Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validate + +import ( + "fmt" + "log/slog" + "testing" + "time" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/stretchr/testify/require" + + "github.com/cockroachdb/field-eng-powertools/stopper" + "github.com/cockroachlabs-field/blobcheck/internal/db" + "github.com/cockroachlabs-field/blobcheck/internal/env" + "github.com/cockroachlabs-field/blobcheck/internal/store" +) + +const minioEndpoint = "localhost:29000" + +func createMinioBucket( + ctx *stopper.Context, vars map[string]string, env *env.Env, bucketName string, +) error { + minioClient, err := minio.New(minioEndpoint, &minio.Options{ + Creds: credentials.NewStaticV4(vars["AWS_ACCESS_KEY_ID"], vars["AWS_SECRET_ACCESS_KEY"], ""), + }) + if err != nil { + return err + } + found, err := minioClient.BucketExists(ctx, bucketName) + if err != nil { + return err + } + if found { + slog.Debug("Bucket already exists", slog.String("bucket", bucketName)) + } + minioClient.MakeBucket(ctx, bucketName, + minio.MakeBucketOptions{ + Region: "us-east-1", + ObjectLocking: false, + }, + ) + return nil + +} + +// TestValidation performs a minimal validation test. +func TestValidation(t *testing.T) { + ctx := stopper.WithContext(t.Context()) + r := require.New(t) + endpoint := fmt.Sprintf("http://%s", minioEndpoint) + vars := store.Params{ + store.AccountParam: "cockroach", + store.SecretParam: "cockroach", + store.RegionParam: "us-east-1", + } + expected := store.Params{ + store.AccountParam: "cockroach", + store.SecretParam: store.Obfuscated, + store.RegionParam: "us-east-1", + store.EndPointParam: endpoint, + store.UsePathStyleParam: "true", + } + lookup := func(key string) (string, bool) { + val, ok := vars[key] + return val, ok + } + + bucketName := fmt.Sprintf("bucket-%d", time.Now().UnixMilli()) + var env = &env.Env{ + DatabaseURL: "postgresql://root@localhost:26257?sslmode=disable", + Endpoint: endpoint, + LookupEnv: lookup, + Path: bucketName, + Testing: true, + } + r.NoError(createMinioBucket(ctx, vars, env, bucketName)) + store, err := store.S3FromEnv(ctx, env) + r.NoError(err) + validator, err := New(ctx, env, store) + r.NoError(err) + defer validator.Clean(ctx) + report, err := validator.Validate(ctx) + r.NoError(err) + r.NotNil(report) + // Validate the report contents + r.NotEmpty(report.SuggestedParams) + r.Equal(expected, report.SuggestedParams) + // Validate the report stats, if applicable + conn, err := validator.pool.Acquire(ctx) + r.NoError(err) + defer conn.Release() + version, err := db.Version(ctx, conn) + r.NoError(err) + if version.MinVersion(db.MinVersionForStats) { + r.Equal(1, len(report.Stats)) + } +} diff --git a/internal/validate/validate.go b/internal/validate/validate.go new file mode 100644 index 0000000..346dc00 --- /dev/null +++ b/internal/validate/validate.go @@ -0,0 +1,263 @@ +// Copyright 2025 Cockroach Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package validate provides functionality to validate backups and restores. +package validate + +import ( + "log/slog" + "sync" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/cockroachdb/errors" + "github.com/cockroachdb/field-eng-powertools/stopper" + "github.com/cockroachlabs-field/blobcheck/internal/db" + "github.com/cockroachlabs-field/blobcheck/internal/env" + "github.com/cockroachlabs-field/blobcheck/internal/store" + "github.com/cockroachlabs-field/blobcheck/internal/workload" +) + +const ( + maxConns = 10 + workers = 5 + defaultTime = 5 * time.Second +) + +// Report contains the results of a validation run. +type Report struct { + SuggestedParams store.Params + Stats []*db.Stats +} + +// Validator verifies backup/restore functionality +type Validator struct { + pool *pgxpool.Pool + store store.Store + sourceTable, restoredTable db.KvTable + latest string +} + +// New creates a new Validator. +func New(ctx *stopper.Context, env *env.Env, store store.Store) (*Validator, error) { + config, err := pgxpool.ParseConfig(env.DatabaseURL) + if err != nil { + return nil, err + } + config.MaxConns = maxConns + pool, err := pgxpool.NewWithConfig(ctx, config) + if err != nil { + return nil, err + } + conn, err := pool.Acquire(ctx) + if err != nil { + return nil, err + } + defer conn.Release() + source := db.Database{Name: "_blobcheck"} + if err := source.Create(ctx, conn); err != nil { + return nil, err + } + // TODO (silvano): presplit table to have ranges in all nodes + sourceTable := db.KvTable{ + Database: source, + Schema: db.Public, + Name: "mytable", + } + if err := sourceTable.Create(ctx, conn); err != nil { + return nil, err + } + dest := db.Database{Name: "_blobcheck_restored"} + if err := dest.Create(ctx, conn); err != nil { + return nil, err + } + restoredTable := db.KvTable{ + Database: dest, + Schema: db.Public, + Name: "mytable", + } + return &Validator{ + pool: pool, + restoredTable: restoredTable, + sourceTable: sourceTable, + store: store, + }, nil + +} + +// checkBackups verifies that there is exactly one full and one incremental backup. +func (v *Validator) checkBackups(ctx *stopper.Context, extConn *db.ExternalConn) error { + conn, err := v.pool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + backups, err := extConn.ListTableBackups(ctx, conn) + if err != nil { + return err + } + if len(backups) != 1 { + return errors.Newf("expected exactly 1 backup collection, got %d", len(backups)) + } + v.latest = backups[0] + info, err := extConn.BackupInfo(ctx, conn, backups[0], v.sourceTable) + if err != nil { + return err + } + if len(info) != 2 { + return errors.Newf("expected exactly 2 backups (1 full, 1 incremental), got %d", len(info)) + } + fullCount := 0 + for _, i := range info { + if i.Full { + fullCount++ + } + } + if fullCount != 1 { + return errors.Errorf("expected one full backup and one incremental backup") + } + return nil + +} + +// Clean removes all resources created by the validator. +func (v *Validator) Clean(ctx *stopper.Context) error { + conn, err := v.pool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + var e1, e2 error + if err := v.sourceTable.Database.Drop(ctx, conn); err != nil { + slog.Error("drop source DB", "err", err) + e1 = err + } + if err := v.restoredTable.Database.Drop(ctx, conn); err != nil { + slog.Error("drop restored DB", "err", err) + e2 = err + } + return errors.Join(e1, e2) +} + +// Validate performs a backup/restore against a storage provider +// to asses minimum compatibility at the functional level. +// This does not imply that a storage provider passing the test is supported. +func (v *Validator) Validate(ctx *stopper.Context) (*Report, error) { + conn, err := v.pool.Acquire(ctx) + if err != nil { + return nil, err + } + defer conn.Release() + extConn, err := db.NewExternalConn(ctx, conn, v.store) + if err != nil { + return nil, err + } + defer extConn.Drop(ctx, conn) + slog.Info("capturing initial statistics") + // Capture initial stats + stats, err := extConn.Stats(ctx, conn) + if err != nil { + return nil, err + } + + // write some data + slog.Info("running workload to populate some data") + if err := v.runWorkload(ctx, defaultTime); err != nil { + return nil, err + } + + g := sync.WaitGroup{} + for w := range workers { + g.Add(1) + ctx.Go(func(ctx *stopper.Context) error { + defer g.Done() + slog.Info("starting", "worker", w) + return v.runWorkload(ctx, defaultTime) + }) + } + g.Add(1) + ctx.Go(func(ctx *stopper.Context) error { + defer g.Done() + conn, err := v.pool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + slog.Info("starting full backup") + return v.sourceTable.Backup(ctx, conn, extConn, false) + }) + g.Wait() + // Run an incremental backup + slog.Info("workers done") + slog.Info("starting incremental backup") + if err := v.sourceTable.Backup(ctx, conn, extConn, true); err != nil { + return nil, err + } + + // Verify that we have the backups, then restore in a separate database. + if err := v.checkBackups(ctx, extConn); err != nil { + return nil, err + } + + slog.Info("restoring backup") + if err := v.restoredTable.Restore(ctx, conn, extConn, &v.sourceTable); err != nil { + return nil, err + } + slog.Info("checking integrity") + originalBank, err := v.sourceTable.Fingerprint(ctx, conn) + if err != nil { + return nil, err + } + restore, err := v.restoredTable.Fingerprint(ctx, conn) + if err != nil { + return nil, err + } + + if originalBank != restore { + return nil, errors.Errorf("got %s, expected %s while comparing restoreDB with originalBank", + restore, originalBank) + } + + return &Report{ + SuggestedParams: extConn.SuggestedParams(), + Stats: stats, + }, nil +} + +// runWorkload runs the bank workload for the specified duration. +func (v *Validator) runWorkload(ctx *stopper.Context, duration time.Duration) error { + // TODO (silvano): if table is presplit, use prefix according to the split + w := workload.Workload{ + Prefix: uuid.New().String(), + Table: v.sourceTable, + } + done := make(chan bool) + ctx.Go(func(ctx *stopper.Context) error { + conn, err := v.pool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + return w.Run(ctx, conn, done) + }) + select { + case <-time.Tick(duration): + // signal workload to stop + close(done) + case <-ctx.Stopping(): + } + return nil +} diff --git a/internal/workload/workload.go b/internal/workload/workload.go new file mode 100644 index 0000000..b254e0b --- /dev/null +++ b/internal/workload/workload.go @@ -0,0 +1,58 @@ +// Copyright 2025 Cockroach Labs, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package workload + +import ( + "fmt" + "log/slog" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/cockroachdb/field-eng-powertools/stopper" + "github.com/cockroachlabs-field/blobcheck/internal/db" +) + +const ( + thinkTime = time.Millisecond +) + +// Workload represents a workload to be run. +type Workload struct { + // Table is the database table to operate on. + Table db.KvTable + Prefix string +} + +// Run executes a simple workload that inserts rows into the database. +func (w *Workload) Run(ctx *stopper.Context, conn *pgxpool.Conn, done <-chan bool) error { + var idx int + for { + err := w.Table.Upsert(ctx, conn, fmt.Sprintf("%s-%d", w.Prefix, idx), uuid.NewString()) + if err != nil { + slog.Error("failed to upsert row", "idx", idx, "err", err) + return err + } + select { + case <-done: + return nil + case <-ctx.Stopping(): + return nil + case <-time.Tick(thinkTime): + idx++ + } + } +} diff --git a/scripts/test.sh b/scripts/test.sh index 3a300de..9429435 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -17,4 +17,4 @@ set -e go clean --testcache -go test ./... +go test -v ./... From 96d1438b2db3a89b09074531f69866072c3cc271 Mon Sep 17 00:00:00 2001 From: Silvano Ravotto Date: Fri, 22 Aug 2025 11:45:35 -0400 Subject: [PATCH 2/2] blob: renamed store.Store to blob.Storage --- internal/{store => blob}/s3.go | 18 +++++------ internal/{store => blob}/s3_test.go | 10 +++---- internal/{store/store.go => blob/storage.go} | 8 ++--- .../store_test.go => blob/storage_test.go} | 2 +- internal/db/ext_conn.go | 20 ++++++------- internal/db/integration_test.go | 4 +-- internal/db/testutil.go | 22 +++++++------- ...tion_test.go => minio_integration_test.go} | 30 +++++++++---------- internal/validate/validate.go | 17 +++++------ 9 files changed, 65 insertions(+), 66 deletions(-) rename internal/{store => blob}/s3.go (96%) rename internal/{store => blob}/s3_test.go (98%) rename internal/{store/store.go => blob/storage.go} (88%) rename internal/{store/store_test.go => blob/storage_test.go} (99%) rename internal/validate/{integration_test.go => minio_integration_test.go} (81%) diff --git a/internal/store/s3.go b/internal/blob/s3.go similarity index 96% rename from internal/store/s3.go rename to internal/blob/s3.go index c2d4113..6df5e34 100644 --- a/internal/store/s3.go +++ b/internal/blob/s3.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package store +package blob import ( "context" @@ -60,7 +60,7 @@ const ( DefaultRegion = "aws-global" ) -// ValidParams lists the valid parameters for the S3 store. +// ValidParams lists the valid parameters for the S3 object storage. var ValidParams = []string{ AccountParam, SecretParam, TokenParam, EndPointParam, RegionParam, UsePathStyleParam, SkipChecksum, SkipTLSVerify, @@ -86,7 +86,7 @@ type s3Store struct { // S3FromEnv creates a new S3 store from the environment. // It will try to connect to the S3 service using the environment variables provided, // and adding any parameters that are required. -func S3FromEnv(ctx *stopper.Context, env *env.Env) (Store, error) { +func S3FromEnv(ctx *stopper.Context, env *env.Env) (Storage, error) { creds, ok := lookupEnv(env, []string{AccountParam, SecretParam}, []string{TokenParam, RegionParam}) if !ok { return nil, ErrMissingParam @@ -105,7 +105,7 @@ func S3FromEnv(ctx *stopper.Context, env *env.Env) (Store, error) { return initial.try(ctx, initial.BucketName()) } -// BucketName implements Store. +// BucketName implements BlobStorage. func (s *s3Store) BucketName() string { cleanedPath := path.Clean(s.dest) components := strings.Split(cleanedPath, "/") @@ -115,7 +115,7 @@ func (s *s3Store) BucketName() string { return components[0] } -// Params implements Store. +// Params implements BlobStorage. func (s *s3Store) Params() Params { params := maps.Clone(s.params) for param := range params { @@ -126,7 +126,7 @@ func (s *s3Store) Params() Params { return params } -// URL implements Store. +// URL implements BlobStorage. func (s *s3Store) URL() string { res := s.escapeValues() res = fmt.Sprintf("s3://%s?%s", s.dest, res) @@ -144,8 +144,8 @@ func (s *s3Store) addParam(key string, value string) error { // candidateConfigs provides a set of candidate configurations for the S3 store. // TODO(silvano): consider making this public. -func (s *s3Store) candidateConfigs() iter.Seq[Store] { - return func(yield func(Store) bool) { +func (s *s3Store) candidateConfigs() iter.Seq[Storage] { + return func(yield func(Storage) bool) { combos := [][]string{ {}, // baseline first {SkipChecksum}, @@ -211,7 +211,7 @@ const ( ) // try attempts to connect to the S3 store using alternative configurations. -func (s *s3Store) try(ctx context.Context, bucketName string) (Store, error) { +func (s *s3Store) try(ctx context.Context, bucketName string) (Storage, error) { var clientMode aws.ClientLogMode if s.verbose { clientMode |= aws.LogRetries | aws.LogRequestWithBody | aws.LogRequestEventMessage | aws.LogResponse | aws.LogResponseEventMessage | aws.LogSigning diff --git a/internal/store/s3_test.go b/internal/blob/s3_test.go similarity index 98% rename from internal/store/s3_test.go rename to internal/blob/s3_test.go index 496f881..9df8ca0 100644 --- a/internal/store/s3_test.go +++ b/internal/blob/s3_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package store +package blob import ( "fmt" @@ -110,7 +110,7 @@ func TestS3Alternates(t *testing.T) { } gotSeq := s.candidateConfigs() var got []Params - gotSeq(func(d Store) bool { + gotSeq(func(d Storage) bool { if alt, ok := d.(*s3Store); ok { got = append(got, alt.params) } @@ -269,14 +269,14 @@ func TestMinioFromEnv(t *testing.T) { Testing: true, } - store, err := S3FromEnv(ctx, env) + blobStorage, err := S3FromEnv(ctx, env) if tt.wantErr != nil { - assert.Nil(t, store) + assert.Nil(t, blobStorage) assert.ErrorIs(t, err, tt.wantErr) return } require.NoError(t, err) - s3 := (store.(*s3Store)) + s3 := (blobStorage.(*s3Store)) assert.Equal(t, tt.want, s3.params) assert.Regexp(t, fmt.Sprintf("^%s", testPath), s3.dest) }) diff --git a/internal/store/store.go b/internal/blob/storage.go similarity index 88% rename from internal/store/store.go rename to internal/blob/storage.go index bf4e062..607241b 100644 --- a/internal/store/store.go +++ b/internal/blob/storage.go @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO(silvano): rename this package to "externalStorage"/"blobStorage" -package store +// Package blob provides an interface for interacting with blob storage. +package blob import ( "iter" @@ -43,8 +43,8 @@ func (p Params) Iter() iter.Seq2[string, string] { } } -// Store represents a destination to perform a backup/restore. -type Store interface { +// Storage represents a destination to perform a backup/restore. +type Storage interface { // Params returns a copy of the params. Params() Params // URL returns a escaped URL. diff --git a/internal/store/store_test.go b/internal/blob/storage_test.go similarity index 99% rename from internal/store/store_test.go rename to internal/blob/storage_test.go index 025920d..b7e053c 100644 --- a/internal/store/store_test.go +++ b/internal/blob/storage_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package store +package blob import ( "testing" diff --git a/internal/db/ext_conn.go b/internal/db/ext_conn.go index 92b4cd1..d0b5b10 100644 --- a/internal/db/ext_conn.go +++ b/internal/db/ext_conn.go @@ -25,16 +25,16 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/field-eng-powertools/semver" "github.com/cockroachdb/field-eng-powertools/stopper" - "github.com/cockroachlabs-field/blobcheck/internal/store" + "github.com/cockroachlabs-field/blobcheck/internal/blob" ) // MinVersionForStats is the minimum version required for retrieving statistics. var MinVersionForStats = semver.MustSemver("v25.1.0") -// ExternalConn represents an external connection to an object store. +// ExternalConn represents an external connection to blob storage. type ExternalConn struct { - name Ident - store store.Store + name Ident + blob blob.Storage } // Stats represents statistics about the external connection. @@ -58,11 +58,11 @@ type TableBackup struct { // NewExternalConn creates a new external connection. func NewExternalConn( - ctx *stopper.Context, conn *pgxpool.Conn, store store.Store, + ctx *stopper.Context, conn *pgxpool.Conn, blob blob.Storage, ) (*ExternalConn, error) { extConn := &ExternalConn{ - name: "_blobcheck_backup", - store: store, + name: "_blobcheck_backup", + blob: blob, } err := extConn.Drop(ctx, conn) if err != nil { @@ -128,7 +128,7 @@ func (c *ExternalConn) BackupInfo( const createExtConnStmt = `CREATE EXTERNAL CONNECTION '%[1]s' AS '%[2]s'` func (c *ExternalConn) create(ctx *stopper.Context, conn *pgxpool.Conn) error { - destURL := c.store.URL() + destURL := c.blob.URL() stmt := fmt.Sprintf(createExtConnStmt, c.name, destURL) slog.Debug("trying", slog.String("url", destURL)) if _, err := conn.Exec(ctx, stmt); err != nil { @@ -203,6 +203,6 @@ func (c *ExternalConn) String() string { } // SuggestedParams returns the suggested parameters for the external connection. -func (c *ExternalConn) SuggestedParams() store.Params { - return c.store.Params() +func (c *ExternalConn) SuggestedParams() blob.Params { + return c.blob.Params() } diff --git a/internal/db/integration_test.go b/internal/db/integration_test.go index ee45183..6c98682 100644 --- a/internal/db/integration_test.go +++ b/internal/db/integration_test.go @@ -55,8 +55,8 @@ func TestIntegration(t *testing.T) { r.NoError(err) extConn := &ExternalConn{ - name: "test-conn", - store: &testStore{}, + name: "test-conn", + blob: &testBlobStorage{}, } extConn.create(ctx, conn) defer func() { a.NoError(extConn.Drop(ctx, conn)) }() diff --git a/internal/db/testutil.go b/internal/db/testutil.go index 94e5ced..a3c0c0a 100644 --- a/internal/db/testutil.go +++ b/internal/db/testutil.go @@ -20,7 +20,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/cockroachdb/field-eng-powertools/stopper" - "github.com/cockroachlabs-field/blobcheck/internal/store" + "github.com/cockroachlabs-field/blobcheck/internal/blob" ) const ( @@ -33,7 +33,7 @@ const ( type TestEnv struct { Database Database KvTable KvTable - Store store.Store + Blob blob.Storage Pool *pgxpool.Pool } @@ -87,22 +87,22 @@ func (e TestEnv) Cleanup(ctx *stopper.Context) error { return nil } -type testStore struct { +type testBlobStorage struct { } -var _ store.Store = &testStore{} +var _ blob.Storage = &testBlobStorage{} -// BucketName implements store.Store. -func (t *testStore) BucketName() string { +// BucketName implements blob.BlobStorage. +func (t *testBlobStorage) BucketName() string { return testBucket } -// Params implements store.Store. -func (t *testStore) Params() store.Params { - return store.Params{} +// Params implements blob.BlobStorage. +func (t *testBlobStorage) Params() blob.Params { + return blob.Params{} } -// URL implements store.Store. -func (t *testStore) URL() string { +// URL implements blob.BlobStorage. +func (t *testBlobStorage) URL() string { return externalURL } diff --git a/internal/validate/integration_test.go b/internal/validate/minio_integration_test.go similarity index 81% rename from internal/validate/integration_test.go rename to internal/validate/minio_integration_test.go index b17e1bf..46b756d 100644 --- a/internal/validate/integration_test.go +++ b/internal/validate/minio_integration_test.go @@ -25,9 +25,9 @@ import ( "github.com/stretchr/testify/require" "github.com/cockroachdb/field-eng-powertools/stopper" + "github.com/cockroachlabs-field/blobcheck/internal/blob" "github.com/cockroachlabs-field/blobcheck/internal/db" "github.com/cockroachlabs-field/blobcheck/internal/env" - "github.com/cockroachlabs-field/blobcheck/internal/store" ) const minioEndpoint = "localhost:29000" @@ -58,22 +58,22 @@ func createMinioBucket( } -// TestValidation performs a minimal validation test. -func TestValidation(t *testing.T) { +// TestMinio performs a minimal validation test against a MinIO instance. +func TestMinio(t *testing.T) { ctx := stopper.WithContext(t.Context()) r := require.New(t) endpoint := fmt.Sprintf("http://%s", minioEndpoint) - vars := store.Params{ - store.AccountParam: "cockroach", - store.SecretParam: "cockroach", - store.RegionParam: "us-east-1", + vars := blob.Params{ + blob.AccountParam: "cockroach", + blob.SecretParam: "cockroach", + blob.RegionParam: "us-east-1", } - expected := store.Params{ - store.AccountParam: "cockroach", - store.SecretParam: store.Obfuscated, - store.RegionParam: "us-east-1", - store.EndPointParam: endpoint, - store.UsePathStyleParam: "true", + expected := blob.Params{ + blob.AccountParam: "cockroach", + blob.SecretParam: blob.Obfuscated, + blob.RegionParam: "us-east-1", + blob.EndPointParam: endpoint, + blob.UsePathStyleParam: "true", } lookup := func(key string) (string, bool) { val, ok := vars[key] @@ -89,9 +89,9 @@ func TestValidation(t *testing.T) { Testing: true, } r.NoError(createMinioBucket(ctx, vars, env, bucketName)) - store, err := store.S3FromEnv(ctx, env) + blobStorage, err := blob.S3FromEnv(ctx, env) r.NoError(err) - validator, err := New(ctx, env, store) + validator, err := New(ctx, env, blobStorage) r.NoError(err) defer validator.Clean(ctx) report, err := validator.Validate(ctx) diff --git a/internal/validate/validate.go b/internal/validate/validate.go index 346dc00..43aab1c 100644 --- a/internal/validate/validate.go +++ b/internal/validate/validate.go @@ -25,9 +25,9 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/field-eng-powertools/stopper" + "github.com/cockroachlabs-field/blobcheck/internal/blob" "github.com/cockroachlabs-field/blobcheck/internal/db" "github.com/cockroachlabs-field/blobcheck/internal/env" - "github.com/cockroachlabs-field/blobcheck/internal/store" "github.com/cockroachlabs-field/blobcheck/internal/workload" ) @@ -39,20 +39,20 @@ const ( // Report contains the results of a validation run. type Report struct { - SuggestedParams store.Params + SuggestedParams blob.Params Stats []*db.Stats } // Validator verifies backup/restore functionality type Validator struct { pool *pgxpool.Pool - store store.Store + blobStorage blob.Storage sourceTable, restoredTable db.KvTable latest string } // New creates a new Validator. -func New(ctx *stopper.Context, env *env.Env, store store.Store) (*Validator, error) { +func New(ctx *stopper.Context, env *env.Env, blobStorage blob.Storage) (*Validator, error) { config, err := pgxpool.ParseConfig(env.DatabaseURL) if err != nil { return nil, err @@ -93,9 +93,8 @@ func New(ctx *stopper.Context, env *env.Env, store store.Store) (*Validator, err pool: pool, restoredTable: restoredTable, sourceTable: sourceTable, - store: store, + blobStorage: blobStorage, }, nil - } // checkBackups verifies that there is exactly one full and one incremental backup. @@ -118,7 +117,7 @@ func (v *Validator) checkBackups(ctx *stopper.Context, extConn *db.ExternalConn) return err } if len(info) != 2 { - return errors.Newf("expected exactly 2 backups (1 full, 1 incremental), got %d", len(info)) + return errors.Newf("expected exactly 2 backups (1 full, 1 incremental), got %d backups", len(info)) } fullCount := 0 for _, i := range info { @@ -127,7 +126,7 @@ func (v *Validator) checkBackups(ctx *stopper.Context, extConn *db.ExternalConn) } } if fullCount != 1 { - return errors.Errorf("expected one full backup and one incremental backup") + return errors.Newf("expected exactly 1 full backup, got %d", fullCount) } return nil @@ -161,7 +160,7 @@ func (v *Validator) Validate(ctx *stopper.Context) (*Report, error) { return nil, err } defer conn.Release() - extConn, err := db.NewExternalConn(ctx, conn, v.store) + extConn, err := db.NewExternalConn(ctx, conn, v.blobStorage) if err != nil { return nil, err }