From 309863defef0bc5755353b82661571672c69e8c8 Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Tue, 9 Feb 2021 11:32:38 -0500 Subject: [PATCH] Stop background refresh of cached data for requests that result in ACL not found errors (#9742) Backport of #9738 to release/1.7.x # Conflicts: # agent/cache/cache.go # agent/cache/cache_test.go --- .changelog/9738.txt | 3 + agent/cache/cache.go | 13 +++- agent/cache/cache_test.go | 141 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 3 deletions(-) create mode 100644 .changelog/9738.txt diff --git a/.changelog/9738.txt b/.changelog/9738.txt new file mode 100644 index 000000000000..f8962e57f008 --- /dev/null +++ b/.changelog/9738.txt @@ -0,0 +1,3 @@ +```release-note:bug +cache: Prevent spamming the logs for days when a cached request encounters an "ACL not found" error. +``` diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 7892272451e7..ebe80b588bad 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -17,11 +17,14 @@ package cache import ( "container/heap" "fmt" + "strconv" "sync" "sync/atomic" "time" "github.com/armon/go-metrics" + + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/lib" ) @@ -540,6 +543,8 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min newEntry.State = result.State } + preventRefresh := acl.IsErrNotFound(err) + // Error handling if err == nil { metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1) @@ -571,8 +576,10 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min newEntry.RefreshLostContact = time.Time{} } } else { - metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1) - metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1) + labels := []metrics.Label{{Name: "fatal", Value: strconv.FormatBool(preventRefresh)}} + + metrics.IncrCounterWithLabels([]string{"consul", "cache", "fetch_error"}, 1, labels) + metrics.IncrCounterWithLabels([]string{"consul", "cache", t, "fetch_error"}, 1, labels) // Increment attempt counter attempt++ @@ -612,7 +619,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min // If refresh is enabled, run the refresh in due time. The refresh // below might block, but saves us from spawning another goroutine. - if tEntry.Opts.Refresh { + if tEntry.Opts.Refresh && !preventRefresh { c.refresh(tEntry.Opts, attempt, t, key, r) } }() diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 8d7455839967..1df86d3c4894 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/hashicorp/consul/acl" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -1183,3 +1184,143 @@ func TestCacheGet_nonBlockingType(t *testing.T) { time.Sleep(20 * time.Millisecond) typ.AssertExpectations(t) } + +func TestCache_RefreshLifeCycle(t *testing.T) { + typ := &MockType{} + t.Cleanup(func() { typ.AssertExpectations(t) }) + + makeRequest := func(index uint64) fakeRequest { + return fakeRequest{ + info: RequestInfo{ + Key: "v1", + Token: "token", + Datacenter: "dc1", + MinIndex: index, + }, + } + } + + typ.On("SupportsBlocking").Return(true).Times(0) + + typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{ + Value: true, + Index: 2, + }, nil) + + releaseSecondReq := make(chan time.Time) + typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{}, acl.PermissionDeniedError{Cause: "forced error"}).WaitUntil(releaseSecondReq) + + releaseThirdReq := make(chan time.Time) + typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{}, acl.ErrNotFound).WaitUntil(releaseThirdReq) + + c := TestCache(t) + c.RegisterType("t", typ, &RegisterOptions{ + // Maintain a blocking query, retry dropped connections quickly + Refresh: true, + RefreshTimer: 0 * time.Second, + RefreshTimeout: 10 * time.Minute, + }) + + key := makeEntryKey("t", "dc1", "token", "v1") + + // get the background refresh going + result, _, err := c.Get("t", makeRequest(1)) + require.NoError(t, err) + require.Equal(t, true, result) + + // ensure that the entry is fetching again + c.entriesLock.Lock() + entry, ok := c.entries[key] + require.True(t, ok) + require.True(t, entry.Fetching) + c.entriesLock.Unlock() + + requestChan := make(chan error, 1) + + getError := func(index uint64) { + _, _, err := c.Get("t", makeRequest(index)) + requestChan <- err + } + + // background a call that will wait for a newer version + go getError(2) + + // I really dislike the arbitrary sleep here. However we want to test out some of the + // branching in getWithIndex (called by Get) and that doesn't expose any way for us to + // know when that go routine has gotten far enough and is waiting on the latest value. + // Therefore the only thing we can do for now is to sleep long enough to let that + // go routine progress far enough. + time.Sleep(100 * time.Millisecond) + + // release the blocking query to simulate an ACL permission denied error + close(releaseSecondReq) + + // ensure we were woken up and see the permission denied error + select { + case err := <-requestChan: + require.True(t, acl.IsErrPermissionDenied(err)) + case <-time.After(500 * time.Millisecond): + require.Fail(t, "blocking cache Get never returned") + } + + // ensure that the entry is fetching again + c.entriesLock.Lock() + entry, ok = c.entries[key] + require.True(t, ok) + require.True(t, entry.Fetching) + c.entriesLock.Unlock() + + // background a call that will wait for a newer version - will result in an acl not found error + go getError(5) + + // Same arbitrary sleep as the one after the second request and the same reasoning. + time.Sleep(100 * time.Millisecond) + + // release the blocking query to simulate an ACL not found error + close(releaseThirdReq) + + // ensure we were woken up and see the ACL not found error + select { + case err := <-requestChan: + require.True(t, acl.IsErrNotFound(err)) + case <-time.After(500 * time.Millisecond): + require.Fail(t, "blocking cache Get never returned") + } + + // ensure that the ACL not found error killed off the background refresh + // but didn't remove it from the cache + c.entriesLock.Lock() + entry, ok = c.entries[key] + require.True(t, ok) + require.False(t, entry.Fetching) + c.entriesLock.Unlock() +} + +type fakeType struct { + index uint64 +} + +func (f fakeType) Fetch(_ FetchOptions, _ Request) (FetchResult, error) { + idx := atomic.LoadUint64(&f.index) + return FetchResult{Value: int(idx * 2), Index: idx}, nil +} + +func (f fakeType) RegisterOptions() RegisterOptions { + return RegisterOptions{Refresh: true} +} + +func (f fakeType) SupportsBlocking() bool { + return true +} + +var _ Type = (*fakeType)(nil) + +type fakeRequest struct { + info RequestInfo +} + +func (f fakeRequest) CacheInfo() RequestInfo { + return f.info +} + +var _ Request = (*fakeRequest)(nil)