Skip to content

Commit

Permalink
proxycfg: ensure that an irrecoverable error in proxycfg closes the x…
Browse files Browse the repository at this point in the history
…ds session and triggers a replacement proxycfg watcher
  • Loading branch information
rboyer committed Mar 1, 2023
1 parent ec593c2 commit 4ea8741
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 3 deletions.
4 changes: 4 additions & 0 deletions agent/proxycfg-sources/local/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package local

import (
"context"
"time"

"github.com/hashicorp/go-hclog"

Expand Down Expand Up @@ -50,12 +51,15 @@ func Sync(ctx context.Context, cfg SyncConfig) {
cfg.State.Notify(stateCh)
defer cfg.State.StopNotify(stateCh)

const resyncFrequency = 30 * time.Second

for {
sync(cfg)

select {
case <-stateCh:
// Wait for a state change.
case <-time.After(resyncFrequency):
case <-ctx.Done():
return
}
Expand Down
2 changes: 1 addition & 1 deletion agent/proxycfg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour

func (m *Manager) register(id ProxyID, ns *structs.NodeService, source ProxySource, token string, overwrite bool) error {
state, ok := m.proxies[id]
if ok {
if ok && !state.stoppedRunning() {
if state.source != source && !overwrite {
// Registered by a different source, leave as-is.
return nil
Expand Down
32 changes: 30 additions & 2 deletions agent/proxycfg/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,20 @@ type state struct {
ch chan UpdateEvent
snapCh chan ConfigSnapshot
reqCh chan chan *ConfigSnapshot
doneCh chan struct{}

rateLimiter *rate.Limiter
}

func (s *state) stoppedRunning() bool {
select {
case <-s.doneCh:
return true
default:
return false
}
}

// failed returns whether run exited because a data source is in an
// irrecoverable state.
func (s *state) failed() bool {
Expand Down Expand Up @@ -182,6 +192,7 @@ func newState(id ProxyID, ns *structs.NodeService, source ProxySource, token str
ch: ch,
snapCh: make(chan ConfigSnapshot, 1),
reqCh: make(chan chan *ConfigSnapshot, 1),
doneCh: make(chan struct{}),
rateLimiter: rateLimiter,
}, nil
}
Expand Down Expand Up @@ -265,6 +276,9 @@ func (s *state) Watch() (<-chan ConfigSnapshot, error) {

// Close discards the state and stops any long-running watches.
func (s *state) Close(failed bool) error {
if s.stoppedRunning() {
return nil
}
if s.cancel != nil {
s.cancel()
}
Expand Down Expand Up @@ -314,6 +328,9 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
}

func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) {
// Closing the done channel signals that this entire state is no longer
// going to be updated.
defer close(s.doneCh)
// Close the channel we return from Watch when we stop so consumers can stop
// watching and clean up their goroutines. It's important we do this here and
// not in Close since this routine sends on this chan and so might panic if it
Expand Down Expand Up @@ -429,9 +446,20 @@ func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) {
func (s *state) CurrentSnapshot() *ConfigSnapshot {
// Make a chan for the response to be sent on
ch := make(chan *ConfigSnapshot, 1)
s.reqCh <- ch

select {
case <-s.doneCh:
return nil
case s.reqCh <- ch:
}

// Wait for the response
return <-ch
select {
case <-s.doneCh:
return nil
case resp := <-ch:
return resp
}
}

// Changed returns whether or not the passed NodeService has had any of the
Expand Down

0 comments on commit 4ea8741

Please sign in to comment.