Skip to content
This repository has been archived by the owner on Feb 9, 2024. It is now read-only.

Commit

Permalink
Expost stdout and stderr separately in all exec-related APIs (incl.
Browse files Browse the repository at this point in the history
remote calls) to be able to properly handle error conditions in the
debug reporter.
One of the issues previously was that the reports would
contain bogus logs (i.e. previous container logs that could not be
captured while they did not exist, etc.)
Also replaces the ad-hoc kubernetes capture code in favor of the
'kubectl cluster-info dump' to capture all of the information as a
directory tree.
  • Loading branch information
a-palchikov committed Feb 7, 2020
1 parent f37329a commit 8da21c9
Show file tree
Hide file tree
Showing 28 changed files with 227 additions and 162 deletions.
26 changes: 13 additions & 13 deletions lib/checks/checks.go
Expand Up @@ -518,18 +518,18 @@ func (r *checker) checkServerDisk(ctx context.Context, server storage.Server, ta
defer func() {
// testfile was created only on real filesystem
if !strings.HasPrefix(target, "/dev") {
err := r.Remote.Exec(ctx, server.AdvertiseIP, []string{"rm", target}, &out)
err := r.Remote.Exec(ctx, server.AdvertiseIP, []string{"rm", target}, nil, &out)
if err != nil {
log.Errorf("Failed to remove test file: %v %v.", out.String(), trace.DebugReport(err))
log.WithError(err).Errorf("Failed to remove test file: %v.", out.String())
}
}
}()

err := r.Remote.Exec(ctx, server.AdvertiseIP, []string{
"dd", "if=/dev/zero", fmt.Sprintf("of=%v", target),
"bs=100K", "count=1024", "conv=fdatasync"}, &out)
"bs=100K", "count=1024", "conv=fdatasync"}, &out, &out)
if err != nil {
return 0, trace.Wrap(err)
return 0, trace.Wrap(err, "failed to copy file: %s", out.String())
}

speed, err := utils.ParseDDOutput(out.String())
Expand All @@ -543,18 +543,18 @@ func (r *checker) checkServerDisk(ctx context.Context, server storage.Server, ta
// checkTempDir makes sure agents can create temporary files on servers
func (r *checker) checkTempDir(ctx context.Context, server Server) error {
filename := filepath.Join(server.TempDir, fmt.Sprintf("tmpcheck.%v", uuid.New()))
var out bytes.Buffer
var stderr bytes.Buffer

err := r.Remote.Exec(ctx, server.AdvertiseIP, []string{"touch", filename}, &out)
err := r.Remote.Exec(ctx, server.AdvertiseIP, []string{"touch", filename}, nil, &stderr)
if err != nil {
return trace.BadParameter("couldn't create a test file in temp directory %v on %q: %v",
server.TempDir, server.ServerInfo.GetHostname(), out.String())
return trace.BadParameter("failed to create a test file in temp directory %v on %q: %v",
server.TempDir, server.ServerInfo.GetHostname(), stderr.String())
}

err = r.Remote.Exec(ctx, server.AdvertiseIP, []string{"rm", filename}, &out)
err = r.Remote.Exec(ctx, server.AdvertiseIP, []string{"rm", filename}, nil, &stderr)
if err != nil {
log.Errorf("Failed to delete %v on %v: %v %v.",
filename, server.AdvertiseIP, trace.DebugReport(err), out.String())
log.WithError(err).Errorf("Failed to delete %v on %v: %v.",
filename, server.AdvertiseIP, stderr.String())
}

log.Infof("Server %q passed temp directory check: %v.",
Expand Down Expand Up @@ -1062,8 +1062,8 @@ func ifTestsDisabled() bool {

// RunStream executes the specified command on r.server.
// Implements utils.CommandRunner
func (r *serverRemote) RunStream(ctx context.Context, w io.Writer, args ...string) error {
return trace.Wrap(r.remote.Exec(ctx, r.server.AdvertiseIP, args, w))
func (r *serverRemote) RunStream(ctx context.Context, stdout, stderr io.Writer, args ...string) error {
return trace.Wrap(r.remote.Exec(ctx, r.server.AdvertiseIP, args, stdout, stderr))
}

type serverRemote struct {
Expand Down
6 changes: 3 additions & 3 deletions lib/checks/remote.go
Expand Up @@ -36,7 +36,7 @@ import (
// Remote defines an interface for validating remote nodes.
type Remote interface {
// Exec executes the command remotely on the specified node.
Exec(ctx context.Context, addr string, command []string, out io.Writer) error
Exec(ctx context.Context, addr string, command []string, stdout, stderr io.Writer) error
// CheckPorts executes network test to test port availability.
CheckPorts(context.Context, PingPongGame) (PingPongGameResults, error)
// CheckBandwidth executes network bandwidth test.
Expand Down Expand Up @@ -77,12 +77,12 @@ type remote struct {
// Exec executes the command remotely on the specified node.
//
// The command's output is written to the provided writer.
func (r *remote) Exec(ctx context.Context, addr string, command []string, out io.Writer) error {
func (r *remote) Exec(ctx context.Context, addr string, command []string, stdout, stderr io.Writer) error {
clt, err := r.GetClient(ctx, addr)
if err != nil {
return trace.Wrap(err)
}
err = clt.Command(ctx, r.FieldLogger, out, command...)
err = clt.Command(ctx, r.FieldLogger, stdout, stderr, command...)
if err != nil {
return trace.Wrap(err)
}
Expand Down
12 changes: 6 additions & 6 deletions lib/expand/phases/etcd.go
Expand Up @@ -140,7 +140,7 @@ func (p *etcdExecutor) Rollback(ctx context.Context) error {

func (p *etcdExecutor) checkBackup(ctx context.Context, agent rpcclient.Client, backupPath string) error {
var out bytes.Buffer
err := agent.Command(ctx, p.FieldLogger, &out, utils.PlanetEnterCommand(
err := agent.Command(ctx, p.FieldLogger, &out, &out, utils.PlanetEnterCommand(
defaults.StatBin, backupPath)...)
if err != nil {
return trace.Wrap(err, "failed to check backup file %v: %s", backupPath, out.String())
Expand All @@ -150,7 +150,7 @@ func (p *etcdExecutor) checkBackup(ctx context.Context, agent rpcclient.Client,

func (p *etcdExecutor) stopEtcd(ctx context.Context, agent rpcclient.Client) error {
var out bytes.Buffer
err := agent.Command(ctx, p.FieldLogger, &out, utils.PlanetEnterCommand(
err := agent.Command(ctx, p.FieldLogger, &out, &out, utils.PlanetEnterCommand(
defaults.SystemctlBin, "stop", "etcd")...)
if err != nil {
return trace.Wrap(err, "failed to stop etcd: %s", out.String())
Expand All @@ -160,7 +160,7 @@ func (p *etcdExecutor) stopEtcd(ctx context.Context, agent rpcclient.Client) err

func (p *etcdExecutor) wipeEtcd(ctx context.Context, agent rpcclient.Client) error {
var out bytes.Buffer
err := agent.Command(ctx, p.FieldLogger, &out, utils.PlanetEnterCommand(
err := agent.Command(ctx, p.FieldLogger, &out, &out, utils.PlanetEnterCommand(
defaults.PlanetBin, "etcd", "wipe", "--confirm")...)
if err != nil {
return trace.Wrap(err, "failed to wipe out etcd data: %s", out.String())
Expand All @@ -170,7 +170,7 @@ func (p *etcdExecutor) wipeEtcd(ctx context.Context, agent rpcclient.Client) err

func (p *etcdExecutor) startEtcd(ctx context.Context, agent rpcclient.Client) error {
var out bytes.Buffer
err := agent.Command(ctx, p.FieldLogger, &out, utils.PlanetEnterCommand(
err := agent.Command(ctx, p.FieldLogger, &out, &out, utils.PlanetEnterCommand(
defaults.SystemctlBin, "start", "etcd")...)
if err != nil {
return trace.Wrap(err, "failed to start etcd: %s", out.String())
Expand All @@ -181,7 +181,7 @@ func (p *etcdExecutor) startEtcd(ctx context.Context, agent rpcclient.Client) er
func (p *etcdExecutor) restoreEtcd(ctx context.Context, agent rpcclient.Client, backupPath string) error {
var out bytes.Buffer
err := utils.Retry(defaults.RetryInterval, defaults.RetryLessAttempts, func() error {
return agent.Command(ctx, p.FieldLogger, &out, utils.PlanetEnterCommand(
return agent.Command(ctx, p.FieldLogger, &out, &out, utils.PlanetEnterCommand(
defaults.PlanetBin, "etcd", "restore", backupPath)...)
})
if err != nil {
Expand Down Expand Up @@ -247,7 +247,7 @@ func (p *etcdBackupExecutor) Execute(ctx context.Context) error {

func (p *etcdBackupExecutor) backupEtcd(ctx context.Context, agent rpcclient.Client, backupPath string) error {
var out bytes.Buffer
err := agent.Command(ctx, p.FieldLogger, &out, utils.PlanetEnterCommand(
err := agent.Command(ctx, p.FieldLogger, &out, &out, utils.PlanetEnterCommand(
defaults.PlanetBin, "etcd", "backup", backupPath)...)
if err != nil {
return trace.Wrap(err, "failed to backup etcd data: %s", out.String())
Expand Down
5 changes: 3 additions & 2 deletions lib/fsm/rpc.go
Expand Up @@ -119,8 +119,9 @@ func (r *agentRunner) Run(ctx context.Context, server storage.Server, args ...st
args, serverName(server))
}
logger.Debug("Executing remotely: ", args)
err = agent.GravityCommand(ctx, logger, nil, args...)
return trace.Wrap(err)
var stderr bytes.Buffer
err = agent.GravityCommand(ctx, logger, nil, &stderr, args...)
return trace.Wrap(err, "failed to execute command %v: %s", args, stderr.String())
default:
return trace.Errorf("internal error, canExecute=%v", canRun)
}
Expand Down
4 changes: 2 additions & 2 deletions lib/ops/checks.go
Expand Up @@ -82,8 +82,8 @@ func FormatValidationError(err error) error {

// Exec executes an arbitrary command on the remote node specified with addr.
// The output is written into out
func (r *remoteCommands) Exec(ctx context.Context, addr string, args []string, out io.Writer) error {
return trace.Wrap(r.AgentService.Exec(ctx, r.key, addr, args, out))
func (r *remoteCommands) Exec(ctx context.Context, addr string, args []string, stdout, stderr io.Writer) error {
return trace.Wrap(r.AgentService.Exec(ctx, r.key, addr, args, stdout, stderr))
}

// CheckPorts validates the cluster port availability
Expand Down
6 changes: 3 additions & 3 deletions lib/ops/ops.go
Expand Up @@ -81,7 +81,7 @@ type TeleportProxyService interface {

// ExecuteCommand executes a command on a remote node addrress
// for a given site domain
ExecuteCommand(ctx context.Context, domainName, nodeAddr, command string, out io.Writer) error
ExecuteCommand(ctx context.Context, domainName, nodeAddr, command string, stdout, stderr io.Writer) error

// GetClient returns admin client to local proxy
GetClient() teleauth.ClientI
Expand Down Expand Up @@ -1288,12 +1288,12 @@ type AgentService interface {

// Exec executes the command specified with args on a remote server given with addr.
// It streams the process's output to the given writer out.
Exec(ctx context.Context, opKey SiteOperationKey, addr string, args []string, out io.Writer) error
Exec(ctx context.Context, opKey SiteOperationKey, addr string, args []string, stdout, stderr io.Writer) error

// ExecNoLog executes the command specified with args on a remote server given with addr.
// It streams the process's output to the given writer out.
// Underlying remote call output is not logged
ExecNoLog(ctx context.Context, opKey SiteOperationKey, addr string, args []string, out io.Writer) error
ExecNoLog(ctx context.Context, opKey SiteOperationKey, addr string, args []string, stdout, stderr io.Writer) error

// Validate executes preflight checks on the node specified with addr
// against the specified manifest and profile.
Expand Down
12 changes: 6 additions & 6 deletions lib/ops/opsservice/agents.go
Expand Up @@ -143,25 +143,25 @@ func (r *AgentService) GetServerInfos(ctx context.Context, key ops.SiteOperation

// Exec executes command on a remote server
// that is identified by meeting point and agent's address addr
func (r *AgentService) Exec(ctx context.Context, key ops.SiteOperationKey, addr string, args []string, out io.Writer) error {
return r.exec(ctx, key, addr, args, out, r.FieldLogger)
func (r *AgentService) Exec(ctx context.Context, key ops.SiteOperationKey, addr string, args []string, stdout, stderr io.Writer) error {
return r.exec(ctx, key, addr, args, stdout, stderr, r.FieldLogger)
}

// ExecNoLog executes the command specified with args on a remote server given with addr.
// It streams the process's output to the given writer out.
// Underlying remote call output is not logged
func (r *AgentService) ExecNoLog(ctx context.Context, key ops.SiteOperationKey, addr string, args []string, out io.Writer) error {
return r.exec(ctx, key, addr, args, out, utils.DiscardingLog)
func (r *AgentService) ExecNoLog(ctx context.Context, key ops.SiteOperationKey, addr string, args []string, stdout, stderr io.Writer) error {
return r.exec(ctx, key, addr, args, stdout, stderr, utils.DiscardingLog)
}

func (r *AgentService) exec(ctx context.Context, key ops.SiteOperationKey, addr string, args []string, out io.Writer, log log.FieldLogger) error {
func (r *AgentService) exec(ctx context.Context, key ops.SiteOperationKey, addr string, args []string, stdout, stderr io.Writer, log log.FieldLogger) error {
group, err := r.peerStore.getOrCreateGroup(key)
if err != nil {
return trace.Wrap(err)
}

addr = rpc.AgentAddr(addr)
return trace.Wrap(group.WithContext(ctx, addr).Command(ctx, r.FieldLogger, out, args...))
return trace.Wrap(group.WithContext(ctx, addr).Command(ctx, r.FieldLogger, stdout, stderr, args...))
}

// Validate executes preflight checks on the node specified with addr
Expand Down
34 changes: 19 additions & 15 deletions lib/ops/opsservice/report.go
Expand Up @@ -141,11 +141,12 @@ func (s *site) getReport(runner remoteRunner, servers []remoteServer, master rem
}
serverRunner := &serverRunner{server: server, runner: runner}
reportWriter := getReportWriterForServer(dir, server)
logger := log.WithField("server", server.Address())
if err := s.collectKubernetesInfo(reportWriter, serverRunner); err != nil {
log.WithError(err).Error("Failed to collect Kubernetes info.")
logger.WithError(err).Error("Failed to collect Kubernetes info.")
}
if err := s.collectEtcdBackup(reportWriter, serverRunner); err != nil {
log.WithError(err).Error("Failed to collect etcd backup.")
logger.WithError(err).Error("Failed to collect etcd backup.")
}
if err := s.collectDebugInfoFromServers(dir, servers, runner); err != nil {
log.WithError(err).Error("Failed to collect diagnostics from some nodes.")
Expand Down Expand Up @@ -201,11 +202,12 @@ func (s *site) collectDebugInfo(reportWriter report.FileWriter, runner *serverRu
}
defer w.Close()

err = runner.RunStream(w, s.gravityCommand("system", "report",
var stderr bytes.Buffer
err = runner.RunStream(w, &stderr, s.gravityCommand("system", "report",
fmt.Sprintf("--filter=%v", report.FilterSystem),
"--compressed")...)
if err != nil {
return trace.Wrap(err, "failed to collect diagnostics")
return trace.Wrap(err, "failed to collect diagnostics: %s", stderr.String())
}
return nil
}
Expand All @@ -217,10 +219,11 @@ func (s *site) collectKubernetesInfo(reportWriter report.FileWriter, runner *ser
}
defer w.Close()

err = runner.RunStream(w, s.gravityCommand("system", "report",
var stderr bytes.Buffer
err = runner.RunStream(w, &stderr, s.gravityCommand("system", "report",
fmt.Sprintf("--filter=%v", report.FilterKubernetes), "--compressed")...)
if err != nil {
return trace.Wrap(err, "failed to collect kubernetes diagnostics")
return trace.Wrap(err, "failed to collect kubernetes diagnostics: %s", stderr.String())
}
return nil
}
Expand All @@ -231,10 +234,11 @@ func (s *site) collectEtcdBackup(reportWriter report.FileWriter, runner *serverR
return trace.Wrap(err)
}
defer w.Close()
err = runner.RunStream(w, s.gravityCommand("system", "report", fmt.Sprintf(
var stderr bytes.Buffer
err = runner.RunStream(w, &stderr, s.gravityCommand("system", "report", fmt.Sprintf(
"--filter=%v", report.FilterEtcd), "--compressed")...)
if err != nil {
return trace.Wrap(err)
return trace.Wrap(err, "failed to collect etcd backup: %s", stderr.String())
}
return nil
}
Expand All @@ -246,7 +250,7 @@ func runCollectors(site site, dir string) error {
}

collectors := []collectorFn{
collectSiteInfo(*storageSite),
collectClusterInfo(*storageSite),
collectDumpHook,
}
reportWriter := report.NewFileWriter(dir)
Expand All @@ -255,7 +259,7 @@ func runCollectors(site site, dir string) error {
for _, collector := range collectors {
err := collector(reportWriter, site)
if err != nil {
log.Errorf("failed to collect diagnostics: %v", trace.DebugReport(err))
log.WithError(err).Error("Failed to collect diagnostics.")
}
}
return nil
Expand All @@ -279,10 +283,10 @@ func collectOperationsLogs(site site, dir string) error {
return nil
}

// collectSiteInfo returns JSON-formatted site information
func collectSiteInfo(s storage.Site) collectorFn {
// collectClusterInfo returns JSON-formatted cluster information
func collectClusterInfo(s storage.Site) collectorFn {
return func(reportWriter report.FileWriter, site site) error {
w, err := reportWriter.NewWriter(siteInfoFilename)
w, err := reportWriter.NewWriter(clusterInfoFilename)
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -355,8 +359,8 @@ func isActiveInstallOperation(op ops.SiteOperation) bool {
}

const (
// siteInfoFilename is the name of the file with JSON-dumped site
siteInfoFilename = "site.json"
// clusterInfoFilename is the name of the file with JSON-encoded cluster metadata
clusterInfoFilename = "cluster.json"
// dumpHookFilename is the name of the file with dump hook output
dumpHookFilename = "dump-hook"
// opLogsFilename defines the file pattern that stores operation log for a particular
Expand Down
2 changes: 1 addition & 1 deletion lib/ops/opsservice/report_test.go
Expand Up @@ -39,7 +39,7 @@ func (s *ReportSuite) TestClusterInfo(c *check.C) {
License: "license",
}
w := &bufferWriter{}
err := collectSiteInfo(cluster)(w, site{})
err := collectClusterInfo(cluster)(w, site{})
c.Assert(err, check.IsNil)
var fromReport storage.Site
c.Assert(json.Unmarshal(w.Bytes(), &fromReport), check.IsNil)
Expand Down
22 changes: 11 additions & 11 deletions lib/ops/opsservice/runner.go
Expand Up @@ -35,7 +35,7 @@ type remoteRunner interface {
// Run runs the provided command on the specified server
Run(server remoteServer, args ...string) ([]byte, error)
// RunStream runs the provided command on the specified server and streams output to w
RunStream(server remoteServer, w io.Writer, args ...string) error
RunStream(server remoteServer, stdout, stderr io.Writer, args ...string) error
// RunCmd runs the provided command on the specified server and logs
// its results into the operation context
RunCmd(operationContext, remoteServer, Command) ([]byte, error)
Expand All @@ -54,9 +54,9 @@ type teleportRunner struct {
}

// RunStream runs the provided command on the specified server and streams output to w
func (r *teleportRunner) RunStream(server remoteServer, w io.Writer, args ...string) error {
func (r *teleportRunner) RunStream(server remoteServer, stdout, stderr io.Writer, args ...string) error {
command := strings.Join(args, " ")
err := r.ExecuteCommand(context.TODO(), r.domainName, server.Address(), command, w)
err := r.ExecuteCommand(context.TODO(), r.domainName, server.Address(), command, stdout, stderr)

logger := r.WithFields(log.Fields{
constants.FieldServer: server.Address(),
Expand All @@ -73,8 +73,8 @@ func (r *teleportRunner) RunStream(server remoteServer, w io.Writer, args ...str

// Run runs the provided command on the specified server
func (r *teleportRunner) Run(server remoteServer, args ...string) ([]byte, error) {
out := &bytes.Buffer{}
err := r.RunStream(server, out, args...)
var out bytes.Buffer
err := r.RunStream(server, &out, &out, args...)
if err != nil {
return out.Bytes(), trace.Wrap(err, out.String())
}
Expand All @@ -97,8 +97,8 @@ type agentRunner struct {
}

// RunStream runs the provided command on the specified server and streams output to w
func (r *agentRunner) RunStream(server remoteServer, w io.Writer, args ...string) error {
err := r.AgentService.ExecNoLog(context.TODO(), r.ctx.key(), server.Address(), args, w)
func (r *agentRunner) RunStream(server remoteServer, stdout, stderr io.Writer, args ...string) error {
err := r.AgentService.ExecNoLog(context.TODO(), r.ctx.key(), server.Address(), args, stdout, stderr)

entry := r.ctx.WithFields(log.Fields{
constants.FieldServer: server.Address(),
Expand All @@ -116,8 +116,8 @@ func (r *agentRunner) RunStream(server remoteServer, w io.Writer, args ...string

// Run runs the provided command on the specified server
func (r *agentRunner) Run(server remoteServer, args ...string) ([]byte, error) {
out := &bytes.Buffer{}
err := r.RunStream(server, out, args...)
var out bytes.Buffer
err := r.RunStream(server, &out, &out, args...)
if err != nil {
return out.Bytes(), trace.Wrap(err)
}
Expand All @@ -141,8 +141,8 @@ type serverRunner struct {
}

// RunStream runs the provided command and streams output to w
func (r *serverRunner) RunStream(w io.Writer, args ...string) error {
return r.runner.RunStream(r.server, w, args...)
func (r *serverRunner) RunStream(stdout, stderr io.Writer, args ...string) error {
return r.runner.RunStream(r.server, stdout, stderr, args...)
}

// Run runs the provided command
Expand Down

0 comments on commit 8da21c9

Please sign in to comment.