Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
112661: roachtest: move quota inside test runner r=RaduBerinde a=RaduBerinde

Currently the quota is freed by the cluster when it is destroyed. But
this is not always what we want, so there is fragile logic around
removing the allocation from the cluster before destruction.

This change simplifies this by removing the cluster's responsibility
for the allocation. Now the main worker loop manages the quota
allocation, which makes things a lot simpler.

We reorganize the main loop to separately look for a test that can
reuse the cluster, and if that fails destroy the cluster and release
the allocation before looking for a new test. We now log when we
acquire, release, and wait for quota.

Epic: none
Release note: None

113878: cast_test: add refcursor type r=yuzefovich a=yuzefovich

Fixes: #111618.

Release note: None

113925: kvserver: remove rangefeed restarter r=erikgrinaker a=erikgrinaker

In 2e80942, we added paced rangefeed restarts when changing `kv.rangefeed.scheduler.enabled`. This could result in very uneven rangefeed distributions due to the subsequent retries, which overloaded nodes.

We don't have time to properly fix this for 23.2, so we instead remove the restarts and have operators pause/resume the changefeeds when changing the setting.

Resolves #112952.
Epic: none
Release note: None

Co-authored-by: Radu Berinde <radu@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Erik Grinaker <grinaker@cockroachlabs.com>
  • Loading branch information
4 people committed Nov 7, 2023
4 parents 85e4a51 + d547c16 + 85556e1 + 4b3ffa5 commit 7a0b7f4
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 656 deletions.
31 changes: 0 additions & 31 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -691,10 +690,6 @@ type destroyState struct {
// If not set, Destroy() only wipes the cluster.
owned bool

// alloc is set if owned is set. If set, it represents resources in a
// QuotaPool that need to be released when the cluster is destroyed.
alloc *quotapool.IntAlloc

mu struct {
syncutil.Mutex
loggerClosed bool
Expand Down Expand Up @@ -733,7 +728,6 @@ type clusterConfig struct {
username string
localCluster bool
useIOBarrier bool
alloc *quotapool.IntAlloc
// Specifies CPU architecture which may require a custom AMI and cockroach binary.
arch vm.CPUArch
// Specifies the OS which may require a custom AMI and cockroach binary.
Expand Down Expand Up @@ -853,10 +847,6 @@ func (f *clusterFactory) newCluster(
}
return c, nil, nil
}
// Ensure an allocation is specified.
if cfg.alloc == nil {
return nil, nil, errors.New("no allocation specified; cfg.alloc must be set")
}

if cfg.localCluster {
// Local clusters never expire.
Expand Down Expand Up @@ -884,9 +874,6 @@ func (f *clusterFactory) newCluster(
// that each create attempt gets a unique cluster name.
createVMOpts, providerOpts, err := cfg.spec.RoachprodOpts(params)
if err != nil {
// We must release the allocation because cluster creation is not possible at this point.
cfg.alloc.Release()

return nil, nil, err
}
if cfg.spec.Cloud != spec.Local {
Expand Down Expand Up @@ -937,7 +924,6 @@ func (f *clusterFactory) newCluster(
os: cfg.os,
destroyState: destroyState{
owned: true,
alloc: cfg.alloc,
},
l: l,
}
Expand All @@ -963,14 +949,8 @@ func (f *clusterFactory) newCluster(
}

l.PrintfCtx(ctx, "cluster creation failed, cleaning up in case it was partially created: %s", err)
// Set the alloc to nil so that Destroy won't release it.
// This is ugly, but given that the alloc is created very far away from this code
// (when selecting the test) it's the best we can do for now.
c.destroyState.alloc = nil
c.Destroy(ctx, closeLogger, l)
if i >= maxAttempts {
// Here we have to release the alloc, as we are giving up.
cfg.alloc.Release()
return nil, nil, err
}
// Try again to create the cluster.
Expand Down Expand Up @@ -1074,10 +1054,6 @@ func (c *clusterImpl) StopCockroachGracefullyOnNode(
// Save marks the cluster as "saved" so that it doesn't get destroyed.
func (c *clusterImpl) Save(ctx context.Context, msg string, l *logger.Logger) {
l.PrintfCtx(ctx, "saving cluster %s for debugging (--debug specified)", c)
// TODO(andrei): should we extend the cluster here? For how long?
if c.destroyState.owned && c.destroyState.alloc != nil { // we won't have an alloc for an unowned cluster
c.destroyState.alloc.Freeze()
}
c.r.markClusterAsSaved(c, msg)
c.destroyState.mu.Lock()
c.destroyState.mu.saved = true
Expand Down Expand Up @@ -1687,13 +1663,6 @@ func (c *clusterImpl) doDestroy(ctx context.Context, l *logger.Logger) <-chan st
} else {
l.PrintfCtx(ctx, "destroying cluster %s... done", c)
}
if c.destroyState.alloc != nil {
// We should usually have an alloc here, but if we're getting into this
// code path while retrying cluster creation, we don't want the alloc
// to be released (as we're going to retry cluster creation) and it will
// be nil here.
c.destroyState.alloc.Release()
}
} else {
l.PrintfCtx(ctx, "wiping cluster %s", c)
c.status("wiping cluster")
Expand Down
136 changes: 66 additions & 70 deletions pkg/cmd/roachtest/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ func (r *testRunner) allocateCluster(
lopt loggingOpt,
t registry.TestSpec,
arch vm.CPUArch,
alloc *quotapool.IntAlloc,
wStatus *workerStatus,
) (*clusterImpl, *vm.CreateOpts, error) {
wStatus.SetStatus(fmt.Sprintf("creating cluster (arch=%q)", arch))
Expand Down Expand Up @@ -472,7 +471,6 @@ func (r *testRunner) allocateCluster(
artifactsDir: lopt.artifactsDir,
username: clustersOpt.user,
localCluster: clustersOpt.typ == localCluster,
alloc: alloc,
arch: arch,
}
return clusterFactory.newCluster(ctx, cfg, wStatus.SetStatus, lopt.tee)
Expand Down Expand Up @@ -543,6 +541,17 @@ func (r *testRunner) runWorker(
}
}()

var alloc *quotapool.IntAlloc
defer func() {
// Release any quota, in case we exit from the loop from an error path.
if alloc != nil {
if alloc.Acquired() > 0 {
l.PrintfCtx(ctx, "Releasing quota for %s CPUs", alloc.String())
}
qp.Release(alloc)
}
}()

// Loop until there's no more work in the pool, we get interrupted, or an
// error occurs.
for {
Expand All @@ -558,48 +567,64 @@ func (r *testRunner) runWorker(
}

wStatus.SetTest(nil /* test */, testToRunRes{})
testToRun, err := r.getWork(ctx, work, qp, c, interrupt, l)
if err != nil {
// Problem selecting a test, bail out.
return err
}

// If we are reusing a cluster, wipe it.
if testToRun.canReuseCluster {
err := c.WipeForReuse(ctx, l, testToRun.spec.Cluster)
if err != nil {
shout(ctx, l, stdout, "Unable to reuse cluster: %s due to: %s. Will attempt to create a fresh one",
c.Name(), err)
// N.B. we do not count reuse attempt error toward clusterCreateErr.
// Let's attempt to create a fresh cluster.
testToRun.canReuseCluster = false
// We need an allocation quota to start a new cluster; steal it from the
// old cluster before we destroy it (we know the cluster configurations
// will be identical).
testToRun.alloc = c.destroyState.alloc
c.destroyState.alloc = nil
testToRun := testToRunRes{noWork: true}
if c != nil {
// Try to reuse cluster.
testToRun = work.selectTestForCluster(ctx, c.spec, r.cr)
if !testToRun.noWork {
// We found a test to run on this cluster. Wipe the cluster.
if err := c.WipeForReuse(ctx, l, testToRun.spec.Cluster); err != nil {
shout(ctx, l, stdout, "Unable to reuse cluster: %s due to: %s. Will attempt to create a fresh one",
c.Name(), err)
// We do not count reuse attempt error toward clusterCreateErr. Let's
// destroy the cluster and attempt to create a fresh cluster for the
// selected test.
//
// We don't release the quota allocation - the new cluster will be
// identical.
testToRun.canReuseCluster = false
// We use a context that can't be canceled for the Destroy().
c.Destroy(context.Background(), closeLogger, l)
wStatus.SetCluster(nil)
c = nil
}
}
}

// If we are not reusing a cluster (this includes the noWork case), destroy it.
if c != nil && !testToRun.canReuseCluster {
wStatus.SetStatus("destroying cluster")
// We failed to find a test that can take advantage of this cluster. So
// we're going to release it, which will deallocate its resources.
if testToRun.noWork {
l.PrintfCtx(ctx, "No more tests. Destroying %s.", c)
} else {
// We could not find a test that can reuse the cluster. Destroy the cluster
// and search for a new test.
if testToRun.noWork {
if c != nil {
wStatus.SetStatus("destroying cluster")
// We failed to find a test that can take advantage of this cluster. So
// we're going to release it, which will deallocate its resources.
l.PrintfCtx(ctx, "No tests that can reuse cluster %s found. Destroying.", c)
// We use a context that can't be canceled for the Destroy().
c.Destroy(context.Background(), closeLogger, l)
wStatus.SetCluster(nil)
c = nil
}
// We use a context that can't be canceled for the Destroy().
c.Destroy(context.Background(), closeLogger, l)
wStatus.SetCluster(nil)
c = nil
}

if testToRun.noWork {
shout(ctx, l, stdout, "no work remaining; runWorker is bailing out...")
return nil
// At this point, any previous cluster was destroyed; release any
// associated quota allocation.
if alloc != nil {
if alloc.Acquired() > 0 {
l.PrintfCtx(ctx, "Releasing quota for %s CPUs", alloc.String())
}
qp.Release(alloc)
alloc = nil
}

var err error
testToRun, alloc, err = work.selectTest(ctx, qp, l)
if err != nil {
return err
}
if testToRun.noWork {
shout(ctx, l, stdout, "No work remaining; runWorker is bailing out...")
return nil
}
}

// From this point onward, c != nil iff we are reusing the cluster.
Expand Down Expand Up @@ -628,7 +653,7 @@ func (r *testRunner) runWorker(
// TODO(radu): the arch is not guaranteed and another arch can be selected
// (in RoachprodOpts). All the code below using arch is incorrect in this
// case.
if err = VerifyLibraries(testToRun.spec.NativeLibs, arch); err != nil {
if err := VerifyLibraries(testToRun.spec.NativeLibs, arch); err != nil {
shout(ctx, l, stdout, "Library verification failed: %s", err)
return err
}
Expand All @@ -642,7 +667,7 @@ func (r *testRunner) runWorker(
wStatus.SetTest(nil /* test */, testToRun)
c, vmCreateOpts, clusterCreateErr = r.allocateCluster(
ctx, clusterFactory, clustersOpt, lopt,
testToRun.spec, arch, testToRun.alloc, wStatus)
testToRun.spec, arch, wStatus)
if clusterCreateErr != nil {
atomic.AddInt32(&r.numClusterErrs, 1)
shout(ctx, l, stdout, "Unable to create (or reuse) cluster for test %s due to: %s.",
Expand Down Expand Up @@ -827,6 +852,8 @@ func (r *testRunner) runWorker(
getPerfArtifacts(ctx, c, t)
}
if clustersOpt.debugMode == DebugKeepAlways {
alloc.Freeze()
alloc = nil
c.Save(ctx, "cluster saved since --debug-always set", l)
c = nil
}
Expand Down Expand Up @@ -1418,37 +1445,6 @@ func (r *testRunner) generateReport() string {
return msg
}

// getWork selects the next test to run and creates a suitable cluster for it if
// need be. If a new cluster needs to be created, the method blocks until there
// are enough resources available to run it.
// getWork takes in a cluster; if not nil, tests that can reuse it are
// preferred. If a test that can reuse it is not found (or if there's no more
// work), the cluster is destroyed (and so its resources are released).
func (r *testRunner) getWork(
ctx context.Context,
work *workPool,
qp *quotapool.IntPool,
c *clusterImpl,
interrupt <-chan struct{},
l *logger.Logger,
) (testToRunRes, error) {

select {
case <-interrupt:
return testToRunRes{}, fmt.Errorf("interrupted")
default:
}

testToRun, err := work.getTestToRun(ctx, c, qp, r.cr)
if err != nil {
return testToRunRes{}, err
}
if !testToRun.noWork {
l.PrintfCtx(ctx, "Selected test: %s run: %d.", testToRun.spec.Name, testToRun.runNum)
}
return testToRun, nil
}

// addWorker updates the bookkeeping for one more worker.
func (r *testRunner) addWorker(ctx context.Context, name string) *workerStatus {
r.workersMu.Lock()
Expand Down
Loading

0 comments on commit 7a0b7f4

Please sign in to comment.