Skip to content

Commit

Permalink
Merge #111534
Browse files Browse the repository at this point in the history
111534: roachtest: clean up allocate cluster function r=RaduBerinde a=RaduBerinde

#### roachtest: remove clusterAllocator argument

The only reason for this argument was so we could inject errors in one
test. This commit replaces it with a "pre allocate" function in the
cluster opts.

Epic: none
Release note: None

#### roachtest: clean up allocate cluster function

Replace the allocate cluster function with a method. This makes things
more clear, especially the lifetime of the cluster factory.

Epic: none
Release note: None

Co-authored-by: Radu Berinde <radu@cockroachlabs.com>
  • Loading branch information
craig[bot] and RaduBerinde committed Oct 4, 2023
2 parents b74d8c6 + fc571e3 commit 1eadb77
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 140 deletions.
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ go_test(
"//pkg/roachprod/vm/gce",
"//pkg/testutils",
"//pkg/testutils/echotest",
"//pkg/util/quotapool",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/version",
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ type clusterFactory struct {
namePrefix string
// counter is incremented with every new cluster. It's used as part of the cluster's name.
// Accessed atomically.
counter uint64
counter atomic.Uint64
// The registry with whom all clustered will be registered.
r *clusterRegistry
// artifactsDir is the directory in which the cluster creation log file will be placed.
Expand Down Expand Up @@ -831,7 +831,7 @@ func (f *clusterFactory) genName(cfg clusterConfig) string {
if cfg.nameOverride != "" {
return cfg.nameOverride
}
count := atomic.AddUint64(&f.counter, 1)
count := f.counter.Add(1)
return makeClusterName(
fmt.Sprintf("%s-%02d-%s", f.namePrefix, count, cfg.spec.String()))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func runTests(register func(registry.Registry), args []string, benchOnly bool) e
skipInit: skipInit,
goCoverEnabled: goCoverEnabled,
},
lopt, nil /* clusterAllocator */)
lopt)

// Make sure we attempt to clean up. We run with a non-canceled ctx; the
// ctx above might be canceled in case a signal was received. If that's
Expand Down
200 changes: 93 additions & 107 deletions pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ type clustersOpt struct {

// Controls whether the cluster is cleaned up at the end of the test.
debugMode debugMode

// preAllocateClusterFn is a function called right before allocating a
// cluster. It allows the caller to e.g. inject errors for testing.
preAllocateClusterFn func(
ctx context.Context,
t registry.TestSpec,
arch vm.CPUArch,
) error
}

type debugMode int
Expand Down Expand Up @@ -235,7 +243,6 @@ func (r *testRunner) Run(
clustersOpt clustersOpt,
topt testOpts,
lopt loggingOpt,
clusterAllocator clusterAllocatorFn,
) error {
// Validate options.
if len(tests) == 0 {
Expand Down Expand Up @@ -279,9 +286,11 @@ func (r *testRunner) Run(
}
}
}
if clusterAllocator == nil {
clusterAllocator = defaultClusterAllocator(r, clustersOpt, lopt)
}

clusterFactory := newClusterFactory(
clustersOpt.user, clustersOpt.clusterID, lopt.artifactsDir,
r.cr, numConcurrentClusterCreations(),
)

n := len(tests)
if n*count < parallelism {
Expand Down Expand Up @@ -312,9 +321,9 @@ func (r *testRunner) Run(
err := r.runWorker(
ctx, fmt.Sprintf("w%d", i) /* name */, r.work, qp,
r.stopper.ShouldQuiesce(),
clustersOpt.debugMode,
lopt.artifactsDir, lopt.literalArtifactsDir, lopt.tee, lopt.stdout,
clusterAllocator,
clusterFactory,
clustersOpt,
lopt,
topt,
l,
)
Expand Down Expand Up @@ -401,85 +410,72 @@ func generateRunID(cOpts clustersOpt) string {
return fmt.Sprintf("%s-%s", cOpts.user, cOpts.clusterID)
}

// defaultClusterAllocator is used by workers to create new clusters (or to attach
// to an existing one).
//
// N.B. the resulting clusterAllocatorFn reuses the same clusterFactory to allocate clusters.
func defaultClusterAllocator(
r *testRunner, clustersOpt clustersOpt, lopt loggingOpt,
) clusterAllocatorFn {
clusterFactory := newClusterFactory(
clustersOpt.user, clustersOpt.clusterID, lopt.artifactsDir, r.cr, numConcurrentClusterCreations())
func (r *testRunner) allocateCluster(
ctx context.Context,
clusterFactory *clusterFactory,
clustersOpt clustersOpt,
lopt loggingOpt,
t registry.TestSpec,
arch vm.CPUArch,
alloc *quotapool.IntAlloc,
wStatus *workerStatus,
) (*clusterImpl, *vm.CreateOpts, error) {
wStatus.SetStatus(fmt.Sprintf("creating cluster (arch=%q)", arch))
defer wStatus.SetStatus("")

allocateCluster := func(
ctx context.Context,
t registry.TestSpec,
arch vm.CPUArch,
alloc *quotapool.IntAlloc,
artifactsDir string,
wStatus *workerStatus,
) (*clusterImpl, *vm.CreateOpts, error) {
wStatus.SetStatus(fmt.Sprintf("creating cluster (arch=%q)", arch))
defer wStatus.SetStatus("")

existingClusterName := clustersOpt.clusterName
if existingClusterName != "" {
// Logs for attaching to a cluster go to a dedicated log file.
logPath := filepath.Join(artifactsDir, runnerLogsDir, "cluster-create", existingClusterName+".log")
clusterL, err := logger.RootLogger(logPath, lopt.tee)
if err != nil {
return nil, nil, err
}
defer clusterL.Close()
opt := attachOpt{
skipValidation: r.config.skipClusterValidationOnAttach,
skipStop: r.config.skipClusterStopOnAttach,
skipWipe: r.config.skipClusterWipeOnAttach,
}
// TODO(srosenberg): we need to think about validation here. Attaching to an incompatible cluster, e.g.,
// using arm64 AMI with amd64 binary, would result in obscure errors. The test runner ensures compatibility
// during cluster reuse, whereas attachment via CLI (e.g., via roachprod) does not.
lopt.l.PrintfCtx(ctx, "Attaching to existing cluster %s for test %s", existingClusterName, t.Name)
c, err := attachToExistingCluster(ctx, existingClusterName, clusterL, t.Cluster, opt, r.cr)
if err == nil {
// Pretend pre-existing's cluster architecture matches the desired one; see the above TODO wrt validation.
c.arch = arch
return c, nil, nil
}
if !errors.Is(err, errClusterNotFound) {
return nil, nil, err
}
// Fall through to create new cluster with name override.
lopt.l.PrintfCtx(
ctx, "Creating new cluster with custom name %q for test %s: %s (arch=%q)",
clustersOpt.clusterName, t.Name, t.Cluster, arch,
)
} else {
lopt.l.PrintfCtx(ctx, "Creating new cluster for test %s: %s (arch=%q)", t.Name, t.Cluster, arch)
if clustersOpt.preAllocateClusterFn != nil {
if err := clustersOpt.preAllocateClusterFn(ctx, t, arch); err != nil {
return nil, nil, err
}
}

cfg := clusterConfig{
nameOverride: clustersOpt.clusterName, // only set if we hit errClusterFound above
spec: t.Cluster,
artifactsDir: artifactsDir,
username: clustersOpt.user,
localCluster: clustersOpt.typ == localCluster,
alloc: alloc,
arch: arch,
existingClusterName := clustersOpt.clusterName
if existingClusterName != "" {
// Logs for attaching to a cluster go to a dedicated log file.
logPath := filepath.Join(lopt.artifactsDir, runnerLogsDir, "cluster-create", existingClusterName+".log")
clusterL, err := logger.RootLogger(logPath, lopt.tee)
if err != nil {
return nil, nil, err
}
defer clusterL.Close()
opt := attachOpt{
skipValidation: r.config.skipClusterValidationOnAttach,
skipStop: r.config.skipClusterStopOnAttach,
skipWipe: r.config.skipClusterWipeOnAttach,
}
// TODO(srosenberg): we need to think about validation here. Attaching to an incompatible cluster, e.g.,
// using arm64 AMI with amd64 binary, would result in obscure errors. The test runner ensures compatibility
// during cluster reuse, whereas attachment via CLI (e.g., via roachprod) does not.
lopt.l.PrintfCtx(ctx, "Attaching to existing cluster %s for test %s", existingClusterName, t.Name)
c, err := attachToExistingCluster(ctx, existingClusterName, clusterL, t.Cluster, opt, r.cr)
if err == nil {
// Pretend pre-existing's cluster architecture matches the desired one; see the above TODO wrt validation.
c.arch = arch
return c, nil, nil
}
if !errors.Is(err, errClusterNotFound) {
return nil, nil, err
}
return clusterFactory.newCluster(ctx, cfg, wStatus.SetStatus, lopt.tee)
// Fall through to create new cluster with name override.
lopt.l.PrintfCtx(
ctx, "Creating new cluster with custom name %q for test %s: %s (arch=%q)",
clustersOpt.clusterName, t.Name, t.Cluster, arch,
)
} else {
lopt.l.PrintfCtx(ctx, "Creating new cluster for test %s: %s (arch=%q)", t.Name, t.Cluster, arch)
}
return allocateCluster
}

type clusterAllocatorFn func(
ctx context.Context,
t registry.TestSpec,
arch vm.CPUArch,
alloc *quotapool.IntAlloc,
artifactsDir string,
wStatus *workerStatus,
) (*clusterImpl, *vm.CreateOpts, error)
cfg := clusterConfig{
nameOverride: clustersOpt.clusterName, // only set if we hit errClusterFound above
spec: t.Cluster,
artifactsDir: lopt.artifactsDir,
username: clustersOpt.user,
localCluster: clustersOpt.typ == localCluster,
alloc: alloc,
arch: arch,
}
return clusterFactory.newCluster(ctx, cfg, wStatus.SetStatus, lopt.tee)
}

// runWorker runs tests in a loop until work is exhausted.
//
Expand All @@ -498,37 +494,24 @@ type clusterAllocatorFn func(
//
// runWorker returns either error (other than cluster provisioning) or the count of cluster provisioning errors.
//
// Args:
// name: The worker's name, to be used as a prefix for log messages.
// artifactsRootDir: The artifacts dir. Each test's logs are going to be under a
//
// run_<n> dir. If empty, test log files will not be created.
//
// literalArtifactsDir: The literal on-agent path where artifacts are stored.
//
// Only used for teamcity[publishArtifacts] messages.
// The worker's name will be used as a prefix for log messages.
//
// stdout: The Writer to use for messages that need to go to stdout (e.g. the
//
// "=== RUN" and "--- FAIL" lines).
//
// teeOpt: The teeing option for future test loggers.
// l: The logger to use for more verbose messages.
// Each test's logs are going to be under a <test-name>/run_<n> dir inside
// lotp.artifactsDir. If empty, test log files will not be created.
func (r *testRunner) runWorker(
ctx context.Context,
name string,
work *workPool,
qp *quotapool.IntPool,
interrupt <-chan struct{},
debugMode debugMode,
artifactsRootDir string,
literalArtifactsDir string,
teeOpt logger.TeeOptType,
stdout io.Writer,
allocateCluster clusterAllocatorFn,
clusterFactory *clusterFactory,
clustersOpt clustersOpt,
lopt loggingOpt,
topt testOpts,
l *logger.Logger,
) error {
stdout := lopt.stdout

ctx = logtags.AddTag(ctx, name, nil /* value */)
wStatus := r.addWorker(ctx, name)
defer func() {
Expand Down Expand Up @@ -693,7 +676,9 @@ func (r *testRunner) runWorker(
// Create a new cluster if can't reuse or reuse attempt failed.
// N.B. non-reusable cluster would have been destroyed above.
wStatus.SetTest(nil /* test */, testToRun)
c, vmCreateOpts, clusterCreateErr = allocateCluster(ctx, testToRun.spec, arch, testToRun.alloc, artifactsRootDir, wStatus)
c, vmCreateOpts, clusterCreateErr = r.allocateCluster(
ctx, clusterFactory, clustersOpt, lopt,
testToRun.spec, arch, testToRun.alloc, wStatus)
if clusterCreateErr != nil {
atomic.AddInt32(&r.numClusterErrs, 1)
shout(ctx, l, stdout, "Unable to create (or reuse) cluster for test %s due to: %s.",
Expand All @@ -704,6 +689,7 @@ func (r *testRunner) runWorker(
}
// Prepare the test's logger. Always set this up with real files, using a
// temp dir if necessary. This simplifies testing.
artifactsRootDir := lopt.artifactsDir
if artifactsRootDir == "" {
artifactsRootDir, _ = os.MkdirTemp("", "roachtest-logger")
}
Expand All @@ -717,9 +703,9 @@ func (r *testRunner) runWorker(
// Map artifacts/TestFoo/run_?/** => TestFoo/run_?/**, i.e. collect the artifacts
// for this test exactly as they are laid out on disk (when the time
// comes).
artifactsSpec := fmt.Sprintf("%s/%s/** => %s/%s", filepath.Join(literalArtifactsDir, escapedTestName), runSuffix, escapedTestName, runSuffix)
artifactsSpec := fmt.Sprintf("%s/%s/** => %s/%s", filepath.Join(lopt.literalArtifactsDir, escapedTestName), runSuffix, escapedTestName, runSuffix)

testL, err := logger.RootLogger(logPath, teeOpt)
testL, err := logger.RootLogger(logPath, lopt.tee)
if err != nil {
return err
}
Expand All @@ -738,7 +724,7 @@ func (r *testRunner) runWorker(
l: testL,
versionsBinaryOverride: topt.versionsBinaryOverride,
skipInit: topt.skipInit,
debug: debugMode.IsDebug(),
debug: clustersOpt.debugMode.IsDebug(),
goCoverEnabled: topt.goCoverEnabled,
}
github := newGithubIssues(r.config.disableIssue, c, vmCreateOpts)
Expand Down Expand Up @@ -852,7 +838,7 @@ func (r *testRunner) runWorker(
failureMsg += t.failureMsg()
}
if c != nil {
switch debugMode {
switch clustersOpt.debugMode {
case DebugKeepAlways, DebugKeepOnFailure:
// Save the cluster for future debugging.
c.Save(ctx, failureMsg, l)
Expand All @@ -876,7 +862,7 @@ func (r *testRunner) runWorker(
if t.spec.Benchmark {
getPerfArtifacts(ctx, c, t)
}
if debugMode == DebugKeepAlways {
if clustersOpt.debugMode == DebugKeepAlways {
c.Save(ctx, "cluster saved since --debug-always set", l)
c = nil
}
Expand Down

0 comments on commit 1eadb77

Please sign in to comment.