Skip to content

Commit

Permalink
[v14] Header Connection: close causes kubectl to fail exec (#33172)
Browse files Browse the repository at this point in the history
* Header `Connection: close` causes `kubectl` to fail exec

The header `Connection: close` causes failure in kubetl when it upgrades
the connection to SPDY.

The `ReadTimeout` and `WriteTimeout` are known to cause problems to
Kubernetes watch streams.

Fixes #33020

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>

* add unit tests

---------

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>
  • Loading branch information
tigrato committed Oct 10, 2023
1 parent b6825b5 commit 5366d2c
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 10 deletions.
7 changes: 0 additions & 7 deletions lib/srv/alpnproxy/local_proxy.go
Expand Up @@ -35,7 +35,6 @@ import (
"golang.org/x/exp/slices"

"github.com/gravitational/teleport/api/client"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/utils/pingconn"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/srv/alpnproxy/common"
Expand Down Expand Up @@ -275,9 +274,6 @@ func (l *LocalProxy) makeHTTPReverseProxy(certs []tls.Certificate) *httputil.Rev
outReq.URL.Host = l.cfg.RemoteProxyAddr
},
ModifyResponse: func(response *http.Response) error {
// Ask the client to close the connection to avoid re-use.
response.Header.Add("Connection", "close")

errHeader := response.Header.Get(commonApp.TeleportAPIErrorHeader)
if errHeader != "" {
// TODO: find a cleaner way of formatting the error.
Expand Down Expand Up @@ -320,10 +316,7 @@ func (l *LocalProxy) StartHTTPAccessProxy(ctx context.Context) error {
defaultProxy := l.makeHTTPReverseProxy(l.getCerts())

server := &http.Server{
ReadTimeout: apidefaults.DefaultIOTimeout,
ReadHeaderTimeout: defaults.ReadHeadersTimeout,
WriteTimeout: apidefaults.DefaultIOTimeout,
IdleTimeout: apidefaults.DefaultIdleTimeout,
Handler: http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if l.cfg.HTTPMiddleware.HandleRequest(rw, req) {
return
Expand Down
113 changes: 110 additions & 3 deletions tool/tsh/common/kube_proxy_test.go
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package common

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"os"
Expand All @@ -28,10 +31,14 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubectl/pkg/scheme"

"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/keypaths"
Expand Down Expand Up @@ -130,15 +137,115 @@ func sendRequestToKubeLocalProxy(t *testing.T, config *clientcmdapi.Config, tele
KeyData: config.AuthInfos[contextName].ClientKeyData,
ServerName: common.KubeLocalProxySNI(teleportCluster, kubeCluster),
}

client, err := kubernetes.NewForConfig(&rest.Config{
restConfig := &rest.Config{
Host: "https://" + teleportCluster,
TLSClientConfig: tlsClientConfig,
Proxy: http.ProxyURL(proxyURL),
})
}
client, err := kubernetes.NewForConfig(restConfig)
require.NoError(t, err)

resp, err := client.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{})
require.Nil(t, err)
require.GreaterOrEqual(t, len(resp.Items), 1)

runKubectlExec(t, restConfig)
}

// runKubectlExec runs a kubectl exec command in a dummy pod.
// The mock Kubernetes API server will return the pod name and the stdin data
// written to the pod.
func runKubectlExec(t *testing.T, config *rest.Config) {
var (
stdinWrite = &bytes.Buffer{}
stdout = &bytes.Buffer{}
stderr = &bytes.Buffer{}
podName = "teleport"
podNamespace = "default"
podContainerName = "teleportContainer"
containerCommmandExecute = []string{"sh"}
stdinContent = []byte("stdin_data")
)

_, err := stdinWrite.Write(stdinContent)
require.NoError(t, err)

streamOpts := remotecommand.StreamOptions{
Stdin: io.NopCloser(stdinWrite),
Stdout: stdout,
Stderr: stderr,
Tty: false,
}

req, err := generateExecRequest(
generateExecRequestConfig{
config: config,
podName: podName,
podNamespace: podNamespace,
containerName: podContainerName,
cmd: containerCommmandExecute, // placeholder for commands to execute in the dummy pod
options: streamOpts,
},
)
require.NoError(t, err)

exec, err := remotecommand.NewSPDYExecutor(config, http.MethodPost, req.URL())
require.NoError(t, err)

err = exec.StreamWithContext(context.Background(), streamOpts)
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("%s\n%s", podContainerName, string(stdinContent)), stdout.String())
}

// generateExecRequestConfig is the config for generating a Kube API url for
// executing commands in pods.
type generateExecRequestConfig struct {
// config is the rest config for the cluster.
config *rest.Config
// podName is the name of the pod to execute the command in.
podName string
// podNamespace is the namespace of the pod to execute the command in.
podNamespace string
// containerName is the name of the container to execute the command in.
containerName string
// cmd is the command to execute in the container.
cmd []string
// options are the options for the command execution.
options remotecommand.StreamOptions
}

// generateExecRequest generates a Kube API url for executing commands in pods.
// The url format is the following:
// "/api/v1/namespaces/{podNamespace}/pods/{podName}/exec?stderr={stdout}&stdout={stdout}&tty={tty}&reason={reason}&container={containerName}&command={command}"
func generateExecRequest(config generateExecRequestConfig) (*rest.Request, error) {
restClient, err := rest.RESTClientFor(
&rest.Config{
Host: config.config.Host,
APIPath: "/api",
ContentConfig: rest.ContentConfig{
GroupVersion: &corev1.SchemeGroupVersion,
NegotiatedSerializer: runtime.NewSimpleNegotiatedSerializer(runtime.SerializerInfo{}),
},
TLSClientConfig: rest.TLSClientConfig{Insecure: true},
},
)
if err != nil {
return nil, err
}

req := restClient.Post().
Resource("pods").
Name(config.podName).
Namespace(config.podNamespace).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: config.containerName,
Command: config.cmd,
Stdin: config.options.Stdin != nil,
Stdout: config.options.Stdout != nil,
Stderr: config.options.Stderr != nil,
TTY: config.options.Tty,
}, scheme.ParameterCodec)

return req, nil
}

0 comments on commit 5366d2c

Please sign in to comment.