Skip to content

Commit

Permalink
[v14] Fix remote pool of signed certs when exec into leaf clusters (#…
Browse files Browse the repository at this point in the history
…32768)

* Fix remote pool of signed certs when exec into leaf clusters

This PR fixes the list of acceptable CAs from the leaf cluster when
exec into a leaf cluster pod.

Fixes #32380

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>

* add unit test

---------

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>
  • Loading branch information
tigrato committed Sep 29, 2023
1 parent f7101b6 commit 0d2b682
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 27 deletions.
10 changes: 9 additions & 1 deletion lib/kube/proxy/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,14 +368,22 @@ type Forwarder struct {
// use the heartbeat clusters.
getKubernetesServersForKubeCluster getKubeServersByNameFunc

// cachedTransport is a cache of http.Transport objects used to
// cachedTransport is a cache of cachedTransportEntry objects used to
// connect to Teleport services.
// TODO(tigrato): Implement a cache eviction policy using watchers.
cachedTransport *ttlmap.TTLMap
// cachedTransportMu is a mutex used to protect the cachedTransport.
cachedTransportMu sync.Mutex
}

// cachedTransportEntry is a cached transport entry used to connect to
// Teleport services. It contains a cached http.RoundTripper and a cached
// tls.Config.
type cachedTransportEntry struct {
transport http.RoundTripper
tlsConfig *tls.Config
}

// getKubeServersByNameFunc is a function that returns a list of
// kubernetes servers for a given kube cluster.
type getKubeServersByNameFunc = func(ctx context.Context, name string) ([]types.KubeServer, error)
Expand Down
74 changes: 70 additions & 4 deletions lib/kube/proxy/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,10 +1175,11 @@ func newMockForwader(ctx context.Context, t *testing.T) *Forwarder {
type mockCSRClient struct {
auth.ClientI

clock clockwork.Clock
ca *tlsca.CertAuthority
gotCSR auth.KubeCSR
lastCert *x509.Certificate
clock clockwork.Clock
ca *tlsca.CertAuthority
gotCSR auth.KubeCSR
lastCert *x509.Certificate
leafClusterName string
}

func newMockCSRClient(clock clockwork.Clock) (*mockCSRClient, error) {
Expand All @@ -1189,6 +1190,26 @@ func newMockCSRClient(clock clockwork.Clock) (*mockCSRClient, error) {
return &mockCSRClient{ca: ca, clock: clock}, nil
}

func (c *mockCSRClient) GetCertAuthority(ctx context.Context, id types.CertAuthID, loadKeys bool) (types.CertAuthority, error) {
if id.DomainName == c.leafClusterName {
return &types.CertAuthorityV2{
Kind: types.KindCertAuthority,
Version: types.V3,
Metadata: types.Metadata{
Name: "local",
},
Spec: types.CertAuthoritySpecV2{
Type: types.HostCA,
ClusterName: c.leafClusterName,
ActiveKeys: types.CAKeySet{
TLS: []*types.TLSKeyPair{{Cert: []byte(fixtures.TLSCACertPEM)}},
},
},
}, nil
}
return nil, trace.NotFound("cluster not found")
}

func (c *mockCSRClient) ProcessKubeCSR(csr auth.KubeCSR) (*auth.KubeCSRResponse, error) {
c.gotCSR = csr

Expand Down Expand Up @@ -1837,3 +1858,48 @@ func Test_authContext_eventClusterMeta(t *testing.T) {
})
}
}

func TestForwarderTLSConfigCAs(t *testing.T) {
clusterName := "leaf"

// Create a cert pool with the cert from fixtures.TLSCACertPEM
certPool := x509.NewCertPool()
certPool.AppendCertsFromPEM([]byte(fixtures.TLSCACertPEM))

// create the tls config used by the forwarder
originalTLSConfig := &tls.Config{}
// create the auth server mock client
clock := clockwork.NewFakeClock()
cl, err := newMockCSRClient(clock)
require.NoError(t, err)
cl.leafClusterName = clusterName

f := &Forwarder{
cfg: ForwarderConfig{
Keygen: testauthority.New(),
AuthClient: cl,
TracerProvider: otel.GetTracerProvider(),
tracer: otel.Tracer(teleport.ComponentKube),
KubeServiceType: ProxyService,
CachingAuthClient: cl,
ConnTLSConfig: originalTLSConfig,
},
log: logrus.NewEntry(logrus.New()),
ctx: context.Background(),
}
// generate tlsConfig for the leaf cluster
tlsConfig, err := f.getTLSConfigForLeafCluster(clusterName)
require.NoError(t, err)
// ensure that the tlsConfig is a clone of the originalTLSConfig
require.NotSame(t, originalTLSConfig, tlsConfig, "expected tlsConfig to be different from originalTLSConfig")
// ensure that the tlsConfig has the certPool as the RootCAs
require.True(t, tlsConfig.RootCAs.Equal(certPool), "expected root CAs to be equal to certPool")

// generate tlsConfig for the local cluster
_, localTLSConfig, err := f.newLocalClusterTransport(clusterName)
require.NoError(t, err)
// ensure that the localTLSConfig is a clone of the originalTLSConfig
require.NotSame(t, originalTLSConfig, localTLSConfig, "expected localTLSConfig pointer to be different from originalTLSConfig")
// ensure that the localTLSConfig doesn't have the certPool as the RootCAs
require.False(t, localTLSConfig.RootCAs.Equal(certPool), "root CAs should not include certPool")
}
60 changes: 38 additions & 22 deletions lib/kube/proxy/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func (f *Forwarder) transportForRequest(sess *clusterSession) (http.RoundTripper
// If all servers support impersonation, use a new transport for each
// request. This will ensure that the client certificate is valid for the
// server that the request is being sent to.
return f.transportForRequestWithImpersonation(sess)
transport, _, err := f.transportForRequestWithImpersonation(sess)
return transport, trace.Wrap(err)
}
// Otherwise, use a single transport per request.
return f.transportForRequestWithoutImpersonation(sess)
Expand Down Expand Up @@ -130,7 +131,7 @@ func (f *Forwarder) transportForRequestWithoutImpersonation(sess *clusterSession
// requests to the cluster in order to improve performance.
// The transport is cached in the forwarder so that it can be reused for future
// requests. If the transport is not cached, a new one is created and cached.
func (f *Forwarder) transportForRequestWithImpersonation(sess *clusterSession) (http.RoundTripper, error) {
func (f *Forwarder) transportForRequestWithImpersonation(sess *clusterSession) (http.RoundTripper, *tls.Config, error) {
// transportCacheTTL is the TTL for the transport cache.
const transportCacheTTL = 5 * time.Hour
// If the cluster is remote, the key is the teleport cluster name.
Expand All @@ -143,36 +144,44 @@ func (f *Forwarder) transportForRequestWithImpersonation(sess *clusterSession) (
cachedI, ok := f.cachedTransport.Get(key)
f.cachedTransportMu.Unlock()
if ok {
if cached, ok := cachedI.(http.RoundTripper); ok {
return cached, nil
if cached, ok := cachedI.(cachedTransportEntry); ok {
return cached.transport, cached.tlsConfig.Clone(), nil
}
}

var httpTransport http.RoundTripper
var err error
var (
httpTransport http.RoundTripper
err error
tlsConfig *tls.Config
)
if sess.teleportCluster.isRemote {
// If the cluster is remote, create a new transport for the remote cluster.
httpTransport, err = f.newRemoteClusterTransport(sess.teleportCluster.name)
httpTransport, tlsConfig, err = f.newRemoteClusterTransport(sess.teleportCluster.name)
} else if sess.kubeAPICreds != nil {
// If agent is running in agent mode, get the transport from the configured cluster
// credentials.
return sess.kubeAPICreds.getTransport(), nil
return sess.kubeAPICreds.getTransport(), sess.kubeAPICreds.getTLSConfig(), nil
} else if f.cfg.ReverseTunnelSrv != nil {
// If agent is running in proxy mode, create a new transport for the local cluster.
httpTransport, err = f.newLocalClusterTransport(sess.kubeClusterName)
httpTransport, tlsConfig, err = f.newLocalClusterTransport(sess.kubeClusterName)
} else {
return nil, trace.BadParameter("no reverse tunnel server or credentials provided")
return nil, nil, trace.BadParameter("no reverse tunnel server or credentials provided")
}
if err != nil {
return nil, trace.Wrap(err)
return nil, nil, trace.Wrap(err)
}

// Cache the transport.
f.cachedTransportMu.Lock()
f.cachedTransport.Set(key, httpTransport, transportCacheTTL)
f.cachedTransport.Set(key,
cachedTransportEntry{
transport: httpTransport,
tlsConfig: tlsConfig,
},
transportCacheTTL)
f.cachedTransportMu.Unlock()

return httpTransport, nil
return httpTransport, tlsConfig.Clone(), nil
}

// transportCacheKey returns a key used to cache transports.
Expand Down Expand Up @@ -309,25 +318,28 @@ func validClientCreds(clock clockwork.Clock, c *tls.Config) bool {
// that can be used to dial Kubernetes Proxy in a remote Teleport cluster.
// The transport is configured to use a connection pool and to close idle
// connections after a timeout.
func (f *Forwarder) newRemoteClusterTransport(clusterName string) (http.RoundTripper, error) {
func (f *Forwarder) newRemoteClusterTransport(clusterName string) (http.RoundTripper, *tls.Config, error) {
// Tunnel is nil for a teleport process with "kubernetes_service" but
// not "proxy_service".
if f.cfg.ReverseTunnelSrv == nil {
return nil, trace.BadParameter("this Teleport process can not dial Kubernetes endpoints in remote Teleport clusters; only proxy_service supports this, make sure a Teleport proxy is first in the request path")
return nil, nil, trace.BadParameter("this Teleport process can not dial Kubernetes endpoints in remote Teleport clusters; only proxy_service supports this, make sure a Teleport proxy is first in the request path")
}
// Dialer that will be used to dial the remote cluster via the reverse tunnel.
dialFn := f.remoteClusterDialer(clusterName)
tlsConfig, err := f.getTLSConfigForLeafCluster(clusterName)
if err != nil {
return nil, trace.Wrap(err)
return nil, nil, trace.Wrap(err)
}
// Create a new HTTP/2 transport that will be used to dial the remote cluster.
h2Transport, err := newH2Transport(tlsConfig, dialFn)
if err != nil {
return nil, trace.Wrap(err)
return nil, nil, trace.Wrap(err)
}

return instrumentedRoundtripper(f.cfg.KubeServiceType, auth.NewImpersonatorRoundTripper(h2Transport)), nil
return instrumentedRoundtripper(
f.cfg.KubeServiceType,
auth.NewImpersonatorRoundTripper(h2Transport),
), tlsConfig.Clone(), nil
}

// getTLSConfigForLeafCluster returns a TLS config with the Proxy certificate
Expand Down Expand Up @@ -399,15 +411,18 @@ func (f *Forwarder) remoteClusterDialer(clusterName string) dialContextFunc {

// newLocalClusterTransport returns a new [http.Transport] (https://golang.org/pkg/net/http/#Transport)
// that can be used to dial Kubernetes Service in a local Teleport cluster.
func (f *Forwarder) newLocalClusterTransport(kubeClusterName string) (http.RoundTripper, error) {
func (f *Forwarder) newLocalClusterTransport(kubeClusterName string) (http.RoundTripper, *tls.Config, error) {
dialFn := f.localClusterDialer(kubeClusterName)
// Create a new HTTP/2 transport that will be used to dial the remote cluster.
h2Transport, err := newH2Transport(f.cfg.ConnTLSConfig, dialFn)
if err != nil {
return nil, trace.Wrap(err)
return nil, nil, trace.Wrap(err)
}

return instrumentedRoundtripper(f.cfg.KubeServiceType, auth.NewImpersonatorRoundTripper(h2Transport)), nil
return instrumentedRoundtripper(
f.cfg.KubeServiceType,
auth.NewImpersonatorRoundTripper(h2Transport),
), f.cfg.ConnTLSConfig.Clone(), nil
}

// localClusterDialer returns a dialer that can be used to dial Kubernetes Service
Expand Down Expand Up @@ -524,7 +539,8 @@ func (f *Forwarder) getTLSConfig(sess *clusterSession) (*tls.Config, bool, error
// if the next hop supports impersonation, we can use the TLS config
// of the proxy to connect to it.
if f.allServersSupportImpersonation(sess) {
return f.cfg.ConnTLSConfig.Clone(), true, nil
_, tlsConfig, err := f.transportForRequestWithImpersonation(sess)
return tlsConfig, err == nil, trace.Wrap(err)
}

// If the next hop does not support impersonation, we need to get a
Expand Down

0 comments on commit 0d2b682

Please sign in to comment.