Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

roachtest: add tpcc OR roachtest #123942

Merged
merged 3 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/backup_fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func registerBackupFixtures(r registry.Registry) {
hardware: makeHardwareSpecs(hardwareSpecs{workloadNode: true}),
scheduledBackupSpecs: makeBackupFixtureSpecs(scheduledBackupSpecs{
backupSpecs: backupSpecs{
workload: tpccRestore{opts: tpccRestoreOptions{warehouses: 500}},
workload: tpccRestore{opts: tpccRestoreOptions{warehouses: 5000}},
nonRevisionHistory: true,
},
}),
Expand Down
76 changes: 68 additions & 8 deletions pkg/cmd/roachtest/tests/online_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
crdbworkload "github.com/cockroachdb/cockroach/pkg/workload"
Expand Down Expand Up @@ -80,17 +82,31 @@ func registerOnlineRestorePerf(r registry.Registry) {
skip: "fails because of #118283",
},
{
// 8TB tpce Online Restore
hardware: makeHardwareSpecs(hardwareSpecs{nodes: 10, volumeSize: 2000,
ebsThroughput: 1000 /* MB/s */, workloadNode: true}),
// 350 GB tpcc Online Restore
hardware: makeHardwareSpecs(hardwareSpecs{workloadNode: true}),
backup: makeRestoringBackupSpecs(backupSpecs{
nonRevisionHistory: true,
version: fixtureFromMasterVersion,
workload: tpceRestore{customers: 500000}}),
timeout: 5 * time.Hour,
cloud: spec.GCE,
version: "24.1",
workload: tpccRestore{tpccRestoreOptions{warehouses: 5000, waitFraction: 0, workers: 100, maxRate: 300}},
customFixtureDir: `'gs://cockroach-fixtures-us-east1/backups/tpc-c/v24.1/db/warehouses=5k?AUTH=implicit'`}),
timeout: 1 * time.Hour,
suites: registry.Suites(registry.Nightly),
restoreUptoIncremental: 1,
skip: "used for ad hoc experiments",
restoreUptoIncremental: 0,
},
{
// 8.5TB tpcc Online Restore
hardware: makeHardwareSpecs(hardwareSpecs{nodes: 10, volumeSize: 1500, workloadNode: true}),
backup: makeRestoringBackupSpecs(backupSpecs{
nonRevisionHistory: true,
cloud: spec.GCE,
version: "24.1",
workload: tpccRestore{tpccRestoreOptions{warehouses: 150000, waitFraction: 0, workers: 100, maxRate: 1000}},
customFixtureDir: `'gs://cockroach-fixtures-us-east1/backups/tpc-c/v24.1/db/warehouses=150k?AUTH=implicit'`}),
timeout: 3 * time.Hour,
suites: registry.Suites(registry.Nightly),
restoreUptoIncremental: 0,
skip: "link phase is really slow, which will cause the test to time out",
},
} {
for _, runOnline := range []bool{true, false} {
Expand All @@ -107,6 +123,9 @@ func registerOnlineRestorePerf(r registry.Registry) {
sp.namePrefix = "offline/"
sp.skip = "used for ad hoc experiments"
}
if !runWorkload {
sp.skip = "used for ad hoc experiments"
}

sp.namePrefix = sp.namePrefix + fmt.Sprintf("workload=%t", runWorkload)
if !useWorkarounds {
Expand Down Expand Up @@ -158,6 +177,30 @@ func registerOnlineRestorePerf(r registry.Registry) {
}
}

// maybeAddSomeEmptyTables adds some empty tables to the cluster to exercise
// prefix rewrite rules.
func maybeAddSomeEmptyTables(ctx context.Context, rd restoreDriver) error {
if rd.rng.Intn(2) == 0 {
return nil
}
rd.t.L().Printf("adding some empty tables")
db, err := rd.c.ConnE(ctx, rd.t.L(), rd.c.Node(1)[0])
if err != nil {
return err
}
defer db.Close()
if _, err := db.Exec(`CREATE DATABASE empty`); err != nil {
return err
}
numTables := rd.rng.Intn(10)
for i := 0; i < numTables; i++ {
if _, err := db.Exec(fmt.Sprintf(`CREATE TABLE empty.t%d (a INT)`, i)); err != nil {
return err
}
}
return nil
}

func registerOnlineRestoreCorrectness(r registry.Registry) {
sp := restoreSpecs{
hardware: makeHardwareSpecs(hardwareSpecs{workloadNode: true}),
Expand Down Expand Up @@ -409,6 +452,13 @@ func waitForDownloadJob(
return downloadJobEndTimeLowerBound, err
}
if status == string(jobs.StatusSucceeded) {
var externalBytes uint64
if err := conn.QueryRow(jobutils.GetExternalBytesForConnectedTenant).Scan(&externalBytes); err != nil {
return downloadJobEndTimeLowerBound, errors.Wrapf(err, "could not get external bytes")
}
if externalBytes != 0 {
return downloadJobEndTimeLowerBound, errors.Newf(" not all data downloaded. %d external bytes still in cluster", externalBytes)
}
postDownloadDelay := time.Minute
l.Printf("Download job completed; let workload run for %.2f minute before proceeding", postDownloadDelay.Minutes())
time.Sleep(postDownloadDelay)
Expand Down Expand Up @@ -500,11 +550,21 @@ func runRestore(
if _, err := db.Exec("SET CLUSTER SETTING admission.sql_kv_response.enabled=false"); err != nil {
return err
}
if _, err := db.Exec("SET CLUSTER SETTING kv.consistency_queue.enabled=false"); err != nil {
return err
}
if _, err := db.Exec("SET CLUSTER SETTING kv.range_merge.skip_external_bytes.enabled=true"); err != nil {
return err
}

}
opts := ""
if runOnline {
opts = "WITH EXPERIMENTAL DEFERRED COPY"
}
if err := maybeAddSomeEmptyTables(ctx, rd); err != nil {
return errors.Wrapf(err, "failed to add some empty tables")
}
restoreStartTime = timeutil.Now()
restoreCmd := rd.restoreCmd(fmt.Sprintf("DATABASE %s", sp.backup.workload.DatabaseName()), opts)
t.L().Printf("Running %s", restoreCmd)
Expand Down
55 changes: 36 additions & 19 deletions pkg/cmd/roachtest/tests/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -638,6 +639,10 @@ type backupSpecs struct {
// TODO(msbutler): if another fixture requires a different backup option,
// create a new backupOpts struct.
nonRevisionHistory bool

// customFixtureDir is used when an engineer is naughty and doesn't create the
// backup fixture in the correct fixture path.
customFixtureDir string
}

func (bs backupSpecs) CloudIsCompatible(cloud string) error {
Expand Down Expand Up @@ -670,6 +675,9 @@ func (bs backupSpecs) storagePrefix() string {
func (bs backupSpecs) backupCollection() string {
// N.B. AWS buckets are _regional_ whereas GCS buckets are _multi-regional_. Thus, in order to avoid egress (cost),
// we use us-east-2 for AWS, which is the default region for all roachprod clusters. (See roachprod/vm/aws/aws.go)
if bs.customFixtureDir != "" {
return bs.customFixtureDir
}
properties := ""
if bs.nonRevisionHistory {
properties = "/rev-history=false"
Expand Down Expand Up @@ -719,6 +727,9 @@ func makeBackupSpecs(override backupSpecs, specs backupSpecs) backupSpecs {
if override.workload != nil {
specs.workload = override.workload
}
if override.customFixtureDir != "" {
specs.customFixtureDir = override.customFixtureDir
}
return specs
}

Expand Down Expand Up @@ -820,6 +831,7 @@ type tpccRestoreOptions struct {
warehouses int
workers int
maxOps int
maxRate int
waitFraction float64
queryTraceFile string
seed uint64
Expand Down Expand Up @@ -850,6 +862,7 @@ func (tpcc tpccRestore) run(
MaybeFlag(tpcc.opts.workers > 0, "workers", tpcc.opts.workers).
MaybeFlag(tpcc.opts.waitFraction != 1, "wait", tpcc.opts.waitFraction).
MaybeFlag(tpcc.opts.maxOps != 0, "max-ops", tpcc.opts.maxOps).
MaybeFlag(tpcc.opts.maxRate != 0, "max-rate", tpcc.opts.maxRate).
MaybeFlag(tpcc.opts.seed != 0, "seed", tpcc.opts.seed).
MaybeFlag(tpcc.opts.fakeTime != 0, "fake-time", tpcc.opts.fakeTime).
MaybeFlag(tpcc.opts.queryTraceFile != "", "query-trace-file", tpcc.opts.queryTraceFile).
Expand All @@ -867,14 +880,12 @@ func (tpcc tpccRestore) String() string {
switch tpcc.opts.warehouses {
case 10:
builder.WriteString("150MB")
case 500:
builder.WriteString("8GB")
case 7000:
builder.WriteString("115GB")
case 25000:
builder.WriteString("400GB")
case 5000:
builder.WriteString("350GB")
case 150000:
builder.WriteString("8TB")
default:
panic("tpcc warehouse count not recognized")
panic(fmt.Sprintf("tpcc warehouse %d count not recognized", tpcc.opts.warehouses))
}
return builder.String()
}
Expand All @@ -895,7 +906,7 @@ type restoreSpecs struct {
suites registry.SuiteSet

// restoreUptoIncremental specifies the number of incremental backups in the
// chain to restore up to.
// chain to restore up to. If set to 0, no AOST is used.
restoreUptoIncremental int

// namePrefix appears in the name of the roachtest, i.e. `restore/{prefix}/{config}`.
Expand Down Expand Up @@ -955,20 +966,18 @@ type restoreDriver struct {
// gets computed during test execution, it is stored in the restoreDriver
// rather than the restoreSpecs.
aost string
}

func validateRestoreSpecs(t test.Test, sp restoreSpecs) {
if sp.restoreUptoIncremental == 0 {
t.Fatalf("invalid restoreSpecs; unspecified restoreUptoIncremental field")
}
rng *rand.Rand
}

func makeRestoreDriver(t test.Test, c cluster.Cluster, sp restoreSpecs) restoreDriver {
validateRestoreSpecs(t, sp)
rng, seed := randutil.NewPseudoRand()
t.L().Printf(`Random Seed is %d`, seed)
return restoreDriver{
t: t,
c: c,
sp: sp,
t: t,
c: c,
sp: sp,
rng: rng,
}
}

Expand All @@ -994,6 +1003,10 @@ func (rd *restoreDriver) prepareCluster(ctx context.Context) {

// getAOST gets the AOST to use in the restore cmd.
func (rd *restoreDriver) getAOST(ctx context.Context) {
if rd.sp.restoreUptoIncremental == 0 {
rd.aost = ""
return
}
var aost string
conn := rd.c.Conn(ctx, rd.t.L(), 1)
defer conn.Close()
Expand All @@ -1003,8 +1016,12 @@ func (rd *restoreDriver) getAOST(ctx context.Context) {
}

func (rd *restoreDriver) restoreCmd(target, opts string) string {
query := fmt.Sprintf(`RESTORE %s FROM %s IN %s AS OF SYSTEM TIME '%s' %s`,
target, rd.sp.backup.fullBackupDir, rd.sp.backup.backupCollection(), rd.aost, opts)
var aostSubCmd string
if rd.aost != "" {
aostSubCmd = fmt.Sprintf("AS OF SYSTEM TIME '%s'", rd.aost)
}
query := fmt.Sprintf(`RESTORE %s FROM %s IN %s %s %s`,
target, rd.sp.backup.fullBackupDir, rd.sp.backup.backupCollection(), aostSubCmd, opts)
rd.t.L().Printf("Running restore cmd: %s", query)
return query
}
Expand Down