Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add kv cache #100

Merged
merged 11 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 88 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,14 @@ func New(opts ...Option) (Client, error) {
if err != nil {
return nil, fmt.Errorf("init downloader failed, err: %s", err.Error())
}
if clientOpt.fileCache.Enabled {
if err = cache.Init(true, clientOpt.fileCache.CacheDir); err != nil {
return nil, fmt.Errorf("init file cache failed, err: %s", err.Error())
}
go cache.AutoCleanupFileCache(clientOpt.fileCache.CacheDir, DefaultCleanupIntervalSeconds,
clientOpt.fileCache.ThresholdGB, DefaultCacheRetentionRate)

if err = initFileCache(clientOpt); err != nil {
return nil, err
}
if err = initKvCache(clientOpt); err != nil {
return nil, err
}

watcher, err := newWatcher(u, clientOpt)
if err != nil {
return nil, fmt.Errorf("init watcher failed, err: %s", err.Error())
Expand All @@ -145,6 +146,44 @@ func New(opts ...Option) (Client, error) {
return c, nil
}

// initFileCache init file cache
func initFileCache(opts *options) error {
if opts.fileCache.Enabled {
logger.Info("enable file cache")
if err := cache.Init(opts.fileCache.CacheDir); err != nil {
return fmt.Errorf("init file cache failed, err: %s", err.Error())
}
go cache.AutoCleanupFileCache(opts.fileCache.CacheDir, DefaultCleanupIntervalSeconds,
opts.fileCache.ThresholdGB, DefaultCacheRetentionRate)
}
return nil
}

// initKvCache init kv cache
func initKvCache(opts *options) error {
if opts.kvCache.Enabled {
logger.Info("enable kv cache")
if err := cache.InitMemCache(opts.kvCache.ThresholdMB); err != nil {
return fmt.Errorf("init kv cache failed, err: %s", err.Error())
}

go func() {
mc := cache.GetMemCache()
for {
hit, miss, kvCnt := mc.Stats().Hits, mc.Stats().Misses, mc.Len()
var hitRatio float64
if hit+miss > 0 {
hitRatio = float64(hit) / float64(hit+miss)
}
logger.Debug("kv cache statistics", slog.Int64("hit", hit), slog.Int64("miss", miss),
slog.String("hit-ratio", fmt.Sprintf("%.3f", hitRatio)), slog.Int("kv-count", kvCnt))
time.Sleep(time.Second * 15)
}
}()
}
return nil
}

// AddWatcher add a watcher to client
func (c *client) AddWatcher(callback Callback, app string, opts ...AppOption) error {
_ = c.watcher.Subscribe(callback, app, opts...)
Expand Down Expand Up @@ -241,15 +280,16 @@ func (c *client) PullFiles(app string, opts ...AppOption) (*Release, error) { //
r.AppMate.FailedDetailReason = st.Err().Error()
}
if err = c.sendClientMessaging(vas, r.AppMate, nil); err != nil {
logger.Error("description failed to report the client change event, client_mode: %s, biz: %d,app: %s, err: %s",
r.ClientMode.String(), r.BizID, r.AppMate.App, err.Error())
logger.Error("description failed to report the client change event",
slog.String("client_mode", r.ClientMode.String()), slog.Uint64("biz", uint64(r.BizID)),
slog.String("app", r.AppMate.App), logger.ErrAttr(err))
}
}
}()

resp, err = c.upstream.PullAppFileMeta(vas, req)
if err != nil {
logger.Error("pull file meta failed, err: %s, rid: %s", err.Error(), vas.Rid)
logger.Error("pull file meta failed", logger.ErrAttr(err), slog.String("rid", vas.Rid))
return nil, err
}

Expand Down Expand Up @@ -335,7 +375,10 @@ func (c *client) PullKvs(app string, match []string, opts ...AppOption) (*Releas
}

// Get 读取 Key 的值
// 优先从feed-server服务端获取最新版本value并缓存,在feed-server服务端连接不可用时则降级从缓存中获取(如果有缓存过),
// 在feed-server服务端连接不可用时,存在从缓存获取到的value值不是最新发布版本的风险
func (c *client) Get(app string, key string, opts ...AppOption) (string, error) {
fireyun marked this conversation as resolved.
Show resolved Hide resolved
// get kv value from remote service
option := &AppOptions{}
for _, opt := range opts {
opt(option)
Expand All @@ -355,14 +398,49 @@ func (c *client) Get(app string, key string, opts ...AppOption) (string, error)
if option.UID != "" {
req.AppMeta.Uid = option.UID
}
cacheKey := kvCacheKey(c.opts.bizID, app, key)

resp, err := c.upstream.GetKvValue(vas, req)
if err != nil {
return "", err
st, _ := status.FromError(err)
switch st.Code() {
case codes.Unavailable, codes.DeadlineExceeded, codes.Internal:
logger.Error("feed-server is unavailable", logger.ErrAttr(err))
// 降级从缓存中获取
if cache.EnableMemCache {
val, cErr := cache.GetMemCache().Get(cacheKey)
if cErr != nil {
logger.Error("get kv value from cache failed", slog.String("key", cacheKey), logger.ErrAttr(cErr))
return "", err
}
logger.Warn("feed-server is unavailable but get kv value from cache successfully",
slog.String("key", cacheKey))
return string(val), nil
}
default:
return "", err
}
}

// 缓存最新kv的value值
if cache.EnableMemCache {
v, e := cache.GetMemCache().Get(cacheKey)
// 已缓存过,则不用再次缓存
if e == nil && string(v) == resp.Value {
return resp.Value, nil
}
if err := cache.GetMemCache().Set(cacheKey, []byte(resp.Value)); err != nil {
logger.Error("set kv cache failed", slog.String("key", cacheKey), logger.ErrAttr(err))
}
}

return resp.Value, nil
}

func kvCacheKey(bizID uint32, app, key string) string {
return fmt.Sprintf("%d_%s_%s", bizID, app, key)
}

// ListApps list app from remote, only return have perm by token
func (c *client) ListApps(match []string) ([]*pbfs.App, error) {
vas, _ := c.buildVas()
Expand Down
18 changes: 18 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type options struct {
token string
// fileCache file cache option
fileCache FileCache
// kvCache kv cache option
kvCache KvCache
// EnableMonitorResourceUsage 是否采集/监控资源使用率
enableMonitorResourceUsage bool
}
Expand All @@ -48,6 +50,14 @@ type FileCache struct {
// RetentionRate float64
}

// KvCache option for kv cache
type KvCache struct {
// Enabled is whether enable kv cache
Enabled bool
// ThresholdMB is threshold megabyte of kv cache
ThresholdMB float64
}

const (
// DefaultCleanupIntervalSeconds is the bscp cli default file cache cleanup interval.
DefaultCleanupIntervalSeconds = 300
Expand Down Expand Up @@ -116,6 +126,14 @@ func WithFileCache(c FileCache) Option {
}
}

// WithKvCache set kv cache
func WithKvCache(c KvCache) Option {
return func(o *options) error {
o.kvCache = c
return nil
}
}

// WithEnableMonitorResourceUsage 是否采集/监控资源使用率
func WithEnableMonitorResourceUsage(enable bool) Option {
return func(o *options) error {
Expand Down
9 changes: 5 additions & 4 deletions client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,9 @@ func (r *Release) Execute(steps ...Function) error {
}

if err = r.sendVersionChangeMessaging(bd); err != nil {
logger.Error("description failed to report the client change event, client_mode: %s, biz: %d,app: %s, err: %s",
r.ClientMode.String(), r.BizID, r.AppMate.App, err.Error())
logger.Error("description failed to report the client change event",
slog.String("client_mode", r.ClientMode.String()), slog.Uint64("biz", uint64(r.BizID)),
slog.String("app", r.AppMate.App), logger.ErrAttr(err))
}

}()
Expand All @@ -361,8 +362,8 @@ func (r *Release) Execute(steps ...Function) error {

// 发送拉取前事件
if err = r.sendVersionChangeMessaging(bd); err != nil {
logger.Error("failed to send the pull status event. biz: %d,app: %s, err: %s",
r.BizID, r.AppMate.App, err.Error())
logger.Error("failed to send the pull status event", slog.Uint64("biz", uint64(r.BizID)),
slog.String("app", r.AppMate.App), logger.ErrAttr(err))
}

// 发送心跳数据
Expand Down
3 changes: 2 additions & 1 deletion client/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (w *watcher) StartWatch() error {
// 发送客户端连接信息
go func() {
if err = w.sendClientMessaging(apps, nil); err != nil {
logger.Error("failed to send the client connection event. biz: %d, err: %s", w.opts.bizID, err.Error())
logger.Error("failed to send the client connection event",
slog.Uint64("biz", uint64(w.opts.bizID)), logger.ErrAttr(err))
}
}()

Expand Down
9 changes: 9 additions & 0 deletions cmd/bscp/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func Watch(cmd *cobra.Command, args []string) {
CacheDir: conf.FileCache.CacheDir,
ThresholdGB: conf.FileCache.ThresholdGB,
}),
client.WithKvCache(client.KvCache{
Enabled: conf.KvCache.Enabled,
ThresholdMB: conf.KvCache.ThresholdMB,
}),
client.WithEnableMonitorResourceUsage(conf.EnableMonitorResourceUsage),
)
if err != nil {
Expand Down Expand Up @@ -250,6 +254,11 @@ func init() {
WatchCmd.Flags().Float64P("cache-threshold-gb", "", constant.DefaultCacheThresholdGB,
"bscp file cache threshold gigabyte")
mustBindPFlag(watchViper, "file_cache.threshold_gb", WatchCmd.Flags().Lookup("cache-threshold-gb"))
WatchCmd.Flags().BoolP("kv-cache-enabled", "", constant.DefaultKvCacheEnabled, "enable kv cache or not")
mustBindPFlag(watchViper, "kv_cache.enabled", WatchCmd.Flags().Lookup("kv-cache-enabled"))
WatchCmd.Flags().Float64P("kv-cache-threshold-mb", "", constant.DefaultKvCacheThresholdMB,
"bscp kv cache threshold megabyte in memory")
mustBindPFlag(watchViper, "kv_cache.threshold_mb", WatchCmd.Flags().Lookup("kv-cache-threshold-mb"))
WatchCmd.Flags().BoolP("enable-resource", "e", true, "enable report resource usage")
mustBindPFlag(watchViper, "enable_resource", WatchCmd.Flags().Lookup("enable-resource"))

Expand Down
11 changes: 9 additions & 2 deletions examples/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@ apps:
- "app": "demo-2"
# 文件缓存配置
file_cache:
# 是否开启文件缓存,不配置时默认开启
# 是否开启文件缓存
enabled: true
# 缓存目录
cache_dir: /data/bscp/cache
# 缓存清理阈值,单位为GB,缓存目录达到该阈值时开始清理,按文件更新时间从最老的文件开始清理,直至达到设置的缓存保留比例为止
threshold_gb: 2
threshold_gb: 2
# kv缓存配置
kv_cache:
# 是否开启kv缓存
# 用于在feed-server服务端连接不可用时仍能降级从缓存获取到对应value(如果有缓存过),但存在value值不是最新发布版本的风险
enabled: true
# kv缓存容量阈值,单位为MB,超过后会丢弃旧缓存数据
threshold_mb: 500
96 changes: 64 additions & 32 deletions examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,72 +5,104 @@ go 1.20
require (
github.com/TencentBlueKing/bscp-go v1.0.5
github.com/spf13/cobra v1.7.0
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f
)

require (
github.com/Tencent/bk-bcs/bcs-common v0.0.0-20230912015319-acb7495967f5 // indirect
github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp v0.0.0-20240227114116-5cf70c21cd04 // indirect
dario.cat/mergo v1.0.0 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/ProtonMail/go-crypto v1.0.0 // indirect
github.com/Tencent/bk-bcs/bcs-common v0.0.0-20240425034551-e53e35b46a7b // indirect
github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp v0.0.0-20240517115418-9396857e3664 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitly/go-simplejson v0.5.1 // indirect
github.com/bluele/gcache v0.0.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudflare/circl v1.3.8 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/cyphar/filepath-securejoin v0.2.4 // indirect
github.com/denisbrodbeck/machineid v1.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
github.com/go-git/go-billy/v5 v5.5.0 // indirect
github.com/go-git/go-git/v5 v5.12.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/hcl v1.0.1-vault-5 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.59 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/nxadm/tail v1.4.11 // indirect
github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.2.1 // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/samber/lo v1.39.0 // indirect
github.com/spf13/afero v1.9.5 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
github.com/skeema/knownhosts v1.2.2 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.16.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect
github.com/spf13/viper v1.18.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/tidwall/gjson v1.16.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
github.com/ugorji/go/codec v1.2.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.9 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect
go.etcd.io/etcd/client/v3 v3.5.9 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/urfave/cli/v2 v2.27.1 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
go-micro.dev/v4 v4.10.2 // indirect
go.etcd.io/etcd/api/v3 v3.5.13 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect
go.etcd.io/etcd/client/v3 v3.5.13 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.13.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/grpc v1.63.2 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/klog/v2 v2.120.1 // indirect
)

replace github.com/TencentBlueKing/bscp-go => ../
Loading
Loading