-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
kubectl.go
552 lines (491 loc) · 19 KB
/
kubectl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
/*
Copyright 2023 Gravitational, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"time"
"github.com/gravitational/trace"
"github.com/spf13/cobra"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
"k8s.io/cli-runtime/pkg/genericclioptions"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/component-base/cli"
"k8s.io/kubectl/pkg/cmd"
"k8s.io/kubectl/pkg/cmd/plugin"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"github.com/gravitational/teleport"
tracehttp "github.com/gravitational/teleport/api/observability/tracing/http"
"github.com/gravitational/teleport/api/profile"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/kube/kubeconfig"
)
var (
podForbiddenRe = regexp.MustCompile(`(?m)Error from server \(Forbidden\): pods "(.*)" is forbidden: User ".*" cannot get resource "pods" in API group "" in the namespace "(.*)"`)
clusterForbidden = "[00] access denied"
// clusterObjectDiscoveryFailed is printed when kubectl tries to do API discovery
// - calling /apis endpoint - but Teleport denies the request. Since it cannot
// discover the resources available in the cluster, it prints this message saying
// that the cluster does not have pod(s). Since every Kubernetes cluster supports
// pods, it's safe to create a resource access request.
clusterObjectDiscoveryFailed = regexp.MustCompile(`(?m)the server doesn't have a resource type "pods?"`)
)
// resourceKind identifies a Kubernetes resource.
type resourceKind struct {
kind string
subResourceName string
}
// onKubectlCommand re-execs itself if env var `tshKubectlRexec` is not set
// in order to execute the `kubectl` portion of the code. This is a requirement because
// `kubectl` calls `os.Exit()` in every code path, and we need to intercept the
// exit code to validate if the request was denied.
// When executing `tsh kubectl get pods`, tsh checks if `tshKubectlReexec`. Since
// it's the user call and the flag is not present, tsh reexecs the same exact
// the user executed and uses an io.MultiWriter to write the os.Stderr output
// from the kubectl command into an io.Pipe for analysis. It also sets the env
// `tshKubectlReexec` in the exec.Cmd.Env and runs the command. When running the
// command, `tsh` will be recalled, and since `tshKubectlReexec` is set only the
// kubectl portion of code is executed.
// On the caller side, once the callee execution finishes, tsh inspects the stderr
// outputs and decides if creating an access request is appropriate.
// If the access request is created, tsh waits for the approval and runs the expected
// command again.
func onKubectlCommand(cf *CLIConf, fullArgs []string, args []string) error {
if os.Getenv(tshKubectlReexecEnvVar) == "" {
err := runKubectlAndCollectRun(cf, fullArgs, args)
return trace.Wrap(err)
}
runKubectlCode(cf, args)
return nil
}
const (
// tshKubectlReexecEnvVar is the name of the environment variable used to control if
// tsh should re-exec or execute a kubectl command.
tshKubectlReexecEnvVar = "TSH_KUBE_REEXEC"
)
// runKubectlReexec reexecs itself and copies the `stderr` output into
// the provided collector.
// It also sets tshKubectlReexec for the command to prevent
// an exec loop
func runKubectlReexec(cf *CLIConf, fullArgs, args []string, collector io.Writer) error {
closeFn, newKubeConfigLocation, err := maybeStartKubeLocalProxy(cf, withKubectlArgs(args))
if err != nil {
return trace.Wrap(err)
}
defer closeFn()
cmdEnv := append(os.Environ(), fmt.Sprintf("%s=yes", tshKubectlReexecEnvVar))
// Update kubeconfig location.
if newKubeConfigLocation != "" {
cmdEnv = overwriteKubeconfigInEnv(cmdEnv, newKubeConfigLocation)
fullArgs = overwriteKubeconfigFlagInArgs(fullArgs, newKubeConfigLocation)
}
// Execute.
cmd := exec.Command(cf.executablePath, fullArgs...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = io.MultiWriter(os.Stderr, collector)
cmd.Env = cmdEnv
return trace.Wrap(cmd.Run())
}
// wrapConfigFn wraps the rest.Config with a custom RoundTripper if the user
// wants to sample traces.
func wrapConfigFn(cf *CLIConf) func(c *rest.Config) *rest.Config {
return func(c *rest.Config) *rest.Config {
c.Wrap(
func(rt http.RoundTripper) http.RoundTripper {
if cf.SampleTraces {
// If the user wants to sample traces, wrap the transport with a trace
// transport.
return tracehttp.NewTransport(rt)
}
return rt
},
)
return c
}
}
// runKubectlCode runs the actual kubectl package code with the default options.
// This code is only executed when `tshKubectlReexec` env is present. This happens
// because we need to retry kubectl calls and `kubectl` calls os.Exit in multiple
// paths.
func runKubectlCode(cf *CLIConf, args []string) {
closeTracer := func() {}
if cf.SampleTraces {
provider, err := newTraceProvider(cf, "", nil)
if err != nil {
log.WithError(err).Debug("Failed to set up span forwarding")
} else {
// only update the provider if we successfully set it up
cf.TracingProvider = provider
// ensure that the provider is shutdown on exit to flush any spans
// that haven't been forwarded yet.
closeTracer = func() {
shutdownCtx, cancel := context.WithTimeout(cf.Context, 1*time.Second)
defer cancel()
err := provider.Shutdown(shutdownCtx)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
log.WithError(err).Debugf("Failed to shutdown trace provider")
}
}
}
}
// If the user opted to not sample traces, cf.TracingProvider is pre-initialized
// with a noop provider.
ctx, span := cf.TracingProvider.Tracer("kubectl").Start(cf.Context, "kubectl")
closeSpanAndTracer := func() {
span.End()
closeTracer()
}
// These values are the defaults used by kubectl and can be found here:
// https://github.com/kubernetes/kubectl/blob/3612c18ed86fc0a2f4467ca355b3e21569fabe0a/pkg/cmd/cmd.go#L94
defaultConfigFlags := genericclioptions.NewConfigFlags(true).
WithDeprecatedPasswordFlag().
WithDiscoveryBurst(300).
WithDiscoveryQPS(50.0).
WithWrapConfigFn(wrapConfigFn(cf))
command := cmd.NewDefaultKubectlCommandWithArgs(
cmd.KubectlOptions{
// init the default plugin handler.
PluginHandler: cmd.NewDefaultPluginHandler(plugin.ValidPluginFilenamePrefixes),
Arguments: args,
ConfigFlags: defaultConfigFlags,
// init the IOSStreams.
IOStreams: genericclioptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr},
},
)
command.SetContext(ctx)
// override args without kubectl to avoid errors.
command.SetArgs(args[1:])
// run command until it finishes.
if err := cli.RunNoErrOutput(command); err != nil {
closeSpanAndTracer()
// Pretty-print the error and exit with an error.
cmdutil.CheckErr(err)
}
closeSpanAndTracer()
os.Exit(0)
}
func runKubectlAndCollectRun(cf *CLIConf, fullArgs, args []string) error {
var (
alreadyRequestedAccess bool
err error
exitErr *exec.ExitError
)
for {
// missingKubeResources will include the Kubernetes Resources whose access
// was rejected in this kubectl call.
missingKubeResources := make([]resourceKind, 0, 50)
reader, writer := io.Pipe()
group, _ := errgroup.WithContext(cf.Context)
group.Go(
func() error {
// This goroutine scans each line of output emitted to stderr by kubectl
// and parses it in order to check if the returned error was a problem with
// missing access level. If it's the case, tsh kubectl will create automatically
// the access request for the user to access the resource.
// Current supported resources:
// - pod
// - kube_cluster
scanner := bufio.NewScanner(reader)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
line := scanner.Text()
// Check if the request targeting a pod endpoint was denied due to
// Teleport Pod RBAC or if the operation was denied by Kubernetes RBAC.
// In the second case, we should create a Resource Access Request to allow
// the user to exec/read logs using different Kubernetes RBAC principals.
// using different Kubernetes RBAC principals.
if podForbiddenRe.MatchString(line) {
results := podForbiddenRe.FindStringSubmatch(line)
missingKubeResources = append(missingKubeResources, resourceKind{kind: types.KindKubePod, subResourceName: filepath.Join(results[2], results[1])})
// Check if cluster access was denied. If denied we should create
// a Resource Access Request for the cluster and not a pod.
} else if strings.Contains(line, clusterForbidden) || clusterObjectDiscoveryFailed.MatchString(line) {
missingKubeResources = append(missingKubeResources, resourceKind{kind: types.KindKubernetesCluster})
}
}
return trace.Wrap(scanner.Err())
},
)
err := runKubectlReexec(cf, fullArgs, args, writer)
writer.CloseWithError(io.EOF)
if scanErr := group.Wait(); scanErr != nil {
log.WithError(scanErr).Warn("unable to scan stderr payload")
}
if err == nil {
break
} else if !errors.As(err, &exitErr) {
return trace.Wrap(err)
} else if errors.As(err, &exitErr) && exitErr.ExitCode() != cmdutil.DefaultErrorExitCode {
// if the exit code is not 1, it was emitted by pod exec code and we should
// ignore it since the user was allowed to execute the command in the pod.
break
}
if len(missingKubeResources) > 0 && !alreadyRequestedAccess {
// create the access requests for the user and wait for approval.
if err := createKubeAccessRequest(cf, missingKubeResources, args); err != nil {
return trace.Wrap(err)
}
alreadyRequestedAccess = true
continue
}
break
}
// exit with the kubectl exit code to keep compatibility.
if errors.As(err, &exitErr) {
os.Exit(exitErr.ExitCode())
}
return nil
}
// createKubeAccessRequest creates an access request to the denied resources
// if the user's roles allow search_as_role.
func createKubeAccessRequest(cf *CLIConf, resources []resourceKind, args []string) error {
tc, err := makeClient(cf)
if err != nil {
return trace.Wrap(err)
}
kubeName, err := getKubeClusterName(args, tc.SiteName)
if err != nil {
return trace.Wrap(err)
}
for _, rec := range resources {
cf.RequestedResourceIDs = append(
cf.RequestedResourceIDs,
filepath.Join("/", tc.SiteName, rec.kind, kubeName, rec.subResourceName),
)
}
cf.Reason = fmt.Sprintf("Resource request automatically created for %v", args)
if err := executeAccessRequest(cf, tc); err != nil {
// TODO(tigrato): intercept the error to validate the origin
return trace.Wrap(err)
}
return nil
}
// extractKubeConfigAndContext parses the args and extracts:
// - the "--context" flag that overrides the default context to use, if present
// - the "--kubeconfig" flag that overrides the default kubeconfig location, if
// present
func extractKubeConfigAndContext(args []string) (string, string) {
if len(args) <= 2 {
return "", ""
}
command := makeKubectlCobraCommand()
return extractKubeConfigAndContextFromCommand(command, args)
}
// extractKubeConfigAndContextFromCommand parses the args using provided
// kubectl command and extracts:
// - the "--context" flag that overrides the default context to use, if present
// - the "--kubeconfig" flag that overrides the default kubeconfig location, if
// present
func extractKubeConfigAndContextFromCommand(command *cobra.Command, args []string) (kubeconfig string, context string) {
if len(args) <= 2 {
return
}
// Find subcommand.
if subcommand, _, err := command.Find(args[1:]); err == nil {
command = subcommand
}
// Ignore errors from ParseFlags.
command.ParseFlags(args[1:])
kubeconfig = command.Flag("kubeconfig").Value.String()
context = command.Flag("context").Value.String()
return
}
var makeKubectlCobraCommandLock sync.Mutex
// makeKubectlCobraCommand creates a cobra.Command for kubectl.
//
// Note that cmd.NewKubectlCommand is slow (15+ ms, 20k+ alloc), so avoid
// making/re-making it when possible.
//
// Also cmd.NewKubectlCommand is not goroutine-safe, thus using a lock.
func makeKubectlCobraCommand() *cobra.Command {
makeKubectlCobraCommandLock.Lock()
defer makeKubectlCobraCommandLock.Unlock()
return cmd.NewKubectlCommand(cmd.KubectlOptions{
// Use NewConfigFlags to avoid load existing values from
// defaultConfigFlags.
ConfigFlags: genericclioptions.NewConfigFlags(true),
})
}
// getKubeClusterName extracts the Kubernetes Cluster name if the Kube belongs to
// the teleportClusterName cluster. It parses the args to extract the `--kubeconfig`
// and `--context` flag values and to use them if any was overriten.
func getKubeClusterName(args []string, teleportClusterName string) (string, error) {
kubeconfigLocation, selectedContext := extractKubeConfigAndContext(args)
if selectedContext == "" {
kubeName, err := kubeconfig.SelectedKubeCluster(kubeconfigLocation, teleportClusterName)
return kubeName, trace.Wrap(err)
}
kc, err := kubeconfig.Load(kubeconfigLocation)
if err != nil {
return "", trace.Wrap(err)
}
kubeName := kubeconfig.KubeClusterFromContext(selectedContext, kc.Contexts[selectedContext], teleportClusterName)
if kubeName == "" {
return "", trace.BadParameter("selected context %q does not belong to Teleport cluster %q", selectedContext, teleportClusterName)
}
return kubeName, nil
}
type kubeLocalProxyOpts struct {
// kubectlArgs is a list of command arguments passed in for `tsh kubectl`.
// used to decide if local proxy is required.
kubectlArgs []string
// makeAndStartKubeLocalProxyFunc is a callback function to create and
// start a kube local proxy, when it is decided that a local proxy is
// required. Default to makeAndStartKubeLocalProxy. Can be set another
// function for testing.
makeAndStartKubeLocalProxyFunc func(*CLIConf, *clientcmdapi.Config, kubeconfig.LocalProxyClusters) (func(), string, error)
}
type applyKubeLocalProxyOpts func(o *kubeLocalProxyOpts)
func withKubectlArgs(args []string) applyKubeLocalProxyOpts {
return func(o *kubeLocalProxyOpts) {
o.kubectlArgs = args
}
}
func newKubeLocalProxyOpts(applyOpts ...applyKubeLocalProxyOpts) kubeLocalProxyOpts {
opts := kubeLocalProxyOpts{
makeAndStartKubeLocalProxyFunc: makeAndStartKubeLocalProxy,
}
for _, applyOpt := range applyOpts {
applyOpt(&opts)
}
return opts
}
// maybeStartKubeLocalProxy starts a kube local proxy if local proxy is
// required. A closeFn and the new kubeconfig path are returned if local proxy
// is successfully created. Called by `tsh kubectl` and `tsh kube exec`.
func maybeStartKubeLocalProxy(cf *CLIConf, applyOpts ...applyKubeLocalProxyOpts) (func(), string, error) {
opts := newKubeLocalProxyOpts(applyOpts...)
config, clusters, useLocalProxy := shouldUseKubeLocalProxy(cf, opts.kubectlArgs)
if !useLocalProxy {
return func() {}, "", nil
}
closeFn, newKubeConfigLocation, err := opts.makeAndStartKubeLocalProxyFunc(cf, config, clusters)
return closeFn, newKubeConfigLocation, trace.Wrap(err)
}
// makeAndStartKubeLocalProxy is a helper to create a kube local proxy and
// start it in a goroutine. If successful, a closeFn and the generated
// kubeconfig location are returned.
func makeAndStartKubeLocalProxy(cf *CLIConf, config *clientcmdapi.Config, clusters kubeconfig.LocalProxyClusters) (func(), string, error) {
tc, err := makeClient(cf)
if err != nil {
return nil, "", trace.Wrap(err)
}
localProxy, err := makeKubeLocalProxy(cf, tc, clusters, config, cf.LocalProxyPort)
if err != nil {
return nil, "", trace.Wrap(err)
}
go localProxy.Start(cf.Context)
closeFn := func() {
localProxy.Close()
}
return closeFn, localProxy.KubeConfigPath(), nil
}
// shouldUseKubeLocalProxy checks if a local proxy is required for kube
// access for `tsh kubectl` or `tsh kube exec`.
//
// The local proxy is required when all of these conditions are met:
// - profile is loadable
// - kube access is enabled, and is accessed through web proxy address
// - ALPN connection upgrade is required (e.g. Proxy behind ALB)
// - not `kubectl config` commands
// - original/default kubeconfig is loadable
// - Selected cluster is a Teleport cluster that uses KubeClusterAddr
func shouldUseKubeLocalProxy(cf *CLIConf, kubectlArgs []string) (*clientcmdapi.Config, kubeconfig.LocalProxyClusters, bool) {
// When failed to load profile, assume this CLI command is not running
// against Teleport clusters.
profile, err := cf.GetProfile()
if err != nil {
return nil, nil, false
}
if !profile.RequireKubeLocalProxy() {
return nil, nil, false
}
// Skip "kubectl config" commands.
var kubeconfigLocation, selectedContext string
if len(kubectlArgs) > 0 {
kubectlCommand := makeKubectlCobraCommand()
if isKubectlConfigCommand(kubectlCommand, kubectlArgs) {
return nil, nil, false
}
kubeconfigLocation, selectedContext = extractKubeConfigAndContextFromCommand(kubectlCommand, kubectlArgs)
}
// Nothing to do if cannot load original kubeconfig.
defaultConfig, err := kubeconfig.Load(kubeconfigLocation)
if err != nil {
return nil, nil, false
}
// Prepare Teleport kube cluster based on selected context.
kubeCluster, found := kubeconfig.FindTeleportClusterForLocalProxy(defaultConfig, kubeClusterAddrFromProfile(profile), selectedContext)
if !found {
return nil, nil, false
}
return defaultConfig, kubeconfig.LocalProxyClusters{kubeCluster}, true
}
func isKubectlConfigCommand(kubectlCommand *cobra.Command, args []string) bool {
if len(args) < 2 || args[0] != "kubectl" {
return false
}
find, _, _ := kubectlCommand.Find(args[1:])
for ; find != nil; find = find.Parent() {
if find.Name() == "config" {
return true
}
}
return false
}
func kubeClusterAddrFromProfile(profile *profile.Profile) string {
partialClientConfig := client.Config{
WebProxyAddr: profile.WebProxyAddr,
KubeProxyAddr: profile.KubeProxyAddr,
}
return partialClientConfig.KubeClusterAddr()
}
func overwriteKubeconfigFlagInArgs(args []string, newPath string) []string {
// Make a clone to avoid changing the original args.
args = slices.Clone(args)
for i, arg := range args {
switch {
case strings.HasPrefix(arg, "--kubeconfig="):
args[i] = fmt.Sprintf("--kubeconfig=%v", newPath)
case arg == "--kubeconfig" && len(args) > i+1:
args[i+1] = newPath
}
}
return args
}
func overwriteKubeconfigInEnv(env []string, newPath string) (output []string) {
kubeConfigEnvPrefix := teleport.EnvKubeConfig + "="
for _, entry := range env {
if strings.HasPrefix(entry, kubeConfigEnvPrefix) {
continue
}
output = append(output, entry)
}
output = append(output, kubeConfigEnvPrefix+newPath)
return
}