From d73d7c4a04e2ab6785bbd70db07ccf69572bf6c5 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Wed, 4 Oct 2023 05:35:49 -0700 Subject: [PATCH 1/2] roachtest: remove clusterAllocator argument The only reason for this argument was so we could inject errors in one test. This commit replaces it with a "pre allocate" function in the cluster opts. Epic: none Release note: None --- pkg/cmd/roachtest/run.go | 2 +- pkg/cmd/roachtest/test_runner.go | 19 +++++++++++++++---- pkg/cmd/roachtest/test_test.go | 20 ++++++++++---------- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/pkg/cmd/roachtest/run.go b/pkg/cmd/roachtest/run.go index f92868e43e5f..cc882db7fc6c 100644 --- a/pkg/cmd/roachtest/run.go +++ b/pkg/cmd/roachtest/run.go @@ -279,7 +279,7 @@ func runTests(register func(registry.Registry), args []string, benchOnly bool) e skipInit: skipInit, goCoverEnabled: goCoverEnabled, }, - lopt, nil /* clusterAllocator */) + lopt) // Make sure we attempt to clean up. We run with a non-canceled ctx; the // ctx above might be canceled in case a signal was received. If that's diff --git a/pkg/cmd/roachtest/test_runner.go b/pkg/cmd/roachtest/test_runner.go index 7eb8879dbc4e..46b5d587e6e6 100644 --- a/pkg/cmd/roachtest/test_runner.go +++ b/pkg/cmd/roachtest/test_runner.go @@ -175,6 +175,14 @@ type clustersOpt struct { // Controls whether the cluster is cleaned up at the end of the test. debugMode debugMode + + // preAllocateClusterFn is a function called right before allocating a + // cluster. It allows the caller to e.g. inject errors for testing. + preAllocateClusterFn func( + ctx context.Context, + t registry.TestSpec, + arch vm.CPUArch, + ) error } type debugMode int @@ -235,7 +243,6 @@ func (r *testRunner) Run( clustersOpt clustersOpt, topt testOpts, lopt loggingOpt, - clusterAllocator clusterAllocatorFn, ) error { // Validate options. if len(tests) == 0 { @@ -279,9 +286,7 @@ func (r *testRunner) Run( } } } - if clusterAllocator == nil { - clusterAllocator = defaultClusterAllocator(r, clustersOpt, lopt) - } + clusterAllocator := defaultClusterAllocator(r, clustersOpt, lopt) n := len(tests) if n*count < parallelism { @@ -422,6 +427,12 @@ func defaultClusterAllocator( wStatus.SetStatus(fmt.Sprintf("creating cluster (arch=%q)", arch)) defer wStatus.SetStatus("") + if clustersOpt.preAllocateClusterFn != nil { + if err := clustersOpt.preAllocateClusterFn(ctx, t, arch); err != nil { + return nil, nil, err + } + } + existingClusterName := clustersOpt.clusterName if existingClusterName != "" { // Logs for attaching to a cluster go to a dedicated log file. diff --git a/pkg/cmd/roachtest/test_test.go b/pkg/cmd/roachtest/test_test.go index daaeb87dfe6b..a68fbb8cc017 100644 --- a/pkg/cmd/roachtest/test_test.go +++ b/pkg/cmd/roachtest/test_test.go @@ -174,19 +174,19 @@ func TestRunnerRun(t *testing.T) { t.Run("", func(t *testing.T) { rt := setupRunnerTest(t, r, c.filters) - var clusterAllocator clusterAllocatorFn - // run without cluster allocator error injection - err := rt.runner.Run(ctx, rt.tests, 1, /* count */ - defaultParallelism, rt.copt, testOpts{}, rt.lopt, clusterAllocator) + const count = 1 + err := rt.runner.Run(ctx, rt.tests, count, defaultParallelism, rt.copt, testOpts{}, rt.lopt) assertTestCompletion(t, rt.tests, c.filters, rt.runner.getCompletedTests(), err, c.expErr) // N.B. skip the case of no matching tests if len(rt.tests) > 0 { // run _with_ cluster allocator error injection - clusterAllocator = alwaysFailingClusterAllocator - err = rt.runner.Run(ctx, rt.tests, 1, /* count */ - defaultParallelism, rt.copt, testOpts{}, rt.lopt, clusterAllocator) + copt := rt.copt + copt.preAllocateClusterFn = func(ctx context.Context, t registry.TestSpec, arch vm.CPUArch) error { + return errors.New("cluster creation failed") + } + err = rt.runner.Run(ctx, rt.tests, count, defaultParallelism, copt, testOpts{}, rt.lopt) assertTestCompletion(t, rt.tests, c.filters, rt.runner.getCompletedTests(), @@ -235,7 +235,7 @@ func TestRunnerEncryptionAtRest(t *testing.T) { for i := 0; i < 10000; i++ { require.NoError(t, rt.runner.Run( context.Background(), rt.tests, 1 /* count */, 1, /* parallelism */ - rt.copt, testOpts{}, rt.lopt, nil, // clusterAllocator + rt.copt, testOpts{}, rt.lopt, )) if atomic.LoadInt32(&sawEncrypted) == 0 { // NB: since it's a 50% chance, the probability of *not* hitting @@ -371,7 +371,7 @@ func TestRunnerTestTimeout(t *testing.T) { }, } err := runner.Run(ctx, []registry.TestSpec{test}, 1, /* count */ - defaultParallelism, copt, testOpts{}, lopt, nil /* clusterAllocator */) + defaultParallelism, copt, testOpts{}, lopt) if !testutils.IsError(err, "some tests failed") { t.Fatalf("expected error \"some tests failed\", got: %v", err) } @@ -466,7 +466,7 @@ func runExitCodeTest(t *testing.T, injectedError error) error { stderr: io.Discard, artifactsDir: "", } - return runner.Run(ctx, tests, 1, 1, clustersOpt{}, testOpts{}, lopt, nil /* clusterAllocator */) + return runner.Run(ctx, tests, 1, 1, clustersOpt{}, testOpts{}, lopt) } func TestExitCode(t *testing.T) { From fc571e3da0dc61472d5123e699422dfab361a5a2 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Wed, 4 Oct 2023 05:35:49 -0700 Subject: [PATCH 2/2] roachtest: clean up allocate cluster function Replace the allocate cluster function with a method. This makes things more clear, especially the lifetime of the cluster factory. Epic: none Release note: None --- pkg/cmd/roachtest/BUILD.bazel | 1 - pkg/cmd/roachtest/cluster.go | 4 +- pkg/cmd/roachtest/test_runner.go | 193 ++++++++++++++----------------- pkg/cmd/roachtest/test_test.go | 28 ++--- 4 files changed, 95 insertions(+), 131 deletions(-) diff --git a/pkg/cmd/roachtest/BUILD.bazel b/pkg/cmd/roachtest/BUILD.bazel index 5d19c0f44e02..d0b3b7242ee3 100644 --- a/pkg/cmd/roachtest/BUILD.bazel +++ b/pkg/cmd/roachtest/BUILD.bazel @@ -101,7 +101,6 @@ go_test( "//pkg/roachprod/vm/gce", "//pkg/testutils", "//pkg/testutils/echotest", - "//pkg/util/quotapool", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/version", diff --git a/pkg/cmd/roachtest/cluster.go b/pkg/cmd/roachtest/cluster.go index 480477f57d52..dfcae3cef05d 100644 --- a/pkg/cmd/roachtest/cluster.go +++ b/pkg/cmd/roachtest/cluster.go @@ -785,7 +785,7 @@ type clusterFactory struct { namePrefix string // counter is incremented with every new cluster. It's used as part of the cluster's name. // Accessed atomically. - counter uint64 + counter atomic.Uint64 // The registry with whom all clustered will be registered. r *clusterRegistry // artifactsDir is the directory in which the cluster creation log file will be placed. @@ -831,7 +831,7 @@ func (f *clusterFactory) genName(cfg clusterConfig) string { if cfg.nameOverride != "" { return cfg.nameOverride } - count := atomic.AddUint64(&f.counter, 1) + count := f.counter.Add(1) return makeClusterName( fmt.Sprintf("%s-%02d-%s", f.namePrefix, count, cfg.spec.String())) } diff --git a/pkg/cmd/roachtest/test_runner.go b/pkg/cmd/roachtest/test_runner.go index 46b5d587e6e6..b1f6b3534f6d 100644 --- a/pkg/cmd/roachtest/test_runner.go +++ b/pkg/cmd/roachtest/test_runner.go @@ -286,7 +286,11 @@ func (r *testRunner) Run( } } } - clusterAllocator := defaultClusterAllocator(r, clustersOpt, lopt) + + clusterFactory := newClusterFactory( + clustersOpt.user, clustersOpt.clusterID, lopt.artifactsDir, + r.cr, numConcurrentClusterCreations(), + ) n := len(tests) if n*count < parallelism { @@ -317,9 +321,9 @@ func (r *testRunner) Run( err := r.runWorker( ctx, fmt.Sprintf("w%d", i) /* name */, r.work, qp, r.stopper.ShouldQuiesce(), - clustersOpt.debugMode, - lopt.artifactsDir, lopt.literalArtifactsDir, lopt.tee, lopt.stdout, - clusterAllocator, + clusterFactory, + clustersOpt, + lopt, topt, l, ) @@ -406,91 +410,72 @@ func generateRunID(cOpts clustersOpt) string { return fmt.Sprintf("%s-%s", cOpts.user, cOpts.clusterID) } -// defaultClusterAllocator is used by workers to create new clusters (or to attach -// to an existing one). -// -// N.B. the resulting clusterAllocatorFn reuses the same clusterFactory to allocate clusters. -func defaultClusterAllocator( - r *testRunner, clustersOpt clustersOpt, lopt loggingOpt, -) clusterAllocatorFn { - clusterFactory := newClusterFactory( - clustersOpt.user, clustersOpt.clusterID, lopt.artifactsDir, r.cr, numConcurrentClusterCreations()) +func (r *testRunner) allocateCluster( + ctx context.Context, + clusterFactory *clusterFactory, + clustersOpt clustersOpt, + lopt loggingOpt, + t registry.TestSpec, + arch vm.CPUArch, + alloc *quotapool.IntAlloc, + wStatus *workerStatus, +) (*clusterImpl, *vm.CreateOpts, error) { + wStatus.SetStatus(fmt.Sprintf("creating cluster (arch=%q)", arch)) + defer wStatus.SetStatus("") - allocateCluster := func( - ctx context.Context, - t registry.TestSpec, - arch vm.CPUArch, - alloc *quotapool.IntAlloc, - artifactsDir string, - wStatus *workerStatus, - ) (*clusterImpl, *vm.CreateOpts, error) { - wStatus.SetStatus(fmt.Sprintf("creating cluster (arch=%q)", arch)) - defer wStatus.SetStatus("") - - if clustersOpt.preAllocateClusterFn != nil { - if err := clustersOpt.preAllocateClusterFn(ctx, t, arch); err != nil { - return nil, nil, err - } + if clustersOpt.preAllocateClusterFn != nil { + if err := clustersOpt.preAllocateClusterFn(ctx, t, arch); err != nil { + return nil, nil, err } + } - existingClusterName := clustersOpt.clusterName - if existingClusterName != "" { - // Logs for attaching to a cluster go to a dedicated log file. - logPath := filepath.Join(artifactsDir, runnerLogsDir, "cluster-create", existingClusterName+".log") - clusterL, err := logger.RootLogger(logPath, lopt.tee) - if err != nil { - return nil, nil, err - } - defer clusterL.Close() - opt := attachOpt{ - skipValidation: r.config.skipClusterValidationOnAttach, - skipStop: r.config.skipClusterStopOnAttach, - skipWipe: r.config.skipClusterWipeOnAttach, - } - // TODO(srosenberg): we need to think about validation here. Attaching to an incompatible cluster, e.g., - // using arm64 AMI with amd64 binary, would result in obscure errors. The test runner ensures compatibility - // during cluster reuse, whereas attachment via CLI (e.g., via roachprod) does not. - lopt.l.PrintfCtx(ctx, "Attaching to existing cluster %s for test %s", existingClusterName, t.Name) - c, err := attachToExistingCluster(ctx, existingClusterName, clusterL, t.Cluster, opt, r.cr) - if err == nil { - // Pretend pre-existing's cluster architecture matches the desired one; see the above TODO wrt validation. - c.arch = arch - return c, nil, nil - } - if !errors.Is(err, errClusterNotFound) { - return nil, nil, err - } - // Fall through to create new cluster with name override. - lopt.l.PrintfCtx( - ctx, "Creating new cluster with custom name %q for test %s: %s (arch=%q)", - clustersOpt.clusterName, t.Name, t.Cluster, arch, - ) - } else { - lopt.l.PrintfCtx(ctx, "Creating new cluster for test %s: %s (arch=%q)", t.Name, t.Cluster, arch) + existingClusterName := clustersOpt.clusterName + if existingClusterName != "" { + // Logs for attaching to a cluster go to a dedicated log file. + logPath := filepath.Join(lopt.artifactsDir, runnerLogsDir, "cluster-create", existingClusterName+".log") + clusterL, err := logger.RootLogger(logPath, lopt.tee) + if err != nil { + return nil, nil, err + } + defer clusterL.Close() + opt := attachOpt{ + skipValidation: r.config.skipClusterValidationOnAttach, + skipStop: r.config.skipClusterStopOnAttach, + skipWipe: r.config.skipClusterWipeOnAttach, + } + // TODO(srosenberg): we need to think about validation here. Attaching to an incompatible cluster, e.g., + // using arm64 AMI with amd64 binary, would result in obscure errors. The test runner ensures compatibility + // during cluster reuse, whereas attachment via CLI (e.g., via roachprod) does not. + lopt.l.PrintfCtx(ctx, "Attaching to existing cluster %s for test %s", existingClusterName, t.Name) + c, err := attachToExistingCluster(ctx, existingClusterName, clusterL, t.Cluster, opt, r.cr) + if err == nil { + // Pretend pre-existing's cluster architecture matches the desired one; see the above TODO wrt validation. + c.arch = arch + return c, nil, nil } - - cfg := clusterConfig{ - nameOverride: clustersOpt.clusterName, // only set if we hit errClusterFound above - spec: t.Cluster, - artifactsDir: artifactsDir, - username: clustersOpt.user, - localCluster: clustersOpt.typ == localCluster, - alloc: alloc, - arch: arch, + if !errors.Is(err, errClusterNotFound) { + return nil, nil, err } - return clusterFactory.newCluster(ctx, cfg, wStatus.SetStatus, lopt.tee) + // Fall through to create new cluster with name override. + lopt.l.PrintfCtx( + ctx, "Creating new cluster with custom name %q for test %s: %s (arch=%q)", + clustersOpt.clusterName, t.Name, t.Cluster, arch, + ) + } else { + lopt.l.PrintfCtx(ctx, "Creating new cluster for test %s: %s (arch=%q)", t.Name, t.Cluster, arch) } - return allocateCluster -} -type clusterAllocatorFn func( - ctx context.Context, - t registry.TestSpec, - arch vm.CPUArch, - alloc *quotapool.IntAlloc, - artifactsDir string, - wStatus *workerStatus, -) (*clusterImpl, *vm.CreateOpts, error) + cfg := clusterConfig{ + nameOverride: clustersOpt.clusterName, // only set if we hit errClusterFound above + spec: t.Cluster, + artifactsDir: lopt.artifactsDir, + username: clustersOpt.user, + localCluster: clustersOpt.typ == localCluster, + alloc: alloc, + arch: arch, + } + return clusterFactory.newCluster(ctx, cfg, wStatus.SetStatus, lopt.tee) +} // runWorker runs tests in a loop until work is exhausted. // @@ -509,37 +494,24 @@ type clusterAllocatorFn func( // // runWorker returns either error (other than cluster provisioning) or the count of cluster provisioning errors. // -// Args: -// name: The worker's name, to be used as a prefix for log messages. -// artifactsRootDir: The artifacts dir. Each test's logs are going to be under a -// -// run_ dir. If empty, test log files will not be created. -// -// literalArtifactsDir: The literal on-agent path where artifacts are stored. -// -// Only used for teamcity[publishArtifacts] messages. -// -// stdout: The Writer to use for messages that need to go to stdout (e.g. the +// The worker's name will be used as a prefix for log messages. // -// "=== RUN" and "--- FAIL" lines). -// -// teeOpt: The teeing option for future test loggers. -// l: The logger to use for more verbose messages. +// Each test's logs are going to be under a /run_ dir inside +// lotp.artifactsDir. If empty, test log files will not be created. func (r *testRunner) runWorker( ctx context.Context, name string, work *workPool, qp *quotapool.IntPool, interrupt <-chan struct{}, - debugMode debugMode, - artifactsRootDir string, - literalArtifactsDir string, - teeOpt logger.TeeOptType, - stdout io.Writer, - allocateCluster clusterAllocatorFn, + clusterFactory *clusterFactory, + clustersOpt clustersOpt, + lopt loggingOpt, topt testOpts, l *logger.Logger, ) error { + stdout := lopt.stdout + ctx = logtags.AddTag(ctx, name, nil /* value */) wStatus := r.addWorker(ctx, name) defer func() { @@ -704,7 +676,9 @@ func (r *testRunner) runWorker( // Create a new cluster if can't reuse or reuse attempt failed. // N.B. non-reusable cluster would have been destroyed above. wStatus.SetTest(nil /* test */, testToRun) - c, vmCreateOpts, clusterCreateErr = allocateCluster(ctx, testToRun.spec, arch, testToRun.alloc, artifactsRootDir, wStatus) + c, vmCreateOpts, clusterCreateErr = r.allocateCluster( + ctx, clusterFactory, clustersOpt, lopt, + testToRun.spec, arch, testToRun.alloc, wStatus) if clusterCreateErr != nil { atomic.AddInt32(&r.numClusterErrs, 1) shout(ctx, l, stdout, "Unable to create (or reuse) cluster for test %s due to: %s.", @@ -715,6 +689,7 @@ func (r *testRunner) runWorker( } // Prepare the test's logger. Always set this up with real files, using a // temp dir if necessary. This simplifies testing. + artifactsRootDir := lopt.artifactsDir if artifactsRootDir == "" { artifactsRootDir, _ = os.MkdirTemp("", "roachtest-logger") } @@ -728,9 +703,9 @@ func (r *testRunner) runWorker( // Map artifacts/TestFoo/run_?/** => TestFoo/run_?/**, i.e. collect the artifacts // for this test exactly as they are laid out on disk (when the time // comes). - artifactsSpec := fmt.Sprintf("%s/%s/** => %s/%s", filepath.Join(literalArtifactsDir, escapedTestName), runSuffix, escapedTestName, runSuffix) + artifactsSpec := fmt.Sprintf("%s/%s/** => %s/%s", filepath.Join(lopt.literalArtifactsDir, escapedTestName), runSuffix, escapedTestName, runSuffix) - testL, err := logger.RootLogger(logPath, teeOpt) + testL, err := logger.RootLogger(logPath, lopt.tee) if err != nil { return err } @@ -749,7 +724,7 @@ func (r *testRunner) runWorker( l: testL, versionsBinaryOverride: topt.versionsBinaryOverride, skipInit: topt.skipInit, - debug: debugMode.IsDebug(), + debug: clustersOpt.debugMode.IsDebug(), goCoverEnabled: topt.goCoverEnabled, } github := newGithubIssues(r.config.disableIssue, c, vmCreateOpts) @@ -863,7 +838,7 @@ func (r *testRunner) runWorker( failureMsg += t.failureMsg() } if c != nil { - switch debugMode { + switch clustersOpt.debugMode { case DebugKeepAlways, DebugKeepOnFailure: // Save the cluster for future debugging. c.Save(ctx, failureMsg, l) @@ -887,7 +862,7 @@ func (r *testRunner) runWorker( if t.spec.Benchmark { getPerfArtifacts(ctx, c, t) } - if debugMode == DebugKeepAlways { + if clustersOpt.debugMode == DebugKeepAlways { c.Save(ctx, "cluster saved since --debug-always set", l) c = nil } diff --git a/pkg/cmd/roachtest/test_test.go b/pkg/cmd/roachtest/test_test.go index a68fbb8cc017..067994e3a79c 100644 --- a/pkg/cmd/roachtest/test_test.go +++ b/pkg/cmd/roachtest/test_test.go @@ -30,7 +30,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/vm" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" @@ -97,17 +96,6 @@ func nilLogger() *logger.Logger { return l } -func alwaysFailingClusterAllocator( - ctx context.Context, - t registry.TestSpec, - arch vm.CPUArch, - alloc *quotapool.IntAlloc, - artifactsDir string, - wStatus *workerStatus, -) (*clusterImpl, *vm.CreateOpts, error) { - return nil, nil, errors.New("cluster creation failed") -} - func TestRunnerRun(t *testing.T) { ctx := context.Background() @@ -307,18 +295,20 @@ func assertTestCompletion( expectedErr string, ) { t.Helper() - require.True(t, len(completed) == len(tests)) - for _, info := range completed { + if !testutils.IsError(actualErr, expectedErr) { + t.Fatalf("expected err: %q, but found %v. Filters: %s", expectedErr, actualErr, filters) + } + + require.Equal(t, len(tests), len(completed), "len(completed) invalid") + + for i, info := range completed { if info.test == "pass" { - require.True(t, info.pass) + require.Truef(t, info.pass, "expected test %s to pass", tests[i].Name) } else if info.test == "fail" { - require.True(t, !info.pass) + require.Falsef(t, info.pass, "expected test %s to fail", tests[i].Name) } } - if !testutils.IsError(actualErr, expectedErr) { - t.Fatalf("expected err: %q, but found %v. Filters: %s", expectedErr, actualErr, filters) - } } type syncedBuffer struct {