diff --git a/.changelog/16497.txt b/.changelog/16497.txt new file mode 100644 index 000000000000..3aa3633ac3a6 --- /dev/null +++ b/.changelog/16497.txt @@ -0,0 +1,3 @@ +```release-note:bug +proxycfg: ensure that an irrecoverable error in proxycfg closes the xds session and triggers a replacement proxycfg watcher +``` diff --git a/agent/agent.go b/agent/agent.go index 9d3db7f00612..72dda87e22f7 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -701,11 +701,12 @@ func (a *Agent) Start(ctx context.Context) error { go localproxycfg.Sync( &lib.StopChannelContext{StopCh: a.shutdownCh}, localproxycfg.SyncConfig{ - Manager: a.proxyConfig, - State: a.State, - Logger: a.proxyConfig.Logger.Named("agent-state"), - Tokens: a.baseDeps.Tokens, - NodeName: a.config.NodeName, + Manager: a.proxyConfig, + State: a.State, + Logger: a.proxyConfig.Logger.Named("agent-state"), + Tokens: a.baseDeps.Tokens, + NodeName: a.config.NodeName, + ResyncFrequency: a.config.LocalProxyConfigResyncInterval, }, ) diff --git a/agent/config/builder.go b/agent/config/builder.go index c8d2d1f0c6a2..6321990121b9 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1083,6 +1083,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) { Watches: c.Watches, XDSUpdateRateLimit: rate.Limit(float64Val(c.XDS.UpdateMaxPerSecond)), AutoReloadConfigCoalesceInterval: 1 * time.Second, + LocalProxyConfigResyncInterval: 30 * time.Second, } rt.TLS, err = b.buildTLSConfig(rt, c.TLS) diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 6ac9926e5a28..a855ac688c08 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -1467,6 +1467,10 @@ type RuntimeConfig struct { Reporting ReportingConfig + // LocalProxyConfigResyncInterval is not a user-configurable value and exists + // here so that tests can use a smaller value. + LocalProxyConfigResyncInterval time.Duration + EnterpriseRuntimeConfig } diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 38624dc14ce6..5ab3499bff85 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -5828,12 +5828,13 @@ func TestLoad_FullConfig(t *testing.T) { nodeEntMeta := structs.NodeEnterpriseMetaInDefaultPartition() expected := &RuntimeConfig{ // non-user configurable values - AEInterval: time.Minute, - CheckDeregisterIntervalMin: time.Minute, - CheckReapInterval: 30 * time.Second, - SegmentNameLimit: 64, - SyncCoordinateIntervalMin: 15 * time.Second, - SyncCoordinateRateTarget: 64, + AEInterval: time.Minute, + CheckDeregisterIntervalMin: time.Minute, + CheckReapInterval: 30 * time.Second, + SegmentNameLimit: 64, + SyncCoordinateIntervalMin: 15 * time.Second, + SyncCoordinateRateTarget: 64, + LocalProxyConfigResyncInterval: 30 * time.Second, Revision: "JNtPSav3", Version: "R909Hblt", diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index eb03c271a9ab..151bbee6ba5a 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -108,9 +108,9 @@ "Method": "", "Name": "zoo", "Notes": "", + "OSService": "", "OutputMaxSize": 4096, "ScriptArgs": [], - "OSService": "", "ServiceID": "", "Shell": "", "Status": "", @@ -231,6 +231,7 @@ "KVMaxValueSize": 1234567800000000, "LeaveDrainTime": "0s", "LeaveOnTerm": false, + "LocalProxyConfigResyncInterval": "0s", "Logging": { "EnableSyslog": false, "LogFilePath": "", @@ -256,6 +257,7 @@ "PrimaryGatewaysInterval": "0s", "RPCAdvertiseAddr": "", "RPCBindAddr": "", + "RPCClientTimeout": "0s", "RPCConfig": { "EnableStreaming": false }, @@ -265,7 +267,6 @@ "RPCMaxConnsPerClient": 0, "RPCProtocol": 0, "RPCRateLimit": 0, - "RPCClientTimeout": "0s", "RaftBoltDBConfig": { "NoFreelistSync": false }, @@ -334,6 +335,7 @@ "Method": "", "Name": "blurb", "Notes": "", + "OSService": "", "OutputMaxSize": 4096, "ProxyGRPC": "", "ProxyHTTP": "", @@ -341,7 +343,6 @@ "Shell": "", "Status": "", "SuccessBeforePassing": 0, - "OSService": "", "TCP": "", "TLSServerName": "", "TLSSkipVerify": false, diff --git a/agent/proxycfg-sources/local/sync.go b/agent/proxycfg-sources/local/sync.go index c6cee8c61d15..5702d2f36841 100644 --- a/agent/proxycfg-sources/local/sync.go +++ b/agent/proxycfg-sources/local/sync.go @@ -2,6 +2,7 @@ package local import ( "context" + "time" "github.com/hashicorp/go-hclog" @@ -11,6 +12,8 @@ import ( "github.com/hashicorp/consul/agent/token" ) +const resyncFrequency = 30 * time.Second + const source proxycfg.ProxySource = "local" // SyncConfig contains the dependencies required by Sync. @@ -30,6 +33,10 @@ type SyncConfig struct { // Logger will be used to write log messages. Logger hclog.Logger + + // ResyncFrequency is how often to do a resync and recreate any terminated + // watches. + ResyncFrequency time.Duration } // Sync watches the agent's local state and registers/deregisters services with @@ -50,12 +57,19 @@ func Sync(ctx context.Context, cfg SyncConfig) { cfg.State.Notify(stateCh) defer cfg.State.StopNotify(stateCh) + var resyncCh <-chan time.Time for { sync(cfg) + if resyncCh == nil && cfg.ResyncFrequency > 0 { + resyncCh = time.After(cfg.ResyncFrequency) + } + select { case <-stateCh: // Wait for a state change. + case <-resyncCh: + resyncCh = nil case <-ctx.Done(): return } diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index c58268e7e039..d21ff4f1ea5b 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -158,7 +158,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour func (m *Manager) register(id ProxyID, ns *structs.NodeService, source ProxySource, token string, overwrite bool) error { state, ok := m.proxies[id] - if ok { + if ok && !state.stoppedRunning() { if state.source != source && !overwrite { // Registered by a different source, leave as-is. return nil diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 1002f1445911..d7b92abb0ce2 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -81,10 +81,20 @@ type state struct { ch chan UpdateEvent snapCh chan ConfigSnapshot reqCh chan chan *ConfigSnapshot + doneCh chan struct{} rateLimiter *rate.Limiter } +func (s *state) stoppedRunning() bool { + select { + case <-s.doneCh: + return true + default: + return false + } +} + // failed returns whether run exited because a data source is in an // irrecoverable state. func (s *state) failed() bool { @@ -180,6 +190,7 @@ func newState(id ProxyID, ns *structs.NodeService, source ProxySource, token str ch: ch, snapCh: make(chan ConfigSnapshot, 1), reqCh: make(chan chan *ConfigSnapshot, 1), + doneCh: make(chan struct{}), rateLimiter: rateLimiter, }, nil } @@ -261,6 +272,9 @@ func (s *state) Watch() (<-chan ConfigSnapshot, error) { // Close discards the state and stops any long-running watches. func (s *state) Close(failed bool) error { + if s.stoppedRunning() { + return nil + } if s.cancel != nil { s.cancel() } @@ -310,6 +324,9 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { } func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) { + // Closing the done channel signals that this entire state is no longer + // going to be updated. + defer close(s.doneCh) // Close the channel we return from Watch when we stop so consumers can stop // watching and clean up their goroutines. It's important we do this here and // not in Close since this routine sends on this chan and so might panic if it @@ -425,9 +442,20 @@ func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) { func (s *state) CurrentSnapshot() *ConfigSnapshot { // Make a chan for the response to be sent on ch := make(chan *ConfigSnapshot, 1) - s.reqCh <- ch + + select { + case <-s.doneCh: + return nil + case s.reqCh <- ch: + } + // Wait for the response - return <-ch + select { + case <-s.doneCh: + return nil + case resp := <-ch: + return resp + } } // Changed returns whether or not the passed NodeService has had any of the diff --git a/agent/proxycfg_test.go b/agent/proxycfg_test.go new file mode 100644 index 000000000000..40a49f9b1626 --- /dev/null +++ b/agent/proxycfg_test.go @@ -0,0 +1,138 @@ +package agent + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/grpc-external/limiter" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/testrpc" +) + +func TestAgent_local_proxycfg(t *testing.T) { + a := NewTestAgent(t, TestACLConfig()) + defer a.Shutdown() + + testrpc.WaitForLeader(t, a.RPC, "dc1") + + token := generateUUID() + + svc := &structs.NodeService{ + ID: "db", + Service: "db", + Port: 5000, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + } + require.NoError(t, a.State.AddServiceWithChecks(svc, nil, token)) + + proxy := &structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "db-sidecar-proxy", + Service: "db-sidecar-proxy", + Port: 5000, + // Set this internal state that we expect sidecar registrations to have. + LocallyRegisteredAsSidecar: true, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "db", + Upstreams: structs.TestUpstreams(t), + }, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + } + require.NoError(t, a.State.AddServiceWithChecks(proxy, nil, token)) + + // This is a little gross, but this gives us the layered pair of + // local/catalog sources for now. + cfg := a.xdsServer.CfgSrc + + var ( + timer = time.After(100 * time.Millisecond) + timerFired = false + finalTimer <-chan time.Time + ) + + var ( + firstTime = true + ch <-chan *proxycfg.ConfigSnapshot + stc limiter.SessionTerminatedChan + cancel proxycfg.CancelFunc + ) + defer func() { + if cancel != nil { + cancel() + } + }() + for { + if ch == nil { + // Sign up for a stream of config snapshots, in the same manner as the xds server. + sid := proxy.CompoundServiceID() + + if firstTime { + firstTime = false + } else { + t.Logf("re-creating watch") + } + + // Prior to fixes in https://github.com/hashicorp/consul/pull/16497 + // this call to Watch() would deadlock. + var err error + ch, stc, cancel, err = cfg.Watch(sid, a.config.NodeName, token) + require.NoError(t, err) + } + select { + case <-stc: + t.Fatal("session unexpectedly terminated") + case snap, ok := <-ch: + if !ok { + t.Logf("channel is closed") + cancel() + ch, stc, cancel = nil, nil, nil + continue + } + require.NotNil(t, snap) + if !timerFired { + t.Fatal("should not have gotten snapshot until after we manifested the token") + } + return + case <-timer: + timerFired = true + finalTimer = time.After(1 * time.Second) + + // This simulates the eventual consistency of a token + // showing up on a server after it's creation by + // pre-creating the UUID and later using that as the + // initial SecretID for a real token. + gotToken := testWriteToken(t, a, &api.ACLToken{ + AccessorID: generateUUID(), + SecretID: token, + Description: "my token", + ServiceIdentities: []*api.ACLServiceIdentity{{ + ServiceName: "db", + }}, + }) + require.Equal(t, token, gotToken) + case <-finalTimer: + t.Fatal("did not receive a snapshot after the token manifested") + } + } + +} + +func testWriteToken(t *testing.T, a *TestAgent, tok *api.ACLToken) string { + req, _ := http.NewRequest("PUT", "/v1/acl/token", jsonReader(tok)) + req.Header.Add("X-Consul-Token", "root") + resp := httptest.NewRecorder() + a.srv.h.ServeHTTP(resp, req) + require.Equal(t, http.StatusOK, resp.Code) + + dec := json.NewDecoder(resp.Body) + aclResp := &structs.ACLToken{} + require.NoError(t, dec.Decode(aclResp)) + return aclResp.SecretID +} diff --git a/agent/testagent.go b/agent/testagent.go index 0bb19a099b5c..8214c6059b7d 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -211,6 +211,9 @@ func (a *TestAgent) Start(t *testing.T) error { } else { result.RuntimeConfig.Telemetry.Disable = true } + + // Lower the resync interval for tests. + result.RuntimeConfig.LocalProxyConfigResyncInterval = 250 * time.Millisecond } return result, err }