Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Machine ID: Database Tunnel service #39880

Merged
merged 13 commits into from
Apr 3, 2024
9 changes: 7 additions & 2 deletions lib/tbot/service_database_tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
type DatabaseTunnelService struct {
botCfg *config.BotConfig
cfg *config.DatabaseTunnelService
proxyPingCache *proxyPingCache
log logrus.FieldLogger
resolver reversetunnelclient.Resolver
botClient *auth.Client
Expand All @@ -90,6 +91,12 @@
s.log.WithField("roles", roles).Debug("No roles configured, using all roles available.")
}

proxyPing, err := s.proxyPingCache.ping(ctx)
if err != nil {
return alpnproxy.LocalProxyConfig{}, trace.Wrap(err, "pinging proxy")
}
proxyAddr := proxyPing.Proxy.SSH.PublicAddr

// Fetch information about the database and then issue the initial
// certificate. We issue the initial certificate to allow us to fail faster.
// We cache the routeToDatabase as these will not change during the lifetime
Expand All @@ -114,8 +121,6 @@
}
s.log.Debug("Issued initial certificate for local proxy.")

proxyAddr := "leaf.tele.ottr.sh:443"

middleware := alpnProxyMiddleware{
onNewConnection: func(ctx context.Context, lp *alpnproxy.LocalProxy, conn net.Conn) error {
ctx, span := tracer.Start(ctx, "DatabaseTunnelService/OnNewConnection")
Expand Down Expand Up @@ -205,7 +210,7 @@

// lp.Start will block and continues to block until lp.Close() is called.
// Despite taking a context, it will not exit until the first connection is
// made after the context is cancelled.

Check failure on line 213 in lib/tbot/service_database_tunnel.go

View workflow job for this annotation

GitHub Actions / Lint (Go)

`cancelled` is a misspelling of `canceled` (misspell)
var errCh = make(chan error, 1)
go func() {
errCh <- lp.Start(ctx)
Expand Down
84 changes: 20 additions & 64 deletions lib/tbot/service_outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const renewalRetryLimit = 5
type outputsService struct {
log logrus.FieldLogger
reloadBroadcaster *channelBroadcaster
proxyPingCache *proxyPingCache
authPingCache *authPingCache
botClient *auth.Client
getBotIdentity getBotIdentityFn
cfg *config.BotConfig
Expand Down Expand Up @@ -90,8 +92,10 @@ func (s *outputsService) renewOutputs(
// create a cache shared across outputs so they don't hammer the auth
// server with similar requests
drc := &outputRenewalCache{
client: s.botClient,
cfg: s.cfg,
proxyPingCache: s.proxyPingCache,
authPingCache: s.authPingCache,
client: s.botClient,
cfg: s.cfg,
}

// Determine the default role list based on the bot role. The role's
Expand Down Expand Up @@ -633,14 +637,14 @@ func fetchDefaultRoles(ctx context.Context, roleGetter services.RoleGetter, iden
// requests for the same information. This is shared between all of the
// outputs.
type outputRenewalCache struct {
client *auth.Client
client *auth.Client
cfg *config.BotConfig
proxyPingCache *proxyPingCache
authPingCache *authPingCache

cfg *config.BotConfig
mu sync.Mutex
mu sync.Mutex
// These are protected by getter/setters with mutex locks
_cas map[types.CertAuthType][]types.CertAuthority
_authPong *proto.PingResponse
_proxyPong *webclient.PingResponse
_cas map[types.CertAuthType][]types.CertAuthority
}

func (orc *outputRenewalCache) getCertAuthorities(
Expand Down Expand Up @@ -672,71 +676,23 @@ func (orc *outputRenewalCache) GetCertAuthorities(
return orc.getCertAuthorities(ctx, caType)
}

func (orc *outputRenewalCache) authPing(ctx context.Context) (*proto.PingResponse, error) {
if orc._authPong != nil {
return orc._authPong, nil
}

pong, err := orc.client.Ping(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
orc._authPong = &pong

return &pong, nil
}

// AuthPing pings the auth server and returns the (possibly cached) response.
func (orc *outputRenewalCache) AuthPing(ctx context.Context) (*proto.PingResponse, error) {
orc.mu.Lock()
defer orc.mu.Unlock()
return orc.authPing(ctx)
}

func (orc *outputRenewalCache) proxyPing(ctx context.Context) (*webclient.PingResponse, error) {
if orc._proxyPong != nil {
return orc._proxyPong, nil
}

// Determine the Proxy address to use.
addr, addrKind := orc.cfg.Address()
switch addrKind {
case config.AddressKindAuth:
// If the address is an auth address, ping auth to determine proxy addr.
authPong, err := orc.authPing(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
addr = authPong.ProxyPublicAddr
case config.AddressKindProxy:
// If the address is a proxy address, use it directly.
default:
return nil, trace.BadParameter("unsupported address kind: %v", addrKind)
}

// We use find instead of Ping as it's less resource intense and we can
// ping the AuthServer directly for its configuration if necessary.
proxyPong, err := webclient.Find(&webclient.Config{
Context: ctx,
ProxyAddr: addr,
Insecure: orc.cfg.Insecure,
})
res, err := orc.authPingCache.ping(ctx)
if err != nil {
return nil, trace.Wrap(err)
}

orc._proxyPong = proxyPong

return proxyPong, nil
}
return &res, nil
}

// ProxyPing returns a (possibly cached) ping response from the Teleport proxy.
// Note that it relies on the auth server being configured with a sane proxy
// public address.
func (orc *outputRenewalCache) ProxyPing(ctx context.Context) (*webclient.PingResponse, error) {
orc.mu.Lock()
defer orc.mu.Unlock()
return orc.proxyPing(ctx)
res, err := orc.proxyPingCache.ping(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
return res, nil
}

// Config returns the bots config.
Expand Down
93 changes: 93 additions & 0 deletions lib/tbot/tbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"google.golang.org/grpc"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/client/webclient"
"github.com/gravitational/teleport/api/metadata"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -191,6 +192,16 @@ func (b *Bot) Run(ctx context.Context) error {
}()
services = append(services, b.botIdentitySvc)

authPingCache := &authPingCache{
client: b.botIdentitySvc.GetClient(),
log: b.log,
}
proxyPingCache := &proxyPingCache{
authPingCache: authPingCache,
botCfg: b.cfg,
log: b.log,
}

// Setup all other services
if b.cfg.DiagAddr != "" {
services = append(services, &diagnosticsService{
Expand All @@ -202,6 +213,8 @@ func (b *Bot) Run(ctx context.Context) error {
})
}
services = append(services, &outputsService{
authPingCache: authPingCache,
proxyPingCache: proxyPingCache,
getBotIdentity: b.botIdentitySvc.GetIdentity,
botClient: b.botIdentitySvc.GetClient(),
cfg: b.cfg,
Expand Down Expand Up @@ -248,6 +261,7 @@ func (b *Bot) Run(ctx context.Context) error {
case *config.DatabaseTunnelService:
svc := &DatabaseTunnelService{
getBotIdentity: b.botIdentitySvc.GetIdentity,
proxyPingCache: proxyPingCache,
botClient: b.botIdentitySvc.GetClient(),
resolver: resolver,
botCfg: b.cfg,
Expand Down Expand Up @@ -466,3 +480,82 @@ func clientForFacade(
c, err := authclient.Connect(ctx, authClientConfig)
return c, trace.Wrap(err)
}

type authPingCache struct {
client *auth.Client
log logrus.FieldLogger

mu sync.RWMutex
fetched bool
strideynet marked this conversation as resolved.
Show resolved Hide resolved
cachedValue proto.PingResponse
}

func (a *authPingCache) ping(ctx context.Context) (proto.PingResponse, error) {
a.mu.Lock()
defer a.mu.Unlock()
if a.fetched {
return a.cachedValue, nil
}

a.log.Debug("Pinging auth server.")
res, err := a.client.Ping(ctx)
if err != nil {
a.log.WithError(err).Error("Failed to ping auth server.")
return proto.PingResponse{}, trace.Wrap(err)
}
a.fetched = true
a.cachedValue = res
a.log.WithField("pong", res).Debug("Successfully pinged auth server.")

return a.cachedValue, nil
}

type proxyPingCache struct {
authPingCache *authPingCache
botCfg *config.BotConfig
log logrus.FieldLogger

mu sync.RWMutex
fetched bool
cachedValue *webclient.PingResponse
}

func (p *proxyPingCache) ping(ctx context.Context) (*webclient.PingResponse, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.fetched {
return p.cachedValue, nil
}

// Determine the Proxy address to use.
addr, addrKind := p.botCfg.Address()
switch addrKind {
case config.AddressKindAuth:
// If the address is an auth address, ping auth to determine proxy addr.
authPong, err := p.authPingCache.ping(ctx)
if err != nil {
return nil, trace.Wrap(err)
}
addr = authPong.ProxyPublicAddr
case config.AddressKindProxy:
// If the address is a proxy address, use it directly.
default:
return nil, trace.BadParameter("unsupported address kind: %v", addrKind)
}

p.log.WithField("addr", addr).Debug("Pinging proxy.")
res, err := webclient.Find(&webclient.Config{
Context: ctx,
ProxyAddr: addr,
Insecure: p.botCfg.Insecure,
})
if err != nil {
p.log.WithError(err).Error("Failed to ping proxy.")
return nil, trace.Wrap(err)
}
p.log.WithField("pong", res).Debug("Successfully pinged proxy.")
p.fetched = true
p.cachedValue = res

return p.cachedValue, nil
}