Skip to content

Commit

Permalink
fix: Compare config value then swap when caching param value
Browse files Browse the repository at this point in the history
See also milvus-io#33784

This PR change the behavior of `SetCacheValue` of config manager:

- Use mutex and map instead of concurrent map for `configCache`
- Compare config raw value before set cache value

With this implementation, concurrent caching & eviction shall always
have current output:

|time|caching |eviction|config   |cached   |
|----|--------|------- |---------|---------|
|t0  |get     |        |old value|null     |
|t1  |CAS OK  |        |old value|old value|
|t2  |        |update  |new value|old value|
|t3  |        |eviction|new value|null     |

|time|caching |eviction|config   |cached   |
|----|--------|------- |---------|---------|
|t0  |get     |        |old value|null     |
|t1  |        |update  |new value|null     |
|t2  |CAS fail|        |old value|null     |
|t3  |        |eviction|new value|null     |

|time|caching |eviction|config   |cached   |
|----|--------|------- |---------|---------|
|t0  |        |update  |new value|null     |
|t1  |get     |        |new value|null     |
|t2  |CAS OK  |        |new value|new value|
|t3  |        |eviction|new value|null     |

|time|caching |eviction|config   |cached   |
|----|--------|------- |---------|---------|
|t0  |        |update  |new value|null     |
|t1  |get     |        |new value|null     |
|t2  |        |eviction|new value|null     |
|t3  |CAS OK  |        |new value|new value|

|time|caching |eviction|config   |cached   |
|----|--------|------- |---------|---------|
|t0  |        |update  |new value|null     |
|t1  |        |eviction|new value|null     |
|t2  |get     |        |new value|null     |
|t3  |CAS OK  |        |new value|new value|

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Jun 12, 2024
1 parent 1697706 commit 4a41749
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 41 deletions.
46 changes: 32 additions & 14 deletions pkg/config/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package config
import (
"fmt"
"strings"
"sync"

"github.com/cockroachdb/errors"
"go.uber.org/zap"
Expand Down Expand Up @@ -84,7 +85,10 @@ type Manager struct {
keySourceMap *typeutil.ConcurrentMap[string, string] // store the key to config source, example: key is A.B.C and source is file which means the A.B.C's value is from file
overlays *typeutil.ConcurrentMap[string, string] // store the highest priority configs which modified at runtime
forbiddenKeys *typeutil.ConcurrentSet[string]
configCache *typeutil.ConcurrentMap[string, interface{}]

cacheMutex sync.RWMutex
configCache map[string]any
// configCache *typeutil.ConcurrentMap[string, interface{}]
}

func NewManager() *Manager {
Expand All @@ -94,36 +98,50 @@ func NewManager() *Manager {
keySourceMap: typeutil.NewConcurrentMap[string, string](),
overlays: typeutil.NewConcurrentMap[string, string](),
forbiddenKeys: typeutil.NewConcurrentSet[string](),
configCache: typeutil.NewConcurrentMap[string, interface{}](),
configCache: make(map[string]any),
}
resetConfigCacheFunc := NewHandler("reset.config.cache", func(event *Event) {
keyToRemove := strings.NewReplacer("/", ".").Replace(event.Key)
manager.configCache.Remove(keyToRemove)
manager.EvictCachedValue(keyToRemove)
})
manager.Dispatcher.RegisterForKeyPrefix("", resetConfigCacheFunc)
return manager
}

func (m *Manager) GetCachedValue(key string) (interface{}, bool) {
return m.configCache.Get(key)
m.cacheMutex.RLock()
defer m.cacheMutex.RUnlock()
value, ok := m.configCache[key]
return value, ok
}

func (m *Manager) SetCachedValue(key string, value interface{}) {
m.configCache.Insert(key, value)
func (m *Manager) CASCachedValue(key string, origin string, value interface{}) bool {
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
current, err := m.GetConfig(key)
if err != nil {
return false
}
if current != origin {
return false
}
m.configCache[key] = value
return true
}

func (m *Manager) EvictCachedValue(key string) {
m.configCache.Remove(key)
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()
delete(m.configCache, key)
}

func (m *Manager) EvictCacheValueByFormat(keys ...string) {
set := typeutil.NewSet(keys...)
m.configCache.Range(func(key string, value interface{}) bool {
if set.Contain(formatKey(key)) {
m.configCache.Remove(key)
}
return true
})
m.cacheMutex.Lock()
defer m.cacheMutex.Unlock()

for _, key := range keys {
delete(m.configCache, key)
}
}

func (m *Manager) GetConfig(key string) (string, error) {
Expand Down
9 changes: 4 additions & 5 deletions pkg/config/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestCachedConfig(t *testing.T) {
time.Sleep(time.Second)
_, exist := mgr.GetCachedValue("a.b")
assert.False(t, exist)
mgr.SetCachedValue("a.b", "aaa")
mgr.CASCachedValue("a.b", "aaa", "aaa")
val, exist := mgr.GetCachedValue("a.b")
assert.True(t, exist)
assert.Equal(t, "aaa", val.(string))
Expand All @@ -237,10 +237,9 @@ func TestCachedConfig(t *testing.T) {
{
_, exist := mgr.GetCachedValue("c.d")
assert.False(t, exist)
mgr.SetCachedValue("cd", "xxx")
val, exist := mgr.GetCachedValue("cd")
assert.True(t, exist)
assert.Equal(t, "xxx", val.(string))
mgr.CASCachedValue("cd", "", "xxx")
_, exist = mgr.GetCachedValue("cd")
assert.False(t, exist)

// after refresh, the cached value should be reset
ctx := context.Background()
Expand Down
55 changes: 33 additions & 22 deletions pkg/util/paramtable/param_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ func (pi *ParamItem) GetAsStrings() []string {
return strings
}
}
realStrs := getAsStrings(pi.GetValue())
pi.manager.SetCachedValue(pi.Key, realStrs)
val := pi.GetValue()
realStrs := getAsStrings(val)
pi.manager.CASCachedValue(pi.Key, val, realStrs)
return realStrs
}

Expand All @@ -111,8 +112,9 @@ func (pi *ParamItem) GetAsBool() bool {
return boolVal
}
}
boolVal := getAsBool(pi.GetValue())
pi.manager.SetCachedValue(pi.Key, boolVal)
val := pi.GetValue()
boolVal := getAsBool(val)
pi.manager.CASCachedValue(pi.Key, val, boolVal)
return boolVal
}

Expand All @@ -122,8 +124,9 @@ func (pi *ParamItem) GetAsInt() int {
return intVal
}
}
intVal := getAsInt(pi.GetValue())
pi.manager.SetCachedValue(pi.Key, intVal)
val := pi.GetValue()
intVal := getAsInt(val)
pi.manager.CASCachedValue(pi.Key, val, intVal)
return intVal
}

Expand All @@ -133,8 +136,9 @@ func (pi *ParamItem) GetAsInt32() int32 {
return int32Val
}
}
int32Val := int32(getAsInt64(pi.GetValue()))
pi.manager.SetCachedValue(pi.Key, int32Val)
val := pi.GetValue()
int32Val := int32(getAsInt64(val))
pi.manager.CASCachedValue(pi.Key, val, int32Val)
return int32Val
}

Expand All @@ -144,8 +148,9 @@ func (pi *ParamItem) GetAsUint() uint {
return uintVal
}
}
uintVal := uint(getAsUint64(pi.GetValue()))
pi.manager.SetCachedValue(pi.Key, uintVal)
val := pi.GetValue()
uintVal := uint(getAsUint64(val))
pi.manager.CASCachedValue(pi.Key, val, uintVal)
return uintVal
}

Expand All @@ -155,8 +160,9 @@ func (pi *ParamItem) GetAsUint32() uint32 {
return uint32Val
}
}
uint32Val := uint32(getAsUint64(pi.GetValue()))
pi.manager.SetCachedValue(pi.Key, uint32Val)
val := pi.GetValue()
uint32Val := uint32(getAsUint64(val))
pi.manager.CASCachedValue(pi.Key, val, uint32Val)
return uint32Val
}

Expand All @@ -166,8 +172,9 @@ func (pi *ParamItem) GetAsUint64() uint64 {
return uint64Val
}
}
uint64Val := getAsUint64(pi.GetValue())
pi.manager.SetCachedValue(pi.Key, uint64Val)
val := pi.GetValue()
uint64Val := getAsUint64(val)
pi.manager.CASCachedValue(pi.Key, val, uint64Val)
return uint64Val
}

Expand All @@ -177,8 +184,9 @@ func (pi *ParamItem) GetAsUint16() uint16 {
return uint16Val
}
}
uint16Val := uint16(getAsUint64(pi.GetValue()))
pi.manager.SetCachedValue(pi.Key, uint16Val)
val := pi.GetValue()
uint16Val := uint16(getAsUint64(val))
pi.manager.CASCachedValue(pi.Key, val, uint16Val)
return uint16Val
}

Expand All @@ -188,8 +196,9 @@ func (pi *ParamItem) GetAsInt64() int64 {
return int64Val
}
}
int64Val := getAsInt64(pi.GetValue())
pi.manager.SetCachedValue(pi.Key, int64Val)
val := pi.GetValue()
int64Val := getAsInt64(val)
pi.manager.CASCachedValue(pi.Key, val, int64Val)
return int64Val
}

Expand All @@ -199,8 +208,9 @@ func (pi *ParamItem) GetAsFloat() float64 {
return floatVal
}
}
floatVal := getAsFloat(pi.GetValue())
pi.manager.SetCachedValue(pi.Key, floatVal)
val := pi.GetValue()
floatVal := getAsFloat(val)
pi.manager.CASCachedValue(pi.Key, val, floatVal)
return floatVal
}

Expand All @@ -210,8 +220,9 @@ func (pi *ParamItem) GetAsDuration(unit time.Duration) time.Duration {
return durationVal
}
}
durationVal := getAsDuration(pi.GetValue(), unit)
pi.manager.SetCachedValue(pi.Key, durationVal)
val := pi.GetValue()
durationVal := getAsDuration(val, unit)
pi.manager.CASCachedValue(pi.Key, val, durationVal)
return durationVal
}

Expand Down

0 comments on commit 4a41749

Please sign in to comment.