Skip to content

Commit

Permalink
Stop background refresh of cached data for requests that result in AC…
Browse files Browse the repository at this point in the history
…L not found errors (#9742)

Backport of #9738 to release/1.7.x

# Conflicts:
#	agent/cache/cache.go
#	agent/cache/cache_test.go
  • Loading branch information
mkeeler committed Feb 9, 2021
1 parent 93c13b9 commit 309863d
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .changelog/9738.txt
@@ -0,0 +1,3 @@
```release-note:bug
cache: Prevent spamming the logs for days when a cached request encounters an "ACL not found" error.
```
13 changes: 10 additions & 3 deletions agent/cache/cache.go
Expand Up @@ -17,11 +17,14 @@ package cache
import (
"container/heap"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/armon/go-metrics"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/lib"
)

Expand Down Expand Up @@ -540,6 +543,8 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min
newEntry.State = result.State
}

preventRefresh := acl.IsErrNotFound(err)

// Error handling
if err == nil {
metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1)
Expand Down Expand Up @@ -571,8 +576,10 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min
newEntry.RefreshLostContact = time.Time{}
}
} else {
metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1)
labels := []metrics.Label{{Name: "fatal", Value: strconv.FormatBool(preventRefresh)}}

metrics.IncrCounterWithLabels([]string{"consul", "cache", "fetch_error"}, 1, labels)
metrics.IncrCounterWithLabels([]string{"consul", "cache", t, "fetch_error"}, 1, labels)

// Increment attempt counter
attempt++
Expand Down Expand Up @@ -612,7 +619,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min

// If refresh is enabled, run the refresh in due time. The refresh
// below might block, but saves us from spawning another goroutine.
if tEntry.Opts.Refresh {
if tEntry.Opts.Refresh && !preventRefresh {
c.refresh(tEntry.Opts, attempt, t, key, r)
}
}()
Expand Down
141 changes: 141 additions & 0 deletions agent/cache/cache_test.go
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/hashicorp/consul/acl"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1183,3 +1184,143 @@ func TestCacheGet_nonBlockingType(t *testing.T) {
time.Sleep(20 * time.Millisecond)
typ.AssertExpectations(t)
}

func TestCache_RefreshLifeCycle(t *testing.T) {
typ := &MockType{}
t.Cleanup(func() { typ.AssertExpectations(t) })

makeRequest := func(index uint64) fakeRequest {
return fakeRequest{
info: RequestInfo{
Key: "v1",
Token: "token",
Datacenter: "dc1",
MinIndex: index,
},
}
}

typ.On("SupportsBlocking").Return(true).Times(0)

typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{
Value: true,
Index: 2,
}, nil)

releaseSecondReq := make(chan time.Time)
typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{}, acl.PermissionDeniedError{Cause: "forced error"}).WaitUntil(releaseSecondReq)

releaseThirdReq := make(chan time.Time)
typ.On("Fetch", mock.Anything, mock.Anything).Once().Return(FetchResult{}, acl.ErrNotFound).WaitUntil(releaseThirdReq)

c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
// Maintain a blocking query, retry dropped connections quickly
Refresh: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
})

key := makeEntryKey("t", "dc1", "token", "v1")

// get the background refresh going
result, _, err := c.Get("t", makeRequest(1))
require.NoError(t, err)
require.Equal(t, true, result)

// ensure that the entry is fetching again
c.entriesLock.Lock()
entry, ok := c.entries[key]
require.True(t, ok)
require.True(t, entry.Fetching)
c.entriesLock.Unlock()

requestChan := make(chan error, 1)

getError := func(index uint64) {
_, _, err := c.Get("t", makeRequest(index))
requestChan <- err
}

// background a call that will wait for a newer version
go getError(2)

// I really dislike the arbitrary sleep here. However we want to test out some of the
// branching in getWithIndex (called by Get) and that doesn't expose any way for us to
// know when that go routine has gotten far enough and is waiting on the latest value.
// Therefore the only thing we can do for now is to sleep long enough to let that
// go routine progress far enough.
time.Sleep(100 * time.Millisecond)

// release the blocking query to simulate an ACL permission denied error
close(releaseSecondReq)

// ensure we were woken up and see the permission denied error
select {
case err := <-requestChan:
require.True(t, acl.IsErrPermissionDenied(err))
case <-time.After(500 * time.Millisecond):
require.Fail(t, "blocking cache Get never returned")
}

// ensure that the entry is fetching again
c.entriesLock.Lock()
entry, ok = c.entries[key]
require.True(t, ok)
require.True(t, entry.Fetching)
c.entriesLock.Unlock()

// background a call that will wait for a newer version - will result in an acl not found error
go getError(5)

// Same arbitrary sleep as the one after the second request and the same reasoning.
time.Sleep(100 * time.Millisecond)

// release the blocking query to simulate an ACL not found error
close(releaseThirdReq)

// ensure we were woken up and see the ACL not found error
select {
case err := <-requestChan:
require.True(t, acl.IsErrNotFound(err))
case <-time.After(500 * time.Millisecond):
require.Fail(t, "blocking cache Get never returned")
}

// ensure that the ACL not found error killed off the background refresh
// but didn't remove it from the cache
c.entriesLock.Lock()
entry, ok = c.entries[key]
require.True(t, ok)
require.False(t, entry.Fetching)
c.entriesLock.Unlock()
}

type fakeType struct {
index uint64
}

func (f fakeType) Fetch(_ FetchOptions, _ Request) (FetchResult, error) {
idx := atomic.LoadUint64(&f.index)
return FetchResult{Value: int(idx * 2), Index: idx}, nil
}

func (f fakeType) RegisterOptions() RegisterOptions {
return RegisterOptions{Refresh: true}
}

func (f fakeType) SupportsBlocking() bool {
return true
}

var _ Type = (*fakeType)(nil)

type fakeRequest struct {
info RequestInfo
}

func (f fakeRequest) CacheInfo() RequestInfo {
return f.info
}

var _ Request = (*fakeRequest)(nil)

0 comments on commit 309863d

Please sign in to comment.