/
config.go
158 lines (144 loc) · 3.26 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package config
import (
"context"
"errors"
"reflect"
"sync"
"time"
// init encoding
_ "github.com/go-kratos/kratos/v2/encoding/json"
_ "github.com/go-kratos/kratos/v2/encoding/proto"
_ "github.com/go-kratos/kratos/v2/encoding/xml"
_ "github.com/go-kratos/kratos/v2/encoding/yaml"
"github.com/go-kratos/kratos/v2/log"
)
var (
// ErrNotFound is key not found.
ErrNotFound = errors.New("key not found")
// ErrTypeAssert is type assert error.
ErrTypeAssert = errors.New("type assert error")
_ Config = (*config)(nil)
)
// Observer is config observer.
type Observer func(string, Value)
// Config is a config interface.
type Config interface {
Load() error
Scan(v interface{}) error
Value(key string) Value
Watch(key string, o Observer) error
Close() error
}
type config struct {
opts options
reader Reader
cached sync.Map
observers sync.Map
watchers []Watcher
}
// New new a config with options.
func New(opts ...Option) Config {
o := options{
decoder: defaultDecoder,
resolver: defaultResolver,
}
for _, opt := range opts {
opt(&o)
}
return &config{
opts: o,
reader: newReader(o),
}
}
func (c *config) watch(w Watcher) {
for {
kvs, err := w.Next()
if errors.Is(err, context.Canceled) {
log.Infof("watcher's ctx cancel : %v", err)
return
}
if err != nil {
time.Sleep(time.Second)
log.Errorf("failed to watch next config: %v", err)
continue
}
if err := c.reader.Merge(kvs...); err != nil {
log.Errorf("failed to merge next config: %v", err)
continue
}
if err := c.reader.Resolve(); err != nil {
log.Errorf("failed to resolve next config: %v", err)
continue
}
c.cached.Range(func(key, value interface{}) bool {
k := key.(string)
v := value.(Value)
if n, ok := c.reader.Value(k); ok && reflect.TypeOf(n.Load()) == reflect.TypeOf(v.Load()) && !reflect.DeepEqual(n.Load(), v.Load()) {
v.Store(n.Load())
if o, ok := c.observers.Load(k); ok {
o.(Observer)(k, v)
}
}
return true
})
}
}
func (c *config) Load() error {
for _, src := range c.opts.sources {
kvs, err := src.Load()
if err != nil {
return err
}
for _, v := range kvs {
log.Debugf("config loaded: %s format: %s", v.Key, v.Format)
}
if err = c.reader.Merge(kvs...); err != nil {
log.Errorf("failed to merge config source: %v", err)
return err
}
w, err := src.Watch()
if err != nil {
log.Errorf("failed to watch config source: %v", err)
return err
}
c.watchers = append(c.watchers, w)
go c.watch(w)
}
if err := c.reader.Resolve(); err != nil {
log.Errorf("failed to resolve config source: %v", err)
return err
}
return nil
}
func (c *config) Value(key string) Value {
if v, ok := c.cached.Load(key); ok {
return v.(Value)
}
if v, ok := c.reader.Value(key); ok {
c.cached.Store(key, v)
return v
}
return &errValue{err: ErrNotFound}
}
func (c *config) Scan(v interface{}) error {
data, err := c.reader.Source()
if err != nil {
return err
}
return unmarshalJSON(data, v)
}
func (c *config) Watch(key string, o Observer) error {
if v := c.Value(key); v.Load() == nil {
return ErrNotFound
}
c.observers.Store(key, o)
return nil
}
func (c *config) Close() error {
for _, w := range c.watchers {
if err := w.Stop(); err != nil {
return err
}
}
return nil
}