diff --git a/go.mod b/go.mod index 4f55a76fc666..84ca647c1eb5 100644 --- a/go.mod +++ b/go.mod @@ -95,7 +95,6 @@ require ( github.com/andygrunwald/go-jira v1.14.0 github.com/apache/arrow/go/arrow v0.0.0-20200923215132-ac86123a3f01 github.com/apache/arrow/go/v11 v11.0.0 - github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.18.2 github.com/axiomhq/hyperloglog v0.0.0-20181223111420-4b99d0c2c99e github.com/bazelbuild/rules_go v0.26.0 diff --git a/go.sum b/go.sum index 8228d27d3d23..a1a465d1b713 100644 --- a/go.sum +++ b/go.sum @@ -296,7 +296,6 @@ github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2 github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY= github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw= -github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= diff --git a/pkg/ccl/changefeedccl/scheduled_changefeed_test.go b/pkg/ccl/changefeedccl/scheduled_changefeed_test.go index 0d74975ce95d..86e346752590 100644 --- a/pkg/ccl/changefeedccl/scheduled_changefeed_test.go +++ b/pkg/ccl/changefeedccl/scheduled_changefeed_test.go @@ -343,7 +343,7 @@ func TestCreateChangefeedScheduleIfNotExists(t *testing.T) { th.sqlDB.Exec(t, "CREATE TABLE t1 (a INT)") const scheduleLabel = "foo" - const createQuery = "CREATE SCHEDULE IF NOT EXISTS '%s' FOR CHANGEFEED TABLE t1 INTO 's3://bucket?AUTH=implicit' WITH initial_scan = 'only' RECURRING '@daily'" + const createQuery = "CREATE SCHEDULE IF NOT EXISTS '%s' FOR CHANGEFEED TABLE t1 INTO 'null://' WITH initial_scan = 'only' RECURRING '@daily'" th.sqlDB.Exec(t, fmt.Sprintf(createQuery, scheduleLabel)) @@ -387,7 +387,7 @@ func TestCreateChangefeedScheduleInExplicitTxnRollback(t *testing.T) { require.NoError(t, res.Err()) th.sqlDB.Exec(t, "BEGIN;") - th.sqlDB.Exec(t, "CREATE SCHEDULE FOR CHANGEFEED TABLE t1 INTO 's3://bucket?AUTH=implicit' WITH initial_scan = 'only' RECURRING '@daily';") + th.sqlDB.Exec(t, "CREATE SCHEDULE FOR CHANGEFEED TABLE t1 INTO 'null://' WITH initial_scan = 'only' RECURRING '@daily';") th.sqlDB.Exec(t, "ROLLBACK;") res = th.sqlDB.Query(t, "SELECT id FROM [SHOW SCHEDULES FOR CHANGEFEED]") diff --git a/pkg/cmd/roachtest/BUILD.bazel b/pkg/cmd/roachtest/BUILD.bazel index 4ecc4165fc4a..26894bde884d 100644 --- a/pkg/cmd/roachtest/BUILD.bazel +++ b/pkg/cmd/roachtest/BUILD.bazel @@ -46,7 +46,6 @@ go_library( "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/version", - "@com_github_armon_circbuf//:circbuf", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_logtags//:logtags", "@com_github_lib_pq//:pq", diff --git a/pkg/cmd/roachtest/cluster.go b/pkg/cmd/roachtest/cluster.go index 11199ede3cac..a791d5818683 100644 --- a/pkg/cmd/roachtest/cluster.go +++ b/pkg/cmd/roachtest/cluster.go @@ -32,7 +32,6 @@ import ( "sync/atomic" "time" - "github.com/armon/circbuf" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil" @@ -434,161 +433,6 @@ func initBinariesAndLibraries() { } } -// execCmd is like execCmdEx, but doesn't return the command's output. -func execCmd( - ctx context.Context, l *logger.Logger, clusterName string, secure bool, args ...string, -) error { - return execCmdEx(ctx, l, clusterName, secure, args...).err -} - -type cmdRes struct { - err error - // stdout and stderr are the commands output. Note that this is truncated and - // only a tail is returned. - stdout, stderr string -} - -// execCmdEx runs a command and returns its error and output. -// -// Note that the output is truncated; only a tail is returned. -// Also note that if the command exits with an error code, its output is also -// included in cmdRes.err. -func execCmdEx( - ctx context.Context, l *logger.Logger, clusterName string, secure bool, args ...string, -) cmdRes { - var cancel func() - ctx, cancel = context.WithCancel(ctx) - defer cancel() - - l.Printf("> %s\n", strings.Join(args, " ")) - var roachprodRunStdout, roachprodRunStderr io.Writer - - debugStdoutBuffer, _ := circbuf.NewBuffer(4096) - debugStderrBuffer, _ := circbuf.NewBuffer(4096) - - // Do a dance around https://github.com/golang/go/issues/23019. - // When the command we run launches a subprocess, that subprocess receives - // a copy of our Command's Stdout/Stderr file descriptor, which effectively - // means that the file descriptors close only when that subcommand returns. - // However, proactively killing the subcommand is not really possible - we - // will only manage to kill the parent process that we launched directly. - // In practice this means that if we try to react to context cancellation, - // the pipes we read the output from will wait for the *subprocess* to - // terminate, leaving us hanging, potentially indefinitely. - // To work around it, use pipes and set a read deadline on our (read) end of - // the pipes when we detect a context cancellation. - var closePipes func(ctx context.Context) - var wg sync.WaitGroup - { - - var wOut, wErr, rOut, rErr *os.File - var cwOnce sync.Once - closePipes = func(ctx context.Context) { - // Idempotently closes the writing end of the pipes. This is called either - // when the process returns or when it was killed due to context - // cancellation. In the former case, close the writing ends of the pipe - // so that the copy goroutines started below return (without missing any - // output). In the context cancellation case, we set a deadline to force - // the goroutines to quit eagerly. This is important since the command - // may have duplicated wOut and wErr to its possible subprocesses, which - // may continue to run for long periods of time, and would otherwise - // block this command. In theory this is possible also when the command - // returns on its own accord, so we set a (more lenient) deadline in the - // first case as well. - // - // NB: there's also the option (at least on *nix) to use a process group, - // but it doesn't look portable: - // https://medium.com/@felixge/killing-a-child-process-and-all-of-its-children-in-go-54079af94773 - cwOnce.Do(func() { - if wOut != nil { - _ = wOut.Close() - } - if wErr != nil { - _ = wErr.Close() - } - dur := 10 * time.Second // wait up to 10s for subprocesses - if ctx.Err() != nil { - dur = 10 * time.Millisecond - } - deadline := timeutil.Now().Add(dur) - if rOut != nil { - _ = rOut.SetReadDeadline(deadline) - } - if rErr != nil { - _ = rErr.SetReadDeadline(deadline) - } - }) - } - defer closePipes(ctx) - - var err error - rOut, wOut, err = os.Pipe() - if err != nil { - return cmdRes{err: err} - } - - rErr, wErr, err = os.Pipe() - if err != nil { - return cmdRes{err: err} - } - roachprodRunStdout = wOut - wg.Add(1) - go func() { - defer wg.Done() - _, _ = io.Copy(l.Stdout, io.TeeReader(rOut, debugStdoutBuffer)) - }() - - if l.Stderr == l.Stdout { - // If l.Stderr == l.Stdout, we use only one pipe to avoid - // duplicating everything. - roachprodRunStderr = wOut - } else { - roachprodRunStderr = wErr - wg.Add(1) - go func() { - defer wg.Done() - _, _ = io.Copy(l.Stderr, io.TeeReader(rErr, debugStderrBuffer)) - }() - } - } - - err := roachprod.Run(ctx, l, clusterName, "" /* SSHOptions */, "" /* processTag */, secure, roachprodRunStdout, roachprodRunStderr, args) - closePipes(ctx) - wg.Wait() - - stdoutString := debugStdoutBuffer.String() - if debugStdoutBuffer.TotalWritten() > debugStdoutBuffer.Size() { - stdoutString = "<... some data truncated by circular buffer; go to artifacts for details ...>\n" + stdoutString - } - stderrString := debugStderrBuffer.String() - if debugStderrBuffer.TotalWritten() > debugStderrBuffer.Size() { - stderrString = "<... some data truncated by circular buffer; go to artifacts for details ...>\n" + stderrString - } - - if err != nil { - // Context errors opaquely appear as "signal killed" when manifested. - // We surface this error explicitly. - if ctx.Err() != nil { - err = errors.CombineErrors(ctx.Err(), err) - } - - if err != nil { - err = &cluster.WithCommandDetails{ - Wrapped: err, - Cmd: strings.Join(args, " "), - Stderr: stderrString, - Stdout: stdoutString, - } - } - } - - return cmdRes{ - err: err, - stdout: stdoutString, - stderr: stderrString, - } -} - type clusterRegistry struct { mu struct { syncutil.Mutex @@ -2300,41 +2144,31 @@ func (c *clusterImpl) Run(ctx context.Context, node option.NodeListOption, args // will be redirected to a file which is logged via the cluster-wide logger in // case of an error. Logs will sort chronologically. Failing invocations will // have an additional marker file with a `.failed` extension instead of `.log`. -func (c *clusterImpl) RunE(ctx context.Context, node option.NodeListOption, args ...string) error { +func (c *clusterImpl) RunE(ctx context.Context, nodes option.NodeListOption, args ...string) error { if len(args) == 0 { return errors.New("No command passed") } - l, logFile, err := c.loggerForCmd(node, args...) + l, logFile, err := c.loggerForCmd(nodes, args...) if err != nil { return err } + defer l.Close() - if err := errors.Wrap(ctx.Err(), "cluster.RunE"); err != nil { - return err - } - err = execCmd(ctx, l, c.MakeNodes(node), c.IsSecure(), args...) - - l.Printf("> result: %+v", err) - if err := ctx.Err(); err != nil { - l.Printf("(note: incoming context was canceled: %s", err) - } - // We need to protect ourselves from a race where cluster logger is - // concurrently closed before child logger is created. In that case child - // logger will have no log file but would write to stderr instead and we can't - // create a meaningful ".failed" file for it. - physicalFileName := "" - if l.File != nil { - physicalFileName = l.File.Name() - } - l.Close() - if err != nil && len(physicalFileName) > 0 { - failedPhysicalFileName := strings.TrimSuffix(physicalFileName, ".log") + ".failed" - if failedFile, err2 := os.Create(failedPhysicalFileName); err2 != nil { - failedFile.Close() + cmd := strings.Join(args, " ") + c.t.L().Printf("running cmd `%s` on nodes [%v]; details in %s.log", roachprod.TruncateString(cmd, 30), nodes, logFile) + l.Printf("> %s", cmd) + if err := roachprod.Run(ctx, l, c.MakeNodes(nodes), "", "", c.IsSecure(), l.Stdout, l.Stderr, args); err != nil { + if err := ctx.Err(); err != nil { + l.Printf("(note: incoming context was canceled: %s)", err) + return err } + + l.Printf("> result: %s", err) + createFailedFile(l.File.Name()) + return errors.Wrapf(err, "full command output in %s.log", logFile) } - err = errors.Wrapf(err, "output in %s", logFile) - return err + l.Printf("> result: ") + return nil } // RunWithDetailsSingleNode is just like RunWithDetails but used when 1) operating @@ -2348,10 +2182,7 @@ func (c *clusterImpl) RunWithDetailsSingleNode( return install.RunResultDetails{}, errors.Newf("RunWithDetailsSingleNode received %d nodes. Use RunWithDetails if you need to run on multiple nodes.", len(nodes)) } results, err := c.RunWithDetails(ctx, testLogger, nodes, args...) - if err != nil { - return install.RunResultDetails{}, err - } - return results[0], results[0].Err + return results[0], errors.CombineErrors(err, results[0].Err) } // RunWithDetails runs a command on the specified nodes, returning the results @@ -2364,49 +2195,55 @@ func (c *clusterImpl) RunWithDetails( if len(args) == 0 { return nil, errors.New("No command passed") } - l, _, err := c.loggerForCmd(nodes, args...) + l, logFile, err := c.loggerForCmd(nodes, args...) if err != nil { return nil, err } - physicalFileName := "" - if l.File != nil { - physicalFileName = l.File.Name() - } + defer l.Close() - if err := ctx.Err(); err != nil { - l.Printf("(note: incoming context was canceled: %s", err) - return nil, err - } + cmd := strings.Join(args, " ") - l.Printf("running %s on nodes: %v", strings.Join(args, " "), nodes) + // This could probably be removed in favour of c.t.L() but it's used extensively in roachtests. if testLogger != nil { - testLogger.Printf("> %s\n", strings.Join(args, " ")) + testLogger.Printf("running cmd `%s` on nodes [%v]; details in %s.log", roachprod.TruncateString(cmd, 30), nodes, logFile) } + l.Printf("> %s", cmd) results, err := roachprod.RunWithDetails(ctx, l, c.MakeNodes(nodes), "" /* SSHOptions */, "" /* processTag */, c.IsSecure(), args) - if err != nil && len(physicalFileName) > 0 { - l.Printf("> result: %+v", err) - createFailedFile(physicalFileName) - return results, err + + logFileFull := l.File.Name() + if err != nil { + if ctxErr := ctx.Err(); ctxErr != nil { + l.Printf("(note: incoming context was canceled: %s)", err) + return nil, ctxErr + } + + l.Printf("> result: %s", err) + createFailedFile(logFileFull) + return nil, err } + hasError := false for _, result := range results { if result.Err != nil { - err = result.Err - l.Printf("> Error for Node %d: %+v", int(result.Node), result.Err) + hasError = true + l.Printf("> result: Error for Node %d: %+v", int(result.Node), result.Err) } } - if err != nil { - createFailedFile(physicalFileName) + if hasError { + createFailedFile(logFileFull) + } else { + l.Printf("> result: ") } - l.Close() return results, nil } -func createFailedFile(logFileName string) { - failedPhysicalFileName := strings.TrimSuffix(logFileName, ".log") + ".failed" - if failedFile, err2 := os.Create(failedPhysicalFileName); err2 != nil { - failedFile.Close() +func createFailedFile(logFile string) { + if logFile == "" { + return + } + if file, err := os.Create(strings.TrimSuffix(logFile, ".log") + ".failed"); err == nil { + file.Close() } } @@ -2495,7 +2332,7 @@ func (c *clusterImpl) ExternalPGUrl( return c.pgURLErr(ctx, l, node, true, tenant) } -func addrToAdminUIAddr(c *clusterImpl, addr string) (string, error) { +func addrToAdminUIAddr(addr string) (string, error) { host, port, err := net.SplitHostPort(addr) if err != nil { return "", err @@ -2547,7 +2384,7 @@ func (c *clusterImpl) InternalAdminUIAddr( return nil, err } for _, u := range urls { - adminUIAddr, err := addrToAdminUIAddr(c, u) + adminUIAddr, err := addrToAdminUIAddr(u) if err != nil { return nil, err } @@ -2567,7 +2404,7 @@ func (c *clusterImpl) ExternalAdminUIAddr( return nil, err } for _, u := range externalAddrs { - adminUIAddr, err := addrToAdminUIAddr(c, u) + adminUIAddr, err := addrToAdminUIAddr(u) if err != nil { return nil, err } diff --git a/pkg/cmd/roachtest/test_runner.go b/pkg/cmd/roachtest/test_runner.go index 05f27f15997a..faa032a42447 100644 --- a/pkg/cmd/roachtest/test_runner.go +++ b/pkg/cmd/roachtest/test_runner.go @@ -1111,16 +1111,19 @@ func (r *testRunner) runTest( t.ReplaceL(logger) } - // Awkward file name to keep it close to test.log. - l.Printf("running post test assertions (test-post-assertions.log)") - replaceLogger("test-post-assertions") - - // We still want to run the post-test assertions even if the test timed out as it - // might provide useful information about the health of the nodes. Any assertion failures - // will will be recorded against, and eventually fail, the test. - // TODO (miral): consider not running these assertions if the test has already failed - if err := r.postTestAssertions(ctx, t, c, 10*time.Minute); err != nil { - l.Printf("error during post test assertions: %v; see test-post-assertions.log for details", err) + if !t.Failed() { + // Awkward file name to keep it close to test.log. + l.Printf("running post test assertions (test-post-assertions.log)") + replaceLogger("test-post-assertions") + + // We still want to run the post-test assertions even if the test timed out as it + // might provide useful information about the health of the nodes. Any assertion failures + // will will be recorded against, and eventually fail, the test. + if err := r.postTestAssertions(ctx, t, c, 10*time.Minute); err != nil { + l.Printf("error during post test assertions: %v; see test-post-assertions.log for details", err) + } + } else { + l.Printf("skipping post test assertions as test failed") } // From now on, all logging goes to test-teardown.log to give a clear separation between diff --git a/pkg/kv/kvserver/asim/gen/generator.go b/pkg/kv/kvserver/asim/gen/generator.go index f897899c6c29..aa3889d49b45 100644 --- a/pkg/kv/kvserver/asim/gen/generator.go +++ b/pkg/kv/kvserver/asim/gen/generator.go @@ -192,12 +192,15 @@ type PlacementType int const ( Even PlacementType = iota Skewed + Random + WeightedRandom ) // BaseRanges provide fundamental range functionality and are embedded in // specialized range structs. These structs implement the RangeGen interface // which is then utilized to generate allocator simulation. Key structs that -// embed BaseRanges are: BasicRanges. +// embed BaseRanges are: BasicRanges, RandomizedBasicRanges, and +// WeightedRandomizedBasicRanges. type BaseRanges struct { Ranges int KeySpace int @@ -205,21 +208,27 @@ type BaseRanges struct { Bytes int64 } -// getRangesInfo generates and distributes ranges across stores based on +// GetRangesInfo generates and distributes ranges across stores based on // PlacementType while using other BaseRanges fields for range configuration. -func (b BaseRanges) getRangesInfo(pType PlacementType, numOfStores int) state.RangesInfo { +func (b BaseRanges) GetRangesInfo( + pType PlacementType, numOfStores int, randSource *rand.Rand, weightedRandom []float64, +) state.RangesInfo { switch pType { case Even: return state.RangesInfoEvenDistribution(numOfStores, b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes) case Skewed: return state.RangesInfoSkewedDistribution(numOfStores, b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes) + case Random: + return state.RangesInfoRandDistribution(randSource, numOfStores, b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes) + case WeightedRandom: + return state.RangesInfoWeightedRandDistribution(randSource, weightedRandom, b.Ranges, b.KeySpace, b.ReplicationFactor, b.Bytes) default: panic(fmt.Sprintf("unexpected range placement type %v", pType)) } } -// loadRangeInfo loads the given state with the specified rangesInfo. -func (b BaseRanges) loadRangeInfo(s state.State, rangesInfo state.RangesInfo) { +// LoadRangeInfo loads the given state with the specified rangesInfo. +func (b BaseRanges) LoadRangeInfo(s state.State, rangesInfo state.RangesInfo) { for _, rangeInfo := range rangesInfo { rangeInfo.Size = b.Bytes } @@ -239,8 +248,11 @@ type BasicRanges struct { func (br BasicRanges) Generate( seed int64, settings *config.SimulationSettings, s state.State, ) state.State { - rangesInfo := br.getRangesInfo(br.PlacementType, len(s.Stores())) - br.loadRangeInfo(s, rangesInfo) + if br.PlacementType == Random || br.PlacementType == WeightedRandom { + panic("BasicRanges generate only uniform or skewed distributions") + } + rangesInfo := br.GetRangesInfo(br.PlacementType, len(s.Stores()), nil, []float64{}) + br.LoadRangeInfo(s, rangesInfo) return s } diff --git a/pkg/kv/kvserver/asim/state/new_state.go b/pkg/kv/kvserver/asim/state/new_state.go index 4d0c9c870594..07de25c07a73 100644 --- a/pkg/kv/kvserver/asim/state/new_state.go +++ b/pkg/kv/kvserver/asim/state/new_state.go @@ -12,6 +12,7 @@ package state import ( "fmt" + "math/rand" "sort" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" @@ -65,6 +66,79 @@ func exactDistribution(counts []int) []float64 { return distribution } +// weighted struct handles weighted random index selection from an input array, +// weightedStores. +// +// For example, consider input weightedStores = [0.1, 0.2, 0.7]. +// - newWeighted constructs cumulative weighs, creating cumulativeWeighted [0.1, +// 0.3, 1.0]. +// - rand function then randomly selects a number n within the range of [0.0, +// 1.0) and finds which bucket ([0.0, 0.1], (0.1, 0.3], (0.3, 1.0]) n falls +// under. It finds the smallest index within cumulativeWeights that >= n. Thus, +// indices with greater weights have a higher probability of being selected as +// they cover larger cumulative weights range. For instance, if it selects 0.5, +// Rand would return index 2 since 0.7 is the smallest index that is >= 0.5. +type weighted struct { + cumulativeWeights []float64 +} + +// newWeighted constructs cumulative weights that are used later to select a +// single random index from weightedStores based on the associated weights. +func newWeighted(weightedStores []float64) weighted { + cumulativeWeights := make([]float64, len(weightedStores)) + prefixSumWeight := float64(0) + for i, item := range weightedStores { + prefixSumWeight += item + cumulativeWeights[i] = prefixSumWeight + } + if cumulativeWeights[len(weightedStores)-1] != float64(1) { + panic(fmt.Sprintf("total cumulative weights for all stores should sum up to one but got %.2f\n", + cumulativeWeights[len(weightedStores)-1])) + } + return weighted{cumulativeWeights: cumulativeWeights} +} + +// rand randomly picks an index from weightedStores based on the associated +// weights. +func (w weighted) rand(randSource *rand.Rand) int { + r := randSource.Float64() + index := sort.Search(len(w.cumulativeWeights), func(i int) bool { return w.cumulativeWeights[i] >= r }) + return index +} + +// weightedRandDistribution generates a weighted random distribution across +// stores. It achieves this by randomly selecting an index from weightedStores +// 10 times while considering the weights, and repeating this process ten times. +// The output is a weighted random distribution reflecting the selections made. +func weightedRandDistribution(randSource *rand.Rand, weightedStores []float64) []float64 { + w := newWeighted(weightedStores) + numSamples := 10 + votes := make([]int, len(weightedStores)) + for i := 0; i < numSamples; i++ { + index := w.rand(randSource) + votes[index] += 1 + } + return exactDistribution(votes) +} + +// randDistribution generates a random distribution across stores. It achieves +// this by creating an array of size n, selecting random numbers from [0, 10) +// for each index, and returning the exact distribution of this result. +func randDistribution(randSource *rand.Rand, n int) []float64 { + total := float64(0) + distribution := make([]float64, n) + for i := 0; i < n; i++ { + num := float64(randSource.Intn(10)) + distribution[i] = num + total += num + } + + for i := 0; i < n; i++ { + distribution[i] = distribution[i] / total + } + return distribution +} + // RangesInfoWithDistribution returns a RangesInfo, where the stores given are // initialized with the specified % of the replicas. This is done on a best // effort basis, given the replication factor. It may be impossible to satisfy @@ -250,6 +324,61 @@ func RangesInfoEvenDistribution( int64(MinKey), int64(keyspace), rangeSize) } +// RangesInfoWeightedRandDistribution returns a RangesInfo, where ranges are +// generated with a weighted random distribution across stores. +func RangesInfoWeightedRandDistribution( + randSource *rand.Rand, + weightedStores []float64, + ranges int, + keyspace int, + replicationFactor int, + rangeSize int64, +) RangesInfo { + if randSource == nil || len(weightedStores) == 0 { + panic("randSource cannot be nil and weightedStores must be non-empty in order to generate weighted random range info") + } + distribution := weightedRandDistribution(randSource, weightedStores) + storeList := makeStoreList(len(weightedStores)) + spanConfig := defaultSpanConfig + spanConfig.NumReplicas = int32(replicationFactor) + spanConfig.NumVoters = int32(replicationFactor) + return RangesInfoWithDistribution( + storeList, + distribution, + distribution, + ranges, + spanConfig, + int64(MinKey), + int64(keyspace), + rangeSize, /* rangeSize */ + ) +} + +// RangesInfoRandDistribution returns a RangesInfo, where ranges are generated +// with a random distribution across stores. +func RangesInfoRandDistribution( + randSource *rand.Rand, + stores int, + ranges int, + keyspace int, + replicationFactor int, + rangeSize int64, +) RangesInfo { + if randSource == nil { + panic("randSource cannot be nil in order to generate random range info") + } + distribution := randDistribution(randSource, stores) + storeList := makeStoreList(stores) + + spanConfig := defaultSpanConfig + spanConfig.NumReplicas = int32(replicationFactor) + spanConfig.NumVoters = int32(replicationFactor) + + return RangesInfoWithDistribution( + storeList, distribution, distribution, ranges, spanConfig, + int64(MinKey), int64(keyspace), rangeSize) +} + // NewStateWithDistribution returns a State where the stores given are // initialized with the specified % of the replicas. This is done on a best // effort basis, given the replication factor. It may be impossible to satisfy @@ -320,3 +449,35 @@ func NewStateSkewedDistribution( rangesInfo := RangesInfoSkewedDistribution(stores, ranges, keyspace, replicationFactor, 0 /* rangeSize */) return LoadConfig(clusterInfo, rangesInfo, settings) } + +// NewStateRandDistribution returns a new State where the replica count per +// store is randomized. +func NewStateRandDistribution( + seed int64, + stores int, + ranges int, + keyspace int, + replicationFactor int, + settings *config.SimulationSettings, +) State { + randSource := rand.New(rand.NewSource(seed)) + clusterInfo := ClusterInfoWithStoreCount(stores, 1 /* storesPerNode */) + rangesInfo := RangesInfoRandDistribution(randSource, stores, ranges, keyspace, replicationFactor, 0 /* rangeSize */) + return LoadConfig(clusterInfo, rangesInfo, settings) +} + +// NewStateWeightedRandDistribution returns a new State where the replica count +// per store is weighted randomized based on weightedStores. +func NewStateWeightedRandDistribution( + seed int64, + weightedStores []float64, + ranges int, + keyspace int, + replicationFactor int, + settings *config.SimulationSettings, +) State { + randSource := rand.New(rand.NewSource(seed)) + clusterInfo := ClusterInfoWithStoreCount(len(weightedStores), 1 /* storesPerNode */) + rangesInfo := RangesInfoWeightedRandDistribution(randSource, weightedStores, ranges, keyspace, replicationFactor, 0 /* rangeSize */) + return LoadConfig(clusterInfo, rangesInfo, settings) +} diff --git a/pkg/kv/kvserver/asim/state/state_test.go b/pkg/kv/kvserver/asim/state/state_test.go index 0f8208f2938d..b83012f8d4c9 100644 --- a/pkg/kv/kvserver/asim/state/state_test.go +++ b/pkg/kv/kvserver/asim/state/state_test.go @@ -384,13 +384,20 @@ func TestOrderedStateLists(t *testing.T) { // Test a skewed distribution with 100 stores, 10k ranges and 1m keyspace. s = NewStateSkewedDistribution(100, 10000, 3, 1000000, settings) assertListsOrdered(s) + + const defaultSeed = 42 + s = NewStateRandDistribution(defaultSeed, 7, 1400, 10000, 3, settings) + assertListsOrdered(s) + + s = NewStateWeightedRandDistribution(defaultSeed, []float64{0.0, 0.1, 0.3, 0.6}, 1400, 10000, 3, settings) + assertListsOrdered(s) } // TestNewStateDeterministic asserts that the state returned from the new state // utility functions is deterministic. func TestNewStateDeterministic(t *testing.T) { settings := config.DefaultSimulationSettings() - + const defaultSeed = 42 testCases := []struct { desc string newStateFn func() State @@ -409,6 +416,18 @@ func TestNewStateDeterministic(t *testing.T) { return NewStateWithDistribution([]float64{0.2, 0.2, 0.2, 0.2, 0.2}, 5, 3, 10000, settings) }, }, + { + desc: "rand distribution ", + newStateFn: func() State { + return NewStateRandDistribution(defaultSeed, 7, 1400, 10000, 3, settings) + }, + }, + { + desc: "weighted rand distribution ", + newStateFn: func() State { + return NewStateWeightedRandDistribution(defaultSeed, []float64{0.0, 0.1, 0.3, 0.6}, 1400, 10000, 3, settings) + }, + }, } for _, tc := range testCases { @@ -421,6 +440,35 @@ func TestNewStateDeterministic(t *testing.T) { } } +// TestRandDistribution asserts that the distribution returned from +// randDistribution and weightedRandDistribution sum up to 1. +func TestRandDistribution(t *testing.T) { + const defaultSeed = 42 + randSource := rand.New(rand.NewSource(defaultSeed)) + testCases := []struct { + desc string + distribution []float64 + }{ + { + desc: "random distribution", + distribution: randDistribution(randSource, 7), + }, + { + desc: "weighted random distribution", + distribution: weightedRandDistribution(randSource, []float64{0.0, 0.1, 0.3, 0.6}), + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + total := float64(0) + for i := 0; i < len(tc.distribution); i++ { + total += tc.distribution[i] + } + require.Equal(t, float64(1), total) + }) + } +} + // TestSplitRangeDeterministic asserts that range splits are deterministic. func TestSplitRangeDeterministic(t *testing.T) { settings := config.DefaultSimulationSettings() diff --git a/pkg/kv/kvserver/asim/tests/default_settings.go b/pkg/kv/kvserver/asim/tests/default_settings.go index 5bfe875ec4da..a8fce04d8b83 100644 --- a/pkg/kv/kvserver/asim/tests/default_settings.go +++ b/pkg/kv/kvserver/asim/tests/default_settings.go @@ -64,7 +64,7 @@ func defaultLoadGen() gen.BasicLoad { const ( defaultRanges = 1 defaultPlacementType = gen.Even - defaultReplicationFactor = 1 + defaultReplicationFactor = 3 defaultBytes = 0 ) @@ -108,3 +108,24 @@ func defaultPlotSettings() plotSettings { width: defaultWidth, } } + +type rangeGenSettings struct { + rangeKeyGenType generatorType + keySpaceGenType generatorType + weightedRand []float64 +} + +const ( + defaultRangeKeyGenType = uniformGenerator + defaultKeySpaceGenType = uniformGenerator +) + +var defaultWeightedRand []float64 + +func defaultRangeGenSettings() rangeGenSettings { + return rangeGenSettings{ + rangeKeyGenType: defaultRangeKeyGenType, + keySpaceGenType: defaultKeySpaceGenType, + weightedRand: defaultWeightedRand, + } +} diff --git a/pkg/kv/kvserver/asim/tests/rand_framework.go b/pkg/kv/kvserver/asim/tests/rand_framework.go index 4b2d926fd8c4..e63bbb3df74e 100644 --- a/pkg/kv/kvserver/asim/tests/rand_framework.go +++ b/pkg/kv/kvserver/asim/tests/rand_framework.go @@ -13,6 +13,7 @@ package tests import ( "context" "fmt" + "math" "math/rand" "strings" "testing" @@ -40,10 +41,28 @@ type testSettings struct { randSource *rand.Rand assertions []SimulationAssertion randOptions testRandOptions + rangeGen rangeGenSettings } type randTestingFramework struct { - s testSettings + s testSettings + rangeGenerator generator + keySpaceGenerator generator +} + +func newRandTestingFramework(settings testSettings) randTestingFramework { + if int64(defaultMaxRange) > defaultMinKeySpace { + panic(fmt.Sprintf( + "Max number of ranges specified (%d) is greater than number of keys in key space (%d) ", + defaultMaxRange, defaultMinKeySpace)) + } + rangeGenerator := newGenerator(settings.randSource, defaultMinRange, defaultMaxRange, settings.rangeGen.rangeKeyGenType) + keySpaceGenerator := newGenerator(settings.randSource, defaultMinKeySpace, defaultMaxKeySpace, settings.rangeGen.keySpaceGenType) + return randTestingFramework{ + s: settings, + rangeGenerator: rangeGenerator, + keySpaceGenerator: keySpaceGenerator, + } } func (f randTestingFramework) getCluster() gen.ClusterGen { @@ -57,7 +76,7 @@ func (f randTestingFramework) getRanges() gen.RangeGen { if !f.s.randOptions.ranges { return defaultBasicRangesGen() } - return gen.BasicRanges{} + return f.randomBasicRangesGen() } func (f randTestingFramework) getLoad() gen.LoadGen { @@ -166,3 +185,46 @@ func checkAssertions( } return false, "" } + +const ( + defaultMinRange = 1 + defaultMaxRange = 1000 + defaultMinKeySpace = 1000 + defaultMaxKeySpace = 200000 +) + +func convertInt64ToInt(num int64) int { + if num < math.MinInt32 || num > math.MaxUint32 { + // Theoretically, this should be impossible given that we have defined + // min and max boundaries for ranges and key space. + panic(fmt.Sprintf("num overflows the max value or min value of int32 %d", num)) + } + return int(num) +} + +func (f randTestingFramework) randomBasicRangesGen() gen.RangeGen { + if len(f.s.rangeGen.weightedRand) == 0 { + return RandomizedBasicRanges{ + BaseRanges: gen.BaseRanges{ + Ranges: convertInt64ToInt(f.rangeGenerator.key()), + KeySpace: convertInt64ToInt(f.keySpaceGenerator.key()), + ReplicationFactor: defaultReplicationFactor, + Bytes: defaultBytes, + }, + placementType: gen.Random, + randSource: f.s.randSource, + } + } else { + return WeightedRandomizedBasicRanges{ + BaseRanges: gen.BaseRanges{ + Ranges: convertInt64ToInt(f.rangeGenerator.key()), + KeySpace: convertInt64ToInt(f.keySpaceGenerator.key()), + ReplicationFactor: defaultReplicationFactor, + Bytes: defaultBytes, + }, + placementType: gen.WeightedRandom, + randSource: f.s.randSource, + weightedRand: f.s.rangeGen.weightedRand, + } + } +} diff --git a/pkg/kv/kvserver/asim/tests/rand_gen.go b/pkg/kv/kvserver/asim/tests/rand_gen.go index 2c4111dc13e9..ce3ae50dfd4a 100644 --- a/pkg/kv/kvserver/asim/tests/rand_gen.go +++ b/pkg/kv/kvserver/asim/tests/rand_gen.go @@ -11,8 +11,10 @@ package tests import ( + "fmt" "math/rand" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/config" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/gen" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/asim/state" ) @@ -23,3 +25,122 @@ func (f randTestingFramework) randomClusterInfoGen(randSource *rand.Rand) gen.Lo chosenType := state.ClusterOptions[chosenIndex] return loadClusterInfo(chosenType) } + +// RandomizedBasicRanges implements the RangeGen interface, supporting random +// range info distribution. +type RandomizedBasicRanges struct { + gen.BaseRanges + placementType gen.PlacementType + randSource *rand.Rand +} + +var _ gen.RangeGen = &RandomizedBasicRanges{} + +func (r RandomizedBasicRanges) Generate( + seed int64, settings *config.SimulationSettings, s state.State, +) state.State { + if r.placementType != gen.Random { + panic("RandomizedBasicRanges generate only randomized distributions") + } + rangesInfo := r.GetRangesInfo(r.placementType, len(s.Stores()), r.randSource, []float64{}) + r.LoadRangeInfo(s, rangesInfo) + return s +} + +// WeightedRandomizedBasicRanges implements the RangeGen interface, supporting +// weighted random range info distribution. +type WeightedRandomizedBasicRanges struct { + gen.BaseRanges + placementType gen.PlacementType + randSource *rand.Rand + weightedRand []float64 +} + +var _ gen.RangeGen = &WeightedRandomizedBasicRanges{} + +func (wr WeightedRandomizedBasicRanges) Generate( + seed int64, settings *config.SimulationSettings, s state.State, +) state.State { + if wr.placementType != gen.WeightedRandom || len(wr.weightedRand) == 0 { + panic("RandomizedBasicRanges generate only weighted randomized distributions with non-empty weightedRand") + } + rangesInfo := wr.GetRangesInfo(wr.placementType, len(s.Stores()), wr.randSource, wr.weightedRand) + wr.LoadRangeInfo(s, rangesInfo) + return s +} + +// TODO(wenyihu6): Instead of duplicating the key generator logic in simulators, +// we should directly reuse the code from the repo pkg/workload/(kv|ycsb) to +// ensure consistent testing. + +// generator generates both ranges and keyspace parameters for ranges +// generations. +type generator interface { + key() int64 +} + +type uniformKeyGenerator struct { + min, max int64 + random *rand.Rand +} + +// newUniformKeyGen returns a generator that generates number∈[min, max] with a +// uniform distribution. +func newUniformKeyGen(min, max int64, rand *rand.Rand) generator { + if max <= min { + panic(fmt.Sprintf("max (%d) must be greater than min (%d)", max, min)) + } + return &uniformKeyGenerator{ + min: min, + max: max, + random: rand, + } +} + +func (g *uniformKeyGenerator) key() int64 { + return g.random.Int63n(g.max-g.min) + g.min +} + +type zipfianKeyGenerator struct { + min, max int64 + random *rand.Rand + zipf *rand.Zipf +} + +// newZipfianKeyGen returns a generator that generates number ∈[min, max] with a +// zipfian distribution. +func newZipfianKeyGen(min, max int64, s float64, v float64, random *rand.Rand) generator { + if max <= min { + panic(fmt.Sprintf("max (%d) must be greater than min (%d)", max, min)) + } + return &zipfianKeyGenerator{ + min: min, + max: max, + random: random, + zipf: rand.NewZipf(random, s, v, uint64(max-min)), + } +} + +func (g *zipfianKeyGenerator) key() int64 { + return int64(g.zipf.Uint64()) + g.min +} + +type generatorType int + +const ( + uniformGenerator generatorType = iota + zipfGenerator +) + +// newGenerator returns a generator that generates number ∈[min, max] following +// a distribution based on gType. +func newGenerator(randSource *rand.Rand, iMin int64, iMax int64, gType generatorType) generator { + switch gType { + case uniformGenerator: + return newUniformKeyGen(iMin, iMax, randSource) + case zipfGenerator: + return newZipfianKeyGen(iMin, iMax, 1.1, 1, randSource) + default: + panic(fmt.Sprintf("unexpected generator type %v", gType)) + } +} diff --git a/pkg/kv/kvserver/asim/tests/rand_test.go b/pkg/kv/kvserver/asim/tests/rand_test.go index e47e155b722c..6036e1fa492e 100644 --- a/pkg/kv/kvserver/asim/tests/rand_test.go +++ b/pkg/kv/kvserver/asim/tests/rand_test.go @@ -17,13 +17,13 @@ import ( ) const ( - defaultNumIterations = 5 + defaultNumIterations = 3 defaultSeed = 42 defaultDuration = 30 * time.Minute defaultVerbosity = false ) -func defaultSettings(randOptions testRandOptions) testSettings { +func defaultSettings(randOptions testRandOptions, rGenSettings rangeGenSettings) testSettings { return testSettings{ numIterations: defaultNumIterations, duration: defaultDuration, @@ -31,6 +31,7 @@ func defaultSettings(randOptions testRandOptions) testSettings { randSource: rand.New(rand.NewSource(defaultSeed)), assertions: defaultAssertions(), randOptions: randOptions, + rangeGen: rGenSettings, } } @@ -39,6 +40,7 @@ func defaultSettings(randOptions testRandOptions) testSettings { // allocator simulations, and validating assertions on the final state. // // Input of the framework (fields in the testSetting struct): +// // 1. numIterations (int, default: 3): specifies number of test iterations to be // run, each with different random configurations generated // 2. duration (time.Duration, default: 30min): defined simulated duration of @@ -49,6 +51,7 @@ func defaultSettings(randOptions testRandOptions) testSettings { // 4. assertions ([]SimulationAssertion, default: conformanceAssertion with 0 // under-replication, 0 over-replication, 0 violating, and 0 unavailable): // defines criteria for validation assertions +// // 5. randOptions: guides the aspect of the test configuration that should be // randomized. This includes: // - cluster (bool): indicates if the cluster configuration should be randomized @@ -59,6 +62,15 @@ func defaultSettings(randOptions testRandOptions) testSettings { // - staticEvents (bool): indicates if static events, including any delayed // events to be applied during the simulation, should be randomized // +// 6. rangeGen (default: uniform rangeGenType, uniform keySpaceGenType, empty +// weightedRand). +// - rangeKeyGenType: determines range generator type across iterations +// (default: uniformGenerator, min = 1, max = 1000) +// - keySpaceGenType: determines key space generator type across iterations +// (default: uniformGenerator, min = 1000, max = 200000) +// - weightedRand: if non-empty, enables weighted randomization for range +// distribution +// // RandTestingFramework is initialized with a specified testSetting and // maintained its state across all iterations. Each iteration in // RandTestingFramework executes the following steps: @@ -71,14 +83,13 @@ func defaultSettings(randOptions testRandOptions) testSettings { func TestRandomized(t *testing.T) { randOptions := testRandOptions{ cluster: true, - ranges: false, + ranges: true, load: false, staticSettings: false, staticEvents: false, } - settings := defaultSettings(randOptions) - f := randTestingFramework{ - s: settings, - } + rangeGenSettings := defaultRangeGenSettings() + settings := defaultSettings(randOptions, rangeGenSettings) + f := newRandTestingFramework(settings) f.runRandTestRepeated(t) } diff --git a/pkg/kv/kvserver/asim/workload/workload.go b/pkg/kv/kvserver/asim/workload/workload.go index 831878ef83ec..91cb83989011 100644 --- a/pkg/kv/kvserver/asim/workload/workload.go +++ b/pkg/kv/kvserver/asim/workload/workload.go @@ -162,6 +162,10 @@ func (rwg *RandomGenerator) Tick(maxTime time.Time) LoadBatch { return ret } +// TODO(wenyihu6): Instead of duplicating the key generator logic in simulators, +// we should directly reuse the code from the repo pkg/workload/(kv|ycsb) to +// ensure consistent testing. + // KeyGenerator generates read and write keys. type KeyGenerator interface { writeKey() int64 diff --git a/pkg/roachprod/install/cluster_synced.go b/pkg/roachprod/install/cluster_synced.go index 734306ea14cf..20d39ab332e2 100644 --- a/pkg/roachprod/install/cluster_synced.go +++ b/pkg/roachprod/install/cluster_synced.go @@ -23,7 +23,6 @@ import ( "os/signal" "path/filepath" "runtime" - "sort" "strings" "sync" "syscall" @@ -135,7 +134,7 @@ var DefaultSSHRetryOpts = newRunRetryOpts(defaultRetryOpt, func(res *RunResultDe var noScpRetrySubstrings = []string{"no such file or directory", "permission denied", "connection timed out"} var defaultSCPRetry = newRunRetryOpts(defaultRetryOpt, func(res *RunResultDetails) bool { - out := strings.ToLower(res.Stderr) + out := strings.ToLower(res.Output(false)) for _, s := range noScpRetrySubstrings { if strings.Contains(out, s) { return false @@ -147,10 +146,14 @@ var defaultSCPRetry = newRunRetryOpts(defaultRetryOpt, // runWithMaybeRetry will run the specified function `f` at least once, or only // once if `runRetryOpts` is nil -// Any returned error from `f` is passed to `runRetryOpts.shouldRetryFn` which, +// +// Any RunResultDetails containing a non nil err from `f` is passed to `runRetryOpts.shouldRetryFn` which, // if it returns true, will result in `f` being retried using the `retryOpts` // If the `shouldRetryFn` is not specified (nil), then retries will be performed -// regardless of the previous result / error +// regardless of the previous result / error. +// +// If a non-nil error (as opposed to the result containing a non-nil error) is returned, +// the function will *not* be retried. // // We operate on a pointer to RunResultDetails as it has already been // captured in a *RunResultDetails[] in Run, but here we may enrich with attempt @@ -163,19 +166,24 @@ func runWithMaybeRetry( ) (*RunResultDetails, error) { if retryOpts == nil { res, err := f(ctx) + if err != nil { + return nil, err + } res.Attempt = 1 - return res, err + return res, nil } - var res *RunResultDetails - var err error - var cmdErr error + var res = &RunResultDetails{} + var cmdErr, err error for r := retry.StartWithCtx(ctx, retryOpts.Options); r.Next(); { res, err = f(ctx) + if err != nil { + // non retryable roachprod error + return nil, err + } res.Attempt = r.CurrentAttempt() + 1 - // nil err (non-nil denotes a roachprod error) indicates a potentially retryable res.Err - if err == nil && res.Err != nil { + if res.Err != nil { cmdErr = errors.CombineErrors(cmdErr, res.Err) if retryOpts.shouldRetryFn == nil || retryOpts.shouldRetryFn(res) { l.Printf("encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, retryOpts.MaxRetries+1) @@ -194,7 +202,7 @@ func runWithMaybeRetry( l.Printf("command successful after %v attempts", res.Attempt) } } - return res, err + return res, nil } func scpWithRetry( @@ -421,9 +429,7 @@ func (c *SyncedCluster) kill( // `kill -9` without wait is never what a caller wants. See #77334. wait = true } - return c.Parallel(ctx, l, len(c.Nodes), func(ctx context.Context, i int) (*RunResultDetails, error) { - node := c.Nodes[i] - + return c.Parallel(ctx, l, c.Nodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { var waitCmd string if wait { waitCmd = fmt.Sprintf(` @@ -468,13 +474,7 @@ fi`, waitCmd, // [5] ) - sess := c.newSession(l, node, cmd, withDebugName("node-"+cmdName)) - defer sess.Close() - - out, cmdErr := sess.CombinedOutput(ctx) - res := newRunResultDetails(node, cmdErr) - res.CombinedOut = out - return res, res.Err + return c.runCmdOnSingleNode(ctx, l, node, cmd, defaultCmdOpts("kill")) }, WithDisplay(display), WithRetryOpts(nil)) // Disable SSH Retries } @@ -484,8 +484,7 @@ func (c *SyncedCluster) Wipe(ctx context.Context, l *logger.Logger, preserveCert if err := c.Stop(ctx, l, 9, true /* wait */, 0 /* maxWait */); err != nil { return err } - return c.Parallel(ctx, l, len(c.Nodes), func(ctx context.Context, i int) (*RunResultDetails, error) { - node := c.Nodes[i] + return c.Parallel(ctx, l, c.Nodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { var cmd string if c.IsLocal() { // Not all shells like brace expansion, so we'll do it here @@ -495,7 +494,7 @@ func (c *SyncedCluster) Wipe(ctx context.Context, l *logger.Logger, preserveCert dirs = append(dirs, "tenant-certs*") } for _, dir := range dirs { - cmd += fmt.Sprintf(`rm -fr %s/%s ;`, c.localVMDir(c.Nodes[i]), dir) + cmd += fmt.Sprintf(`rm -fr %s/%s ;`, c.localVMDir(node), dir) } } else { rmCmds := []string{ @@ -509,13 +508,7 @@ func (c *SyncedCluster) Wipe(ctx context.Context, l *logger.Logger, preserveCert cmd = strings.Join(rmCmds, " && ") } - sess := c.newSession(l, node, cmd, withDebugName("node-wipe")) - defer sess.Close() - - out, cmdErr := sess.CombinedOutput(ctx) - res := newRunResultDetails(node, cmdErr) - res.CombinedOut = out - return res, res.Err + return c.runCmdOnSingleNode(ctx, l, node, cmd, defaultCmdOpts("wipe")) }, WithDisplay(display)) } @@ -531,10 +524,7 @@ type NodeStatus struct { // Status TODO(peter): document func (c *SyncedCluster) Status(ctx context.Context, l *logger.Logger) ([]NodeStatus, error) { display := fmt.Sprintf("%s: status", c.Name) - results := make([]NodeStatus, len(c.Nodes)) - if err := c.Parallel(ctx, l, len(c.Nodes), func(ctx context.Context, i int) (*RunResultDetails, error) { - node := c.Nodes[i] - + res, _, err := c.ParallelE(ctx, l, c.Nodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { binary := cockroachNodeBinary(c, node) cmd := fmt.Sprintf(`out=$(ps axeww -o pid -o ucomm -o command | \ sed 's/export ROACHPROD=//g' | \ @@ -550,37 +540,28 @@ else echo ${out} fi ` - sess := c.newSession(l, node, cmd, withDebugName("node-status")) - defer sess.Close() - - out, cmdErr := sess.CombinedOutput(ctx) - res := newRunResultDetails(node, cmdErr) - res.CombinedOut = out + return c.runCmdOnSingleNode(ctx, l, node, cmd, defaultCmdOpts("status")) + }, WithDisplay(display)) - if res.Err != nil { - return res, errors.Wrapf(res.Err, "~ %s\n%s", cmd, res.CombinedOut) - } + if err != nil { + return nil, err + } - msg := strings.TrimSpace(string(res.CombinedOut)) + statuses := make([]NodeStatus, len(c.Nodes)) + for i, res := range res { + msg := strings.TrimSpace(res.CombinedOut) if msg == "" || strings.HasPrefix(msg, "not-running") { - results[i] = NodeStatus{Running: false} + statuses[i] = NodeStatus{NodeID: int(res.Node), Running: false} if msg != "" { info := strings.Split(msg, " ") - results[i].Version = info[1] + statuses[i].Version = info[1] } - return res, nil + continue } info := strings.Split(msg, " ") - results[i] = NodeStatus{Running: true, Version: info[0], Pid: info[1]} - - return res, nil - }, WithDisplay(display)); err != nil { - return nil, err + statuses[i] = NodeStatus{NodeID: int(res.Node), Running: true, Version: info[0], Pid: info[1]} } - for i := 0; i < len(results); i++ { - results[i].NodeID = int(c.Nodes[i]) - } - return results, nil + return statuses, nil } // MonitorNodeSkipped represents a node whose status was not checked. @@ -773,6 +754,7 @@ done return } + // This is the exception to funneling all SSH traffic through `c.runCmdOnSingleNode` sess := c.newSession(l, node, buf.String(), withDebugDisabled()) defer sess.Close() @@ -861,7 +843,7 @@ type RunResultDetails struct { Node Node Stdout string Stderr string - CombinedOut []byte + CombinedOut string Err error RemoteExitStatus int Attempt int @@ -879,23 +861,82 @@ func newRunResultDetails(node Node, err error) *RunResultDetails { return &res } +// Output prints either the combined, or separated stdout and stderr command output +func (r *RunResultDetails) Output(decorate bool) string { + var builder strings.Builder + outputExists := false + writeVal := func(label string, value string) { + s := strings.TrimSpace(value) + if builder.Len() > 0 { + builder.WriteByte('\n') + } + + if s == "" { + if decorate { + builder.WriteString(label) + builder.WriteString(": ") + } + return + } + + if label != "" && decorate { + builder.WriteString(label) + builder.WriteString(":") + } + builder.WriteString(s) + builder.WriteString("\n") + outputExists = true + } + + writeVal("stdout", r.Stdout) + writeVal("stderr", r.Stderr) + + // Only if stderr and stdout are empty do we check the combined output. + if !outputExists { + builder.Reset() + writeVal("", r.CombinedOut) + } + + if !outputExists { + return "" + } + return builder.String() +} + +// RunCmdOptions is used to configure the behavior of `runCmdOnSingleNode` +type RunCmdOptions struct { + combinedOut bool + includeRoachprodEnvVars bool + stdin io.Reader + stdout, stderr io.Writer + remoteOptions []remoteSessionOption +} + +func defaultCmdOpts(debugName string) RunCmdOptions { + return RunCmdOptions{ + combinedOut: true, + remoteOptions: []remoteSessionOption{withDebugName(debugName)}, + } +} + +// runCmdOnSingleNode is a common entry point for all commands that run on a single node, +// including user commands from roachtests and roachprod commands. +// The `opts` struct is used to configure the behavior of the command, including +// - whether stdout and stderr or combined output is desired +// - specifying the stdin, stdout, and stderr streams +// - specifying the remote session options +// - whether the command should be run with the ROACHPROD env variable (true for all user commands) func (c *SyncedCluster) runCmdOnSingleNode( - ctx context.Context, - l *logger.Logger, - node Node, - cmd string, - combined bool, - stdout, stderr io.Writer, + ctx context.Context, l *logger.Logger, node Node, cmd string, opts RunCmdOptions, ) (*RunResultDetails, error) { // Argument template expansion is node specific (e.g. for {store-dir}). - e := expander{ - node: node, - } + e := expander{node: node} expandedCmd, err := e.expand(ctx, l, c, cmd) if err != nil { - return newRunResultDetails(node, err), err + return nil, errors.WithDetailf(err, "error expanding command: %s", cmd) } + nodeCmd := expandedCmd // Be careful about changing these command strings. In particular, we need // to support running commands in the background on both local and remote // nodes. For example: @@ -904,28 +945,43 @@ func (c *SyncedCluster) runCmdOnSingleNode( // // That command should return immediately. And a "roachprod status" should // reveal that the sleep command is running on the cluster. - envVars := append([]string{ - fmt.Sprintf("ROACHPROD=%s", c.roachprodEnvValue(node)), "GOTRACEBACK=crash", - }, config.DefaultEnvVars()...) - nodeCmd := fmt.Sprintf(`export %s && bash -c %s`, - strings.Join(envVars, " "), ssh.Escape1(expandedCmd)) - if c.IsLocal() { - nodeCmd = fmt.Sprintf("cd %s; %s", c.localVMDir(node), nodeCmd) + if opts.includeRoachprodEnvVars { + envVars := append([]string{ + fmt.Sprintf("ROACHPROD=%s", c.roachprodEnvValue(node)), "GOTRACEBACK=crash", + }, config.DefaultEnvVars()...) + nodeCmd = fmt.Sprintf(`export %s && bash -c %s`, + strings.Join(envVars, " "), ssh.Escape1(expandedCmd)) + if c.IsLocal() { + nodeCmd = fmt.Sprintf("cd %s; %s", c.localVMDir(node), nodeCmd) + } } - sess := c.newSession(l, node, nodeCmd, withDebugName(GenFilenameFromArgs(20, expandedCmd))) + // This default can be overridden by the caller, and is hence specified first + sessionOpts := []remoteSessionOption{withDebugName(GenFilenameFromArgs(20, expandedCmd))} + sess := c.newSession(l, node, nodeCmd, append(sessionOpts, opts.remoteOptions...)...) defer sess.Close() + if opts.stdin != nil { + sess.SetStdin(opts.stdin) + } + if opts.stdout == nil { + opts.stdout = io.Discard + } + if opts.stderr == nil { + opts.stderr = io.Discard + } + var res *RunResultDetails - if combined { + if opts.combinedOut { out, cmdErr := sess.CombinedOutput(ctx) res = newRunResultDetails(node, cmdErr) - res.CombinedOut = out + res.CombinedOut = string(out) } else { // We stream the output if running on a single node. var stdoutBuffer, stderrBuffer bytes.Buffer - multStdout := io.MultiWriter(&stdoutBuffer, stdout) - multStderr := io.MultiWriter(&stderrBuffer, stderr) + + multStdout := io.MultiWriter(&stdoutBuffer, opts.stdout) + multStderr := io.MultiWriter(&stderrBuffer, opts.stderr) sess.SetStdout(multStdout) sess.SetStderr(multStderr) @@ -935,7 +991,18 @@ func (c *SyncedCluster) runCmdOnSingleNode( } if res.Err != nil { - detailMsg := fmt.Sprintf("Node %d. Command with error:\n```\n%s\n```\n", node, cmd) + output := res.Output(true) + // Somewhat arbitrary limit to give us a chance to see some of the output + // in the failure_*.log, since the full output is in the run_*.log. + oLen := len(output) + if oLen > 1024 { + output = " ... " + output[oLen-1024:oLen-1] + } + + if output == "" { + output = "" + } + detailMsg := fmt.Sprintf("Node %d. Command with error:\n```\n%s\n```\n%s", node, cmd, output) res.Err = errors.WithDetail(res.Err, detailMsg) } return res, nil @@ -964,20 +1031,21 @@ func (c *SyncedCluster) Run( stream := len(nodes) == 1 var display string if !stream { - display = fmt.Sprintf("%s: %s", c.Name, title) + display = fmt.Sprintf("%s:%v: %s", c.Name, nodes, title) } - results := make([]*RunResultDetails, len(nodes)) - - // A result is the output of running a command (could be interpreted as an error) - if _, err := c.ParallelE(ctx, l, len(nodes), func(ctx context.Context, i int) (*RunResultDetails, error) { - // An err returned here is an unexpected state within roachprod (non-command error). - // For errors that occur as part of running a command over ssh, the `result` will contain - // the actual error on a specific node. - result, err := c.runCmdOnSingleNode(ctx, l, nodes[i], cmd, !stream, stdout, stderr) - results[i] = result + results, _, err := c.ParallelE(ctx, l, nodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { + opts := RunCmdOptions{ + combinedOut: !stream, + includeRoachprodEnvVars: true, + stdout: stdout, + stderr: stderr, + } + result, err := c.runCmdOnSingleNode(ctx, l, node, cmd, opts) return result, err - }, append(opts, WithDisplay(display))...); err != nil { + }, append(opts, WithDisplay(display))...) + + if err != nil { return err } @@ -986,6 +1054,21 @@ func (c *SyncedCluster) Run( // processResults returns the error from the RunResultDetails with the highest RemoteExitStatus func processResults(results []*RunResultDetails, stream bool, stdout io.Writer) error { + + // Easier to read output when we indent each line of the output. If an error is + // present, we also include the error message at the top. + format := func(s string, e error) string { + s = strings.ReplaceAll(s, "\n", "\n\t") + if e != nil { + return fmt.Sprintf("\t %v\n\t%s", e, s) + } + + if s == "" { + return "\t" + } + return fmt.Sprintf("\t\n\t%s", s) + } + var resultWithError *RunResultDetails for i, r := range results { // We no longer wait for all nodes to complete before returning in the case of an error (#100403) @@ -994,8 +1077,10 @@ func processResults(results []*RunResultDetails, stream bool, stdout io.Writer) continue } + // Emit the cached output of each result. When stream == true, the output is emitted + // as it is generated in `runCmdOnSingleNode`. if !stream { - fmt.Fprintf(stdout, " %2d: %s\n%v\n", i+1, strings.TrimSpace(string(r.CombinedOut)), r.Err) + fmt.Fprintf(stdout, " %2d: %s\n", i+1, format(r.Output(true), r.Err)) } if r.Err != nil { @@ -1021,18 +1106,20 @@ func processResults(results []*RunResultDetails, stream bool, stdout io.Writer) func (c *SyncedCluster) RunWithDetails( ctx context.Context, l *logger.Logger, nodes Nodes, title, cmd string, ) ([]RunResultDetails, error) { - display := fmt.Sprintf("%s: %s", c.Name, title) - - // We use pointers here as we are capturing the state of a result even though it may - // be processed further by the caller. - resultPtrs := make([]*RunResultDetails, len(nodes)) + display := fmt.Sprintf("%s:%v: %s", c.Name, nodes, title) // Failing slow here allows us to capture the output of all nodes even if one fails with a command error. - if _, err := c.ParallelE(ctx, l, len(nodes), func(ctx context.Context, i int) (*RunResultDetails, error) { //nolint:errcheck - result, err := c.runCmdOnSingleNode(ctx, l, nodes[i], cmd, false, l.Stdout, l.Stderr) - resultPtrs[i] = result + resultPtrs, _, err := c.ParallelE(ctx, l, nodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { + opts := RunCmdOptions{ + includeRoachprodEnvVars: true, + stdout: l.Stdout, + stderr: l.Stderr, + } + result, err := c.runCmdOnSingleNode(ctx, l, node, cmd, opts) return result, err - }, WithDisplay(display), WithWaitOnFail()); err != nil { + }, WithDisplay(display), WithWaitOnFail()) + + if err != nil { return nil, err } @@ -1078,37 +1165,33 @@ func (c *SyncedCluster) RepeatRun( // Wait TODO(peter): document func (c *SyncedCluster) Wait(ctx context.Context, l *logger.Logger) error { display := fmt.Sprintf("%s: waiting for nodes to start", c.Name) - errs := make([]error, len(c.Nodes)) - if err := c.Parallel(ctx, l, len(c.Nodes), func(ctx context.Context, i int) (*RunResultDetails, error) { - node := c.Nodes[i] + _, hasError, err := c.ParallelE(ctx, l, c.Nodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { res := &RunResultDetails{Node: node} + var err error cmd := "test -e /mnt/data1/.roachprod-initialized" + opts := defaultCmdOpts("wait-init") for j := 0; j < 600; j++ { - sess := c.newSession(l, node, cmd, withDebugDisabled()) - defer sess.Close() - - _, err := sess.CombinedOutput(ctx) + res, err = c.runCmdOnSingleNode(ctx, l, node, cmd, opts) if err != nil { + return nil, err + } + + if res.Err != nil { time.Sleep(500 * time.Millisecond) continue } return res, nil } - errs[i] = errors.New("timed out after 5m") - res.Err = errs[i] + res.Err = errors.New("timed out after 5m") + l.Printf(" %2d: %v", node, res.Err) return res, nil - }, WithDisplay(display), WithRetryOpts(nil)); err != nil { + }, WithDisplay(display), WithRetryOpts(nil)) + + if err != nil { return err } - var foundErr bool - for i, err := range errs { - if err != nil { - l.Printf(" %2d: %v", c.Nodes[i], err) - foundErr = true - } - } - if foundErr { + if hasError { return errors.New("not all nodes booted successfully") } return nil @@ -1139,10 +1222,9 @@ func (c *SyncedCluster) SetupSSH(ctx context.Context, l *logger.Logger) error { c.Name, len(c.Nodes), len(c.VMs)) } - // Generate an ssh key that we'll distribute to all of the nodes in the + // Generate an ssh key that we'll distribute to all the nodes in the // cluster in order to allow inter-node ssh. - var sshTar []byte - if err := c.Parallel(ctx, l, 1, func(ctx context.Context, i int) (*RunResultDetails, error) { + results, _, err := c.ParallelE(ctx, l, c.Nodes[0:1], func(ctx context.Context, n Node) (*RunResultDetails, error) { // Create the ssh key and then tar up the public, private and // authorized_keys files and output them to stdout. We'll take this output // and pipe it back into tar on the other nodes in the cluster. @@ -1152,47 +1234,22 @@ test -f .ssh/id_rsa || \ cat .ssh/id_rsa.pub >> .ssh/authorized_keys); tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys ` + runOpts := defaultCmdOpts("ssh-gen-key") + runOpts.combinedOut = false + return c.runCmdOnSingleNode(ctx, l, n, cmd, runOpts) + }, WithDisplay("generating ssh key")) - sess := c.newSession(l, 1, cmd, withDebugName("ssh-gen-key")) - defer sess.Close() - - var stdout bytes.Buffer - var stderr bytes.Buffer - sess.SetStdout(&stdout) - sess.SetStderr(&stderr) - - res := newRunResultDetails(1, sess.Run(ctx)) - - res.Stdout = stdout.String() - res.Stderr = stderr.String() - if res.Err != nil { - return res, errors.Wrapf(res.Err, "%s: stderr:\n%s", cmd, res.Stderr) - } - sshTar = []byte(res.Stdout) - return res, nil - }, WithDisplay("generating ssh key")); err != nil { + if err != nil { return err } + sshTar := []byte(results[0].Stdout) // Skip the first node which is where we generated the key. nodes := c.Nodes[1:] - if err := c.Parallel(ctx, l, len(nodes), func(ctx context.Context, i int) (*RunResultDetails, error) { - node := nodes[i] - cmd := `tar xf -` - - sess := c.newSession(l, node, cmd, withDebugName("ssh-dist-key")) - defer sess.Close() - - sess.SetStdin(bytes.NewReader(sshTar)) - - out, cmdErr := sess.CombinedOutput(ctx) - res := newRunResultDetails(node, cmdErr) - res.CombinedOut = out - - if res.Err != nil { - return res, errors.Wrapf(res.Err, "%s: output:\n%s", cmd, res.CombinedOut) - } - return res, nil + if err := c.Parallel(ctx, l, nodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { + runOpts := defaultCmdOpts("ssh-dist-key") + runOpts.stdin = bytes.NewReader(sshTar) + return c.runCmdOnSingleNode(ctx, l, node, `tar xf -`, runOpts) }, WithDisplay("distributing ssh key")); err != nil { return err } @@ -1213,25 +1270,25 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys providerPrivateIPs := make(map[string][]nodeInfo) publicIPs := make([]string, 0, len(c.Nodes)) for _, node := range c.Nodes { - provider := c.VMs[node-1].Provider - ip, err := c.GetInternalIP(node) - if err != nil { - return err - } - providerPrivateIPs[provider] = append(providerPrivateIPs[provider], nodeInfo{node: node, ip: ip}) + v := c.VMs[node-1] + providerPrivateIPs[v.Provider] = append(providerPrivateIPs[v.Provider], nodeInfo{node: node, ip: v.PrivateIP}) publicIPs = append(publicIPs, c.Host(node)) } providerKnownHostData := make(map[string][]byte) providers := maps.Keys(providerPrivateIPs) + // Only need to scan on the first node of each provider. - if err := c.Parallel(ctx, l, len(providers), func(ctx context.Context, i int) (*RunResultDetails, error) { - provider := providers[i] - node := providerPrivateIPs[provider][0].node + firstNodes := make([]Node, len(providers)) + for i, provider := range providers { + firstNodes[i] = providerPrivateIPs[provider][0].node + } + if err := c.Parallel(ctx, l, firstNodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { // Scan a combination of all remote IPs and local IPs pertaining to this // node's cloud provider. scanIPs := append([]string{}, publicIPs...) - for _, nodeInfo := range providerPrivateIPs[provider] { + nodeProvider := c.VMs[node-1].Provider + for _, nodeInfo := range providerPrivateIPs[nodeProvider] { scanIPs = append(scanIPs, nodeInfo.ip) } @@ -1257,32 +1314,25 @@ for i in {1..20}; do done exit 1 ` + runOpts := defaultCmdOpts("ssh-scan-hosts") + runOpts.combinedOut = false + res, err := c.runCmdOnSingleNode(ctx, l, node, cmd, runOpts) + if err != nil { + return nil, err + } - sess := c.newSession(l, node, cmd, withDebugName("ssh-scan-hosts")) - defer sess.Close() - - var stdout bytes.Buffer - var stderr bytes.Buffer - sess.SetStdout(&stdout) - sess.SetStderr(&stderr) - - res := newRunResultDetails(node, sess.Run(ctx)) - - res.Stdout = stdout.String() - res.Stderr = stderr.String() if res.Err != nil { - return res, errors.Wrapf(res.Err, "%s: stderr:\n%s", cmd, res.Stderr) + return res, nil } mu.Lock() - providerKnownHostData[provider] = stdout.Bytes() + providerKnownHostData[nodeProvider] = []byte(res.Stdout) mu.Unlock() return res, nil }, WithDisplay("scanning hosts")); err != nil { return err } - if err := c.Parallel(ctx, l, len(c.Nodes), func(ctx context.Context, i int) (*RunResultDetails, error) { - node := c.Nodes[i] + if err := c.Parallel(ctx, l, c.Nodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { provider := c.VMs[node-1].Provider const cmd = ` known_hosts_data="$(cat)" @@ -1311,19 +1361,9 @@ if [[ "$(whoami)" != "` + config.SharedUser + `" ]]; then '"'"'{}'"'"' ~` + config.SharedUser + `/.ssh' \; fi ` - - sess := c.newSession(l, node, cmd, withDebugName("ssh-dist-known-hosts")) - defer sess.Close() - - sess.SetStdin(bytes.NewReader(providerKnownHostData[provider])) - out, cmdErr := sess.CombinedOutput(ctx) - res := newRunResultDetails(node, cmdErr) - res.CombinedOut = out - - if res.Err != nil { - return res, errors.Wrapf(res.Err, "%s: output:\n%s", cmd, res.CombinedOut) - } - return res, nil + runOpts := defaultCmdOpts("ssh-dist-known-hosts") + runOpts.stdin = bytes.NewReader(providerKnownHostData[provider]) + return c.runCmdOnSingleNode(ctx, l, node, cmd, runOpts) }, WithDisplay("distributing known_hosts")); err != nil { return err } @@ -1334,8 +1374,7 @@ fi // additional authorized_keys to both the current user (your username on // gce and the shared user on aws) as well as to the shared user on both // platforms. - if err := c.Parallel(ctx, l, len(c.Nodes), func(ctx context.Context, i int) (*RunResultDetails, error) { - node := c.Nodes[i] + if err := c.Parallel(ctx, l, c.Nodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { const cmd = ` keys_data="$(cat)" set -e @@ -1359,18 +1398,9 @@ if [[ "$(whoami)" != "` + config.SharedUser + `" ]]; then fi ` - sess := c.newSession(l, node, cmd, withDebugName("ssh-add-extra-keys")) - defer sess.Close() - - sess.SetStdin(bytes.NewReader(c.AuthorizedKeys)) - out, cmdErr := sess.CombinedOutput(ctx) - res := newRunResultDetails(node, cmdErr) - res.CombinedOut = out - - if res.Err != nil { - return res, errors.Wrapf(res.Err, "~ %s\n%s", cmd, res.CombinedOut) - } - return res, nil + runOpts := defaultCmdOpts("ssh-add-extra-keys") + runOpts.stdin = bytes.NewReader(c.AuthorizedKeys) + return c.runCmdOnSingleNode(ctx, l, node, cmd, runOpts) }, WithDisplay("adding additional authorized keys")); err != nil { return err } @@ -1384,12 +1414,9 @@ const ( tenantCertsTarName = "tenant-certs.tar" ) -// DistributeCerts will generate and distribute certificates to all of the -// nodes. +// DistributeCerts will generate and distribute certificates to all the nodes. func (c *SyncedCluster) DistributeCerts(ctx context.Context, l *logger.Logger) error { - if found, err := c.checkForCertificates(ctx, l); err != nil { - return err - } else if found { + if c.checkForCertificates(ctx, l) { return nil } @@ -1399,9 +1426,8 @@ func (c *SyncedCluster) DistributeCerts(ctx context.Context, l *logger.Logger) e } // Generate the ca, client and node certificates on the first node. - var msg string display := fmt.Sprintf("%s: initializing certs", c.Name) - if err := c.Parallel(ctx, l, 1, func(ctx context.Context, i int) (*RunResultDetails, error) { + if err := c.Parallel(ctx, l, c.Nodes[0:1], func(ctx context.Context, node Node) (*RunResultDetails, error) { var cmd string if c.IsLocal() { cmd = fmt.Sprintf(`cd %s ; `, c.localVMDir(1)) @@ -1429,23 +1455,9 @@ fi tar cvf %[3]s certs `, cockroachNodeBinary(c, 1), strings.Join(nodeNames, " "), certsTarName) - sess := c.newSession(l, 1, cmd, withDebugName("init-certs")) - defer sess.Close() - - out, cmdErr := sess.CombinedOutput(ctx) - res := newRunResultDetails(1, cmdErr) - res.CombinedOut = out - - if res.Err != nil { - msg = fmt.Sprintf("%s: %v", res.CombinedOut, res.Err) - } - return res, nil + return c.runCmdOnSingleNode(ctx, l, node, cmd, defaultCmdOpts("init-certs")) }, WithDisplay(display)); err != nil { - return err - } - - if msg != "" { - fmt.Fprintln(os.Stderr, msg) + fmt.Fprintln(os.Stderr, err) exit.WithCode(exit.UnspecifiedError()) } @@ -1465,15 +1477,11 @@ tar cvf %[3]s certs func (c *SyncedCluster) DistributeTenantCerts( ctx context.Context, l *logger.Logger, hostCluster *SyncedCluster, tenantID int, ) error { - if found, err := hostCluster.checkForTenantCertificates(ctx, l); err != nil { - return err - } else if found { + if hostCluster.checkForTenantCertificates(ctx, l) { return nil } - if found, err := hostCluster.checkForCertificates(ctx, l); err != nil { - return err - } else if !found { + if !hostCluster.checkForCertificates(ctx, l) { return errors.New("host cluster missing certificate bundle") } @@ -1504,9 +1512,7 @@ func (c *SyncedCluster) createTenantCertBundle( ctx context.Context, l *logger.Logger, bundleName string, tenantID int, nodeNames []string, ) error { display := fmt.Sprintf("%s: initializing tenant certs", c.Name) - return c.Parallel(ctx, l, 1, func(ctx context.Context, i int) (*RunResultDetails, error) { - node := c.Nodes[i] - + return c.Parallel(ctx, l, c.Nodes[0:1], func(ctx context.Context, node Node) (*RunResultDetails, error) { cmd := "set -e;" if c.IsLocal() { cmd += fmt.Sprintf(`cd %s ; `, c.localVMDir(1)) @@ -1537,17 +1543,7 @@ fi bundleName, ) - sess := c.newSession(l, node, cmd, withDebugName("create-tenant-cert-bundle")) - defer sess.Close() - - out, cmdErr := sess.CombinedOutput(ctx) - res := newRunResultDetails(node, cmdErr) - res.CombinedOut = out - - if res.Err != nil { - return res, errors.Wrapf(res.Err, "certificate creation error: %s", res.CombinedOut) - } - return res, nil + return c.runCmdOnSingleNode(ctx, l, node, cmd, defaultCmdOpts("create-tenant-cert-bundle")) }, WithDisplay(display)) } @@ -1583,7 +1579,7 @@ func (c *SyncedCluster) getFileFromFirstNode( // checkForCertificates checks if the cluster already has a certs bundle created // on the first node. -func (c *SyncedCluster) checkForCertificates(ctx context.Context, l *logger.Logger) (bool, error) { +func (c *SyncedCluster) checkForCertificates(ctx context.Context, l *logger.Logger) bool { dir := "" if c.IsLocal() { dir = c.localVMDir(1) @@ -1593,9 +1589,7 @@ func (c *SyncedCluster) checkForCertificates(ctx context.Context, l *logger.Logg // checkForTenantCertificates checks if the cluster already has a tenant-certs bundle created // on the first node. -func (c *SyncedCluster) checkForTenantCertificates( - ctx context.Context, l *logger.Logger, -) (bool, error) { +func (c *SyncedCluster) checkForTenantCertificates(ctx context.Context, l *logger.Logger) bool { dir := "" if c.IsLocal() { dir = c.localVMDir(1) @@ -1605,13 +1599,13 @@ func (c *SyncedCluster) checkForTenantCertificates( func (c *SyncedCluster) fileExistsOnFirstNode( ctx context.Context, l *logger.Logger, path string, -) (bool, error) { +) bool { l.Printf("%s: checking %s", c.Name, path) - // We use `echo -n` below stop echo from including a newline - // character in the output, allowing us to compare it directly with - // "0". - result, err := c.runCmdOnSingleNode(ctx, l, 1, `$(test -e `+path+`); echo -n $?`, false, l.Stdout, l.Stderr) - return result.Stdout == "0", err + runOpts := defaultCmdOpts("check-file-exists") + runOpts.includeRoachprodEnvVars = true + res, _ := c.runCmdOnSingleNode(ctx, l, 1, `test -e `+path, runOpts) + // We only return true if the command succeeded. + return res != nil && res.RemoteExitStatus == 0 } // createNodeCertArguments returns a list of strings appropriate for use as @@ -1654,8 +1648,7 @@ func (c *SyncedCluster) distributeLocalCertsTar( } display := c.Name + ": distributing certs" - return c.Parallel(ctx, l, len(nodes), func(ctx context.Context, i int) (*RunResultDetails, error) { - node := nodes[i] + return c.Parallel(ctx, l, nodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { var cmd string if c.IsLocal() { cmd = fmt.Sprintf("cd %s ; ", c.localVMDir(node)) @@ -1666,18 +1659,9 @@ func (c *SyncedCluster) distributeLocalCertsTar( cmd += "tar xf -" } - sess := c.newSession(l, node, cmd, withDebugName("dist-local-certs")) - defer sess.Close() - - sess.SetStdin(bytes.NewReader(certsTar)) - out, cmdErr := sess.CombinedOutput(ctx) - res := newRunResultDetails(node, cmdErr) - res.CombinedOut = out - - if res.Err != nil { - return res, errors.Wrapf(res.Err, "~ %s\n%s", cmd, res.CombinedOut) - } - return res, nil + runOpts := defaultCmdOpts("dist-local-certs") + runOpts.stdin = bytes.NewReader(certsTar) + return c.runCmdOnSingleNode(ctx, l, node, cmd, runOpts) }, WithDisplay(display)) } @@ -2328,7 +2312,6 @@ func (c *SyncedCluster) Get( } if config.Quiet && l.File != nil { - l.Printf("\n") linesMu.Lock() for i := range lines { l.Printf(" %2d: %s", nodes[i], lines[i]) @@ -2361,21 +2344,15 @@ func (c *SyncedCluster) pgurls( func (c *SyncedCluster) pghosts( ctx context.Context, l *logger.Logger, nodes Nodes, ) (map[Node]string, error) { - ips := make([]string, len(nodes)) - if err := c.Parallel(ctx, l, len(nodes), func(ctx context.Context, i int) (*RunResultDetails, error) { - node := nodes[i] - res := &RunResultDetails{Node: node} - res.Stdout, res.Err = c.GetInternalIP(node) - ips[i] = res.Stdout - return res, nil - }); err != nil { - return nil, errors.Wrapf(err, "pghosts") - } + m := make(map[Node]string, len(nodes)) - m := make(map[Node]string, len(ips)) - for i, ip := range ips { - m[nodes[i]] = ip + for i := 0; i < len(nodes); i++ { + ip, err := c.GetInternalIP(nodes[i]) + if err == nil { + m[nodes[i]] = ip + } } + return m, nil } @@ -2480,18 +2457,10 @@ func scp(l *logger.Logger, src, dest string) (*RunResultDetails, error) { } res := newRunResultDetails(-1, err) - res.CombinedOut = out + res.CombinedOut = string(out) return res, nil } -// ParallelResult captures the result of a user-defined function -// passed to Parallel or ParallelE. -type ParallelResult struct { - Index int - Out []byte - Err error -} - type ParallelOptions struct { concurrency int display string @@ -2530,8 +2499,8 @@ func WithDisplay(display string) ParallelOption { } // Parallel runs a user-defined function across the nodes in the -// cluster. If any of the commands fail, Parallel will log an error -// and exit the program. +// cluster. If any of the commands fail, Parallel will log each failure +// and return an error. // // A user may also pass in a RunRetryOpts to control how the function is retried // in the case of a failure. @@ -2540,68 +2509,73 @@ func WithDisplay(display string) ParallelOption { func (c *SyncedCluster) Parallel( ctx context.Context, l *logger.Logger, - count int, - fn func(ctx context.Context, i int) (*RunResultDetails, error), + nodes Nodes, + fn func(ctx context.Context, n Node) (*RunResultDetails, error), opts ...ParallelOption, ) error { - // failed will contain command errors if any occur. - // err is an unexpected roachprod error, which we return immediately. - failed, err := c.ParallelE(ctx, l, count, fn, opts...) + results, hasError, err := c.ParallelE(ctx, l, nodes, fn, opts...) + // `err` is an unexpected roachprod error, which we return immediately. if err != nil { return err } - if len(failed) > 0 { - sort.Slice(failed, func(i, j int) bool { return failed[i].Index < failed[j].Index }) - for _, f := range failed { + // `hasError` is true if any of the commands returned an error. + if hasError { + for _, r := range results { // Since this function is potentially returning a single error despite // having run on multiple nodes, we combine all the errors into a single // error. - err = errors.CombineErrors(err, f.Err) - l.Errorf("%d: %+v: %s", f.Index, f.Err, f.Out) + if r != nil && r.Err != nil { + err = errors.CombineErrors(err, r.Err) + l.Errorf("%d: %+v: %s", r.Node, r.Err, r.CombinedOut) + } } - return errors.Wrap(err, "one or more parallel execution failure") + return errors.Wrap(err, "one or more parallel execution failure(s)") } return nil } -// ParallelE runs the given function in parallel across the given -// nodes. -// -// By default, this will fail fast if a command error occurs on any node, in which -// case the function will return a slice containing the erroneous result. +type ParallelResult struct { + // Index is the order position in which the node was passed to Parallel. + // This is useful in maintaining the order of results. + Index int + *RunResultDetails +} + +// ParallelE runs the given function in parallel on the specified nodes. // -// If `WithWaitOnFail()` is passed in, then the function will wait for all -// invocations to complete before returning a slice with all failed results. +// By default, this will fail fast if a command error occurs on any node, and return +// a slice containing all results up to that point, along with a boolean indicating +// that at least one error occurred. If `WithWaitOnFail()` is passed in, then the function +// will wait for all invocations to complete before returning. // // ParallelE only returns an error for roachprod itself, not any command errors run -// on the cluster. It is up to the caller to check the slice for command errors. Any -// such roachprod error will always be returned immediately. +// on the cluster. // -// ParallelE runs the user-defined functions on the first `count` -// nodes in the cluster. It runs at most `concurrency` (or -// `config.MaxConcurrency` if it is lower) in parallel. If `concurrency` is -// 0, then it defaults to `count`. +// ParallelE runs at most `concurrency` (or `config.MaxConcurrency` if it is lower) in parallel. +// If `concurrency` is 0, then it defaults to `len(nodes)`. // -// The function returns a pointer to RunResultDetails as we may enrich +// The function returns pointers to *RunResultDetails as we may enrich // the result with retry information (attempt number, wrapper error). // // RunRetryOpts controls the retry behavior in the case that // the function fails, but returns a nil error. A non-nil error returned by the // function denotes a roachprod error and will not be retried regardless of the // retry options. +// NB: Result order is the same as input node order func (c *SyncedCluster) ParallelE( ctx context.Context, l *logger.Logger, - count int, - fn func(ctx context.Context, i int) (*RunResultDetails, error), + nodes Nodes, + fn func(ctx context.Context, n Node) (*RunResultDetails, error), opts ...ParallelOption, -) ([]ParallelResult, error) { +) ([]*RunResultDetails, bool, error) { options := ParallelOptions{retryOpts: DefaultSSHRetryOpts} for _, opt := range opts { opt(&options) } + count := len(nodes) if options.concurrency == 0 || options.concurrency > count { options.concurrency = count } @@ -2609,8 +2583,9 @@ func (c *SyncedCluster) ParallelE( options.concurrency = config.MaxConcurrency } - results := make(chan ParallelResult, count) + completed := make(chan ParallelResult, count) errorChannel := make(chan error) + var wg sync.WaitGroup wg.Add(count) @@ -2626,12 +2601,13 @@ func (c *SyncedCluster) ParallelE( defer wg.Done() // This is rarely expected to return an error, but we fail fast in case. // Command errors, which are far more common, will be contained within the result. - res, err := runWithMaybeRetry(groupCtx, l, options.retryOpts, func(ctx context.Context) (*RunResultDetails, error) { return fn(ctx, i) }) + res, err := runWithMaybeRetry(groupCtx, l, options.retryOpts, func(ctx context.Context) (*RunResultDetails, error) { return fn(ctx, nodes[i]) }) if err != nil { errorChannel <- err return } - results <- ParallelResult{i, res.CombinedOut, res.Err} + // The index is captured here so that we can maintain the order of results. + completed <- ParallelResult{Index: i, RunResultDetails: res} }(index) index++ } @@ -2641,9 +2617,9 @@ func (c *SyncedCluster) ParallelE( } go func() { + defer close(completed) + defer close(errorChannel) wg.Wait() - close(results) - close(errorChannel) }() var writer ui.Writer @@ -2663,47 +2639,45 @@ func (c *SyncedCluster) ParallelE( } } defer ticker.Stop() - complete := make([]bool, count) - var failed []ParallelResult var spinner = []string{"|", "/", "-", "\\"} spinnerIdx := 0 + var hasError bool + n := 0 + results := make([]*RunResultDetails, count) for done := false; !done; { select { case <-ticker.C: if config.Quiet && l.File == nil { fmt.Fprintf(out, ".") } - case r, ok := <-results: - if r.Err != nil { // Command error - failed = append(failed, r) - if !options.waitOnFail { - groupCancel() - return failed, nil + case r, ok := <-completed: + if ok { + results[r.Index] = r.RunResultDetails + n++ + if r.Err != nil { // Command error + hasError = true + if !options.waitOnFail { + groupCancel() + return results, true, nil + } + } + if index < count { + startNext() } } done = !ok + case err, ok := <-errorChannel: // Roachprod error if ok { - complete[r.Index] = true - } - if index < count { - startNext() + groupCancel() + return nil, false, err } - case err := <-errorChannel: // Roachprod error - groupCancel() - return nil, err } if !config.Quiet && l.File == nil { fmt.Fprint(&writer, options.display) - var n int - for i := range complete { - if complete[i] { - n++ - } - } - fmt.Fprintf(&writer, " %d/%d", n, len(complete)) + fmt.Fprintf(&writer, " %d/%d", n, count) if !done { fmt.Fprintf(&writer, " %s", spinner[spinnerIdx%len(spinner)]) } @@ -2717,19 +2691,19 @@ func (c *SyncedCluster) ParallelE( fmt.Fprintf(out, "\n") } - return failed, nil + return results, hasError, nil } // Init initializes the cluster. It does it through node 1 (as per TargetNodes) // to maintain parity with auto-init behavior of `roachprod start` (when // --skip-init) is not specified. func (c *SyncedCluster) Init(ctx context.Context, l *logger.Logger, node Node) error { - if err := c.initializeCluster(ctx, l, node); err != nil { - return errors.WithDetail(err, "install.Init() failed: unable to initialize cluster.") + if res, err := c.initializeCluster(ctx, l, node); err != nil || (res != nil && res.Err != nil) { + return errors.WithDetail(errors.CombineErrors(err, res.Err), "install.Init() failed: unable to initialize cluster.") } - if err := c.setClusterSettings(ctx, l, node); err != nil { - return errors.WithDetail(err, "install.Init() failed: unable to set cluster settings.") + if res, err := c.setClusterSettings(ctx, l, node); err != nil || (res != nil && res.Err != nil) { + return errors.WithDetail(errors.CombineErrors(err, res.Err), "install.Init() failed: unable to set cluster settings.") } return nil diff --git a/pkg/roachprod/install/cockroach.go b/pkg/roachprod/install/cockroach.go index d4e756017293..c4ed56740f98 100644 --- a/pkg/roachprod/install/cockroach.go +++ b/pkg/roachprod/install/cockroach.go @@ -19,7 +19,6 @@ import ( "os/exec" "path/filepath" "runtime" - "sort" "strings" "text/template" @@ -185,51 +184,35 @@ func (c *SyncedCluster) Start(ctx context.Context, l *logger.Logger, startOpts S l.Printf("%s: starting nodes", c.Name) // SSH retries are disabled by passing nil RunRetryOpts - if err := c.Parallel(ctx, l, len(nodes), func(ctx context.Context, nodeIdx int) (*RunResultDetails, error) { - node := nodes[nodeIdx] - res := &RunResultDetails{Node: node} + if err := c.Parallel(ctx, l, nodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { // NB: if cockroach started successfully, we ignore the output as it is // some harmless start messaging. - if _, err := c.startNode(ctx, l, node, startOpts); err != nil { - res.Err = err + res, err := c.startNode(ctx, l, node, startOpts) + if err != nil || res.Err != nil { + // If err is non-nil, then this will not be retried, but if res.Err is non-nil, it will be. return res, err } // Code that follows applies only for regular nodes. - if startOpts.Target != StartDefault { - return res, nil - } - // We reserve a few special operations (bootstrapping, and setting // cluster settings) to the InitTarget. - if startOpts.GetInitTarget() != node { + if startOpts.Target != StartDefault || startOpts.GetInitTarget() != node || startOpts.SkipInit { return res, nil } // NB: The code blocks below are not parallelized, so it's safe for us // to use fmt.Printf style logging. - // 1. We don't init invoked using `--skip-init`. - // 2. We don't init when invoking with `start-single-node`. - - if startOpts.SkipInit { - return res, nil - } - // For single node clusters, this can be skipped because during the c.StartNode call above, // the `--start-single-node` flag will handle all of this for us. shouldInit := !c.useStartSingleNode() if shouldInit { - if err := c.initializeCluster(ctx, l, node); err != nil { - res.Err = err - return res, errors.Wrap(err, "failed to initialize cluster") + if res, err := c.initializeCluster(ctx, l, node); err != nil || res.Err != nil { + // If err is non-nil, then this will not be retried, but if res.Err is non-nil, it will be. + return res, err } } - if err := c.setClusterSettings(ctx, l, node); err != nil { - res.Err = err - return res, errors.Wrap(err, "failed to set cluster settings") - } - return res, nil + return c.setClusterSettings(ctx, l, node) }, WithConcurrency(parallelism)); err != nil { return err } @@ -324,16 +307,8 @@ func (c *SyncedCluster) ExecOrInteractiveSQL( func (c *SyncedCluster) ExecSQL( ctx context.Context, l *logger.Logger, tenantName string, args []string, ) error { - type result struct { - node Node - output string - } - resultChan := make(chan result, len(c.Nodes)) - display := fmt.Sprintf("%s: executing sql", c.Name) - if err := c.Parallel(ctx, l, len(c.Nodes), func(ctx context.Context, nodeIdx int) (*RunResultDetails, error) { - node := c.Nodes[nodeIdx] - + results, _, err := c.ParallelE(ctx, l, c.Nodes, func(ctx context.Context, node Node) (*RunResultDetails, error) { var cmd string if c.IsLocal() { cmd = fmt.Sprintf(`cd %s ; `, c.localVMDir(node)) @@ -342,31 +317,15 @@ func (c *SyncedCluster) ExecSQL( c.NodeURL("localhost", c.NodePort(node), tenantName) + " " + ssh.Escape(args) - sess := c.newSession(l, node, cmd, withDebugName("run-sql")) - defer sess.Close() - - out, cmdErr := sess.CombinedOutput(ctx) - res := newRunResultDetails(node, cmdErr) - res.CombinedOut = out + return c.runCmdOnSingleNode(ctx, l, node, cmd, defaultCmdOpts("run-sql")) + }, WithDisplay(display), WithWaitOnFail()) - if res.Err != nil { - res.Err = errors.Wrapf(res.Err, "~ %s\n%s", cmd, res.CombinedOut) - } - resultChan <- result{node: node, output: string(res.CombinedOut)} - return res, nil - }, WithDisplay(display), WithWaitOnFail()); err != nil { + if err != nil { return err } - results := make([]result, 0, len(c.Nodes)) - for range c.Nodes { - results = append(results, <-resultChan) - } - sort.Slice(results, func(i, j int) bool { - return results[i].node < results[j].node - }) for _, r := range results { - l.Printf("node %d:\n%s", r.node, r.output) + l.Printf("node %d:\n%s", r.Node, r.CombinedOut) } return nil @@ -374,46 +333,31 @@ func (c *SyncedCluster) ExecSQL( func (c *SyncedCluster) startNode( ctx context.Context, l *logger.Logger, node Node, startOpts StartOpts, -) (string, error) { +) (*RunResultDetails, error) { startCmd, err := c.generateStartCmd(ctx, l, node, startOpts) if err != nil { - return "", err - } - - if err := func() error { - var cmd string - if c.IsLocal() { - cmd = fmt.Sprintf(`cd %s ; `, c.localVMDir(node)) - } - cmd += `cat > cockroach.sh && chmod +x cockroach.sh` - - sess := c.newSession(l, node, cmd) - defer sess.Close() - - sess.SetStdin(strings.NewReader(startCmd)) - if out, err := sess.CombinedOutput(ctx); err != nil { - return errors.Wrapf(err, "failed to upload start script: %s", out) - } - - return nil - }(); err != nil { - return "", err + return nil, err } - - var cmd string + var uploadCmd string if c.IsLocal() { - cmd = fmt.Sprintf(`cd %s ; `, c.localVMDir(node)) + uploadCmd = fmt.Sprintf(`cd %s ; `, c.localVMDir(node)) } - cmd += "./cockroach.sh" + uploadCmd += `cat > cockroach.sh && chmod +x cockroach.sh` - sess := c.newSession(l, node, cmd) - defer sess.Close() + var res = &RunResultDetails{} + uploadOpts := defaultCmdOpts("upload-start-script") + uploadOpts.stdin = strings.NewReader(startCmd) + res, err = c.runCmdOnSingleNode(ctx, l, node, uploadCmd, uploadOpts) + if err != nil || res.Err != nil { + return res, err + } - out, err := sess.CombinedOutput(ctx) - if err != nil { - return "", errors.Wrapf(err, "~ %s\n%s", cmd, out) + var runScriptCmd string + if c.IsLocal() { + runScriptCmd = fmt.Sprintf(`cd %s ; `, c.localVMDir(node)) } - return strings.TrimSpace(string(out)), nil + runScriptCmd += "./cockroach.sh" + return c.runCmdOnSingleNode(ctx, l, node, runScriptCmd, defaultCmdOpts("run-start-script")) } func (c *SyncedCluster) generateStartCmd( @@ -645,39 +589,36 @@ func (c *SyncedCluster) maybeScaleMem(val int) int { return val } -func (c *SyncedCluster) initializeCluster(ctx context.Context, l *logger.Logger, node Node) error { +func (c *SyncedCluster) initializeCluster( + ctx context.Context, l *logger.Logger, node Node, +) (*RunResultDetails, error) { l.Printf("%s: initializing cluster\n", c.Name) cmd := c.generateInitCmd(node) - sess := c.newSession(l, node, cmd, withDebugName("init-cluster")) - defer sess.Close() - - out, err := sess.CombinedOutput(ctx) - if err != nil { - return errors.Wrapf(err, "~ %s\n%s", cmd, out) - } - - if out := strings.TrimSpace(string(out)); out != "" { - l.Printf(out) + res, err := c.runCmdOnSingleNode(ctx, l, node, cmd, defaultCmdOpts("init-cluster")) + if res != nil { + out := strings.TrimSpace(res.CombinedOut) + if out != "" { + l.Printf(out) + } } - return nil + return res, err } -func (c *SyncedCluster) setClusterSettings(ctx context.Context, l *logger.Logger, node Node) error { +func (c *SyncedCluster) setClusterSettings( + ctx context.Context, l *logger.Logger, node Node, +) (*RunResultDetails, error) { l.Printf("%s: setting cluster settings", c.Name) cmd := c.generateClusterSettingCmd(l, node) - sess := c.newSession(l, node, cmd, withDebugName("set-cluster-settings")) - defer sess.Close() - - out, err := sess.CombinedOutput(ctx) - if err != nil { - return errors.Wrapf(err, "~ %s\n%s", cmd, out) - } - if out := strings.TrimSpace(string(out)); out != "" { - l.Printf(out) + res, err := c.runCmdOnSingleNode(ctx, l, node, cmd, defaultCmdOpts("set-cluster-settings")) + if res != nil { + out := strings.TrimSpace(res.CombinedOut) + if out != "" { + l.Printf(out) + } } - return nil + return res, err } func (c *SyncedCluster) generateClusterSettingCmd(l *logger.Logger, node Node) string { @@ -846,18 +787,19 @@ func (c *SyncedCluster) createFixedBackupSchedule( url := c.NodeURL("localhost", c.NodePort(node), "" /* tenantName */) fullCmd := fmt.Sprintf(`COCKROACH_CONNECT_TIMEOUT=%d %s sql --url %s -e %q`, startSQLTimeout, binary, url, createScheduleCmd) - // Instead of using `c.ExecSQL()`, use the more flexible c.newSession(), which allows us to + // Instead of using `c.ExecSQL()`, use `c.runCmdOnSingleNode()`, which allows us to // 1) prefix the schedule backup cmd with COCKROACH_CONNECT_TIMEOUT. // 2) run the command against the first node in the cluster target. - sess := c.newSession(l, node, fullCmd, withDebugName("init-backup-schedule")) - defer sess.Close() - - out, err := sess.CombinedOutput(ctx) - if err != nil { + res, err := c.runCmdOnSingleNode(ctx, l, node, fullCmd, defaultCmdOpts("init-backup-schedule")) + if err != nil || res.Err != nil { + out := "" + if res != nil { + out = res.CombinedOut + } return errors.Wrapf(err, "~ %s\n%s", fullCmd, out) } - if out := strings.TrimSpace(string(out)); out != "" { + if out := strings.TrimSpace(res.CombinedOut); out != "" { l.Printf(out) } return nil diff --git a/pkg/roachprod/roachprod.go b/pkg/roachprod/roachprod.go index 5404804bb382..3a01b172eeee 100644 --- a/pkg/roachprod/roachprod.go +++ b/pkg/roachprod/roachprod.go @@ -361,6 +361,14 @@ func List( return filteredCloud, nil } +// TruncateString truncates a string to maxLength and adds "..." to the end. +func TruncateString(s string, maxLength int) string { + if len(s) > maxLength { + return s[:maxLength-3] + "..." + } + return s +} + // Run runs a command on the nodes in a cluster. func Run( ctx context.Context, @@ -386,11 +394,7 @@ func Run( } cmd := strings.TrimSpace(strings.Join(cmdArray, " ")) - title := cmd - if len(title) > 30 { - title = title[:27] + "..." - } - return c.Run(ctx, l, stdout, stderr, c.Nodes, title, cmd, opts...) + return c.Run(ctx, l, stdout, stderr, c.Nodes, TruncateString(cmd, 30), cmd, opts...) } // RunWithDetails runs a command on the nodes in a cluster. @@ -416,11 +420,7 @@ func RunWithDetails( } cmd := strings.TrimSpace(strings.Join(cmdArray, " ")) - title := cmd - if len(title) > 30 { - title = title[:27] + "..." - } - return c.RunWithDetails(ctx, l, c.Nodes, title, cmd) + return c.RunWithDetails(ctx, l, c.Nodes, TruncateString(cmd, 30), cmd) } // SQL runs `cockroach sql` on a remote cluster. If a single node is passed, @@ -931,14 +931,11 @@ func PgURL( ips[i] = c.VMs[nodes[i]-1].PublicIP } } else { - if err := c.Parallel(ctx, l, len(nodes), func(ctx context.Context, i int) (*install.RunResultDetails, error) { - node := nodes[i] - res := &install.RunResultDetails{Node: node} - res.Stdout, res.Err = c.GetInternalIP(node) - ips[i] = res.Stdout - return res, nil - }); err != nil { - return nil, err + for i := 0; i < len(nodes); i++ { + ip, err := c.GetInternalIP(nodes[i]) + if err == nil { + ips[i] = ip + } } } @@ -1080,9 +1077,7 @@ func Pprof(ctx context.Context, l *logger.Logger, clusterName string, opts Pprof httpClient := httputil.NewClientWithTimeout(timeout) startTime := timeutil.Now().Unix() - nodes := c.TargetNodes() - err = c.Parallel(ctx, l, len(nodes), func(ctx context.Context, i int) (*install.RunResultDetails, error) { - node := nodes[i] + err = c.Parallel(ctx, l, c.TargetNodes(), func(ctx context.Context, node install.Node) (*install.RunResultDetails, error) { res := &install.RunResultDetails{Node: node} host := c.Host(node) port := c.NodeUIPort(node) @@ -1641,16 +1636,25 @@ func CreateSnapshot( if err := LoadClusters(); err != nil { return nil, err } + c, err := newCluster(l, clusterName) if err != nil { return nil, err } + nodes := c.TargetNodes() nodesStatus, err := c.Status(ctx, l) + if err != nil { return nil, err } + // 1-indexed node IDs. + statusByNodeID := make(map[int]install.NodeStatus) + for _, status := range nodesStatus { + statusByNodeID[status.NodeID] = status + } + // TODO(irfansharif): Add validation that we're using some released version, // probably the predecessor one. Also ensure that any running CRDB processes // have been stopped since we're taking raw disk snapshots cluster-wide. @@ -1659,12 +1663,11 @@ func CreateSnapshot( syncutil.Mutex snapshots []vm.VolumeSnapshot }{} - if err := c.Parallel(ctx, l, len(nodes), func(ctx context.Context, i int) (*install.RunResultDetails, error) { - node := nodes[i] + if err := c.Parallel(ctx, l, nodes, func(ctx context.Context, node install.Node) (*install.RunResultDetails, error) { res := &install.RunResultDetails{Node: node} cVM := c.VMs[node-1] - crdbVersion := nodesStatus[i].Version + crdbVersion := statusByNodeID[int(node)].Version if crdbVersion == "" { crdbVersion = "unknown" } @@ -1779,11 +1782,10 @@ func ApplySnapshots( } // Detach and delete existing volumes. This is destructive. - if err := c.Parallel(ctx, l, len(c.TargetNodes()), func(ctx context.Context, i int) (*install.RunResultDetails, error) { - node := c.TargetNodes()[i] + if err := c.Parallel(ctx, l, c.TargetNodes(), func(ctx context.Context, node install.Node) (*install.RunResultDetails, error) { res := &install.RunResultDetails{Node: node} - cVM := &c.VMs[i] + cVM := &c.VMs[node-1] if err := vm.ForProvider(cVM.Provider, func(provider vm.Provider) error { volumes, err := provider.ListVolumes(l, cVM) if err != nil { @@ -1804,8 +1806,7 @@ func ApplySnapshots( return err } - return c.Parallel(ctx, l, len(c.TargetNodes()), func(ctx context.Context, i int) (*install.RunResultDetails, error) { - node := c.TargetNodes()[i] + return c.Parallel(ctx, l, c.TargetNodes(), func(ctx context.Context, node install.Node) (*install.RunResultDetails, error) { res := &install.RunResultDetails{Node: node} volumeOpts := opts // make a copy @@ -1814,13 +1815,14 @@ func ApplySnapshots( volumeOpts.Labels[k] = v } - cVM := &c.VMs[i] + // TODO: same issue as above if the target nodes are not sequential starting from 1 + cVM := &c.VMs[node-1] if err := vm.ForProvider(cVM.Provider, func(provider vm.Provider) error { volumeOpts.Zone = cVM.Zone // NB: The "-1" signifies that it's the first attached non-boot volume. // This is typical naming convention in GCE clusters. volumeOpts.Name = fmt.Sprintf("%s-%04d-1", clusterName, node) - volumeOpts.SourceSnapshotID = snapshots[i].ID + volumeOpts.SourceSnapshotID = snapshots[node-1].ID volumes, err := provider.ListVolumes(l, cVM) if err != nil { @@ -1955,9 +1957,8 @@ func sendCaptureCommand( ) error { nodes := c.TargetNodes() httpClient := httputil.NewClientWithTimeout(0 /* timeout: None */) - _, err := c.ParallelE(ctx, l, len(nodes), - func(ctx context.Context, i int) (*install.RunResultDetails, error) { - node := nodes[i] + _, _, err := c.ParallelE(ctx, l, nodes, + func(ctx context.Context, node install.Node) (*install.RunResultDetails, error) { res := &install.RunResultDetails{Node: node} host := c.Host(node) port := c.NodeUIPort(node)