-
Notifications
You must be signed in to change notification settings - Fork 1
/
confdis.go
150 lines (133 loc) · 3.59 KB
/
confdis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// confdis manages JSON based configuration in redis
package confdis
import (
"encoding/json"
"fmt"
"github.com/vmihailenco/redis"
"reflect"
"sync"
)
type structCreator func() interface{}
type ConfDis struct {
rootKey string
structType reflect.Type
config interface{} // Read-only view of current config tree.
rev int64
mux sync.Mutex // Mutex to protect changes to config and rev.
redis *redis.Client
Changes chan error // Channel to receive config updates (value is return of reload())
}
const PUB_SUFFIX = ":_changes"
const PUB_VALUE = "{\"library\": \"confdis\"}"
func New(client *redis.Client, rootKey string, structVal interface{}) (*ConfDis, error) {
c := ConfDis{}
c.redis = client
c.rootKey = rootKey
c.structType = reflect.TypeOf(structVal)
c.config = createStruct(c.structType)
c.Changes = make(chan error)
if _, empty, err := c.reload(); err != nil {
// Ignore if config doesn't already exist; it can be created
// later.
if !empty {
return nil, err
}
}
return &c, c.watch()
}
// GetConfig returns the current snapshot of config struct.
func (c *ConfDis) GetConfig() interface{} {
c.mux.Lock()
defer c.mux.Unlock()
return c.config
}
// MustReceiveChanges listens for change notifications and updates the
// internal config. Will panic if there is an error reading the new
// config.
func (c *ConfDis) MustReceiveChanges() {
for err := range c.Changes {
if err != nil {
panic(err)
}
}
}
// AtomicSave is like save, but only writes the changed config back to
// redis if somebody else did not make a change already (notified via
// pubsub). Note that the converse is not necessarily true; somebody
// else -- specifically, reload() -- *could* overwrite the changes
// written by AtomicSave.
func (c *ConfDis) AtomicSave(editFn func(interface{}) error) error {
c.mux.Lock()
previousconfig := c.config
previousRev := c.rev
c.mux.Unlock()
if err := editFn(previousconfig); err != nil {
return err
}
c.mux.Lock()
defer c.mux.Unlock()
// Was config changed interim by reload()?
if c.rev != previousRev {
return fmt.Errorf(
"config already changed (rev %d -> %d)", previousRev, c.rev)
}
if err := c.save(); err != nil {
return err
}
c.rev += 1
return nil
}
func createStruct(t reflect.Type) interface{} {
return reflect.New(t).Interface()
}
func (c *ConfDis) save() error {
if data, err := json.Marshal(c.config); err != nil {
return err
} else {
if r := c.redis.Set(c.rootKey, string(data)); r.Err() != nil {
return r.Err()
}
if r := c.redis.Publish(c.rootKey+PUB_SUFFIX, PUB_VALUE); r.Err() != nil {
return r.Err()
}
}
return nil
}
// reload reloads the config tree from redis.
func (c *ConfDis) reload() (interface{}, bool, error) {
if r := c.redis.Get(c.rootKey); r.Err() != nil {
return nil, true, fmt.Errorf("redis: Failed to GET %v; %v", c.rootKey, r.Err())
} else {
config2 := createStruct(c.structType)
if err := json.Unmarshal([]byte(r.Val()), config2); err != nil {
return nil, false, fmt.Errorf("failed to decode JSON response for redis GET %v; %v", c.rootKey, err)
}
c.mux.Lock()
defer c.mux.Unlock()
config1 := c.config
c.config = config2
c.rev += 1
return config1, false, nil
}
panic("unreachable")
}
// watch watches for changes from other clients
func (c *ConfDis) watch() error {
pubsub, err := c.redis.PubSubClient()
if err != nil {
return err
}
ch, err := pubsub.Subscribe(c.rootKey + PUB_SUFFIX)
if err != nil {
return err
}
go func() {
for {
<-ch
_, _, err := c.reload()
// TODO: pass old config (_) if necessary in the future.
c.Changes <- err
}
}()
return nil
}