-
Notifications
You must be signed in to change notification settings - Fork 575
/
pgtaskcontroller.go
291 lines (240 loc) · 9 KB
/
pgtaskcontroller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package pgtask
/*
Copyright 2017 - 2020 Crunchy Data Solutions, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"context"
"strings"
"sync"
crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1"
"github.com/crunchydata/postgres-operator/config"
"github.com/crunchydata/postgres-operator/kubeapi"
"github.com/crunchydata/postgres-operator/ns"
"github.com/crunchydata/postgres-operator/operator"
backrestoperator "github.com/crunchydata/postgres-operator/operator/backrest"
clusteroperator "github.com/crunchydata/postgres-operator/operator/cluster"
pgdumpoperator "github.com/crunchydata/postgres-operator/operator/pgdump"
taskoperator "github.com/crunchydata/postgres-operator/operator/task"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
// Controller holds connections for the controller
type Controller struct {
PgtaskConfig *rest.Config
PgtaskClient *rest.RESTClient
PgtaskScheme *runtime.Scheme
PgtaskClientset *kubernetes.Clientset
Queue workqueue.RateLimitingInterface
Ctx context.Context
informerNsMutex sync.Mutex
InformerNamespaces map[string]struct{}
}
// Run starts an pgtask resource controller
func (c *Controller) Run() error {
log.Debug("Watch Pgtask objects")
//shut down the work queue to cause workers to end
defer c.Queue.ShutDown()
// Watch Example objects
err := c.watchPgtasks(c.Ctx)
if err != nil {
log.Errorf("Failed to register watch for Pgtask resource: %v", err)
return err
}
<-c.Ctx.Done()
return c.Ctx.Err()
}
// watchPgtasks watches the pgtask resource catching events
func (c *Controller) watchPgtasks(ctx context.Context) error {
nsList := ns.GetNamespaces(c.PgtaskClientset, operator.InstallationName)
for i := 0; i < len(nsList); i++ {
log.Infof("starting pgtask controller on ns [%s]", nsList[i])
c.SetupWatch(nsList[i])
}
return nil
}
func (c *Controller) RunWorker() {
//process the 'add' work queue forever
for c.processNextItem() {
}
}
func (c *Controller) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := c.Queue.Get()
if quit {
return false
}
log.Debugf("working on %s", key.(string))
keyParts := strings.Split(key.(string), "/")
keyNamespace := keyParts[0]
keyResourceName := keyParts[1]
log.Debugf("queue got key ns=[%s] resource=[%s]", keyNamespace, keyResourceName)
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer c.Queue.Done(key)
tmpTask := crv1.Pgtask{}
found, err := kubeapi.Getpgtask(c.PgtaskClient, &tmpTask, keyResourceName, keyNamespace)
if !found {
log.Errorf("ERROR onAdd getting pgtask : %s", err.Error())
return false
}
//update pgtask
state := crv1.PgtaskStateProcessed
message := "Successfully processed Pgtask by controller"
err = kubeapi.PatchpgtaskStatus(c.PgtaskClient, state, message, &tmpTask, keyNamespace)
if err != nil {
log.Errorf("ERROR onAdd updating pgtask status: %s", err.Error())
return false
}
//process the incoming task
switch tmpTask.Spec.TaskType {
case crv1.PgtaskMinorUpgrade:
log.Debug("delete minor upgrade task added")
clusteroperator.AddUpgrade(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
case crv1.PgtaskDeletePgbouncer:
log.Debug("delete pgbouncer task added")
clusteroperator.DeletePgbouncerFromPgTask(c.PgtaskClientset, c.PgtaskClient, c.PgtaskConfig, &tmpTask)
case crv1.PgtaskAddPgbouncer:
log.Debug("add pgbouncer task added")
clusteroperator.AddPgbouncerFromPgTask(c.PgtaskClientset, c.PgtaskClient, c.PgtaskConfig, &tmpTask)
case crv1.PgtaskUpdatePgbouncer:
log.Debug("update pgbouncer task added")
clusteroperator.UpdatePgbouncerFromPgTask(c.PgtaskClientset, c.PgtaskClient, c.PgtaskConfig, &tmpTask)
case crv1.PgtaskFailover:
log.Debug("failover task added")
if !dupeFailover(c.PgtaskClient, &tmpTask, keyNamespace) {
clusteroperator.FailoverBase(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask, c.PgtaskConfig)
} else {
log.Debug("skipping duplicate onAdd failover task %s/%s", keyNamespace, keyResourceName)
}
case crv1.PgtaskDeleteData:
log.Debug("delete data task added")
if !dupeDeleteData(c.PgtaskClient, &tmpTask, keyNamespace) {
taskoperator.RemoveData(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask)
} else {
log.Debug("skipping duplicate onAdd delete data task %s/%s", keyNamespace, keyResourceName)
}
case crv1.PgtaskDeleteBackups:
log.Debug("delete backups task added")
taskoperator.RemoveBackups(keyNamespace, c.PgtaskClientset, &tmpTask)
case crv1.PgtaskBackrest:
log.Debug("backrest task added")
backrestoperator.Backrest(keyNamespace, c.PgtaskClientset, &tmpTask)
case crv1.PgtaskBackrestRestore:
log.Debug("backrest restore task added")
backrestoperator.Restore(c.PgtaskClient, keyNamespace, c.PgtaskClientset, &tmpTask)
case crv1.PgtaskpgDump:
log.Debug("pgDump task added")
pgdumpoperator.Dump(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask)
case crv1.PgtaskpgRestore:
log.Debug("pgDump restore task added")
pgdumpoperator.Restore(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask)
case crv1.PgtaskAutoFailover:
log.Debugf("autofailover task added %s", keyResourceName)
case crv1.PgtaskWorkflow:
log.Debugf("workflow task added [%s] ID [%s]", keyResourceName, tmpTask.Spec.Parameters[crv1.PgtaskWorkflowID])
case crv1.PgtaskCloneStep1, crv1.PgtaskCloneStep2, crv1.PgtaskCloneStep3:
log.Debug("clone task added [%s]", keyResourceName)
clusteroperator.Clone(c.PgtaskClientset, c.PgtaskClient, keyNamespace, &tmpTask)
default:
log.Debugf("unknown task type on pgtask added [%s]", tmpTask.Spec.TaskType)
}
return true
}
// onAdd is called when a pgtask is added
func (c *Controller) onAdd(obj interface{}) {
task := obj.(*crv1.Pgtask)
//handle the case of when the operator restarts, we do not want
//to process pgtasks already processed
if task.Status.State == crv1.PgtaskStateProcessed {
log.Debug("pgtask " + task.ObjectMeta.Name + " already processed")
return
}
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
log.Debugf("task putting key in queue %s", key)
c.Queue.Add(key)
}
}
// onUpdate is called when a pgtask is updated
func (c *Controller) onUpdate(oldObj, newObj interface{}) {
//task := newObj.(*crv1.Pgtask)
// log.Debugf("[Controller] onUpdate ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink)
}
// onDelete is called when a pgtask is deleted
func (c *Controller) onDelete(obj interface{}) {
}
func (c *Controller) SetupWatch(ns string) {
// don't create informer for namespace if one has already been created
c.informerNsMutex.Lock()
defer c.informerNsMutex.Unlock()
if _, ok := c.InformerNamespaces[ns]; ok {
return
}
c.InformerNamespaces[ns] = struct{}{}
source := cache.NewListWatchFromClient(
c.PgtaskClient,
crv1.PgtaskResourcePlural,
ns,
fields.Everything())
_, controller := cache.NewInformer(
source,
// The object type.
&crv1.Pgtask{},
// resyncPeriod
// Every resyncPeriod, all resources in the cache will retrigger events.
// Set to 0 to disable the resync.
0,
// Your custom resource event handlers.
cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
DeleteFunc: c.onDelete,
})
go controller.Run(c.Ctx.Done())
log.Debugf("pgtask Controller created informer for namespace %s", ns)
}
//de-dupe logic for a failover, if the failover started
//parameter is set, it means a failover has already been
//started on this
func dupeFailover(restClient *rest.RESTClient, task *crv1.Pgtask, ns string) bool {
tmp := crv1.Pgtask{}
found, _ := kubeapi.Getpgtask(restClient, &tmp, task.Spec.Name, ns)
if !found {
//a big time error if this occurs
return false
}
if tmp.Spec.Parameters[config.LABEL_FAILOVER_STARTED] == "" {
return false
}
return true
}
//de-dupe logic for a delete data, if the delete data job started
//parameter is set, it means a delete data job has already been
//started on this
func dupeDeleteData(restClient *rest.RESTClient, task *crv1.Pgtask, ns string) bool {
tmp := crv1.Pgtask{}
found, _ := kubeapi.Getpgtask(restClient, &tmp, task.Spec.Name, ns)
if !found {
//a big time error if this occurs
return false
}
if tmp.Spec.Parameters[config.LABEL_DELETE_DATA_STARTED] == "" {
return false
}
return true
}