forked from purpleidea/mgmt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
resources.go
182 lines (162 loc) · 6.53 KB
/
resources.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// Mgmt
// Copyright (C) 2013-2018+ James Shubin and the project contributors
// Written by James Shubin <james@shubin.ca> and the project contributors
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package etcd
import (
"fmt"
"log"
"strings"
"github.com/purpleidea/mgmt/engine"
engineUtil "github.com/purpleidea/mgmt/engine/util"
"github.com/purpleidea/mgmt/util"
etcd "github.com/coreos/etcd/clientv3"
)
// WatchResources returns a channel that outputs events when exported resources
// change.
// TODO: Filter our watch (on the server side if possible) based on the
// collection prefixes and filters that we care about...
func WatchResources(obj *EmbdEtcd) chan error {
ch := make(chan error, 1) // buffer it so we can measure it
path := fmt.Sprintf("%s/exported/", NS)
callback := func(re *RE) error {
// TODO: is this even needed? it used to happen on conn errors
log.Printf("Etcd: Watch: Path: %v", path) // event
if re == nil || re.response.Canceled {
return fmt.Errorf("watch is empty") // will cause a CtxError+retry
}
// we normally need to check if anything changed since the last
// event, since a set (export) with no changes still causes the
// watcher to trigger and this would cause an infinite loop. we
// don't need to do this check anymore because we do the export
// transactionally, and only if a change is needed. since it is
// atomic, all the changes arrive together which avoids dupes!!
if len(ch) == 0 { // send event only if one isn't pending
// this check avoids multiple events all queueing up and then
// being released continuously long after the changes stopped
// do not block!
ch <- nil // event
}
return nil
}
_, _ = obj.AddWatcher(path, callback, true, false, etcd.WithPrefix()) // no need to check errors
return ch
}
// SetResources exports all of the resources which we pass in to etcd.
func SetResources(obj *EmbdEtcd, hostname string, resourceList []engine.Res) error {
// key structure is $NS/exported/$hostname/resources/$uid = $data
var kindFilter []string // empty to get from everyone
hostnameFilter := []string{hostname}
// this is not a race because we should only be reading keys which we
// set, and there should not be any contention with other hosts here!
originals, err := GetResources(obj, hostnameFilter, kindFilter)
if err != nil {
return err
}
if len(originals) == 0 && len(resourceList) == 0 { // special case of no add or del
return nil
}
ifs := []etcd.Cmp{} // list matching the desired state
ops := []etcd.Op{} // list of ops in this transaction
for _, res := range resourceList {
if res.Kind() == "" {
log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.Name())
}
uid := fmt.Sprintf("%s/%s", res.Kind(), res.Name())
path := fmt.Sprintf("%s/exported/%s/resources/%s", NS, hostname, uid)
if data, err := engineUtil.ResToB64(res); err == nil {
ifs = append(ifs, etcd.Compare(etcd.Value(path), "=", data)) // desired state
ops = append(ops, etcd.OpPut(path, data))
} else {
return fmt.Errorf("can't convert to B64: %v", err)
}
}
match := func(res engine.Res, resourceList []engine.Res) bool { // helper lambda
for _, x := range resourceList {
if res.Kind() == x.Kind() && res.Name() == x.Name() {
return true
}
}
return false
}
hasDeletes := false
// delete old, now unused resources here...
for _, res := range originals {
if res.Kind() == "" {
log.Fatalf("Etcd: SetResources: Error: Empty kind: %v", res.Name())
}
uid := fmt.Sprintf("%s/%s", res.Kind(), res.Name())
path := fmt.Sprintf("%s/exported/%s/resources/%s", NS, hostname, uid)
if match(res, resourceList) { // if we match, no need to delete!
continue
}
ops = append(ops, etcd.OpDelete(path))
hasDeletes = true
}
// if everything is already correct, do nothing, otherwise, run the ops!
// it's important to do this in one transaction, and atomically, because
// this way, we only generate one watch event, and only when it's needed
if hasDeletes { // always run, ifs don't matter
_, err = obj.Txn(nil, ops, nil) // TODO: does this run? it should!
} else {
_, err = obj.Txn(ifs, nil, ops) // TODO: do we need to look at response?
}
return err
}
// GetResources collects all of the resources which match a filter from etcd.
// If the kindfilter or hostnameFilter is empty, then it assumes no filtering...
// TODO: Expand this with a more powerful filter based on what we eventually
// support in our collect DSL. Ideally a server side filter like WithFilter()
// We could do this if the pattern was $NS/exported/$kind/$hostname/$uid = $data.
func GetResources(obj *EmbdEtcd, hostnameFilter, kindFilter []string) ([]engine.Res, error) {
// key structure is $NS/exported/$hostname/resources/$uid = $data
path := fmt.Sprintf("%s/exported/", NS)
resourceList := []engine.Res{}
keyMap, err := obj.Get(path, etcd.WithPrefix(), etcd.WithSort(etcd.SortByKey, etcd.SortAscend))
if err != nil {
return nil, fmt.Errorf("could not get resources: %v", err)
}
for key, val := range keyMap {
if !strings.HasPrefix(key, path) { // sanity check
continue
}
str := strings.Split(key[len(path):], "/")
if len(str) != 4 {
return nil, fmt.Errorf("unexpected chunk count")
}
hostname, r, kind, name := str[0], str[1], str[2], str[3]
if r != "resources" {
return nil, fmt.Errorf("unexpected chunk pattern")
}
if kind == "" {
return nil, fmt.Errorf("unexpected kind chunk")
}
// FIXME: ideally this would be a server side filter instead!
if len(hostnameFilter) > 0 && !util.StrInList(hostname, hostnameFilter) {
continue
}
// FIXME: ideally this would be a server side filter instead!
if len(kindFilter) > 0 && !util.StrInList(kind, kindFilter) {
continue
}
if obj, err := engineUtil.B64ToRes(val); err == nil {
log.Printf("Etcd: Get: (Hostname, Kind, Name): (%s, %s, %s)", hostname, kind, name)
resourceList = append(resourceList, obj)
} else {
return nil, fmt.Errorf("can't convert from B64: %v", err)
}
}
return resourceList, nil
}