Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v13] Add tsh kubectl support for tracer exporter #27130

Merged
merged 4 commits into from May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 62 additions & 7 deletions tool/tsh/kubectl.go
Expand Up @@ -18,24 +18,29 @@ package main

import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"time"

"github.com/gravitational/trace"
"golang.org/x/sync/errgroup"
"k8s.io/cli-runtime/pkg/genericclioptions"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/component-base/cli"
"k8s.io/kubectl/pkg/cmd"
"k8s.io/kubectl/pkg/cmd/plugin"
cmdutil "k8s.io/kubectl/pkg/cmd/util"

tracehttp "github.com/gravitational/teleport/api/observability/tracing/http"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/kube/kubeconfig"
)
Expand Down Expand Up @@ -72,13 +77,12 @@ type resourceKind struct {
// outputs and decides if creating an access request is appropriate.
// If the access request is created, tsh waits for the approval and runs the expected
// command again.
func onKubectlCommand(cf *CLIConf, args []string) error {
func onKubectlCommand(cf *CLIConf, fullArgs []string, args []string) error {
if os.Getenv(tshKubectlReexecEnvVar) == "" {
err := runKubectlAndCollectRun(cf, args)
err := runKubectlAndCollectRun(cf, fullArgs)
return trace.Wrap(err)
}

runKubectlCode(args)
runKubectlCode(cf, args)
return nil
}

Expand All @@ -101,17 +105,64 @@ func runKubectlReexec(selfExec string, args []string, collector io.Writer) error
return trace.Wrap(cmd.Run())
}

// wrapConfigFn wraps the rest.Config with a custom RoundTripper if the user
// wants to sample traces.
func wrapConfigFn(cf *CLIConf) func(c *rest.Config) *rest.Config {
return func(c *rest.Config) *rest.Config {
c.Wrap(
func(rt http.RoundTripper) http.RoundTripper {
if cf.SampleTraces {
// If the user wants to sample traces, wrap the transport with a trace
// transport.
return tracehttp.NewTransport(rt)
}
return rt
},
)
return c
}
}

// runKubectlCode runs the actual kubectl package code with the default options.
// This code is only executed when `tshKubectlReexec` env is present. This happens
// because we need to retry kubectl calls and `kubectl` calls os.Exit in multiple
// paths.
func runKubectlCode(args []string) {
func runKubectlCode(cf *CLIConf, args []string) {
closeTracer := func() {}
if cf.SampleTraces {
provider, err := newTraceProvider(cf, "", nil)
if err != nil {
log.WithError(err).Debug("Failed to set up span forwarding")
} else {
// only update the provider if we successfully set it up
cf.TracingProvider = provider

// ensure that the provider is shutdown on exit to flush any spans
// that haven't been forwarded yet.
closeTracer = func() {
shutdownCtx, cancel := context.WithTimeout(cf.Context, 1*time.Second)
defer cancel()
err := provider.Shutdown(shutdownCtx)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
log.WithError(err).Debugf("Failed to shutdown trace provider")
}
}
}
}
// If the user opted to not sample traces, cf.TracingProvider is pre-initialized
// with a noop provider.
ctx, span := cf.TracingProvider.Tracer("kubectl").Start(cf.Context, "kubectl")
closeSpanAndTracer := func() {
span.End()
closeTracer()
}
// These values are the defaults used by kubectl and can be found here:
// https://github.com/kubernetes/kubectl/blob/3612c18ed86fc0a2f4467ca355b3e21569fabe0a/pkg/cmd/cmd.go#L94
defaultConfigFlags := genericclioptions.NewConfigFlags(true).
WithDeprecatedPasswordFlag().
WithDiscoveryBurst(300).
WithDiscoveryQPS(50.0)
WithDiscoveryQPS(50.0).
WithWrapConfigFn(wrapConfigFn(cf))

command := cmd.NewDefaultKubectlCommandWithArgs(
cmd.KubectlOptions{
Expand All @@ -123,13 +174,18 @@ func runKubectlCode(args []string) {
IOStreams: genericclioptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr},
},
)
command.SetContext(ctx)
// override args without kubectl to avoid errors.
command.SetArgs(args[1:])

// run command until it finishes.
if err := cli.RunNoErrOutput(command); err != nil {
closeSpanAndTracer()
// Pretty-print the error and exit with an error.
cmdutil.CheckErr(err)
}

closeSpanAndTracer()
os.Exit(0)
}

Expand Down Expand Up @@ -177,7 +233,6 @@ func runKubectlAndCollectRun(cf *CLIConf, args []string) error {
return trace.Wrap(scanner.Err())
},
)

err := runKubectlReexec(cf.executablePath, args, writer)
writer.CloseWithError(io.EOF)

Expand Down
12 changes: 10 additions & 2 deletions tool/tsh/tsh.go
Expand Up @@ -1074,7 +1074,15 @@ func Run(ctx context.Context, args []string, opts ...cliOption) error {

// Connect to the span exporter and initialize the trace provider only if
// the --trace flag was set.
if cf.SampleTraces {
// kubectl is a special case because it is the only command that we re-execute
// in order to be able to access the exit code and stdout/stderr of the command
// that was run and determine if we should create a new access request from
// the output data.
// We don't want to enable tracing for the master invocation of tsh kubectl
// because the data that we would be tracing would be the tsh kubectl command.
// Instead, we want to enable tracing for the re-executed kubectl command and
// we do that in the kubectl command handler.
if cf.SampleTraces && cf.command != kubectl.FullCommand() {
// login only needs to be ignored if forwarding to auth
var ignored []string
if cf.TraceExporter == "" {
Expand Down Expand Up @@ -1315,7 +1323,7 @@ func Run(ctx context.Context, args []string, opts ...cliOption) error {
err = deviceCmd.keyget.run(&cf)
case kubectl.FullCommand():
idx := slices.Index(args, kubectl.FullCommand())
err = onKubectlCommand(&cf, args[idx:])
err = onKubectlCommand(&cf, args, args[idx:])
case approve.FullCommand():
err = onHeadlessApprove(&cf)
default:
Expand Down