From 490434eec2771a0b4e4740eee1f4074dcdf9a0fb Mon Sep 17 00:00:00 2001 From: johnypeng <2651903873@qq.com> Date: Sun, 28 Jan 2024 21:53:17 +0800 Subject: [PATCH 1/2] feat: cmdline supports to get multiple kv details --- client/client.go | 37 ++++++++++++++++++++++++++++++++--- cmd/bscp/get.go | 35 ++++++++++++++++++++++++++------- examples/README.md | 2 +- examples/kv-ctl/main.go | 2 +- go.mod | 4 ++++ internal/upstream/api.go | 11 ++++++++++- internal/upstream/upstream.go | 1 + 7 files changed, 79 insertions(+), 13 deletions(-) diff --git a/client/client.go b/client/client.go index 6ce579330..cf45b061e 100644 --- a/client/client.go +++ b/client/client.go @@ -37,10 +37,12 @@ type Client interface { ListApps(match []string) ([]*pbfs.App, error) // PullFiles pull files from remote PullFiles(app string, opts ...AppOption) (*Release, error) - // Get KV release from remote + // PullKvs pull KV release from remote PullKvs(app string, match []string, opts ...AppOption) (*Release, error) - // Pull Key Value from remote + // Get gets Key Value from remote Get(app string, key string, opts ...AppOption) (string, error) + // GetKvValues gets kv values from remote, get all kv values if keys are empty + GetKvValues(app string, keys []string, opts ...AppOption) ([]*pbfs.KV, error) // AddWatcher add a watcher to client AddWatcher(callback Callback, app string, opts ...AppOption) error // StartWatch start watch @@ -216,7 +218,7 @@ func (c *client) PullFiles(app string, opts ...AppOption) (*Release, error) { return r, nil } -// GetRelease get release from remote +// PullKvs get release from remote func (c *client) PullKvs(app string, match []string, opts ...AppOption) (*Release, error) { option := &AppOptions{} for _, opt := range opts { @@ -292,6 +294,35 @@ func (c *client) Get(app string, key string, opts ...AppOption) (string, error) return resp.Value, nil } +// GetKvValues gets kv values from remote, get all kv values if keys are empty +func (c *client) GetKvValues(app string, keys []string, opts ...AppOption) ([]*pbfs.KV, error) { + option := &AppOptions{} + for _, opt := range opts { + opt(option) + } + vas, _ := c.buildVas() + req := &pbfs.GetKvValuesReq{ + BizId: c.opts.bizID, + AppMeta: &pbfs.AppMeta{ + App: app, + Labels: c.opts.labels, + Uid: c.opts.uid, + }, + Keys: keys, + } + req.AppMeta.Labels = util.MergeLabels(c.opts.labels, option.Labels) + // reset uid + if option.UID != "" { + req.AppMeta.Uid = option.UID + } + resp, err := c.upstream.GetKvValues(vas, req) + if err != nil { + return nil, err + } + + return resp.Kvs, nil +} + // ListApps list app from remote, only return have perm by token func (c *client) ListApps(match []string) ([]*pbfs.App, error) { vas, _ := c.buildVas() diff --git a/cmd/bscp/get.go b/cmd/bscp/get.go index 53a30b7e8..cfb9a54eb 100644 --- a/cmd/bscp/get.go +++ b/cmd/bscp/get.go @@ -28,9 +28,10 @@ var ( ) const ( - outputFormatTable = "" - outputFormatJson = "json" - outputFormatValue = "value" + outputFormatTable = "" + outputFormatJson = "json" + outputFormatValue = "value" + outputFormatValueJson = "value_json" ) var ( @@ -90,7 +91,7 @@ func init() { // kv 参数 getKvCmd.Flags().StringVarP(&appName, "app", "a", "", "app name") getKvCmd.Flags().StringVarP(&labelsStr, "labels", "l", "", "labels") - getKvCmd.PersistentFlags().StringVarP(&outputFormat, "output", "o", "", "output format, One of: json|value") + getKvCmd.PersistentFlags().StringVarP(&outputFormat, "output", "o", "", "output format, One of: json|value|value_json") } // runGetApp executes the get app command. @@ -187,6 +188,23 @@ func runGetKvValue(bscp client.Client, app, key string) error { return err } +func runGetKvValues(bscp client.Client, app string, keys []string) error { + kvs, err := bscp.GetKvValues(app, keys, client.WithAppLabels(labels)) + if err != nil { + return err + } + + output := make(map[string]any, len(kvs)) + for _, kv := range kvs { + output[kv.Key] = map[string]string{ + "kv_type": kv.KvType, + "value": kv.Value, + } + } + + return jsonOutput(output) +} + // runGetKv executes the get kv command. func runGetKv(args []string) error { baseConf, err := initBaseConf() @@ -208,7 +226,8 @@ func runGetKv(args []string) error { return err } - if outputFormat == outputFormatValue { + switch outputFormat { + case outputFormatValue: if len(args) == 0 { return fmt.Errorf("res must not be empty") } @@ -216,7 +235,9 @@ func runGetKv(args []string) error { return fmt.Errorf("multiple res are not supported") } return runGetKvValue(bscp, appName, args[0]) + case outputFormatValueJson: + return runGetKvValues(bscp, appName, args) + default: + return runGetListKv(bscp, appName, args) } - - return runGetListKv(bscp, appName, args) } diff --git a/examples/README.md b/examples/README.md index 0709a1f40..2429efb71 100644 --- a/examples/README.md +++ b/examples/README.md @@ -14,7 +14,7 @@ bscp sdk examples 添加环境变量 ```bash # FEED 地址 -export BSCP_FEED_ADDR="bscp-feed.example.com:9510" +export BSCP_FEED_ADDRS="bscp-feed.example.com:9510" # 服务密钥 Token, 记得需要关联配置文件 export BSCP_TOKEN="xxx" # 当前业务 diff --git a/examples/kv-ctl/main.go b/examples/kv-ctl/main.go index 035439fb4..848bf89c8 100644 --- a/examples/kv-ctl/main.go +++ b/examples/kv-ctl/main.go @@ -114,7 +114,7 @@ func execute() { } else { result := map[string]string{} if len(keySlice) == 0 { - release, err := bscp.PullKvs(appName, opts...) + release, err := bscp.PullKvs(appName, []string{}, opts...) if err != nil { slog.Error("pull kv failed", logger.ErrAttr(err)) os.Exit(1) diff --git a/go.mod b/go.mod index 7289617e6..01d049ff6 100644 --- a/go.mod +++ b/go.mod @@ -74,3 +74,7 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.100.1 // indirect ) + +replace ( + github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp => /Users/yun/data/mycode/bk-bcs/bcs-services/bcs-bscp +) \ No newline at end of file diff --git a/internal/upstream/api.go b/internal/upstream/api.go index d666c1e12..9836c735f 100644 --- a/internal/upstream/api.go +++ b/internal/upstream/api.go @@ -103,7 +103,7 @@ func (uc *upstreamClient) GetDownloadURL(vas *kit.Vas, req *pbfs.GetDownloadURLR return uc.client.GetDownloadURL(vas.Ctx, req) } -// GetKvValue get the kvs value from upstream feed server. +// GetKvValue get the kv value from upstream feed server. func (uc *upstreamClient) GetKvValue(vas *kit.Vas, req *pbfs.GetKvValueReq) (*pbfs.GetKvValueResp, error) { if err := uc.wait.WaitWithContext(vas.Ctx); err != nil { return nil, err @@ -112,6 +112,15 @@ func (uc *upstreamClient) GetKvValue(vas *kit.Vas, req *pbfs.GetKvValueReq) (*pb return uc.client.GetKvValue(vas.Ctx, req) } +// GetKvValues get the kv values from upstream feed server. +func (uc *upstreamClient) GetKvValues(vas *kit.Vas, req *pbfs.GetKvValuesReq) (*pbfs.GetKvValuesResp, error) { + if err := uc.wait.WaitWithContext(vas.Ctx); err != nil { + return nil, err + } + + return uc.client.GetKvValues(vas.Ctx, req) +} + // ListApps list the apps value from upstream feed server. func (uc *upstreamClient) ListApps(vas *kit.Vas, req *pbfs.ListAppsReq) (*pbfs.ListAppsResp, error) { if err := uc.wait.WaitWithContext(vas.Ctx); err != nil { diff --git a/internal/upstream/upstream.go b/internal/upstream/upstream.go index b532cab30..f557f753e 100644 --- a/internal/upstream/upstream.go +++ b/internal/upstream/upstream.go @@ -48,6 +48,7 @@ type Upstream interface { PullAppFileMeta(vas *kit.Vas, req *pbfs.PullAppFileMetaReq) (*pbfs.PullAppFileMetaResp, error) PullKvMeta(vas *kit.Vas, req *pbfs.PullKvMetaReq) (*pbfs.PullKvMetaResp, error) GetKvValue(vas *kit.Vas, req *pbfs.GetKvValueReq) (*pbfs.GetKvValueResp, error) + GetKvValues(vas *kit.Vas, req *pbfs.GetKvValuesReq) (*pbfs.GetKvValuesResp, error) GetDownloadURL(vas *kit.Vas, req *pbfs.GetDownloadURLReq) (*pbfs.GetDownloadURLResp, error) Version() *pbbase.Versioning ListApps(vas *kit.Vas, req *pbfs.ListAppsReq) (*pbfs.ListAppsResp, error) From c29d2bad9ae177fb935f45e8dae67f4c765561ab Mon Sep 17 00:00:00 2001 From: johnypeng <2651903873@qq.com> Date: Mon, 29 Jan 2024 11:12:44 +0800 Subject: [PATCH 2/2] refactor: get kv details in client side --- client/client.go | 31 ------------------ cmd/bscp/get.go | 59 +++++++++++++++++++++++++++++++---- go.mod | 4 --- internal/upstream/api.go | 9 ------ internal/upstream/upstream.go | 1 - 5 files changed, 53 insertions(+), 51 deletions(-) diff --git a/client/client.go b/client/client.go index cf45b061e..3819119aa 100644 --- a/client/client.go +++ b/client/client.go @@ -41,8 +41,6 @@ type Client interface { PullKvs(app string, match []string, opts ...AppOption) (*Release, error) // Get gets Key Value from remote Get(app string, key string, opts ...AppOption) (string, error) - // GetKvValues gets kv values from remote, get all kv values if keys are empty - GetKvValues(app string, keys []string, opts ...AppOption) ([]*pbfs.KV, error) // AddWatcher add a watcher to client AddWatcher(callback Callback, app string, opts ...AppOption) error // StartWatch start watch @@ -294,35 +292,6 @@ func (c *client) Get(app string, key string, opts ...AppOption) (string, error) return resp.Value, nil } -// GetKvValues gets kv values from remote, get all kv values if keys are empty -func (c *client) GetKvValues(app string, keys []string, opts ...AppOption) ([]*pbfs.KV, error) { - option := &AppOptions{} - for _, opt := range opts { - opt(option) - } - vas, _ := c.buildVas() - req := &pbfs.GetKvValuesReq{ - BizId: c.opts.bizID, - AppMeta: &pbfs.AppMeta{ - App: app, - Labels: c.opts.labels, - Uid: c.opts.uid, - }, - Keys: keys, - } - req.AppMeta.Labels = util.MergeLabels(c.opts.labels, option.Labels) - // reset uid - if option.UID != "" { - req.AppMeta.Uid = option.UID - } - resp, err := c.upstream.GetKvValues(vas, req) - if err != nil { - return nil, err - } - - return resp.Kvs, nil -} - // ListApps list app from remote, only return have perm by token func (c *client) ListApps(match []string) ([]*pbfs.App, error) { vas, _ := c.buildVas() diff --git a/cmd/bscp/get.go b/cmd/bscp/get.go index cfb9a54eb..6ca170c24 100644 --- a/cmd/bscp/get.go +++ b/cmd/bscp/get.go @@ -16,6 +16,7 @@ import ( "encoding/json" "fmt" "os" + "sync" "github.com/spf13/cobra" @@ -189,22 +190,68 @@ func runGetKvValue(bscp client.Client, app, key string) error { } func runGetKvValues(bscp client.Client, app string, keys []string) error { - kvs, err := bscp.GetKvValues(app, keys, client.WithAppLabels(labels)) + release, err := bscp.PullKvs(app, []string{}, client.WithAppLabels(labels)) if err != nil { return err } + kvTypeMap := make(map[string]string) + isAll := false + if len(keys) == 0 { + isAll = true + } + for _, k := range release.KvItems { + kvTypeMap[k.Key] = k.KvType + if isAll { + keys = append(keys, k.Key) + } + } + + values, hitError := getKvValues(bscp, app, keys) + if hitError != nil { + return hitError + } - output := make(map[string]any, len(kvs)) - for _, kv := range kvs { - output[kv.Key] = map[string]string{ - "kv_type": kv.KvType, - "value": kv.Value, + output := make(map[string]any, len(keys)) + for idx, key := range keys { + output[key] = map[string]string{ + "kv_type": kvTypeMap[key], + "value": values[idx], } } return jsonOutput(output) } +// getKvValues get kv values concurrently +func getKvValues(bscp client.Client, app string, keys []string) ([]string, error) { + var hitError error + values := make([]string, len(keys)) + pipe := make(chan struct{}, 10) + wg := sync.WaitGroup{} + + for idx, key := range keys { + wg.Add(1) + + pipe <- struct{}{} + go func(idx int, key string) { + defer func() { + wg.Done() + <-pipe + }() + + value, err := bscp.Get(app, key, client.WithAppLabels(labels)) + if err != nil { + hitError = fmt.Errorf("get kv value failed for key: %s, err:%v", key, err) + return + } + values[idx] = value + }(idx, key) + } + wg.Wait() + + return values, hitError +} + // runGetKv executes the get kv command. func runGetKv(args []string) error { baseConf, err := initBaseConf() diff --git a/go.mod b/go.mod index 01d049ff6..7289617e6 100644 --- a/go.mod +++ b/go.mod @@ -74,7 +74,3 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.100.1 // indirect ) - -replace ( - github.com/TencentBlueKing/bk-bcs/bcs-services/bcs-bscp => /Users/yun/data/mycode/bk-bcs/bcs-services/bcs-bscp -) \ No newline at end of file diff --git a/internal/upstream/api.go b/internal/upstream/api.go index 9836c735f..5e83e8bb1 100644 --- a/internal/upstream/api.go +++ b/internal/upstream/api.go @@ -112,15 +112,6 @@ func (uc *upstreamClient) GetKvValue(vas *kit.Vas, req *pbfs.GetKvValueReq) (*pb return uc.client.GetKvValue(vas.Ctx, req) } -// GetKvValues get the kv values from upstream feed server. -func (uc *upstreamClient) GetKvValues(vas *kit.Vas, req *pbfs.GetKvValuesReq) (*pbfs.GetKvValuesResp, error) { - if err := uc.wait.WaitWithContext(vas.Ctx); err != nil { - return nil, err - } - - return uc.client.GetKvValues(vas.Ctx, req) -} - // ListApps list the apps value from upstream feed server. func (uc *upstreamClient) ListApps(vas *kit.Vas, req *pbfs.ListAppsReq) (*pbfs.ListAppsResp, error) { if err := uc.wait.WaitWithContext(vas.Ctx); err != nil { diff --git a/internal/upstream/upstream.go b/internal/upstream/upstream.go index f557f753e..b532cab30 100644 --- a/internal/upstream/upstream.go +++ b/internal/upstream/upstream.go @@ -48,7 +48,6 @@ type Upstream interface { PullAppFileMeta(vas *kit.Vas, req *pbfs.PullAppFileMetaReq) (*pbfs.PullAppFileMetaResp, error) PullKvMeta(vas *kit.Vas, req *pbfs.PullKvMetaReq) (*pbfs.PullKvMetaResp, error) GetKvValue(vas *kit.Vas, req *pbfs.GetKvValueReq) (*pbfs.GetKvValueResp, error) - GetKvValues(vas *kit.Vas, req *pbfs.GetKvValuesReq) (*pbfs.GetKvValuesResp, error) GetDownloadURL(vas *kit.Vas, req *pbfs.GetDownloadURLReq) (*pbfs.GetDownloadURLResp, error) Version() *pbbase.Versioning ListApps(vas *kit.Vas, req *pbfs.ListAppsReq) (*pbfs.ListAppsResp, error)