From 5366d2c31d62f1fb5c4ec92bcde49d7d3c4e2daa Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Tue, 10 Oct 2023 12:28:25 +0100 Subject: [PATCH] [v14] Header `Connection: close` causes `kubectl` to fail exec (#33172) * Header `Connection: close` causes `kubectl` to fail exec The header `Connection: close` causes failure in kubetl when it upgrades the connection to SPDY. The `ReadTimeout` and `WriteTimeout` are known to cause problems to Kubernetes watch streams. Fixes #33020 Signed-off-by: Tiago Silva * add unit tests --------- Signed-off-by: Tiago Silva --- lib/srv/alpnproxy/local_proxy.go | 7 -- tool/tsh/common/kube_proxy_test.go | 113 ++++++++++++++++++++++++++++- 2 files changed, 110 insertions(+), 10 deletions(-) diff --git a/lib/srv/alpnproxy/local_proxy.go b/lib/srv/alpnproxy/local_proxy.go index fa283c24e782e..b2aad987e958e 100644 --- a/lib/srv/alpnproxy/local_proxy.go +++ b/lib/srv/alpnproxy/local_proxy.go @@ -35,7 +35,6 @@ import ( "golang.org/x/exp/slices" "github.com/gravitational/teleport/api/client" - apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/utils/pingconn" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/srv/alpnproxy/common" @@ -275,9 +274,6 @@ func (l *LocalProxy) makeHTTPReverseProxy(certs []tls.Certificate) *httputil.Rev outReq.URL.Host = l.cfg.RemoteProxyAddr }, ModifyResponse: func(response *http.Response) error { - // Ask the client to close the connection to avoid re-use. - response.Header.Add("Connection", "close") - errHeader := response.Header.Get(commonApp.TeleportAPIErrorHeader) if errHeader != "" { // TODO: find a cleaner way of formatting the error. @@ -320,10 +316,7 @@ func (l *LocalProxy) StartHTTPAccessProxy(ctx context.Context) error { defaultProxy := l.makeHTTPReverseProxy(l.getCerts()) server := &http.Server{ - ReadTimeout: apidefaults.DefaultIOTimeout, ReadHeaderTimeout: defaults.ReadHeadersTimeout, - WriteTimeout: apidefaults.DefaultIOTimeout, - IdleTimeout: apidefaults.DefaultIdleTimeout, Handler: http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { if l.cfg.HTTPMiddleware.HandleRequest(rw, req) { return diff --git a/tool/tsh/common/kube_proxy_test.go b/tool/tsh/common/kube_proxy_test.go index 40c7c92fc5e06..78f40a546d2e4 100644 --- a/tool/tsh/common/kube_proxy_test.go +++ b/tool/tsh/common/kube_proxy_test.go @@ -17,7 +17,10 @@ limitations under the License. package common import ( + "bytes" "context" + "fmt" + "io" "net/http" "net/url" "os" @@ -28,10 +31,14 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/tools/remotecommand" + "k8s.io/kubectl/pkg/scheme" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/utils/keypaths" @@ -130,15 +137,115 @@ func sendRequestToKubeLocalProxy(t *testing.T, config *clientcmdapi.Config, tele KeyData: config.AuthInfos[contextName].ClientKeyData, ServerName: common.KubeLocalProxySNI(teleportCluster, kubeCluster), } - - client, err := kubernetes.NewForConfig(&rest.Config{ + restConfig := &rest.Config{ Host: "https://" + teleportCluster, TLSClientConfig: tlsClientConfig, Proxy: http.ProxyURL(proxyURL), - }) + } + client, err := kubernetes.NewForConfig(restConfig) require.NoError(t, err) resp, err := client.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{}) require.Nil(t, err) require.GreaterOrEqual(t, len(resp.Items), 1) + + runKubectlExec(t, restConfig) +} + +// runKubectlExec runs a kubectl exec command in a dummy pod. +// The mock Kubernetes API server will return the pod name and the stdin data +// written to the pod. +func runKubectlExec(t *testing.T, config *rest.Config) { + var ( + stdinWrite = &bytes.Buffer{} + stdout = &bytes.Buffer{} + stderr = &bytes.Buffer{} + podName = "teleport" + podNamespace = "default" + podContainerName = "teleportContainer" + containerCommmandExecute = []string{"sh"} + stdinContent = []byte("stdin_data") + ) + + _, err := stdinWrite.Write(stdinContent) + require.NoError(t, err) + + streamOpts := remotecommand.StreamOptions{ + Stdin: io.NopCloser(stdinWrite), + Stdout: stdout, + Stderr: stderr, + Tty: false, + } + + req, err := generateExecRequest( + generateExecRequestConfig{ + config: config, + podName: podName, + podNamespace: podNamespace, + containerName: podContainerName, + cmd: containerCommmandExecute, // placeholder for commands to execute in the dummy pod + options: streamOpts, + }, + ) + require.NoError(t, err) + + exec, err := remotecommand.NewSPDYExecutor(config, http.MethodPost, req.URL()) + require.NoError(t, err) + + err = exec.StreamWithContext(context.Background(), streamOpts) + require.NoError(t, err) + require.Equal(t, fmt.Sprintf("%s\n%s", podContainerName, string(stdinContent)), stdout.String()) +} + +// generateExecRequestConfig is the config for generating a Kube API url for +// executing commands in pods. +type generateExecRequestConfig struct { + // config is the rest config for the cluster. + config *rest.Config + // podName is the name of the pod to execute the command in. + podName string + // podNamespace is the namespace of the pod to execute the command in. + podNamespace string + // containerName is the name of the container to execute the command in. + containerName string + // cmd is the command to execute in the container. + cmd []string + // options are the options for the command execution. + options remotecommand.StreamOptions +} + +// generateExecRequest generates a Kube API url for executing commands in pods. +// The url format is the following: +// "/api/v1/namespaces/{podNamespace}/pods/{podName}/exec?stderr={stdout}&stdout={stdout}&tty={tty}&reason={reason}&container={containerName}&command={command}" +func generateExecRequest(config generateExecRequestConfig) (*rest.Request, error) { + restClient, err := rest.RESTClientFor( + &rest.Config{ + Host: config.config.Host, + APIPath: "/api", + ContentConfig: rest.ContentConfig{ + GroupVersion: &corev1.SchemeGroupVersion, + NegotiatedSerializer: runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{}), + }, + TLSClientConfig: rest.TLSClientConfig{Insecure: true}, + }, + ) + if err != nil { + return nil, err + } + + req := restClient.Post(). + Resource("pods"). + Name(config.podName). + Namespace(config.podNamespace). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Container: config.containerName, + Command: config.cmd, + Stdin: config.options.Stdin != nil, + Stdout: config.options.Stdout != nil, + Stderr: config.options.Stderr != nil, + TTY: config.options.Tty, + }, scheme.ParameterCodec) + + return req, nil }