Skip to content

Commit

Permalink
Revert "cache: prevent goroutine leak in agent cache (#14908)"
Browse files Browse the repository at this point in the history
This reverts commit fe2d41d.
  • Loading branch information
rboyer committed Oct 24, 2022
1 parent a80a49a commit fe0d91c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 94 deletions.
3 changes: 0 additions & 3 deletions .changelog/14908.txt

This file was deleted.

81 changes: 8 additions & 73 deletions agent/cache/cache.go
Expand Up @@ -139,10 +139,6 @@ type Cache struct {
entries map[string]cacheEntry
entriesExpiryHeap *ttlcache.ExpiryHeap

fetchLock sync.Mutex
lastFetchID uint64
fetchHandles map[string]fetchHandle

// stopped is used as an atomic flag to signal that the Cache has been
// discarded so background fetches and expiry processing should stop.
stopped uint32
Expand All @@ -154,11 +150,6 @@ type Cache struct {
rateLimitCancel context.CancelFunc
}

type fetchHandle struct {
id uint64
stopCh chan struct{}
}

// typeEntry is a single type that is registered with a Cache.
type typeEntry struct {
// Name that was used to register the Type
Expand Down Expand Up @@ -234,7 +225,6 @@ func New(options Options) *Cache {
types: make(map[string]typeEntry),
entries: make(map[string]cacheEntry),
entriesExpiryHeap: ttlcache.NewExpiryHeap(),
fetchHandles: make(map[string]fetchHandle),
stopCh: make(chan struct{}),
options: options,
rateLimitContext: ctx,
Expand Down Expand Up @@ -624,18 +614,8 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries)))

tEntry := r.TypeEntry

// The actual Fetch must be performed in a goroutine. Ensure that we only
// have one in-flight at a time, but don't use a deferred
// context.WithCancel style termination so that these things outlive their
// requester.
//
// By the time we get here the system WANTS to make a replacement fetcher, so
// we terminate the prior one and replace it.
handle := c.getOrReplaceFetchHandle(key)
go func(handle fetchHandle) {
defer c.deleteFetchHandle(key, handle.id)

// The actual Fetch must be performed in a goroutine.
go func() {
// If we have background refresh and currently are in "disconnected" state,
// waiting for a response might mean we mark our results as stale for up to
// 10 minutes (max blocking timeout) after connection is restored. To reduce
Expand Down Expand Up @@ -686,14 +666,6 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
connectedTimer.Stop()
}

// If we were stopped while waiting on a blocking query now would be a
// good time to detect that.
select {
case <-handle.stopCh:
return
default:
}

// Copy the existing entry to start.
newEntry := entry
newEntry.Fetching = false
Expand Down Expand Up @@ -848,15 +820,13 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
}

// If we're over the attempt minimum, start an exponential backoff.
wait := backOffWait(attempt)
if wait := backOffWait(attempt); wait > 0 {
time.Sleep(wait)
}

// If we have a timer, wait for it
wait += tEntry.Opts.RefreshTimer

select {
case <-time.After(wait):
case <-handle.stopCh:
return
if tEntry.Opts.RefreshTimer > 0 {
time.Sleep(tEntry.Opts.RefreshTimer)
}

// Trigger. The "allowNew" field is false because in the time we were
Expand All @@ -866,46 +836,11 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
r.Info.MinIndex = 0
c.fetch(key, r, false, attempt, true)
}
}(handle)
}()

return entry.Waiter
}

func (c *Cache) getOrReplaceFetchHandle(key string) fetchHandle {
c.fetchLock.Lock()
defer c.fetchLock.Unlock()

if prevHandle, ok := c.fetchHandles[key]; ok {
close(prevHandle.stopCh)
}

c.lastFetchID++

handle := fetchHandle{
id: c.lastFetchID,
stopCh: make(chan struct{}),
}

c.fetchHandles[key] = handle

return handle
}

func (c *Cache) deleteFetchHandle(key string, fetchID uint64) {
c.fetchLock.Lock()
defer c.fetchLock.Unlock()

// Only remove a fetchHandle if it's YOUR fetchHandle.
handle, ok := c.fetchHandles[key]
if !ok {
return
}

if handle.id == fetchID {
delete(c.fetchHandles, key)
}
}

func backOffWait(failures uint) time.Duration {
if failures > CacheRefreshBackoffMin {
shift := failures - CacheRefreshBackoffMin
Expand Down
33 changes: 15 additions & 18 deletions agent/cache/cache_test.go
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/lib/ttlcache"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
)

// Test a basic Get with no indexes (and therefore no blocking queries).
Expand Down Expand Up @@ -1751,22 +1750,12 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
require.NoError(t, err)
require.Equal(t, true, result)

waitUntilFetching := func(expectValue bool) {
retry.Run(t, func(t *retry.R) {
c.entriesLock.Lock()
defer c.entriesLock.Unlock()
entry, ok := c.entries[key]
require.True(t, ok)
if expectValue {
require.True(t, entry.Fetching)
} else {
require.False(t, entry.Fetching)
}
})
}

// ensure that the entry is fetching again
waitUntilFetching(true)
c.entriesLock.Lock()
entry, ok := c.entries[key]
require.True(t, ok)
require.True(t, entry.Fetching)
c.entriesLock.Unlock()

requestChan := make(chan error)

Expand Down Expand Up @@ -1800,7 +1789,11 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
}

// ensure that the entry is fetching again
waitUntilFetching(true)
c.entriesLock.Lock()
entry, ok = c.entries[key]
require.True(t, ok)
require.True(t, entry.Fetching)
c.entriesLock.Unlock()

// background a call that will wait for a newer version - will result in an acl not found error
go getError(5)
Expand All @@ -1821,7 +1814,11 @@ func TestCache_RefreshLifeCycle(t *testing.T) {

// ensure that the ACL not found error killed off the background refresh
// but didn't remove it from the cache
waitUntilFetching(false)
c.entriesLock.Lock()
entry, ok = c.entries[key]
require.True(t, ok)
require.False(t, entry.Fetching)
c.entriesLock.Unlock()
}

type fakeType struct {
Expand Down

0 comments on commit fe0d91c

Please sign in to comment.