-
Notifications
You must be signed in to change notification settings - Fork 2
/
domain.go
374 lines (325 loc) · 10.1 KB
/
domain.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
package domain
import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"
platformsync "github.com/hailo-platform/H2O/service/sync"
sjson "github.com/bitly/go-simplejson"
)
const (
sjsonnull = "null"
)
var (
ErrPathNotFound = errors.New("Config path not found")
ErrIdNotFound = errors.New("Config ID not found")
DefaultRepository ConfigRepository
emptyConfig = []byte("{}")
)
// ChangeSet represents some change to our config
type ChangeSet struct {
// Id is a unique ID for the change
Id string `cf:"configservice" key:"Id" name:"id" json:"id"`
// Body is the JSON being applied
Body []byte `name:"body" json:"body"`
// Timestamp is when this happened
Timestamp time.Time `name:"timestamp" json:"timestamp"`
// UserMech identifies the authentication mechanism of the scope from which this change was applied
UserMech string `name:"userMech" json:"userMech"`
// UserId identifies the authenticated user ID of the scope from which this change was applied
UserId string `name:"userId" json:"userId"`
// Message tells us some human-readable message about the change
Message string `name:"message" json:"message"`
// ChangeId is some unique ID for this change
ChangeId string `name:"changeId" json:"changeId"`
// Path of the config being updated
Path string `name:"path" json:"path"`
// Old value for the config
OldConfig []byte `name:"oldConfig" json:"oldConfig"`
}
type ConfigRepository interface {
ReadConfig(ids []string) ([]*ChangeSet, error)
UpdateConfig(cs *ChangeSet) error
ChangeLog(start, end time.Time, count int, lastId string) ([]*ChangeSet, string, error)
ServiceChangeLog(id string, start, end time.Time, count int, lastId string) ([]*ChangeSet, string, error)
}
func readConfigAtPath(body []byte, path string) ([]byte, error) {
if len(path) == 0 {
return body, nil
}
if len(body) == 0 {
return nil, ErrPathNotFound
}
sj, err := sjson.NewJson(body)
if err != nil {
return nil, fmt.Errorf("Error parsing JSON: %v", err)
}
if path != "" {
sj = sj.GetPath(strings.Split(path, "/")...)
}
bytes, err := sj.Encode()
if err != nil {
return nil, fmt.Errorf("Error getting bytes: %v", err)
}
if string(bytes) == sjsonnull {
return nil, ErrPathNotFound
}
return bytes, nil
}
// ReadConfig returns the config item with the specified id.
// The path is optional, and if included will only return the contents at that path within the config.
// For reference, the changeset is included which will contain meta data about the last update
func ReadConfig(id, path string) ([]byte, *ChangeSet, error) {
configs, err := DefaultRepository.ReadConfig([]string{id})
if err != nil {
return nil, nil, fmt.Errorf("Error getting config from DAO: %v", err)
}
if len(configs) != 1 {
return nil, nil, ErrIdNotFound
}
b, err := readConfigAtPath(configs[0].Body, path)
return b, configs[0], err
}
// ChangeLog returns a time series list of changes
func ChangeLog(start, end time.Time, count int, lastId string) ([]*ChangeSet, string, error) {
chs, last, err := DefaultRepository.ChangeLog(start, end, count, lastId)
return chs, last, err
}
// ChangeLog returns a time series list of changes for the given ID
func ServiceChangeLog(id string, start, end time.Time, count int, lastId string) ([]*ChangeSet, string, error) {
chs, last, err := DefaultRepository.ServiceChangeLog(id, start, end, count, lastId)
return chs, last, err
}
// mergeMap starts with "a" and recursivley adds "b" on top
// "a" will be modified
// bId is the id of "b" which is used for explaining
func mergeMap(a, b map[string]interface{}, bId string, stack []string, explain bool) []string {
for k, v := range b {
m, ok := v.(map[string]interface{})
if ok {
// Keep walking
stack = append(stack, k)
stack = mergeMap(a, m, bId, stack, explain)
} else {
// We're at a "leaf"
// Walk down "a" creating nodes if we need to
// and set the value
anode := a
for _, pos := range stack {
if next, ok := anode[pos]; ok {
anode = next.(map[string]interface{})
continue
}
// Doesn't exist, create
anode[pos] = make(map[string]interface{})
anode = anode[pos].(map[string]interface{})
}
// Relace final node
if explain {
// In explain mode we set all values to the id
// of the config they came from
anode[k] = bId
} else {
anode[k] = v
}
}
}
// Pop item off stack
if len(stack) > 0 {
stack = stack[:len(stack)-1]
}
return stack
}
func compileConfig(ids []string, path string, explain bool) ([]byte, error) {
if explain {
// When explaining, we merge the first item in the list with itself
// This ensures that to start with, all values appear to have come from
// "a"
ids = append(ids[:1], ids...)
}
configs, err := DefaultRepository.ReadConfig(ids)
if err != nil {
return nil, fmt.Errorf("Error getting configs: %v", err)
}
var compiled map[string]interface{}
if len(configs) > 0 {
err = json.Unmarshal(configs[0].Body, &compiled)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling config: %v", err)
}
// Merge!
// Skip the first one since it's the base we're starting from
for i := 1; i < len(configs); i++ {
var config map[string]interface{}
err := json.Unmarshal(configs[i].Body, &config)
if err != nil {
return nil, fmt.Errorf("Error unmarshalling config: %v", err)
}
mergeMap(compiled, config, configs[i].Id, make([]string, 0), explain)
}
}
data, err := json.Marshal(compiled)
if err != nil {
return nil, fmt.Errorf("Error marshalling compiled JSON: %v", err)
}
b, err := readConfigAtPath(data, path)
if err == ErrPathNotFound {
return emptyConfig, nil
}
return b, err
}
// CompileConfig will combine multiple configs together.
func CompileConfig(ids []string, path string) ([]byte, error) {
return compileConfig(ids, path, false)
}
// ExplainConfig returns the compiled config except that instead of showing
// the original values, it shows which id was responsible for setting it
func ExplainConfig(ids []string, path string) ([]byte, error) {
return compileConfig(ids, path, true)
}
// DeleteConfig will delete the node at the specified path
// It will return ErrPathNotFound if the path does not exist
func DeleteConfig(changeId, id, path, userMech, userId, message string) error {
configs, err := DefaultRepository.ReadConfig([]string{id})
if err != nil || len(configs) != 1 {
return fmt.Errorf("Error getting config with id: %v", id)
}
var decoded map[string]interface{}
err = json.Unmarshal(configs[0].Body, &decoded)
if err != nil {
return fmt.Errorf("Error decoding config: %v", err)
}
if path != "" {
// Walk until we find the node we want
// return early if we can't find it
parts := strings.Split(path, "/")
node := decoded
ok := true
for _, part := range parts[:len(parts)-1] {
node, ok = node[part].(map[string]interface{})
if !ok {
return ErrPathNotFound
}
}
delete(node, parts[len(parts)-1])
} else {
// No path, drop everything
decoded = make(map[string]interface{})
}
encoded, err := json.Marshal(decoded)
if err != nil {
return fmt.Errorf("Error encoding config: %v", err)
}
oldConfig, err := readConfigAtPath(configs[0].Body, path)
if err != nil {
return fmt.Errorf("Error reading config at path %s : %s ", path, err.Error())
}
err = DefaultRepository.UpdateConfig(&ChangeSet{
Id: id,
Body: encoded,
Timestamp: time.Now(),
UserMech: userMech,
UserId: userId,
Message: message,
ChangeId: changeId,
Path: path,
OldConfig: oldConfig,
})
if err != nil {
return fmt.Errorf("Error saving config: %v", err)
}
return nil
}
// CreateOrUpdateConfig will create or update the config for id at the specified path.
// Message should be a description of the change.
// Data should be the JSON data.
// userMech identifies the authentication mechanism of the scope from which this change was applied
func CreateOrUpdateConfig(changeId, id, path, userMech, userId, message string, data []byte) error {
var newNode interface{}
err := json.Unmarshal(data, &newNode)
if err != nil {
return fmt.Errorf("New value is not valid JSON: %v", err)
}
lock, err := platformsync.RegionLock([]byte(id))
if err != nil {
return err
}
defer lock.Unlock()
configs, err := DefaultRepository.ReadConfig([]string{id})
if err != nil {
return fmt.Errorf("Error getting config from DAO: %v", err)
}
oldConfig := make([]byte, 0)
if len(configs) == 1 {
oldConfig, err = readConfigAtPath(configs[0].Body, path)
if err == ErrPathNotFound {
oldConfig = make([]byte, 0)
} else if err != nil {
return fmt.Errorf("Error getting config at path %s : %v", path, err)
}
}
if path == "" {
// If we are updating at the top level, it should be an object at the top level
var target map[string]interface{}
err = json.Unmarshal(data, &target)
if err != nil {
return fmt.Errorf("Top level config should be a JSON object")
}
return DefaultRepository.UpdateConfig(&ChangeSet{
Id: id,
Body: data,
Timestamp: time.Now(),
UserMech: userMech,
UserId: userId,
Message: message,
ChangeId: changeId,
Path: path,
OldConfig: oldConfig,
})
}
decoded := make(map[string]interface{})
if len(configs) == 1 && len(configs[0].Body) > 0 {
err := json.Unmarshal(configs[0].Body, &decoded)
if err != nil {
return fmt.Errorf("Error parsing JSON: %v", err)
}
}
// Walk down the path, making sure we have all the
// parent nodes we need
parent := decoded
parts := strings.Split(path, "/")
for i, part := range parts {
if i == len(parts)-1 {
// Replace final node
parent[part] = newNode
break
}
node, ok := parent[part]
if !ok {
// Make new node
parent[part] = make(map[string]interface{})
parent = parent[part].(map[string]interface{})
continue
}
parent = node.(map[string]interface{})
}
b, err := json.Marshal(decoded)
if err != nil {
return fmt.Errorf("Error encoding new config: %v", err)
}
return DefaultRepository.UpdateConfig(&ChangeSet{
Id: id,
Body: b,
Timestamp: time.Now(),
UserMech: userMech,
UserId: userId,
Message: message,
ChangeId: changeId,
Path: path,
OldConfig: oldConfig,
})
}
func lockPath(id string) string {
return fmt.Sprintf("/com.hailocab.service.config/%s", id)
}