Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kv cache #100

Merged
merged 11 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 138 additions & 11 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package client
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

Expand All @@ -25,6 +26,7 @@ import (
pbfs "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/protocol/feed-server"
sfs "github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/sf-share"
"github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp/pkg/version"
"github.com/allegro/bigcache/v3"
"golang.org/x/exp/slog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -56,6 +58,9 @@ type Client interface {
ResetLabels(labels map[string]string)
}

// ErrNotFoundKvMD5 is err not found kv md5
var ErrNotFoundKvMD5 = errors.New("not found kv md5")

// Client is the bscp client
type client struct {
pairs map[string]string
Expand Down Expand Up @@ -130,13 +135,14 @@ func New(opts ...Option) (Client, error) {
if err != nil {
return nil, fmt.Errorf("init downloader failed, err: %s", err.Error())
}
if clientOpt.fileCache.Enabled {
if err = cache.Init(true, clientOpt.fileCache.CacheDir); err != nil {
return nil, fmt.Errorf("init file cache failed, err: %s", err.Error())
}
go cache.AutoCleanupFileCache(clientOpt.fileCache.CacheDir, DefaultCleanupIntervalSeconds,
clientOpt.fileCache.ThresholdGB, DefaultCacheRetentionRate)

if err = initFileCache(clientOpt); err != nil {
return nil, err
}
if err = initKvCache(clientOpt); err != nil {
return nil, err
}

watcher, err := newWatcher(u, clientOpt)
if err != nil {
return nil, fmt.Errorf("init watcher failed, err: %s", err.Error())
Expand All @@ -145,6 +151,44 @@ func New(opts ...Option) (Client, error) {
return c, nil
}

// initFileCache init file cache
func initFileCache(opts *options) error {
if opts.fileCache.Enabled {
logger.Info("enable file cache")
if err := cache.Init(opts.fileCache.CacheDir); err != nil {
return fmt.Errorf("init file cache failed, err: %s", err.Error())
}
go cache.AutoCleanupFileCache(opts.fileCache.CacheDir, DefaultCleanupIntervalSeconds,
opts.fileCache.ThresholdGB, DefaultCacheRetentionRate)
}
return nil
}

// initKvCache init kv cache
func initKvCache(opts *options) error {
if opts.kvCache.Enabled {
logger.Info("enable kv cache")
if err := cache.InitMemCache(opts.kvCache.ThresholdMB); err != nil {
return fmt.Errorf("init kv cache failed, err: %s", err.Error())
}

go func() {
mc := cache.GetMemCache()
for {
hit, miss, kvCnt := mc.Stats().Hits, mc.Stats().Misses, mc.Len()
var hitRatio float64
if hit+miss > 0 {
hitRatio = float64(hit) / float64(hit+miss)
}
logger.Debug("kv cache statistics", slog.Int64("hit", hit), slog.Int64("miss", miss),
slog.String("hit-ratio", fmt.Sprintf("%.3f", hitRatio)), slog.Int("kv-count", kvCnt))
time.Sleep(time.Second * 15)
}
}()
}
return nil
}

// AddWatcher add a watcher to client
func (c *client) AddWatcher(callback Callback, app string, opts ...AppOption) error {
_ = c.watcher.Subscribe(callback, app, opts...)
Expand Down Expand Up @@ -241,15 +285,16 @@ func (c *client) PullFiles(app string, opts ...AppOption) (*Release, error) { //
r.AppMate.FailedDetailReason = st.Err().Error()
}
if err = c.sendClientMessaging(vas, r.AppMate, nil); err != nil {
logger.Error("description failed to report the client change event, client_mode: %s, biz: %d,app: %s, err: %s",
r.ClientMode.String(), r.BizID, r.AppMate.App, err.Error())
logger.Error("description failed to report the client change event",
slog.String("client_mode", r.ClientMode.String()), slog.Uint64("biz", uint64(r.BizID)),
slog.String("app", r.AppMate.App), logger.ErrAttr(err))
}
}
}()

resp, err = c.upstream.PullAppFileMeta(vas, req)
if err != nil {
logger.Error("pull file meta failed, err: %s, rid: %s", err.Error(), vas.Rid)
logger.Error("pull file meta failed", logger.ErrAttr(err), slog.String("rid", vas.Rid))
return nil, err
}

Expand Down Expand Up @@ -335,7 +380,23 @@ func (c *client) PullKvs(app string, match []string, opts ...AppOption) (*Releas
}

// Get 读取 Key 的值
// 先从feed-server服务端拉取最新版本元数据,优先从缓存中获取该最新版本value,缓存中没有再调用feed-server获取value并缓存起来
// 在feed-server服务端连接不可用时则降级从缓存中获取(如果有缓存过),此时存在从缓存获取到的value值不是最新发布版本的风险
func (c *client) Get(app string, key string, opts ...AppOption) (string, error) {
fireyun marked this conversation as resolved.
Show resolved Hide resolved
// get kv value from cache
var val, md5 string
var err error
cacheKey := kvCacheKey(c.opts.bizID, app, key)
if cache.EnableMemCache {
val, md5, err = c.getKvValueFromCache(app, key, opts...)
if err == nil {
return val, nil
} else if err != bigcache.ErrEntryNotFound {
logger.Error("get kv value from cache failed", slog.String("key", cacheKey), logger.ErrAttr(err))
}
}

// get kv value from feed-server
option := &AppOptions{}
for _, opt := range opts {
opt(option)
Expand All @@ -355,12 +416,78 @@ func (c *client) Get(app string, key string, opts ...AppOption) (string, error)
if option.UID != "" {
req.AppMeta.Uid = option.UID
}

resp, err := c.upstream.GetKvValue(vas, req)
if err != nil {
return "", err
st, _ := status.FromError(err)
switch st.Code() {
case codes.Unavailable, codes.DeadlineExceeded, codes.Internal:
logger.Error("feed-server is unavailable", logger.ErrAttr(err))
// 降级从缓存中获取
if cache.EnableMemCache {
v, cErr := cache.GetMemCache().Get(cacheKey)
if cErr != nil {
logger.Error("get kv value from cache failed", slog.String("key", cacheKey), logger.ErrAttr(cErr))
return "", err
}
logger.Warn("feed-server is unavailable but get kv value from cache successfully",
slog.String("key", cacheKey))
return string(v[32:]), nil
}
default:
return "", err
}
}
val = resp.Value

// set kv md5 and value for cache
if cache.EnableMemCache {
if md5 == "" {
logger.Error("set kv cache failed", slog.String("key", cacheKey), logger.ErrAttr(ErrNotFoundKvMD5))
} else {
if err := cache.GetMemCache().Set(cacheKey, append([]byte(md5), []byte(val)...)); err != nil {
logger.Error("set kv cache failed", slog.String("key", cacheKey), logger.ErrAttr(err))
}
}
}

return resp.Value, nil
return val, nil
}

// getKvValueWithCache get kv value from the cache
func (c *client) getKvValueFromCache(app string, key string, opts ...AppOption) (string, string, error) {
release, err := c.PullKvs(app, []string{}, opts...)
if err != nil {
return "", "", err
}

var md5 string
for _, k := range release.KvItems {
if k.Key == key {
md5 = k.ContentSpec.Md5
break
}
}
if md5 == "" {
return "", "", ErrNotFoundKvMD5
}

var val []byte
val, err = cache.GetMemCache().Get(kvCacheKey(c.opts.bizID, app, key))
if err != nil {
return "", md5, err
}
// 判断是否为最新版本缓存,不是最新则仍从服务端获取value
if string(val[:32]) != md5 {
return "", md5, bigcache.ErrEntryNotFound
}

return string(val[32:]), md5, nil
}

// kvCacheKey is cache key for kv md5 and value, the cached data's first 32 character is md5, other is value
func kvCacheKey(bizID uint32, app, key string) string {
return fmt.Sprintf("%d_%s_%s", bizID, app, key)
}

// ListApps list app from remote, only return have perm by token
Expand Down
18 changes: 18 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type options struct {
token string
// fileCache file cache option
fileCache FileCache
// kvCache kv cache option
kvCache KvCache
// EnableMonitorResourceUsage 是否采集/监控资源使用率
enableMonitorResourceUsage bool
}
Expand All @@ -48,6 +50,14 @@ type FileCache struct {
// RetentionRate float64
}

// KvCache option for kv cache
type KvCache struct {
// Enabled is whether enable kv cache
Enabled bool
// ThresholdMB is threshold megabyte of kv cache
ThresholdMB float64
}

const (
// DefaultCleanupIntervalSeconds is the bscp cli default file cache cleanup interval.
DefaultCleanupIntervalSeconds = 300
Expand Down Expand Up @@ -116,6 +126,14 @@ func WithFileCache(c FileCache) Option {
}
}

// WithKvCache set kv cache
func WithKvCache(c KvCache) Option {
return func(o *options) error {
o.kvCache = c
return nil
}
}

// WithEnableMonitorResourceUsage 是否采集/监控资源使用率
func WithEnableMonitorResourceUsage(enable bool) Option {
return func(o *options) error {
Expand Down
9 changes: 5 additions & 4 deletions client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,9 @@ func (r *Release) Execute(steps ...Function) error {
}

if err = r.sendVersionChangeMessaging(bd); err != nil {
logger.Error("description failed to report the client change event, client_mode: %s, biz: %d,app: %s, err: %s",
r.ClientMode.String(), r.BizID, r.AppMate.App, err.Error())
logger.Error("description failed to report the client change event",
slog.String("client_mode", r.ClientMode.String()), slog.Uint64("biz", uint64(r.BizID)),
slog.String("app", r.AppMate.App), logger.ErrAttr(err))
}

}()
Expand All @@ -361,8 +362,8 @@ func (r *Release) Execute(steps ...Function) error {

// 发送拉取前事件
if err = r.sendVersionChangeMessaging(bd); err != nil {
logger.Error("failed to send the pull status event. biz: %d,app: %s, err: %s",
r.BizID, r.AppMate.App, err.Error())
logger.Error("failed to send the pull status event", slog.Uint64("biz", uint64(r.BizID)),
slog.String("app", r.AppMate.App), logger.ErrAttr(err))
}

// 发送心跳数据
Expand Down
3 changes: 2 additions & 1 deletion client/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (w *watcher) StartWatch() error {
// 发送客户端连接信息
go func() {
if err = w.sendClientMessaging(apps, nil); err != nil {
logger.Error("failed to send the client connection event. biz: %d, err: %s", w.opts.bizID, err.Error())
logger.Error("failed to send the client connection event",
slog.Uint64("biz", uint64(w.opts.bizID)), logger.ErrAttr(err))
}
}()

Expand Down
9 changes: 9 additions & 0 deletions cmd/bscp/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func Watch(cmd *cobra.Command, args []string) {
CacheDir: conf.FileCache.CacheDir,
ThresholdGB: conf.FileCache.ThresholdGB,
}),
client.WithKvCache(client.KvCache{
Enabled: conf.KvCache.Enabled,
ThresholdMB: conf.KvCache.ThresholdMB,
}),
client.WithEnableMonitorResourceUsage(conf.EnableMonitorResourceUsage),
)
if err != nil {
Expand Down Expand Up @@ -250,6 +254,11 @@ func init() {
WatchCmd.Flags().Float64P("cache-threshold-gb", "", constant.DefaultCacheThresholdGB,
"bscp file cache threshold gigabyte")
mustBindPFlag(watchViper, "file_cache.threshold_gb", WatchCmd.Flags().Lookup("cache-threshold-gb"))
WatchCmd.Flags().BoolP("kv-cache-enabled", "", constant.DefaultKvCacheEnabled, "enable kv cache or not")
mustBindPFlag(watchViper, "kv_cache.enabled", WatchCmd.Flags().Lookup("kv-cache-enabled"))
WatchCmd.Flags().Float64P("kv-cache-threshold-mb", "", constant.DefaultKvCacheThresholdMB,
"bscp kv cache threshold megabyte in memory")
mustBindPFlag(watchViper, "kv_cache.threshold_mb", WatchCmd.Flags().Lookup("kv-cache-threshold-mb"))
WatchCmd.Flags().BoolP("enable-resource", "e", true, "enable report resource usage")
mustBindPFlag(watchViper, "enable_resource", WatchCmd.Flags().Lookup("enable-resource"))

Expand Down
11 changes: 9 additions & 2 deletions examples/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@ apps:
- "app": "demo-2"
# 文件缓存配置
file_cache:
# 是否开启文件缓存,不配置时默认开启
# 是否开启文件缓存
enabled: true
# 缓存目录
cache_dir: /data/bscp/cache
# 缓存清理阈值,单位为GB,缓存目录达到该阈值时开始清理,按文件更新时间从最老的文件开始清理,直至达到设置的缓存保留比例为止
threshold_gb: 2
threshold_gb: 2
# kv缓存配置
kv_cache:
# 是否开启kv缓存
# 用于在feed-server服务端连接不可用时仍能降级从缓存获取到对应value(如果有缓存过),但存在value值不是最新发布版本的风险
enabled: true
# kv缓存容量阈值,单位为MB,超过后会丢弃旧缓存数据
threshold_mb: 500
Loading
Loading