From 10224a1fe2195f367485493ef7eb0b69fa6db59c Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Tue, 9 Feb 2021 10:15:53 -0500 Subject: [PATCH] Stop background refresh of cached data for requests that result in ACL not found errors (#9738) --- .changelog/9738.txt | 3 + agent/cache/cache.go | 22 ++++- agent/cache/cache_test.go | 190 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 212 insertions(+), 3 deletions(-) create mode 100644 .changelog/9738.txt diff --git a/.changelog/9738.txt b/.changelog/9738.txt new file mode 100644 index 000000000000..f8962e57f008 --- /dev/null +++ b/.changelog/9738.txt @@ -0,0 +1,3 @@ +```release-note:bug +cache: Prevent spamming the logs for days when a cached request encounters an "ACL not found" error. +``` diff --git a/agent/cache/cache.go b/agent/cache/cache.go index f7a1a0e8c7c8..7f849d25ae58 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -18,11 +18,14 @@ import ( "container/heap" "context" "fmt" + "strconv" "sync" "sync/atomic" "time" "github.com/armon/go-metrics" + + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/lib" "golang.org/x/time/rate" ) @@ -626,6 +629,8 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign newEntry.State = result.State } + preventRefresh := acl.IsErrNotFound(err) + // Error handling if err == nil { metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1) @@ -657,8 +662,13 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign newEntry.RefreshLostContact = time.Time{} } } else { - metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1) - metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_error"}, 1) + // TODO (mkeeler) maybe change the name of this label to be more indicative of it just + // stopping the background refresh + labels := []metrics.Label{{Name: "fatal", Value: strconv.FormatBool(preventRefresh)}} + + // TODO(kit): Add tEntry.Name to label on fetch_error and deprecate second write + metrics.IncrCounterWithLabels([]string{"consul", "cache", "fetch_error"}, 1, labels) + metrics.IncrCounterWithLabels([]string{"consul", "cache", tEntry.Name, "fetch_error"}, 1, labels) // Increment attempt counter attempt++ @@ -695,7 +705,13 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // If refresh is enabled, run the refresh in due time. The refresh // below might block, but saves us from spawning another goroutine. - if tEntry.Opts.Refresh { + // + // We want to have ACL not found errors stop cache refresh for the cases + // where the token used for the query was deleted. If the request + // was coming from a cache notification then it will start the + // request back up again shortly but in the general case this prevents + // spamming the logs with tons of ACL not found errors for days. + if tEntry.Opts.Refresh && !preventRefresh { // Check if cache was stopped if atomic.LoadUint32(&c.stopped) == 1 { return diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index c2442ea7c12d..d8699d9984aa 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -15,6 +15,8 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "golang.org/x/time/rate" + + "github.com/hashicorp/consul/acl" ) // Test a basic Get with no indexes (and therefore no blocking queries). @@ -1351,3 +1353,191 @@ OUT: } } } + +func TestCache_ExpiryLoop_ExitsWhenStopped(t *testing.T) { + c := &Cache{ + stopCh: make(chan struct{}), + entries: make(map[string]cacheEntry), + entriesExpiryHeap: &expiryHeap{NotifyCh: make(chan struct{}, 1)}, + } + chStart := make(chan struct{}) + chDone := make(chan struct{}) + go func() { + close(chStart) + c.runExpiryLoop() + close(chDone) + }() + + <-chStart + close(c.stopCh) + + select { + case <-chDone: + case <-time.After(50 * time.Millisecond): + t.Fatalf("expected loop to exit when stopped") + } +} + +func TestCache_Prepopulate(t *testing.T) { + typ := &fakeType{index: 5} + c := New(Options{}) + c.RegisterType("t", typ) + + c.Prepopulate("t", FetchResult{Value: 17, Index: 1}, "dc1", "token", "v1") + + ctx := context.Background() + req := fakeRequest{ + info: RequestInfo{ + Key: "v1", + Token: "token", + Datacenter: "dc1", + MinIndex: 1, + }, + } + result, _, err := c.Get(ctx, "t", req) + require.NoError(t, err) + require.Equal(t, 17, result) +} + +func TestCache_RefreshLifeCycle(t *testing.T) { + typ := &MockType{} + t.Cleanup(func() { typ.AssertExpectations(t) }) + + typ.On("RegisterOptions").Times(0).Return(RegisterOptions{ + // Maintain a blocking query, retry dropped connections quickly + Refresh: true, + SupportsBlocking: true, + RefreshTimer: 0 * time.Second, + QueryTimeout: 10 * time.Minute, + }) + + makeRequest := func(index uint64) fakeRequest { + return fakeRequest{ + info: RequestInfo{ + Key: "v1", + Token: "token", + Datacenter: "dc1", + MinIndex: index, + }, + } + } + + typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{ + Value: true, + Index: 2, + }, nil) + + releaseSecondReq := make(chan time.Time) + typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{}, acl.PermissionDenied("forced error")).WaitUntil(releaseSecondReq) + + releaseThirdReq := make(chan time.Time) + typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{}, acl.ErrNotFound).WaitUntil(releaseThirdReq) + + c := New(Options{}) + c.RegisterType("t", typ) + + key := makeEntryKey("t", "dc1", "token", "v1") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // get the background refresh going + result, _, err := c.Get(ctx, "t", makeRequest(1)) + require.NoError(t, err) + require.Equal(t, true, result) + + // ensure that the entry is fetching again + c.entriesLock.Lock() + entry, ok := c.entries[key] + require.True(t, ok) + require.True(t, entry.Fetching) + c.entriesLock.Unlock() + + requestChan := make(chan error) + + getError := func(index uint64) { + _, _, err := c.Get(ctx, "t", makeRequest(index)) + if ctx.Err() != nil { + return + } + requestChan <- err + } + + // background a call that will wait for a newer version + go getError(2) + + // I really dislike the arbitrary sleep here. However we want to test out some of the + // branching in getWithIndex (called by Get) and that doesn't expose any way for us to + // know when that go routine has gotten far enough and is waiting on the latest value. + // Therefore the only thing we can do for now is to sleep long enough to let that + // go routine progress far enough. + time.Sleep(100 * time.Millisecond) + + // release the blocking query to simulate an ACL permission denied error + close(releaseSecondReq) + + // ensure we were woken up and see the permission denied error + select { + case err := <-requestChan: + require.True(t, acl.IsErrPermissionDenied(err)) + case <-time.After(500 * time.Millisecond): + require.Fail(t, "blocking cache Get never returned") + } + + // ensure that the entry is fetching again + c.entriesLock.Lock() + entry, ok = c.entries[key] + require.True(t, ok) + require.True(t, entry.Fetching) + c.entriesLock.Unlock() + + // background a call that will wait for a newer version - will result in an acl not found error + go getError(5) + + // Same arbitrary sleep as the one after the second request and the same reasoning. + time.Sleep(100 * time.Millisecond) + + // release the blocking query to simulate an ACL not found error + close(releaseThirdReq) + + // ensure we were woken up and see the ACL not found error + select { + case err := <-requestChan: + require.True(t, acl.IsErrNotFound(err)) + case <-time.After(500 * time.Millisecond): + require.Fail(t, "blocking cache Get never returned") + } + + // ensure that the ACL not found error killed off the background refresh + // but didn't remove it from the cache + c.entriesLock.Lock() + entry, ok = c.entries[key] + require.True(t, ok) + require.False(t, entry.Fetching) + c.entriesLock.Unlock() +} + +type fakeType struct { + index uint64 +} + +func (f fakeType) Fetch(_ FetchOptions, _ Request) (FetchResult, error) { + idx := atomic.LoadUint64(&f.index) + return FetchResult{Value: int(idx * 2), Index: idx}, nil +} + +func (f fakeType) RegisterOptions() RegisterOptions { + return RegisterOptions{Refresh: true} +} + +var _ Type = (*fakeType)(nil) + +type fakeRequest struct { + info RequestInfo +} + +func (f fakeRequest) CacheInfo() RequestInfo { + return f.info +} + +var _ Request = (*fakeRequest)(nil)