diff --git a/tool/tsh/kubectl.go b/tool/tsh/kubectl.go index 579916aee652d..af193c95491c1 100644 --- a/tool/tsh/kubectl.go +++ b/tool/tsh/kubectl.go @@ -18,24 +18,29 @@ package main import ( "bufio" + "context" "errors" "fmt" "io" + "net/http" "os" "os/exec" "path/filepath" "regexp" "strings" + "time" "github.com/gravitational/trace" "golang.org/x/sync/errgroup" "k8s.io/cli-runtime/pkg/genericclioptions" _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/client-go/rest" "k8s.io/component-base/cli" "k8s.io/kubectl/pkg/cmd" "k8s.io/kubectl/pkg/cmd/plugin" cmdutil "k8s.io/kubectl/pkg/cmd/util" + tracehttp "github.com/gravitational/teleport/api/observability/tracing/http" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/kube/kubeconfig" ) @@ -72,13 +77,12 @@ type resourceKind struct { // outputs and decides if creating an access request is appropriate. // If the access request is created, tsh waits for the approval and runs the expected // command again. -func onKubectlCommand(cf *CLIConf, args []string) error { +func onKubectlCommand(cf *CLIConf, fullArgs []string, args []string) error { if os.Getenv(tshKubectlReexecEnvVar) == "" { - err := runKubectlAndCollectRun(cf, args) + err := runKubectlAndCollectRun(cf, fullArgs) return trace.Wrap(err) } - - runKubectlCode(args) + runKubectlCode(cf, args) return nil } @@ -101,17 +105,64 @@ func runKubectlReexec(selfExec string, args []string, collector io.Writer) error return trace.Wrap(cmd.Run()) } +// wrapConfigFn wraps the rest.Config with a custom RoundTripper if the user +// wants to sample traces. +func wrapConfigFn(cf *CLIConf) func(c *rest.Config) *rest.Config { + return func(c *rest.Config) *rest.Config { + c.Wrap( + func(rt http.RoundTripper) http.RoundTripper { + if cf.SampleTraces { + // If the user wants to sample traces, wrap the transport with a trace + // transport. + return tracehttp.NewTransport(rt) + } + return rt + }, + ) + return c + } +} + // runKubectlCode runs the actual kubectl package code with the default options. // This code is only executed when `tshKubectlReexec` env is present. This happens // because we need to retry kubectl calls and `kubectl` calls os.Exit in multiple // paths. -func runKubectlCode(args []string) { +func runKubectlCode(cf *CLIConf, args []string) { + closeTracer := func() {} + if cf.SampleTraces { + provider, err := newTraceProvider(cf, "", nil) + if err != nil { + log.WithError(err).Debug("Failed to set up span forwarding") + } else { + // only update the provider if we successfully set it up + cf.TracingProvider = provider + + // ensure that the provider is shutdown on exit to flush any spans + // that haven't been forwarded yet. + closeTracer = func() { + shutdownCtx, cancel := context.WithTimeout(cf.Context, 1*time.Second) + defer cancel() + err := provider.Shutdown(shutdownCtx) + if err != nil && !errors.Is(err, context.DeadlineExceeded) { + log.WithError(err).Debugf("Failed to shutdown trace provider") + } + } + } + } + // If the user opted to not sample traces, cf.TracingProvider is pre-initialized + // with a noop provider. + ctx, span := cf.TracingProvider.Tracer("kubectl").Start(cf.Context, "kubectl") + closeSpanAndTracer := func() { + span.End() + closeTracer() + } // These values are the defaults used by kubectl and can be found here: // https://github.com/kubernetes/kubectl/blob/3612c18ed86fc0a2f4467ca355b3e21569fabe0a/pkg/cmd/cmd.go#L94 defaultConfigFlags := genericclioptions.NewConfigFlags(true). WithDeprecatedPasswordFlag(). WithDiscoveryBurst(300). - WithDiscoveryQPS(50.0) + WithDiscoveryQPS(50.0). + WithWrapConfigFn(wrapConfigFn(cf)) command := cmd.NewDefaultKubectlCommandWithArgs( cmd.KubectlOptions{ @@ -123,13 +174,18 @@ func runKubectlCode(args []string) { IOStreams: genericclioptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr}, }, ) + command.SetContext(ctx) // override args without kubectl to avoid errors. command.SetArgs(args[1:]) + // run command until it finishes. if err := cli.RunNoErrOutput(command); err != nil { + closeSpanAndTracer() // Pretty-print the error and exit with an error. cmdutil.CheckErr(err) } + + closeSpanAndTracer() os.Exit(0) } @@ -177,7 +233,6 @@ func runKubectlAndCollectRun(cf *CLIConf, args []string) error { return trace.Wrap(scanner.Err()) }, ) - err := runKubectlReexec(cf.executablePath, args, writer) writer.CloseWithError(io.EOF) diff --git a/tool/tsh/tsh.go b/tool/tsh/tsh.go index 5d4090cd1b1aa..b8d7b91ee165c 100644 --- a/tool/tsh/tsh.go +++ b/tool/tsh/tsh.go @@ -1074,7 +1074,15 @@ func Run(ctx context.Context, args []string, opts ...cliOption) error { // Connect to the span exporter and initialize the trace provider only if // the --trace flag was set. - if cf.SampleTraces { + // kubectl is a special case because it is the only command that we re-execute + // in order to be able to access the exit code and stdout/stderr of the command + // that was run and determine if we should create a new access request from + // the output data. + // We don't want to enable tracing for the master invocation of tsh kubectl + // because the data that we would be tracing would be the tsh kubectl command. + // Instead, we want to enable tracing for the re-executed kubectl command and + // we do that in the kubectl command handler. + if cf.SampleTraces && cf.command != kubectl.FullCommand() { // login only needs to be ignored if forwarding to auth var ignored []string if cf.TraceExporter == "" { @@ -1315,7 +1323,7 @@ func Run(ctx context.Context, args []string, opts ...cliOption) error { err = deviceCmd.keyget.run(&cf) case kubectl.FullCommand(): idx := slices.Index(args, kubectl.FullCommand()) - err = onKubectlCommand(&cf, args[idx:]) + err = onKubectlCommand(&cf, args, args[idx:]) case approve.FullCommand(): err = onHeadlessApprove(&cf) default: