Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v14] Fix remote pool of signed certs when exec into leaf clusters #32768

Merged
merged 2 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,22 @@ type Forwarder struct {
// use the heartbeat clusters.
getKubernetesServersForKubeCluster getKubeServersByNameFunc

// cachedTransport is a cache of http.Transport objects used to
// cachedTransport is a cache of cachedTransportEntry objects used to
// connect to Teleport services.
// TODO(tigrato): Implement a cache eviction policy using watchers.
cachedTransport *ttlmap.TTLMap
// cachedTransportMu is a mutex used to protect the cachedTransport.
cachedTransportMu sync.Mutex
}

// cachedTransportEntry is a cached transport entry used to connect to
// Teleport services. It contains a cached http.RoundTripper and a cached
// tls.Config.
type cachedTransportEntry struct {
transport http.RoundTripper
tlsConfig *tls.Config
}

// getKubeServersByNameFunc is a function that returns a list of
// kubernetes servers for a given kube cluster.
type getKubeServersByNameFunc = func(ctx context.Context, name string) ([]types.KubeServer, error)
Expand Down
74 changes: 70 additions & 4 deletions lib/kube/proxy/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,10 +1175,11 @@ func newMockForwader(ctx context.Context, t *testing.T) *Forwarder {
type mockCSRClient struct {
auth.ClientI

clock clockwork.Clock
ca *tlsca.CertAuthority
gotCSR auth.KubeCSR
lastCert *x509.Certificate
clock clockwork.Clock
ca *tlsca.CertAuthority
gotCSR auth.KubeCSR
lastCert *x509.Certificate
leafClusterName string
}

func newMockCSRClient(clock clockwork.Clock) (*mockCSRClient, error) {
Expand All @@ -1189,6 +1190,26 @@ func newMockCSRClient(clock clockwork.Clock) (*mockCSRClient, error) {
return &mockCSRClient{ca: ca, clock: clock}, nil
}

func (c *mockCSRClient) GetCertAuthority(ctx context.Context, id types.CertAuthID, loadKeys bool) (types.CertAuthority, error) {
if id.DomainName == c.leafClusterName {
return &types.CertAuthorityV2{
Kind: types.KindCertAuthority,
Version: types.V3,
Metadata: types.Metadata{
Name: "local",
},
Spec: types.CertAuthoritySpecV2{
Type: types.HostCA,
ClusterName: c.leafClusterName,
ActiveKeys: types.CAKeySet{
TLS: []*types.TLSKeyPair{{Cert: []byte(fixtures.TLSCACertPEM)}},
},
},
}, nil
}
return nil, trace.NotFound("cluster not found")
}

func (c *mockCSRClient) ProcessKubeCSR(csr auth.KubeCSR) (*auth.KubeCSRResponse, error) {
c.gotCSR = csr

Expand Down Expand Up @@ -1837,3 +1858,48 @@ func Test_authContext_eventClusterMeta(t *testing.T) {
})
}
}

func TestForwarderTLSConfigCAs(t *testing.T) {
clusterName := "leaf"

// Create a cert pool with the cert from fixtures.TLSCACertPEM
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM([]byte(fixtures.TLSCACertPEM))

// create the tls config used by the forwarder
originalTLSConfig := &tls.Config{}
// create the auth server mock client
clock := clockwork.NewFakeClock()
cl, err := newMockCSRClient(clock)
require.NoError(t, err)
cl.leafClusterName = clusterName

f := &Forwarder{
cfg: ForwarderConfig{
Keygen: testauthority.New(),
AuthClient: cl,
TracerProvider: otel.GetTracerProvider(),
tracer: otel.Tracer(teleport.ComponentKube),
KubeServiceType: ProxyService,
CachingAuthClient: cl,
ConnTLSConfig: originalTLSConfig,
},
log: logrus.NewEntry(logrus.New()),
ctx: context.Background(),
}
// generate tlsConfig for the leaf cluster
tlsConfig, err := f.getTLSConfigForLeafCluster(clusterName)
require.NoError(t, err)
// ensure that the tlsConfig is a clone of the originalTLSConfig
require.NotSame(t, originalTLSConfig, tlsConfig, "expected tlsConfig to be different from originalTLSConfig")
// ensure that the tlsConfig has the certPool as the RootCAs
require.True(t, tlsConfig.RootCAs.Equal(certPool), "expected root CAs to be equal to certPool")

// generate tlsConfig for the local cluster
_, localTLSConfig, err := f.newLocalClusterTransport(clusterName)
require.NoError(t, err)
// ensure that the localTLSConfig is a clone of the originalTLSConfig
require.NotSame(t, originalTLSConfig, localTLSConfig, "expected localTLSConfig pointer to be different from originalTLSConfig")
// ensure that the localTLSConfig doesn't have the certPool as the RootCAs
require.False(t, localTLSConfig.RootCAs.Equal(certPool), "root CAs should not include certPool")
}
60 changes: 38 additions & 22 deletions lib/kube/proxy/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func (f *Forwarder) transportForRequest(sess *clusterSession) (http.RoundTripper
// If all servers support impersonation, use a new transport for each
// request. This will ensure that the client certificate is valid for the
// server that the request is being sent to.
return f.transportForRequestWithImpersonation(sess)
transport, _, err := f.transportForRequestWithImpersonation(sess)
return transport, trace.Wrap(err)
}
// Otherwise, use a single transport per request.
return f.transportForRequestWithoutImpersonation(sess)
Expand Down Expand Up @@ -130,7 +131,7 @@ func (f *Forwarder) transportForRequestWithoutImpersonation(sess *clusterSession
// requests to the cluster in order to improve performance.
// The transport is cached in the forwarder so that it can be reused for future
// requests. If the transport is not cached, a new one is created and cached.
func (f *Forwarder) transportForRequestWithImpersonation(sess *clusterSession) (http.RoundTripper, error) {
func (f *Forwarder) transportForRequestWithImpersonation(sess *clusterSession) (http.RoundTripper, *tls.Config, error) {
// transportCacheTTL is the TTL for the transport cache.
const transportCacheTTL = 5 * time.Hour
// If the cluster is remote, the key is the teleport cluster name.
Expand All @@ -143,36 +144,44 @@ func (f *Forwarder) transportForRequestWithImpersonation(sess *clusterSession) (
cachedI, ok := f.cachedTransport.Get(key)
f.cachedTransportMu.Unlock()
if ok {
if cached, ok := cachedI.(http.RoundTripper); ok {
return cached, nil
if cached, ok := cachedI.(cachedTransportEntry); ok {
return cached.transport, cached.tlsConfig.Clone(), nil
}
}

var httpTransport http.RoundTripper
var err error
var (
httpTransport http.RoundTripper
err error
tlsConfig *tls.Config
)
if sess.teleportCluster.isRemote {
// If the cluster is remote, create a new transport for the remote cluster.
httpTransport, err = f.newRemoteClusterTransport(sess.teleportCluster.name)
httpTransport, tlsConfig, err = f.newRemoteClusterTransport(sess.teleportCluster.name)
} else if sess.kubeAPICreds != nil {
// If agent is running in agent mode, get the transport from the configured cluster
// credentials.
return sess.kubeAPICreds.getTransport(), nil
return sess.kubeAPICreds.getTransport(), sess.kubeAPICreds.getTLSConfig(), nil
} else if f.cfg.ReverseTunnelSrv != nil {
// If agent is running in proxy mode, create a new transport for the local cluster.
httpTransport, err = f.newLocalClusterTransport(sess.kubeClusterName)
httpTransport, tlsConfig, err = f.newLocalClusterTransport(sess.kubeClusterName)
} else {
return nil, trace.BadParameter("no reverse tunnel server or credentials provided")
return nil, nil, trace.BadParameter("no reverse tunnel server or credentials provided")
}
if err != nil {
return nil, trace.Wrap(err)
return nil, nil, trace.Wrap(err)
}

// Cache the transport.
f.cachedTransportMu.Lock()
f.cachedTransport.Set(key, httpTransport, transportCacheTTL)
f.cachedTransport.Set(key,
cachedTransportEntry{
transport: httpTransport,
tlsConfig: tlsConfig,
},
transportCacheTTL)
f.cachedTransportMu.Unlock()

return httpTransport, nil
return httpTransport, tlsConfig.Clone(), nil
}

// transportCacheKey returns a key used to cache transports.
Expand Down Expand Up @@ -309,25 +318,28 @@ func validClientCreds(clock clockwork.Clock, c *tls.Config) bool {
// that can be used to dial Kubernetes Proxy in a remote Teleport cluster.
// The transport is configured to use a connection pool and to close idle
// connections after a timeout.
func (f *Forwarder) newRemoteClusterTransport(clusterName string) (http.RoundTripper, error) {
func (f *Forwarder) newRemoteClusterTransport(clusterName string) (http.RoundTripper, *tls.Config, error) {
// Tunnel is nil for a teleport process with "kubernetes_service" but
// not "proxy_service".
if f.cfg.ReverseTunnelSrv == nil {
return nil, trace.BadParameter("this Teleport process can not dial Kubernetes endpoints in remote Teleport clusters; only proxy_service supports this, make sure a Teleport proxy is first in the request path")
return nil, nil, trace.BadParameter("this Teleport process can not dial Kubernetes endpoints in remote Teleport clusters; only proxy_service supports this, make sure a Teleport proxy is first in the request path")
}
// Dialer that will be used to dial the remote cluster via the reverse tunnel.
dialFn := f.remoteClusterDialer(clusterName)
tlsConfig, err := f.getTLSConfigForLeafCluster(clusterName)
if err != nil {
return nil, trace.Wrap(err)
return nil, nil, trace.Wrap(err)
}
// Create a new HTTP/2 transport that will be used to dial the remote cluster.
h2Transport, err := newH2Transport(tlsConfig, dialFn)
if err != nil {
return nil, trace.Wrap(err)
return nil, nil, trace.Wrap(err)
}

return instrumentedRoundtripper(f.cfg.KubeServiceType, auth.NewImpersonatorRoundTripper(h2Transport)), nil
return instrumentedRoundtripper(
f.cfg.KubeServiceType,
auth.NewImpersonatorRoundTripper(h2Transport),
), tlsConfig.Clone(), nil
}

// getTLSConfigForLeafCluster returns a TLS config with the Proxy certificate
Expand Down Expand Up @@ -399,15 +411,18 @@ func (f *Forwarder) remoteClusterDialer(clusterName string) dialContextFunc {

// newLocalClusterTransport returns a new [http.Transport] (https://golang.org/pkg/net/http/#Transport)
// that can be used to dial Kubernetes Service in a local Teleport cluster.
func (f *Forwarder) newLocalClusterTransport(kubeClusterName string) (http.RoundTripper, error) {
func (f *Forwarder) newLocalClusterTransport(kubeClusterName string) (http.RoundTripper, *tls.Config, error) {
dialFn := f.localClusterDialer(kubeClusterName)
// Create a new HTTP/2 transport that will be used to dial the remote cluster.
h2Transport, err := newH2Transport(f.cfg.ConnTLSConfig, dialFn)
if err != nil {
return nil, trace.Wrap(err)
return nil, nil, trace.Wrap(err)
}

return instrumentedRoundtripper(f.cfg.KubeServiceType, auth.NewImpersonatorRoundTripper(h2Transport)), nil
return instrumentedRoundtripper(
f.cfg.KubeServiceType,
auth.NewImpersonatorRoundTripper(h2Transport),
), f.cfg.ConnTLSConfig.Clone(), nil
}

// localClusterDialer returns a dialer that can be used to dial Kubernetes Service
Expand Down Expand Up @@ -524,7 +539,8 @@ func (f *Forwarder) getTLSConfig(sess *clusterSession) (*tls.Config, bool, error
// if the next hop supports impersonation, we can use the TLS config
// of the proxy to connect to it.
if f.allServersSupportImpersonation(sess) {
return f.cfg.ConnTLSConfig.Clone(), true, nil
_, tlsConfig, err := f.transportForRequestWithImpersonation(sess)
return tlsConfig, err == nil, trace.Wrap(err)
}

// If the next hop does not support impersonation, we need to get a
Expand Down