-
Notifications
You must be signed in to change notification settings - Fork 14
/
integrate.go
150 lines (127 loc) · 3.4 KB
/
integrate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package sabakan
import (
"context"
"time"
"github.com/cybozu-go/cke"
"github.com/cybozu-go/cke/metrics"
"github.com/cybozu-go/cke/server"
"github.com/cybozu-go/log"
clientv3 "go.etcd.io/etcd/client/v3"
)
type sabakanContextKey string
const (
// WaitSecs is a context key to pass to change the wait seconds
// before removing retired nodes from the cluster.
WaitSecs = sabakanContextKey("wait secs")
)
type integrator struct {
etcd *clientv3.Client
}
// NewIntegrator returns server.Integrator to add sabakan integration
// feature to CKE.
func NewIntegrator(etcd *clientv3.Client) server.Integrator {
return integrator{etcd: etcd}
}
func (ig integrator) StartWatch(ctx context.Context, ch chan<- struct{}) error {
wch := ig.etcd.Watch(ctx, "", clientv3.WithPrefix(), clientv3.WithFilterDelete())
for resp := range wch {
if err := resp.Err(); err != nil {
return err
}
for _, ev := range resp.Events {
switch string(ev.Kv.Key) {
case cke.KeyConstraints, cke.KeySabakanTemplate, cke.KeySabakanURL:
select {
case ch <- struct{}{}:
default:
}
}
}
}
return nil
}
func (ig integrator) Init(ctx context.Context, leaderKey string) error {
return ig.run(ctx, leaderKey, nil, true)
}
func (ig integrator) Do(ctx context.Context, leaderKey string, clusterStatus *cke.ClusterStatus) error {
return ig.run(ctx, leaderKey, clusterStatus, false)
}
// Do references WaitSecs in ctx to change the wait second value.
func (ig integrator) run(ctx context.Context, leaderKey string, clusterStatus *cke.ClusterStatus, onlyRegenerate bool) error {
st := cke.Storage{Client: ig.etcd}
disabled, err := st.IsSabakanDisabled(ctx)
if err != nil {
return err
}
if disabled {
return nil
}
tmpl, rev, err := st.GetSabakanTemplate(ctx)
switch err {
case cke.ErrNotFound:
return nil
case nil:
default:
return err
}
machines, err := Query(ctx, st)
if err != nil {
// the error is either harmless (cke.ErrNotFound) or already
// logged by well.HTTPClient.
if err != cke.ErrNotFound {
log.Warn("sabakan: query failed", map[string]interface{}{
log.FnError: err,
})
}
return nil
}
cluster, crev, err := st.GetClusterWithRevision(ctx)
if err != nil && err != cke.ErrNotFound {
return err
}
tmplUpdated := (rev != crev)
cstr, err := st.GetConstraints(ctx)
switch err {
case cke.ErrNotFound:
cstr = cke.DefaultConstraints()
case nil:
default:
return err
}
g := NewGenerator(tmpl, cstr, machines, clusterStatus, time.Now())
val := ctx.Value(WaitSecs)
if val != nil {
if secs, ok := val.(float64); ok {
g.SetWaitSeconds(secs)
}
}
var newc *cke.Cluster
if onlyRegenerate {
if cluster != nil && tmplUpdated {
newc, err = g.Regenerate(cluster)
}
} else {
if cluster == nil {
newc, err = g.Generate()
} else {
newc, err = g.Update(cluster)
if newc == nil && err == nil && tmplUpdated {
newc, err = g.Regenerate(cluster)
}
}
}
if err != nil {
metrics.UpdateSabakanIntegration(false, nil, 0, time.Now().UTC())
log.Warn("sabakan: failed to generate cluster", map[string]interface{}{
log.FnError: err,
})
// lint:ignore nilerr Some restriction was not satisfied. Try again.
return nil
}
metrics.UpdateSabakanIntegration(true, g.countWorkerByRole, len(g.nextUnused), time.Now().UTC())
if newc == nil {
log.Debug("sabakan: nothing to do", nil)
return nil
}
return st.PutClusterWithTemplateRevision(ctx, newc, rev, leaderKey)
}