Skip to content

Commit

Permalink
proxycfg: ensure that an irrecoverable error in proxycfg closes the x…
Browse files Browse the repository at this point in the history
…ds session and triggers a replacement proxycfg watcher (#16497)

Receiving an "acl not found" error from an RPC in the agent cache and the
streaming/event components will cause any request loops to cease under the
assumption that they will never work again if the token was destroyed. This
prevents log spam (#14144, #9738).

Unfortunately due to things like:

- authz requests going to stale servers that may not have witnessed the token
  creation yet

- authz requests in a secondary datacenter happening before the tokens get
  replicated to that datacenter

- authz requests from a primary TO a secondary datacenter happening before the
  tokens get replicated to that datacenter

The caller will get an "acl not found" *before* the token exists, rather than
just after. The machinery added above in the linked PRs will kick in and
prevent the request loop from looping around again once the tokens actually
exist.

For `consul-dataplane` usages, where xDS is served by the Consul servers
rather than the clients ultimately this is not a problem because in that
scenario the `agent/proxycfg` machinery is on-demand and launched by a new xDS
stream needing data for a specific service in the catalog. If the watching
goroutines are terminated it ripples down and terminates the xDS stream, which
CDP will eventually re-establish and restart everything.

For Consul client usages, the `agent/proxycfg` machinery is ahead-of-time
launched at service registration time (called "local" in some of the proxycfg
machinery) so when the xDS stream comes in the data is already ready to go. If
the watching goroutines terminate it should terminate the xDS stream, but
there's no mechanism to re-spawn the watching goroutines. If the xDS stream
reconnects it will see no `ConfigSnapshot` and will not get one again until
the client agent is restarted, or the service is re-registered with something
changed in it.

This PR fixes a few things in the machinery:

- there was an inadvertent deadlock in fetching snapshot from the proxycfg
  machinery by xDS, such that when the watching goroutine terminated the
  snapshots would never be fetched. This caused some of the xDS machinery to
  get indefinitely paused and not finish the teardown properly.

- Every 30s we now attempt to re-insert all locally registered services into
  the proxycfg machinery.

- When services are re-inserted into the proxycfg machinery we special case
  "dead" ones such that we unilaterally replace them rather that doing that
  conditionally.
  • Loading branch information
erichaberkorn committed Apr 18, 2023
1 parent 7eb1045 commit a4e6189
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .changelog/16497.txt
@@ -0,0 +1,3 @@
```release-note:bug
proxycfg: ensure that an irrecoverable error in proxycfg closes the xds session and triggers a replacement proxycfg watcher
```
11 changes: 6 additions & 5 deletions agent/agent.go
Expand Up @@ -701,11 +701,12 @@ func (a *Agent) Start(ctx context.Context) error {
go localproxycfg.Sync(
&lib.StopChannelContext{StopCh: a.shutdownCh},
localproxycfg.SyncConfig{
Manager: a.proxyConfig,
State: a.State,
Logger: a.proxyConfig.Logger.Named("agent-state"),
Tokens: a.baseDeps.Tokens,
NodeName: a.config.NodeName,
Manager: a.proxyConfig,
State: a.State,
Logger: a.proxyConfig.Logger.Named("agent-state"),
Tokens: a.baseDeps.Tokens,
NodeName: a.config.NodeName,
ResyncFrequency: a.config.LocalProxyConfigResyncInterval,
},
)

Expand Down
1 change: 1 addition & 0 deletions agent/config/builder.go
Expand Up @@ -1083,6 +1083,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
Watches: c.Watches,
XDSUpdateRateLimit: rate.Limit(float64Val(c.XDS.UpdateMaxPerSecond)),
AutoReloadConfigCoalesceInterval: 1 * time.Second,
LocalProxyConfigResyncInterval: 30 * time.Second,
}

rt.TLS, err = b.buildTLSConfig(rt, c.TLS)
Expand Down
4 changes: 4 additions & 0 deletions agent/config/runtime.go
Expand Up @@ -1467,6 +1467,10 @@ type RuntimeConfig struct {

Reporting ReportingConfig

// LocalProxyConfigResyncInterval is not a user-configurable value and exists
// here so that tests can use a smaller value.
LocalProxyConfigResyncInterval time.Duration

EnterpriseRuntimeConfig
}

Expand Down
13 changes: 7 additions & 6 deletions agent/config/runtime_test.go
Expand Up @@ -5828,12 +5828,13 @@ func TestLoad_FullConfig(t *testing.T) {
nodeEntMeta := structs.NodeEnterpriseMetaInDefaultPartition()
expected := &RuntimeConfig{
// non-user configurable values
AEInterval: time.Minute,
CheckDeregisterIntervalMin: time.Minute,
CheckReapInterval: 30 * time.Second,
SegmentNameLimit: 64,
SyncCoordinateIntervalMin: 15 * time.Second,
SyncCoordinateRateTarget: 64,
AEInterval: time.Minute,
CheckDeregisterIntervalMin: time.Minute,
CheckReapInterval: 30 * time.Second,
SegmentNameLimit: 64,
SyncCoordinateIntervalMin: 15 * time.Second,
SyncCoordinateRateTarget: 64,
LocalProxyConfigResyncInterval: 30 * time.Second,

Revision: "JNtPSav3",
Version: "R909Hblt",
Expand Down
7 changes: 4 additions & 3 deletions agent/config/testdata/TestRuntimeConfig_Sanitize.golden
Expand Up @@ -108,9 +108,9 @@
"Method": "",
"Name": "zoo",
"Notes": "",
"OSService": "",
"OutputMaxSize": 4096,
"ScriptArgs": [],
"OSService": "",
"ServiceID": "",
"Shell": "",
"Status": "",
Expand Down Expand Up @@ -231,6 +231,7 @@
"KVMaxValueSize": 1234567800000000,
"LeaveDrainTime": "0s",
"LeaveOnTerm": false,
"LocalProxyConfigResyncInterval": "0s",
"Logging": {
"EnableSyslog": false,
"LogFilePath": "",
Expand All @@ -256,6 +257,7 @@
"PrimaryGatewaysInterval": "0s",
"RPCAdvertiseAddr": "",
"RPCBindAddr": "",
"RPCClientTimeout": "0s",
"RPCConfig": {
"EnableStreaming": false
},
Expand All @@ -265,7 +267,6 @@
"RPCMaxConnsPerClient": 0,
"RPCProtocol": 0,
"RPCRateLimit": 0,
"RPCClientTimeout": "0s",
"RaftBoltDBConfig": {
"NoFreelistSync": false
},
Expand Down Expand Up @@ -334,14 +335,14 @@
"Method": "",
"Name": "blurb",
"Notes": "",
"OSService": "",
"OutputMaxSize": 4096,
"ProxyGRPC": "",
"ProxyHTTP": "",
"ScriptArgs": [],
"Shell": "",
"Status": "",
"SuccessBeforePassing": 0,
"OSService": "",
"TCP": "",
"TLSServerName": "",
"TLSSkipVerify": false,
Expand Down
14 changes: 14 additions & 0 deletions agent/proxycfg-sources/local/sync.go
Expand Up @@ -2,6 +2,7 @@ package local

import (
"context"
"time"

"github.com/hashicorp/go-hclog"

Expand All @@ -11,6 +12,8 @@ import (
"github.com/hashicorp/consul/agent/token"
)

const resyncFrequency = 30 * time.Second

const source proxycfg.ProxySource = "local"

// SyncConfig contains the dependencies required by Sync.
Expand All @@ -30,6 +33,10 @@ type SyncConfig struct {

// Logger will be used to write log messages.
Logger hclog.Logger

// ResyncFrequency is how often to do a resync and recreate any terminated
// watches.
ResyncFrequency time.Duration
}

// Sync watches the agent's local state and registers/deregisters services with
Expand All @@ -50,12 +57,19 @@ func Sync(ctx context.Context, cfg SyncConfig) {
cfg.State.Notify(stateCh)
defer cfg.State.StopNotify(stateCh)

var resyncCh <-chan time.Time
for {
sync(cfg)

if resyncCh == nil && cfg.ResyncFrequency > 0 {
resyncCh = time.After(cfg.ResyncFrequency)
}

select {
case <-stateCh:
// Wait for a state change.
case <-resyncCh:
resyncCh = nil
case <-ctx.Done():
return
}
Expand Down
2 changes: 1 addition & 1 deletion agent/proxycfg/manager.go
Expand Up @@ -158,7 +158,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour

func (m *Manager) register(id ProxyID, ns *structs.NodeService, source ProxySource, token string, overwrite bool) error {
state, ok := m.proxies[id]
if ok {
if ok && !state.stoppedRunning() {
if state.source != source && !overwrite {
// Registered by a different source, leave as-is.
return nil
Expand Down
32 changes: 30 additions & 2 deletions agent/proxycfg/state.go
Expand Up @@ -81,10 +81,20 @@ type state struct {
ch chan UpdateEvent
snapCh chan ConfigSnapshot
reqCh chan chan *ConfigSnapshot
doneCh chan struct{}

rateLimiter *rate.Limiter
}

func (s *state) stoppedRunning() bool {
select {
case <-s.doneCh:
return true
default:
return false
}
}

// failed returns whether run exited because a data source is in an
// irrecoverable state.
func (s *state) failed() bool {
Expand Down Expand Up @@ -180,6 +190,7 @@ func newState(id ProxyID, ns *structs.NodeService, source ProxySource, token str
ch: ch,
snapCh: make(chan ConfigSnapshot, 1),
reqCh: make(chan chan *ConfigSnapshot, 1),
doneCh: make(chan struct{}),
rateLimiter: rateLimiter,
}, nil
}
Expand Down Expand Up @@ -261,6 +272,9 @@ func (s *state) Watch() (<-chan ConfigSnapshot, error) {

// Close discards the state and stops any long-running watches.
func (s *state) Close(failed bool) error {
if s.stoppedRunning() {
return nil
}
if s.cancel != nil {
s.cancel()
}
Expand Down Expand Up @@ -310,6 +324,9 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
}

func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) {
// Closing the done channel signals that this entire state is no longer
// going to be updated.
defer close(s.doneCh)
// Close the channel we return from Watch when we stop so consumers can stop
// watching and clean up their goroutines. It's important we do this here and
// not in Close since this routine sends on this chan and so might panic if it
Expand Down Expand Up @@ -425,9 +442,20 @@ func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) {
func (s *state) CurrentSnapshot() *ConfigSnapshot {
// Make a chan for the response to be sent on
ch := make(chan *ConfigSnapshot, 1)
s.reqCh <- ch

select {
case <-s.doneCh:
return nil
case s.reqCh <- ch:
}

// Wait for the response
return <-ch
select {
case <-s.doneCh:
return nil
case resp := <-ch:
return resp
}
}

// Changed returns whether or not the passed NodeService has had any of the
Expand Down
138 changes: 138 additions & 0 deletions agent/proxycfg_test.go
@@ -0,0 +1,138 @@
package agent

import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testrpc"
)

func TestAgent_local_proxycfg(t *testing.T) {
a := NewTestAgent(t, TestACLConfig())
defer a.Shutdown()

testrpc.WaitForLeader(t, a.RPC, "dc1")

token := generateUUID()

svc := &structs.NodeService{
ID: "db",
Service: "db",
Port: 5000,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
require.NoError(t, a.State.AddServiceWithChecks(svc, nil, token))

proxy := &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "db-sidecar-proxy",
Service: "db-sidecar-proxy",
Port: 5000,
// Set this internal state that we expect sidecar registrations to have.
LocallyRegisteredAsSidecar: true,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "db",
Upstreams: structs.TestUpstreams(t),
},
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
}
require.NoError(t, a.State.AddServiceWithChecks(proxy, nil, token))

// This is a little gross, but this gives us the layered pair of
// local/catalog sources for now.
cfg := a.xdsServer.CfgSrc

var (
timer = time.After(100 * time.Millisecond)
timerFired = false
finalTimer <-chan time.Time
)

var (
firstTime = true
ch <-chan *proxycfg.ConfigSnapshot
stc limiter.SessionTerminatedChan
cancel proxycfg.CancelFunc
)
defer func() {
if cancel != nil {
cancel()
}
}()
for {
if ch == nil {
// Sign up for a stream of config snapshots, in the same manner as the xds server.
sid := proxy.CompoundServiceID()

if firstTime {
firstTime = false
} else {
t.Logf("re-creating watch")
}

// Prior to fixes in https://github.com/hashicorp/consul/pull/16497
// this call to Watch() would deadlock.
var err error
ch, stc, cancel, err = cfg.Watch(sid, a.config.NodeName, token)
require.NoError(t, err)
}
select {
case <-stc:
t.Fatal("session unexpectedly terminated")
case snap, ok := <-ch:
if !ok {
t.Logf("channel is closed")
cancel()
ch, stc, cancel = nil, nil, nil
continue
}
require.NotNil(t, snap)
if !timerFired {
t.Fatal("should not have gotten snapshot until after we manifested the token")
}
return
case <-timer:
timerFired = true
finalTimer = time.After(1 * time.Second)

// This simulates the eventual consistency of a token
// showing up on a server after it's creation by
// pre-creating the UUID and later using that as the
// initial SecretID for a real token.
gotToken := testWriteToken(t, a, &api.ACLToken{
AccessorID: generateUUID(),
SecretID: token,
Description: "my token",
ServiceIdentities: []*api.ACLServiceIdentity{{
ServiceName: "db",
}},
})
require.Equal(t, token, gotToken)
case <-finalTimer:
t.Fatal("did not receive a snapshot after the token manifested")
}
}

}

func testWriteToken(t *testing.T, a *TestAgent, tok *api.ACLToken) string {
req, _ := http.NewRequest("PUT", "/v1/acl/token", jsonReader(tok))
req.Header.Add("X-Consul-Token", "root")
resp := httptest.NewRecorder()
a.srv.h.ServeHTTP(resp, req)
require.Equal(t, http.StatusOK, resp.Code)

dec := json.NewDecoder(resp.Body)
aclResp := &structs.ACLToken{}
require.NoError(t, dec.Decode(aclResp))
return aclResp.SecretID
}
3 changes: 3 additions & 0 deletions agent/testagent.go
Expand Up @@ -211,6 +211,9 @@ func (a *TestAgent) Start(t *testing.T) error {
} else {
result.RuntimeConfig.Telemetry.Disable = true
}

// Lower the resync interval for tests.
result.RuntimeConfig.LocalProxyConfigResyncInterval = 250 * time.Millisecond
}
return result, err
}
Expand Down

0 comments on commit a4e6189

Please sign in to comment.