Skip to content

Commit

Permalink
Feat: refine kv-ctl (#47)
Browse files Browse the repository at this point in the history
* Feat: refine kv-ctl

* Feat: refine kv-ctl

* Feat: refine kv-ctl

* Feat: cache server refresh appid
  • Loading branch information
ifooth committed Dec 18, 2023
1 parent 1cf012c commit f21c331
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 9 deletions.
50 changes: 49 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
type Client interface {
// PullFiles pull files from remote
PullFiles(app string, opts ...AppOption) (*Release, error)
// Get release from remote
PullKvs(app string, opts ...AppOption) (*Release, error)
// Pull Key Value from remote
Get(app string, key string, opts ...AppOption) (string, error)
// AddWatcher add a watcher to client
Expand Down Expand Up @@ -212,6 +214,52 @@ func (c *client) PullFiles(app string, opts ...AppOption) (*Release, error) {
return r, nil
}

// GetRelease get release from remote
func (c *client) PullKvs(app string, opts ...AppOption) (*Release, error) {
option := &AppOptions{}
for _, opt := range opts {
opt(option)
}
vas, _ := c.buildVas()
req := &pbfs.PullKvMetaReq{
ApiVersion: sfs.CurrentAPIVersion,
BizId: c.opts.bizID,
AppMeta: &pbfs.AppMeta{
App: app,
Labels: c.opts.labels,
Uid: c.opts.uid,
},
Token: c.opts.token,
}
// merge labels, if key conflict, app value will overwrite client value
req.AppMeta.Labels = util.MergeLabels(c.opts.labels, option.Labels)
// reset uid
if option.UID != "" {
req.AppMeta.Uid = option.UID
}
resp, err := c.upstream.PullKvMeta(vas, req)
if err != nil {
return nil, err
}

kvs := make([]*sfs.KvMetaV1, 0, len(resp.GetKvMetas()))
for _, v := range resp.GetKvMetas() {
kvs = append(kvs, &sfs.KvMetaV1{
Key: v.GetKey(),
KvAttachment: v.GetKvAttachment(),
})
}

r := &Release{
ReleaseID: resp.ReleaseId,
FileItems: []*ConfigItemFile{},
KvItems: kvs,
PreHook: nil,
PostHook: nil,
}
return r, nil
}

// Get 读取 Key 的值
func (c *client) Get(app string, key string, opts ...AppOption) (string, error) {
option := &AppOptions{}
Expand All @@ -237,7 +285,7 @@ func (c *client) Get(app string, key string, opts ...AppOption) (string, error)
}
resp, err := c.upstream.GetKvValue(vas, req)
if err != nil {
return "", fmt.Errorf("get kv value failed, err: %s, rid: %s", err, vas.Rid)
return "", err
}

return resp.Value, nil
Expand Down
2 changes: 2 additions & 0 deletions client/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ func (w *watcher) loopReceiveWatchedEvent(wStream pbfs.Upstream_WatchClient) {
}

logger.Error("watch stream is corrupted", logger.ErrAttr(err), slog.String("rid", w.vas.Rid))
// 权限不足或者删除等会一直错误,限制重连频率
time.Sleep(time.Millisecond * 100)
w.NotifyReconnect(reconnectSignal{Reason: "watch stream corrupted"})
return
}
Expand Down
42 changes: 38 additions & 4 deletions examples/kv-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func execute() {
}

bscp, err := client.New(
client.WithFeedAddr(os.Getenv("BSCP_FEED_ADDR")),
client.WithFeedAddrs(strings.Split(os.Getenv("BSCP_FEED_ADDRS"), ",")),
client.WithBizID(uint32(biz)),
client.WithToken(os.Getenv("BSCP_TOKEN")),
client.WithLabels(labels),
Expand All @@ -100,22 +100,48 @@ func execute() {

appName := os.Getenv("BSCP_APP")
opts := []client.AppOption{}
keySlice := strings.Split(keys, ",")

keySlice := []string{}
if keys != "" {
keySlice = strings.Split(keys, ",")
}

if watchMode {
if err = watchAppKV(bscp, appName, keySlice, opts); err != nil {
logger.Error("watch", logger.ErrAttr(err))
os.Exit(1)
}
} else {
result := map[string]string{}
if len(keySlice) == 0 {
release, err := bscp.PullKvs(appName, opts...)
if err != nil {
slog.Error("pull kv failed", logger.ErrAttr(err))
os.Exit(1)
}

if len(release.KvItems) == 0 {
slog.Error("kv release is empty")
os.Exit(1)
}

for _, v := range release.KvItems {
keySlice = append(keySlice, v.Key)
}
}

errKeys := []string{}
for _, key := range keySlice {
value, err := bscp.Get(appName, key, opts...)
if err != nil {
errKeys = append(errKeys, key)
continue
}
result[key] = value
}
if len(errKeys) > 0 {
logger.Warn("get key failed", slog.Any("keys", errKeys))
}

json.NewEncoder(os.Stdout).Encode(result) // nolint
}
Expand All @@ -130,22 +156,30 @@ type watcher struct {
// callback watch 回调函数
func (w *watcher) callback(release *client.Release) error {
result := map[string]string{}
errKeys := []string{}

// kv 列表, 可以读取值
for _, item := range release.KvItems {
if _, ok := w.keyMap[item.Key]; !ok && len(keys) != 0 {
continue
}

value, err := w.bscp.Get(w.app, item.Key)
if err != nil {
logger.Error("get value failed: %d, %v, err: %s", release.ReleaseID, item.Key, err)
errKeys = append(errKeys, item.Key)
continue
}
logger.Info("get value success: %d, %v, %s", release.ReleaseID, item.Key, value)

// key匹配或者为空时,输出
if _, ok := w.keyMap[item.Key]; ok || len(keys) == 0 {
result[item.Key] = value
}
}

if len(errKeys) > 0 {
logger.Warn("get key failed", slog.Any("keys", errKeys))
}

json.NewEncoder(os.Stdout).Encode(result) // nolint

return nil
Expand Down
3 changes: 2 additions & 1 deletion examples/pull-file/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package main
import (
"os"
"strconv"
"strings"

"golang.org/x/exp/slog"

Expand All @@ -40,7 +41,7 @@ func main() {
}

bscp, err := client.New(
client.WithFeedAddr(os.Getenv("BSCP_FEED_ADDRS")),
client.WithFeedAddrs(strings.Split(os.Getenv("BSCP_FEED_ADDRS"), ",")),
client.WithBizID(uint32(biz)),
client.WithToken(os.Getenv("BSCP_TOKEN")),
)
Expand Down
3 changes: 2 additions & 1 deletion examples/pull-kv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/json"
"os"
"strconv"
"strings"

"golang.org/x/exp/slog"

Expand Down Expand Up @@ -47,7 +48,7 @@ func main() {
}

bscp, err := client.New(
client.WithFeedAddr(os.Getenv("BSCP_FEED_ADDRS")),
client.WithFeedAddrs(strings.Split(os.Getenv("BSCP_FEED_ADDRS"), ",")),
client.WithBizID(uint32(biz)),
client.WithToken(os.Getenv("BSCP_TOKEN")),
client.WithLabels(labels),
Expand Down
3 changes: 2 additions & 1 deletion examples/watch-file/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"os"
"os/signal"
"strconv"
"strings"
"syscall"

"golang.org/x/exp/slog"
Expand All @@ -43,7 +44,7 @@ func main() {
}

bscp, err := client.New(
client.WithFeedAddr(os.Getenv("BSCP_FEED_ADDR")),
client.WithFeedAddrs(strings.Split(os.Getenv("BSCP_FEED_ADDRS"), ",")),
client.WithBizID(uint32(biz)),
client.WithToken(os.Getenv("BSCP_TOKEN")),
)
Expand Down
3 changes: 2 additions & 1 deletion examples/watch-kv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"os"
"os/signal"
"strconv"
"strings"
"syscall"

"golang.org/x/exp/slog"
Expand Down Expand Up @@ -50,7 +51,7 @@ func main() {
}

bscp, err := client.New(
client.WithFeedAddr(os.Getenv("BSCP_FEED_ADDR")),
client.WithFeedAddrs(strings.Split(os.Getenv("BSCP_FEED_ADDRS"), ",")),
client.WithBizID(uint32(biz)),
client.WithToken(os.Getenv("BSCP_TOKEN")),
client.WithLabels(labels),
Expand Down
11 changes: 11 additions & 0 deletions internal/upstream/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ func (uc *upstreamClient) PullAppFileMeta(vas *kit.Vas, req *pbfs.PullAppFileMet
return uc.client.PullAppFileMeta(vas.Ctx, req)
}

// PullKVMeta pulls the app kv meta from upstream feed server.
func (uc *upstreamClient) PullKvMeta(vas *kit.Vas, req *pbfs.PullKvMetaReq) (
*pbfs.PullKvMetaResp, error) {

if err := uc.wait.WaitWithContext(vas.Ctx); err != nil {
return nil, err
}

return uc.client.PullKvMeta(vas.Ctx, req)
}

// GetDownloadURL gets the file temp download url from upstream feed server.
func (uc *upstreamClient) GetDownloadURL(vas *kit.Vas, req *pbfs.GetDownloadURLReq) (*pbfs.GetDownloadURLResp, error) {

Expand Down
1 change: 1 addition & 0 deletions internal/upstream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Upstream interface {
Messaging(vas *kit.Vas, typ sfs.MessagingType, payload []byte) (*pbfs.MessagingResp, error)
EnableBounce(bounceIntervalHour uint)
PullAppFileMeta(vas *kit.Vas, req *pbfs.PullAppFileMetaReq) (*pbfs.PullAppFileMetaResp, error)
PullKvMeta(vas *kit.Vas, req *pbfs.PullKvMetaReq) (*pbfs.PullKvMetaResp, error)
GetKvValue(vas *kit.Vas, req *pbfs.GetKvValueReq) (*pbfs.GetKvValueResp, error)
GetDownloadURL(vas *kit.Vas, req *pbfs.GetDownloadURLReq) (*pbfs.GetDownloadURLResp, error)
Version() *pbbase.Versioning
Expand Down

0 comments on commit f21c331

Please sign in to comment.