Skip to content

Commit

Permalink
Stop background refresh of cached data for requests that result in AC…
Browse files Browse the repository at this point in the history
…L not found errors (#9738)
  • Loading branch information
mkeeler committed Feb 9, 2021
1 parent c18a218 commit 10224a1
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .changelog/9738.txt
@@ -0,0 +1,3 @@
```release-note:bug
cache: Prevent spamming the logs for days when a cached request encounters an "ACL not found" error.
```
22 changes: 19 additions & 3 deletions agent/cache/cache.go
Expand Up @@ -18,11 +18,14 @@ import (
"container/heap"
"context"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/armon/go-metrics"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/lib"
"golang.org/x/time/rate"
)
Expand Down Expand Up @@ -626,6 +629,8 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
newEntry.State = result.State
}

preventRefresh := acl.IsErrNotFound(err)

// Error handling
if err == nil {
metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1)
Expand Down Expand Up @@ -657,8 +662,13 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
newEntry.RefreshLostContact = time.Time{}
}
} else {
metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_error"}, 1)
// TODO (mkeeler) maybe change the name of this label to be more indicative of it just
// stopping the background refresh
labels := []metrics.Label{{Name: "fatal", Value: strconv.FormatBool(preventRefresh)}}

// TODO(kit): Add tEntry.Name to label on fetch_error and deprecate second write
metrics.IncrCounterWithLabels([]string{"consul", "cache", "fetch_error"}, 1, labels)
metrics.IncrCounterWithLabels([]string{"consul", "cache", tEntry.Name, "fetch_error"}, 1, labels)

// Increment attempt counter
attempt++
Expand Down Expand Up @@ -695,7 +705,13 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign

// If refresh is enabled, run the refresh in due time. The refresh
// below might block, but saves us from spawning another goroutine.
if tEntry.Opts.Refresh {
//
// We want to have ACL not found errors stop cache refresh for the cases
// where the token used for the query was deleted. If the request
// was coming from a cache notification then it will start the
// request back up again shortly but in the general case this prevents
// spamming the logs with tons of ACL not found errors for days.
if tEntry.Opts.Refresh && !preventRefresh {
// Check if cache was stopped
if atomic.LoadUint32(&c.stopped) == 1 {
return
Expand Down
190 changes: 190 additions & 0 deletions agent/cache/cache_test.go
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"

"github.com/hashicorp/consul/acl"
)

// Test a basic Get with no indexes (and therefore no blocking queries).
Expand Down Expand Up @@ -1351,3 +1353,191 @@ OUT:
}
}
}

func TestCache_ExpiryLoop_ExitsWhenStopped(t *testing.T) {
c := &Cache{
stopCh: make(chan struct{}),
entries: make(map[string]cacheEntry),
entriesExpiryHeap: &expiryHeap{NotifyCh: make(chan struct{}, 1)},
}
chStart := make(chan struct{})
chDone := make(chan struct{})
go func() {
close(chStart)
c.runExpiryLoop()
close(chDone)
}()

<-chStart
close(c.stopCh)

select {
case <-chDone:
case <-time.After(50 * time.Millisecond):
t.Fatalf("expected loop to exit when stopped")
}
}

func TestCache_Prepopulate(t *testing.T) {
typ := &fakeType{index: 5}
c := New(Options{})
c.RegisterType("t", typ)

c.Prepopulate("t", FetchResult{Value: 17, Index: 1}, "dc1", "token", "v1")

ctx := context.Background()
req := fakeRequest{
info: RequestInfo{
Key: "v1",
Token: "token",
Datacenter: "dc1",
MinIndex: 1,
},
}
result, _, err := c.Get(ctx, "t", req)
require.NoError(t, err)
require.Equal(t, 17, result)
}

func TestCache_RefreshLifeCycle(t *testing.T) {
typ := &MockType{}
t.Cleanup(func() { typ.AssertExpectations(t) })

typ.On("RegisterOptions").Times(0).Return(RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
SupportsBlocking: true,
RefreshTimer: 0 * time.Second,
QueryTimeout: 10 * time.Minute,
})

makeRequest := func(index uint64) fakeRequest {
return fakeRequest{
info: RequestInfo{
Key: "v1",
Token: "token",
Datacenter: "dc1",
MinIndex: index,
},
}
}

typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{
Value: true,
Index: 2,
}, nil)

releaseSecondReq := make(chan time.Time)
typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{}, acl.PermissionDenied("forced error")).WaitUntil(releaseSecondReq)

releaseThirdReq := make(chan time.Time)
typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{}, acl.ErrNotFound).WaitUntil(releaseThirdReq)

c := New(Options{})
c.RegisterType("t", typ)

key := makeEntryKey("t", "dc1", "token", "v1")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// get the background refresh going
result, _, err := c.Get(ctx, "t", makeRequest(1))
require.NoError(t, err)
require.Equal(t, true, result)

// ensure that the entry is fetching again
c.entriesLock.Lock()
entry, ok := c.entries[key]
require.True(t, ok)
require.True(t, entry.Fetching)
c.entriesLock.Unlock()

requestChan := make(chan error)

getError := func(index uint64) {
_, _, err := c.Get(ctx, "t", makeRequest(index))
if ctx.Err() != nil {
return
}
requestChan <- err
}

// background a call that will wait for a newer version
go getError(2)

// I really dislike the arbitrary sleep here. However we want to test out some of the
// branching in getWithIndex (called by Get) and that doesn't expose any way for us to
// know when that go routine has gotten far enough and is waiting on the latest value.
// Therefore the only thing we can do for now is to sleep long enough to let that
// go routine progress far enough.
time.Sleep(100 * time.Millisecond)

// release the blocking query to simulate an ACL permission denied error
close(releaseSecondReq)

// ensure we were woken up and see the permission denied error
select {
case err := <-requestChan:
require.True(t, acl.IsErrPermissionDenied(err))
case <-time.After(500 * time.Millisecond):
require.Fail(t, "blocking cache Get never returned")
}

// ensure that the entry is fetching again
c.entriesLock.Lock()
entry, ok = c.entries[key]
require.True(t, ok)
require.True(t, entry.Fetching)
c.entriesLock.Unlock()

// background a call that will wait for a newer version - will result in an acl not found error
go getError(5)

// Same arbitrary sleep as the one after the second request and the same reasoning.
time.Sleep(100 * time.Millisecond)

// release the blocking query to simulate an ACL not found error
close(releaseThirdReq)

// ensure we were woken up and see the ACL not found error
select {
case err := <-requestChan:
require.True(t, acl.IsErrNotFound(err))
case <-time.After(500 * time.Millisecond):
require.Fail(t, "blocking cache Get never returned")
}

// ensure that the ACL not found error killed off the background refresh
// but didn't remove it from the cache
c.entriesLock.Lock()
entry, ok = c.entries[key]
require.True(t, ok)
require.False(t, entry.Fetching)
c.entriesLock.Unlock()
}

type fakeType struct {
index uint64
}

func (f fakeType) Fetch(_ FetchOptions, _ Request) (FetchResult, error) {
idx := atomic.LoadUint64(&f.index)
return FetchResult{Value: int(idx * 2), Index: idx}, nil
}

func (f fakeType) RegisterOptions() RegisterOptions {
return RegisterOptions{Refresh: true}
}

var _ Type = (*fakeType)(nil)

type fakeRequest struct {
info RequestInfo
}

func (f fakeRequest) CacheInfo() RequestInfo {
return f.info
}

var _ Request = (*fakeRequest)(nil)

0 comments on commit 10224a1

Please sign in to comment.