/
apollo.go
139 lines (125 loc) · 3.28 KB
/
apollo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package apollo
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
"sync/atomic"
"time"
"unsafe"
"github.com/apollo-client/apollo-go/codec"
"github.com/apollo-client/apollo-go/codec/jsoncodec"
"github.com/apollo-client/apollo-go/codec/properties"
"github.com/apollo-client/apollo-go/codec/yamlcodec"
"github.com/apollo-client/apollo-go/log"
)
type Application struct {
AppId string `json:"appId"`
Cluster string `json:"cluster"`
Secret string `json:"secret"`
Addr string `json:"addr"`
}
// Client apollo client
type Client struct {
App *Application // application config
opts *Options // options
}
// NewClient new apollo client
func NewClient(c *Application, opt ...Option) (*Client, error) {
if c == nil {
return nil, errors.New("config nil")
}
opts := newOptions(opt...)
cli := &Client{
App: c,
opts: opts,
}
cli.asyncNotifications()
return cli, nil
}
// Watch watch namespace struct
func (c *Client) Watch(namespace string, deft interface{}, ptr *unsafe.Pointer) error {
var code codec.Codec
ext := namespace[strings.LastIndex(namespace, ".")+1:]
switch ext {
case "json":
code = jsoncodec.NewCodec()
case "yaml", "yml":
code = yamlcodec.NewCodec()
case "xml":
return errors.New("not support xml namespace")
case "txt":
return errors.New("not support txt namespace")
default:
code = properties.NewCodec()
}
cb, err := namespaceCallback(deft, ptr, code)
if err != nil {
return err
}
return c.asyncApollo(namespace, cb)
}
// namespaceCallback namespace callback function
func namespaceCallback(deft interface{}, ptr *unsafe.Pointer, code codec.Codec) (WatchCallback, error) {
if reflect.Ptr != reflect.TypeOf(deft).Kind() {
return nil, errors.New("default must be a pointer")
}
// store default pointer
dt := reflect.ValueOf(deft).Elem()
ua := unsafe.Pointer(dt.UnsafeAddr())
atomic.StorePointer(ptr, ua)
// default value map
var mdeft map[string]json.RawMessage
bs, _ := json.Marshal(deft)
if err := json.Unmarshal(bs, &mdeft); err != nil {
return nil, err
}
return func(_ context.Context, apol *Apollo) (err error) {
// apol or apol configurations nil, return
if apol == nil || apol.Configurations == nil {
return
}
// fill in default value
nd := reflect.New(dt.Type())
nt := reflect.TypeOf(deft).Elem()
nm, err := code.Parse(apol.Configurations, mdeft, nt)
if err != nil {
log.Errorf("parse config err: %v", err)
return err
}
// marshal and unmarshal
tbs, _ := json.Marshal(nm)
err = json.Unmarshal(tbs, nd.Interface())
// store new pointer
nptr := unsafe.Pointer(nd.Elem().UnsafeAddr())
atomic.StorePointer(ptr, nptr)
return
}, nil
}
// WatchCallback watch callback define
type WatchCallback func(ctx context.Context, apol *Apollo) error
// safeCallback recover if callback failed
func safeCallback(apol *Apollo, cb WatchCallback) (err error) {
defer func() {
if r := recover(); r != nil {
var msg string
switch e := r.(type) {
case string:
msg = e
case error:
msg = err.Error()
default:
msg = "unknown panic type"
}
err = errors.New("callback panic:" + msg)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
if err = cb(ctx, apol); err != nil {
return fmt.Errorf("callback failed err:%v", err)
}
return
}