Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
104915: roachprod: roachtest: refactoring r=renatolabs,herkolategan a=smg260

This commit addresses some usability issues within roachprod/roachtest, and does some minor clean up.

Summary:

`roachprod`

- `c.Parallel` now accepts  `nodes Nodes` and `func(ctx context.Context, node Node)`. Previously, these was an `int n` representing the first `n` nodes to operate on, and `func(ctx context.Context, int n)`, where second arg would confusingly represented the index of the node being operated on, which was wholly dependent on a captured-by-reference slice at the call site.  This change makes it explicit to the caller and the function, which exact node is targeted. 

- `c.Parallel` now returns `[]*RunResultDetails, boolean, error`. The slice will contain results, successful and failed, for each node (or thus far, depending on fail-fast behaviour). The boolean is a signal to the caller that at least one of the results in the slice suffered from a command error, which can be inspected in the result itself. The `error` now more idiomatically represents an unrecoverable error encountered during `Parallel`; i.e. an un retryable error in `roachprod`. In the latter case, results will be `nil`.

- All calls to execute a remote command on a node are all funnelled to `c.runCmdOnSingleNode`, ensuring consistency, and reducing duplicated code. This function accepts a `RunCmdOptions`  struct to configure it's behaviour, including `stdin/out`

- `runCmdOnSingleNode` : in error cases, include stdout/err output (truncated after n bytes - effectively `tail`). This output will show in the `failure_n.log` and give convenient insight. Full output is always retained in the run log.

- `RunResultDetails.{Stdout,Stderr,CombinedOut}` are now all `string` to represent the most common use case, and a convenience `r.Output()` added to print out the contents of `Stderr,Stdout,CombinedOut`

- Remove complex legacy pipe processing logic from `roachtest` (`execCmd,execCmdEx`), previously used when `roachprod` was not a library of `roachtest`

- When showing the result of a command on each node on the CLI, explicitly display `<ok>` or `<err>` followed by output/error


`roachtest`

- skip post-test assertions on a test failure

- removal of `execCmd/execCmdRes` which were doing some complex pipe processing before calling roachprod.Run (not required since roachprod is a lib of roachtest)

Minor

- Clean up various redundant/unused variables
- Typos
- Command formatting

Informs: #104312 
Informs: #104316 (TBD)

Epic: none
Release note: none

107354: asim: add randomized range generation  r=kvoli a=wenyihu6

This patch enables random range configuration to be generated.

TestRandomized can now take another setting parameter rangeGen (default: uniform
rangeGenType, uniform keySpaceGenType, empty weightedRand).

These generators are part of the framework fields which persist across
iterations. The numbers produced by the generator shape the distribution across
iterations.

- rangeKeyGenType: determines range generator type across iterations (default:
uniformGenerator, min = 1, max = 1000)

- keySpaceGenType: determines key space generator type across iterations
(default: uniformGenerator, min = 1000, max = 200000)

- weightedRand: if non-empty, enables weighted randomization for range
distribution

This provides three modes for range generation:
1. Default: currently set to uniform distribution
2. Random: randomly generates range distribution across stores
3. Weighted Randomization: enables weighted randomization for range distribution
if and only if given weightedRand is non-empty

Part of: #106311

Release note: None

108906: changefeedccl: make tests work offline r=miretskiy a=jayshrivastava

This change updates two unit tests to pass when
running offline.

Fixes: #108867
Release note: None
Epic: None

Co-authored-by: Miral Gadani <miral@cockroachlabs.com>
Co-authored-by: wenyihu6 <wenyi.hu@cockroachlabs.com>
Co-authored-by: Jayant Shrivastava <jayants@cockroachlabs.com>
  • Loading branch information
4 people committed Aug 17, 2023
4 parents 17d0350 + 3c64af2 + 5e55ef7 + f353d4c commit 8d5ff3f
Show file tree
Hide file tree
Showing 17 changed files with 954 additions and 760 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ require (
github.com/andygrunwald/go-jira v1.14.0
github.com/apache/arrow/go/arrow v0.0.0-20200923215132-ac86123a3f01
github.com/apache/arrow/go/v11 v11.0.0
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.18.2
github.com/axiomhq/hyperloglog v0.0.0-20181223111420-4b99d0c2c99e
github.com/bazelbuild/rules_go v0.26.0
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2
github.com/apache/thrift v0.16.0 h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
github.com/apache/thrift v0.16.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/scheduled_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestCreateChangefeedScheduleIfNotExists(t *testing.T) {
th.sqlDB.Exec(t, "CREATE TABLE t1 (a INT)")

const scheduleLabel = "foo"
const createQuery = "CREATE SCHEDULE IF NOT EXISTS '%s' FOR CHANGEFEED TABLE t1 INTO 's3://bucket?AUTH=implicit' WITH initial_scan = 'only' RECURRING '@daily'"
const createQuery = "CREATE SCHEDULE IF NOT EXISTS '%s' FOR CHANGEFEED TABLE t1 INTO 'null://' WITH initial_scan = 'only' RECURRING '@daily'"

th.sqlDB.Exec(t, fmt.Sprintf(createQuery, scheduleLabel))

Expand Down Expand Up @@ -387,7 +387,7 @@ func TestCreateChangefeedScheduleInExplicitTxnRollback(t *testing.T) {
require.NoError(t, res.Err())

th.sqlDB.Exec(t, "BEGIN;")
th.sqlDB.Exec(t, "CREATE SCHEDULE FOR CHANGEFEED TABLE t1 INTO 's3://bucket?AUTH=implicit' WITH initial_scan = 'only' RECURRING '@daily';")
th.sqlDB.Exec(t, "CREATE SCHEDULE FOR CHANGEFEED TABLE t1 INTO 'null://' WITH initial_scan = 'only' RECURRING '@daily';")
th.sqlDB.Exec(t, "ROLLBACK;")

res = th.sqlDB.Query(t, "SELECT id FROM [SHOW SCHEDULES FOR CHANGEFEED]")
Expand Down
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/version",
"@com_github_armon_circbuf//:circbuf",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_lib_pq//:pq",
Expand Down
263 changes: 50 additions & 213 deletions pkg/cmd/roachtest/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"sync/atomic"
"time"

"github.com/armon/circbuf"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
Expand Down Expand Up @@ -434,161 +433,6 @@ func initBinariesAndLibraries() {
}
}

// execCmd is like execCmdEx, but doesn't return the command's output.
func execCmd(
ctx context.Context, l *logger.Logger, clusterName string, secure bool, args ...string,
) error {
return execCmdEx(ctx, l, clusterName, secure, args...).err
}

type cmdRes struct {
err error
// stdout and stderr are the commands output. Note that this is truncated and
// only a tail is returned.
stdout, stderr string
}

// execCmdEx runs a command and returns its error and output.
//
// Note that the output is truncated; only a tail is returned.
// Also note that if the command exits with an error code, its output is also
// included in cmdRes.err.
func execCmdEx(
ctx context.Context, l *logger.Logger, clusterName string, secure bool, args ...string,
) cmdRes {
var cancel func()
ctx, cancel = context.WithCancel(ctx)
defer cancel()

l.Printf("> %s\n", strings.Join(args, " "))
var roachprodRunStdout, roachprodRunStderr io.Writer

debugStdoutBuffer, _ := circbuf.NewBuffer(4096)
debugStderrBuffer, _ := circbuf.NewBuffer(4096)

// Do a dance around https://github.com/golang/go/issues/23019.
// When the command we run launches a subprocess, that subprocess receives
// a copy of our Command's Stdout/Stderr file descriptor, which effectively
// means that the file descriptors close only when that subcommand returns.
// However, proactively killing the subcommand is not really possible - we
// will only manage to kill the parent process that we launched directly.
// In practice this means that if we try to react to context cancellation,
// the pipes we read the output from will wait for the *subprocess* to
// terminate, leaving us hanging, potentially indefinitely.
// To work around it, use pipes and set a read deadline on our (read) end of
// the pipes when we detect a context cancellation.
var closePipes func(ctx context.Context)
var wg sync.WaitGroup
{

var wOut, wErr, rOut, rErr *os.File
var cwOnce sync.Once
closePipes = func(ctx context.Context) {
// Idempotently closes the writing end of the pipes. This is called either
// when the process returns or when it was killed due to context
// cancellation. In the former case, close the writing ends of the pipe
// so that the copy goroutines started below return (without missing any
// output). In the context cancellation case, we set a deadline to force
// the goroutines to quit eagerly. This is important since the command
// may have duplicated wOut and wErr to its possible subprocesses, which
// may continue to run for long periods of time, and would otherwise
// block this command. In theory this is possible also when the command
// returns on its own accord, so we set a (more lenient) deadline in the
// first case as well.
//
// NB: there's also the option (at least on *nix) to use a process group,
// but it doesn't look portable:
// https://medium.com/@felixge/killing-a-child-process-and-all-of-its-children-in-go-54079af94773
cwOnce.Do(func() {
if wOut != nil {
_ = wOut.Close()
}
if wErr != nil {
_ = wErr.Close()
}
dur := 10 * time.Second // wait up to 10s for subprocesses
if ctx.Err() != nil {
dur = 10 * time.Millisecond
}
deadline := timeutil.Now().Add(dur)
if rOut != nil {
_ = rOut.SetReadDeadline(deadline)
}
if rErr != nil {
_ = rErr.SetReadDeadline(deadline)
}
})
}
defer closePipes(ctx)

var err error
rOut, wOut, err = os.Pipe()
if err != nil {
return cmdRes{err: err}
}

rErr, wErr, err = os.Pipe()
if err != nil {
return cmdRes{err: err}
}
roachprodRunStdout = wOut
wg.Add(1)
go func() {
defer wg.Done()
_, _ = io.Copy(l.Stdout, io.TeeReader(rOut, debugStdoutBuffer))
}()

if l.Stderr == l.Stdout {
// If l.Stderr == l.Stdout, we use only one pipe to avoid
// duplicating everything.
roachprodRunStderr = wOut
} else {
roachprodRunStderr = wErr
wg.Add(1)
go func() {
defer wg.Done()
_, _ = io.Copy(l.Stderr, io.TeeReader(rErr, debugStderrBuffer))
}()
}
}

err := roachprod.Run(ctx, l, clusterName, "" /* SSHOptions */, "" /* processTag */, secure, roachprodRunStdout, roachprodRunStderr, args)
closePipes(ctx)
wg.Wait()

stdoutString := debugStdoutBuffer.String()
if debugStdoutBuffer.TotalWritten() > debugStdoutBuffer.Size() {
stdoutString = "<... some data truncated by circular buffer; go to artifacts for details ...>\n" + stdoutString
}
stderrString := debugStderrBuffer.String()
if debugStderrBuffer.TotalWritten() > debugStderrBuffer.Size() {
stderrString = "<... some data truncated by circular buffer; go to artifacts for details ...>\n" + stderrString
}

if err != nil {
// Context errors opaquely appear as "signal killed" when manifested.
// We surface this error explicitly.
if ctx.Err() != nil {
err = errors.CombineErrors(ctx.Err(), err)
}

if err != nil {
err = &cluster.WithCommandDetails{
Wrapped: err,
Cmd: strings.Join(args, " "),
Stderr: stderrString,
Stdout: stdoutString,
}
}
}

return cmdRes{
err: err,
stdout: stdoutString,
stderr: stderrString,
}
}

type clusterRegistry struct {
mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -2300,41 +2144,31 @@ func (c *clusterImpl) Run(ctx context.Context, node option.NodeListOption, args
// will be redirected to a file which is logged via the cluster-wide logger in
// case of an error. Logs will sort chronologically. Failing invocations will
// have an additional marker file with a `.failed` extension instead of `.log`.
func (c *clusterImpl) RunE(ctx context.Context, node option.NodeListOption, args ...string) error {
func (c *clusterImpl) RunE(ctx context.Context, nodes option.NodeListOption, args ...string) error {
if len(args) == 0 {
return errors.New("No command passed")
}
l, logFile, err := c.loggerForCmd(node, args...)
l, logFile, err := c.loggerForCmd(nodes, args...)
if err != nil {
return err
}
defer l.Close()

if err := errors.Wrap(ctx.Err(), "cluster.RunE"); err != nil {
return err
}
err = execCmd(ctx, l, c.MakeNodes(node), c.IsSecure(), args...)

l.Printf("> result: %+v", err)
if err := ctx.Err(); err != nil {
l.Printf("(note: incoming context was canceled: %s", err)
}
// We need to protect ourselves from a race where cluster logger is
// concurrently closed before child logger is created. In that case child
// logger will have no log file but would write to stderr instead and we can't
// create a meaningful ".failed" file for it.
physicalFileName := ""
if l.File != nil {
physicalFileName = l.File.Name()
}
l.Close()
if err != nil && len(physicalFileName) > 0 {
failedPhysicalFileName := strings.TrimSuffix(physicalFileName, ".log") + ".failed"
if failedFile, err2 := os.Create(failedPhysicalFileName); err2 != nil {
failedFile.Close()
cmd := strings.Join(args, " ")
c.t.L().Printf("running cmd `%s` on nodes [%v]; details in %s.log", roachprod.TruncateString(cmd, 30), nodes, logFile)
l.Printf("> %s", cmd)
if err := roachprod.Run(ctx, l, c.MakeNodes(nodes), "", "", c.IsSecure(), l.Stdout, l.Stderr, args); err != nil {
if err := ctx.Err(); err != nil {
l.Printf("(note: incoming context was canceled: %s)", err)
return err
}

l.Printf("> result: %s", err)
createFailedFile(l.File.Name())
return errors.Wrapf(err, "full command output in %s.log", logFile)
}
err = errors.Wrapf(err, "output in %s", logFile)
return err
l.Printf("> result: <ok>")
return nil
}

// RunWithDetailsSingleNode is just like RunWithDetails but used when 1) operating
Expand All @@ -2348,10 +2182,7 @@ func (c *clusterImpl) RunWithDetailsSingleNode(
return install.RunResultDetails{}, errors.Newf("RunWithDetailsSingleNode received %d nodes. Use RunWithDetails if you need to run on multiple nodes.", len(nodes))
}
results, err := c.RunWithDetails(ctx, testLogger, nodes, args...)
if err != nil {
return install.RunResultDetails{}, err
}
return results[0], results[0].Err
return results[0], errors.CombineErrors(err, results[0].Err)
}

// RunWithDetails runs a command on the specified nodes, returning the results
Expand All @@ -2364,49 +2195,55 @@ func (c *clusterImpl) RunWithDetails(
if len(args) == 0 {
return nil, errors.New("No command passed")
}
l, _, err := c.loggerForCmd(nodes, args...)
l, logFile, err := c.loggerForCmd(nodes, args...)
if err != nil {
return nil, err
}
physicalFileName := ""
if l.File != nil {
physicalFileName = l.File.Name()
}
defer l.Close()

if err := ctx.Err(); err != nil {
l.Printf("(note: incoming context was canceled: %s", err)
return nil, err
}
cmd := strings.Join(args, " ")

l.Printf("running %s on nodes: %v", strings.Join(args, " "), nodes)
// This could probably be removed in favour of c.t.L() but it's used extensively in roachtests.
if testLogger != nil {
testLogger.Printf("> %s\n", strings.Join(args, " "))
testLogger.Printf("running cmd `%s` on nodes [%v]; details in %s.log", roachprod.TruncateString(cmd, 30), nodes, logFile)
}

l.Printf("> %s", cmd)
results, err := roachprod.RunWithDetails(ctx, l, c.MakeNodes(nodes), "" /* SSHOptions */, "" /* processTag */, c.IsSecure(), args)
if err != nil && len(physicalFileName) > 0 {
l.Printf("> result: %+v", err)
createFailedFile(physicalFileName)
return results, err

logFileFull := l.File.Name()
if err != nil {
if ctxErr := ctx.Err(); ctxErr != nil {
l.Printf("(note: incoming context was canceled: %s)", err)
return nil, ctxErr
}

l.Printf("> result: %s", err)
createFailedFile(logFileFull)
return nil, err
}

hasError := false
for _, result := range results {
if result.Err != nil {
err = result.Err
l.Printf("> Error for Node %d: %+v", int(result.Node), result.Err)
hasError = true
l.Printf("> result: Error for Node %d: %+v", int(result.Node), result.Err)
}
}
if err != nil {
createFailedFile(physicalFileName)
if hasError {
createFailedFile(logFileFull)
} else {
l.Printf("> result: <ok>")
}
l.Close()
return results, nil
}

func createFailedFile(logFileName string) {
failedPhysicalFileName := strings.TrimSuffix(logFileName, ".log") + ".failed"
if failedFile, err2 := os.Create(failedPhysicalFileName); err2 != nil {
failedFile.Close()
func createFailedFile(logFile string) {
if logFile == "" {
return
}
if file, err := os.Create(strings.TrimSuffix(logFile, ".log") + ".failed"); err == nil {
file.Close()
}
}

Expand Down Expand Up @@ -2495,7 +2332,7 @@ func (c *clusterImpl) ExternalPGUrl(
return c.pgURLErr(ctx, l, node, true, tenant)
}

func addrToAdminUIAddr(c *clusterImpl, addr string) (string, error) {
func addrToAdminUIAddr(addr string) (string, error) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return "", err
Expand Down Expand Up @@ -2547,7 +2384,7 @@ func (c *clusterImpl) InternalAdminUIAddr(
return nil, err
}
for _, u := range urls {
adminUIAddr, err := addrToAdminUIAddr(c, u)
adminUIAddr, err := addrToAdminUIAddr(u)
if err != nil {
return nil, err
}
Expand All @@ -2567,7 +2404,7 @@ func (c *clusterImpl) ExternalAdminUIAddr(
return nil, err
}
for _, u := range externalAddrs {
adminUIAddr, err := addrToAdminUIAddr(c, u)
adminUIAddr, err := addrToAdminUIAddr(u)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 8d5ff3f

Please sign in to comment.