Skip to content

Commit

Permalink
feat: add kv cache (#100)
Browse files Browse the repository at this point in the history
* feat: add kv cache

* feat: add kv cache for sdk

* feat: change cache lib to support threshold megabyte

* chore: update go.mod

* feat: feedserver不可用时降级从缓存获取kv value

* chore: remove unnecessary code

* feat: adjust code

* feat: adjust code

* feat: adjust code

* feat: get kv value from cache first

* feat: adjust code
  • Loading branch information
fireyun committed Jun 25, 2024
1 parent 2168d28 commit e413e24
Show file tree
Hide file tree
Showing 19 changed files with 605 additions and 476 deletions.
149 changes: 138 additions & 11 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package client
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

Expand All @@ -25,6 +26,7 @@ import (
pbfs "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/protocol/feed-server"
sfs "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/sf-share"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/version"
"github.com/allegro/bigcache/v3"
"golang.org/x/exp/slog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -56,6 +58,9 @@ type Client interface {
ResetLabels(labels map[string]string)
}

// ErrNotFoundKvMD5 is err not found kv md5
var ErrNotFoundKvMD5 = errors.New("not found kv md5")

// Client is the bscp client
type client struct {
pairs map[string]string
Expand Down Expand Up @@ -130,13 +135,14 @@ func New(opts ...Option) (Client, error) {
if err != nil {
return nil, fmt.Errorf("init downloader failed, err: %s", err.Error())
}
if clientOpt.fileCache.Enabled {
if err = cache.Init(true, clientOpt.fileCache.CacheDir); err != nil {
return nil, fmt.Errorf("init file cache failed, err: %s", err.Error())
}
go cache.AutoCleanupFileCache(clientOpt.fileCache.CacheDir, DefaultCleanupIntervalSeconds,
clientOpt.fileCache.ThresholdGB, DefaultCacheRetentionRate)

if err = initFileCache(clientOpt); err != nil {
return nil, err
}
if err = initKvCache(clientOpt); err != nil {
return nil, err
}

watcher, err := newWatcher(u, clientOpt)
if err != nil {
return nil, fmt.Errorf("init watcher failed, err: %s", err.Error())
Expand All @@ -145,6 +151,44 @@ func New(opts ...Option) (Client, error) {
return c, nil
}

// initFileCache init file cache
func initFileCache(opts *options) error {
if opts.fileCache.Enabled {
logger.Info("enable file cache")
if err := cache.Init(opts.fileCache.CacheDir); err != nil {
return fmt.Errorf("init file cache failed, err: %s", err.Error())
}
go cache.AutoCleanupFileCache(opts.fileCache.CacheDir, DefaultCleanupIntervalSeconds,
opts.fileCache.ThresholdGB, DefaultCacheRetentionRate)
}
return nil
}

// initKvCache init kv cache
func initKvCache(opts *options) error {
if opts.kvCache.Enabled {
logger.Info("enable kv cache")
if err := cache.InitMemCache(opts.kvCache.ThresholdMB); err != nil {
return fmt.Errorf("init kv cache failed, err: %s", err.Error())
}

go func() {
mc := cache.GetMemCache()
for {
hit, miss, kvCnt := mc.Stats().Hits, mc.Stats().Misses, mc.Len()
var hitRatio float64
if hit+miss > 0 {
hitRatio = float64(hit) / float64(hit+miss)
}
logger.Debug("kv cache statistics", slog.Int64("hit", hit), slog.Int64("miss", miss),
slog.String("hit-ratio", fmt.Sprintf("%.3f", hitRatio)), slog.Int("kv-count", kvCnt))
time.Sleep(time.Second * 15)
}
}()
}
return nil
}

// AddWatcher add a watcher to client
func (c *client) AddWatcher(callback Callback, app string, opts ...AppOption) error {
_ = c.watcher.Subscribe(callback, app, opts...)
Expand Down Expand Up @@ -241,15 +285,16 @@ func (c *client) PullFiles(app string, opts ...AppOption) (*Release, error) { //
r.AppMate.FailedDetailReason = st.Err().Error()
}
if err = c.sendClientMessaging(vas, r.AppMate, nil); err != nil {
logger.Error("description failed to report the client change event, client_mode: %s, biz: %d,app: %s, err: %s",
r.ClientMode.String(), r.BizID, r.AppMate.App, err.Error())
logger.Error("description failed to report the client change event",
slog.String("client_mode", r.ClientMode.String()), slog.Uint64("biz", uint64(r.BizID)),
slog.String("app", r.AppMate.App), logger.ErrAttr(err))
}
}
}()

resp, err = c.upstream.PullAppFileMeta(vas, req)
if err != nil {
logger.Error("pull file meta failed, err: %s, rid: %s", err.Error(), vas.Rid)
logger.Error("pull file meta failed", logger.ErrAttr(err), slog.String("rid", vas.Rid))
return nil, err
}

Expand Down Expand Up @@ -335,7 +380,23 @@ func (c *client) PullKvs(app string, match []string, opts ...AppOption) (*Releas
}

// Get 读取 Key 的值
// 先从feed-server服务端拉取最新版本元数据,优先从缓存中获取该最新版本value,缓存中没有再调用feed-server获取value并缓存起来
// 在feed-server服务端连接不可用时则降级从缓存中获取(如果有缓存过),此时存在从缓存获取到的value值不是最新发布版本的风险
func (c *client) Get(app string, key string, opts ...AppOption) (string, error) {
// get kv value from cache
var val, md5 string
var err error
cacheKey := kvCacheKey(c.opts.bizID, app, key)
if cache.EnableMemCache {
val, md5, err = c.getKvValueFromCache(app, key, opts...)
if err == nil {
return val, nil
} else if err != bigcache.ErrEntryNotFound {
logger.Error("get kv value from cache failed", slog.String("key", cacheKey), logger.ErrAttr(err))
}
}

// get kv value from feed-server
option := &AppOptions{}
for _, opt := range opts {
opt(option)
Expand All @@ -355,12 +416,78 @@ func (c *client) Get(app string, key string, opts ...AppOption) (string, error)
if option.UID != "" {
req.AppMeta.Uid = option.UID
}

resp, err := c.upstream.GetKvValue(vas, req)
if err != nil {
return "", err
st, _ := status.FromError(err)
switch st.Code() {
case codes.Unavailable, codes.DeadlineExceeded, codes.Internal:
logger.Error("feed-server is unavailable", logger.ErrAttr(err))
// 降级从缓存中获取
if cache.EnableMemCache {
v, cErr := cache.GetMemCache().Get(cacheKey)
if cErr != nil {
logger.Error("get kv value from cache failed", slog.String("key", cacheKey), logger.ErrAttr(cErr))
return "", err
}
logger.Warn("feed-server is unavailable but get kv value from cache successfully",
slog.String("key", cacheKey))
return string(v[32:]), nil
}
default:
return "", err
}
}
val = resp.Value

// set kv md5 and value for cache
if cache.EnableMemCache {
if md5 == "" {
logger.Error("set kv cache failed", slog.String("key", cacheKey), logger.ErrAttr(ErrNotFoundKvMD5))
} else {
if err := cache.GetMemCache().Set(cacheKey, append([]byte(md5), []byte(val)...)); err != nil {
logger.Error("set kv cache failed", slog.String("key", cacheKey), logger.ErrAttr(err))
}
}
}

return resp.Value, nil
return val, nil
}

// getKvValueWithCache get kv value from the cache
func (c *client) getKvValueFromCache(app string, key string, opts ...AppOption) (string, string, error) {
release, err := c.PullKvs(app, []string{}, opts...)
if err != nil {
return "", "", err
}

var md5 string
for _, k := range release.KvItems {
if k.Key == key {
md5 = k.ContentSpec.Md5
break
}
}
if md5 == "" {
return "", "", ErrNotFoundKvMD5
}

var val []byte
val, err = cache.GetMemCache().Get(kvCacheKey(c.opts.bizID, app, key))
if err != nil {
return "", md5, err
}
// 判断是否为最新版本缓存,不是最新则仍从服务端获取value
if string(val[:32]) != md5 {
return "", md5, bigcache.ErrEntryNotFound
}

return string(val[32:]), md5, nil
}

// kvCacheKey is cache key for kv md5 and value, the cached data's first 32 character is md5, other is value
func kvCacheKey(bizID uint32, app, key string) string {
return fmt.Sprintf("%d_%s_%s", bizID, app, key)
}

// ListApps list app from remote, only return have perm by token
Expand Down
18 changes: 18 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type options struct {
token string
// fileCache file cache option
fileCache FileCache
// kvCache kv cache option
kvCache KvCache
// EnableMonitorResourceUsage 是否采集/监控资源使用率
enableMonitorResourceUsage bool
}
Expand All @@ -48,6 +50,14 @@ type FileCache struct {
// RetentionRate float64
}

// KvCache option for kv cache
type KvCache struct {
// Enabled is whether enable kv cache
Enabled bool
// ThresholdMB is threshold megabyte of kv cache
ThresholdMB float64
}

const (
// DefaultCleanupIntervalSeconds is the bscp cli default file cache cleanup interval.
DefaultCleanupIntervalSeconds = 300
Expand Down Expand Up @@ -116,6 +126,14 @@ func WithFileCache(c FileCache) Option {
}
}

// WithKvCache set kv cache
func WithKvCache(c KvCache) Option {
return func(o *options) error {
o.kvCache = c
return nil
}
}

// WithEnableMonitorResourceUsage 是否采集/监控资源使用率
func WithEnableMonitorResourceUsage(enable bool) Option {
return func(o *options) error {
Expand Down
9 changes: 5 additions & 4 deletions client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,9 @@ func (r *Release) Execute(steps ...Function) error {
}

if err = r.sendVersionChangeMessaging(bd); err != nil {
logger.Error("description failed to report the client change event, client_mode: %s, biz: %d,app: %s, err: %s",
r.ClientMode.String(), r.BizID, r.AppMate.App, err.Error())
logger.Error("description failed to report the client change event",
slog.String("client_mode", r.ClientMode.String()), slog.Uint64("biz", uint64(r.BizID)),
slog.String("app", r.AppMate.App), logger.ErrAttr(err))
}

}()
Expand All @@ -361,8 +362,8 @@ func (r *Release) Execute(steps ...Function) error {

// 发送拉取前事件
if err = r.sendVersionChangeMessaging(bd); err != nil {
logger.Error("failed to send the pull status event. biz: %d,app: %s, err: %s",
r.BizID, r.AppMate.App, err.Error())
logger.Error("failed to send the pull status event", slog.Uint64("biz", uint64(r.BizID)),
slog.String("app", r.AppMate.App), logger.ErrAttr(err))
}

// 发送心跳数据
Expand Down
3 changes: 2 additions & 1 deletion client/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (w *watcher) StartWatch() error {
// 发送客户端连接信息
go func() {
if err = w.sendClientMessaging(apps, nil); err != nil {
logger.Error("failed to send the client connection event. biz: %d, err: %s", w.opts.bizID, err.Error())
logger.Error("failed to send the client connection event",
slog.Uint64("biz", uint64(w.opts.bizID)), logger.ErrAttr(err))
}
}()

Expand Down
9 changes: 9 additions & 0 deletions cmd/bscp/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func Watch(cmd *cobra.Command, args []string) {
CacheDir: conf.FileCache.CacheDir,
ThresholdGB: conf.FileCache.ThresholdGB,
}),
client.WithKvCache(client.KvCache{
Enabled: conf.KvCache.Enabled,
ThresholdMB: conf.KvCache.ThresholdMB,
}),
client.WithEnableMonitorResourceUsage(conf.EnableMonitorResourceUsage),
)
if err != nil {
Expand Down Expand Up @@ -249,6 +253,11 @@ func init() {
WatchCmd.Flags().Float64P("cache-threshold-gb", "", constant.DefaultCacheThresholdGB,
"bscp file cache threshold gigabyte")
mustBindPFlag(watchViper, "file_cache.threshold_gb", WatchCmd.Flags().Lookup("cache-threshold-gb"))
WatchCmd.Flags().BoolP("kv-cache-enabled", "", constant.DefaultKvCacheEnabled, "enable kv cache or not")
mustBindPFlag(watchViper, "kv_cache.enabled", WatchCmd.Flags().Lookup("kv-cache-enabled"))
WatchCmd.Flags().Float64P("kv-cache-threshold-mb", "", constant.DefaultKvCacheThresholdMB,
"bscp kv cache threshold megabyte in memory")
mustBindPFlag(watchViper, "kv_cache.threshold_mb", WatchCmd.Flags().Lookup("kv-cache-threshold-mb"))
WatchCmd.Flags().BoolP("enable-resource", "e", true, "enable report resource usage")
mustBindPFlag(watchViper, "enable_resource", WatchCmd.Flags().Lookup("enable-resource"))

Expand Down
11 changes: 9 additions & 2 deletions examples/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@ apps:
- "app": "demo-2"
# 文件缓存配置
file_cache:
# 是否开启文件缓存,不配置时默认开启
# 是否开启文件缓存
enabled: true
# 缓存目录
cache_dir: /data/bscp/cache
# 缓存清理阈值,单位为GB,缓存目录达到该阈值时开始清理,按文件更新时间从最老的文件开始清理,直至达到设置的缓存保留比例为止
threshold_gb: 2
threshold_gb: 2
# kv缓存配置
kv_cache:
# 是否开启kv缓存
# 用于在feed-server服务端连接不可用时仍能降级从缓存获取到对应value(如果有缓存过),但存在value值不是最新发布版本的风险
enabled: true
# kv缓存容量阈值,单位为MB,超过后会丢弃旧缓存数据
threshold_mb: 500
Loading

0 comments on commit e413e24

Please sign in to comment.