/
pgtaskcontroller.go
248 lines (207 loc) · 8.24 KB
/
pgtaskcontroller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package pgtask
/*
Copyright 2017 - 2022 Crunchy Data Solutions, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import (
"encoding/json"
"strings"
"github.com/crunchydata/postgres-operator/internal/config"
"github.com/crunchydata/postgres-operator/internal/kubeapi"
backrestoperator "github.com/crunchydata/postgres-operator/internal/operator/backrest"
clusteroperator "github.com/crunchydata/postgres-operator/internal/operator/cluster"
pgdumpoperator "github.com/crunchydata/postgres-operator/internal/operator/pgdump"
taskoperator "github.com/crunchydata/postgres-operator/internal/operator/task"
crv1 "github.com/crunchydata/postgres-operator/pkg/apis/crunchydata.com/v1"
pgo "github.com/crunchydata/postgres-operator/pkg/generated/clientset/versioned"
informers "github.com/crunchydata/postgres-operator/pkg/generated/informers/externalversions/crunchydata.com/v1"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
// Controller holds connections for the controller
type Controller struct {
Client *kubeapi.Client
Queue workqueue.RateLimitingInterface
Informer informers.PgtaskInformer
PgtaskWorkerCount int
}
// RunWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) RunWorker(stopCh <-chan struct{}, doneCh chan<- struct{}) {
go c.waitForShutdown(stopCh)
for c.processNextItem() {
}
log.Debug("pgtask Contoller: worker queue has been shutdown, writing to the done channel")
doneCh <- struct{}{}
}
// waitForShutdown waits for a message on the stop channel and then shuts down the work queue
func (c *Controller) waitForShutdown(stopCh <-chan struct{}) {
<-stopCh
c.Queue.ShutDown()
log.Debug("pgtask Contoller: received stop signal, worker queue told to shutdown")
}
func (c *Controller) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := c.Queue.Get()
if quit {
return false
}
log.Debugf("working on %s", key.(string))
keyParts := strings.Split(key.(string), "/")
keyNamespace := keyParts[0]
keyResourceName := keyParts[1]
log.Debugf("queue got key ns=[%s] resource=[%s]", keyNamespace, keyResourceName)
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer c.Queue.Done(key)
tmpTask, err := c.Client.CrunchydataV1().Pgtasks(keyNamespace).Get(keyResourceName, metav1.GetOptions{})
if err != nil {
log.Errorf("ERROR onAdd getting pgtask : %s", err.Error())
c.Queue.Forget(key) // NB(cbandy): This should probably be a retry.
return true
}
//update pgtask
patch, err := json.Marshal(map[string]interface{}{
"status": crv1.PgtaskStatus{
State: crv1.PgtaskStateProcessed,
Message: "Successfully processed Pgtask by controller",
},
})
if err == nil {
_, err = c.Client.CrunchydataV1().Pgtasks(keyNamespace).Patch(tmpTask.Name, types.MergePatchType, patch)
}
if err != nil {
log.Errorf("ERROR onAdd updating pgtask status: %s", err.Error())
c.Queue.Forget(key) // NB(cbandy): This should probably be a retry.
return true
}
//process the incoming task
switch tmpTask.Spec.TaskType {
case crv1.PgtaskPgAdminAdd:
log.Debug("add pgadmin task added")
clusteroperator.AddPgAdminFromPgTask(c.Client, c.Client.Config, tmpTask)
case crv1.PgtaskPgAdminDelete:
log.Debug("delete pgadmin task added")
clusteroperator.DeletePgAdminFromPgTask(c.Client, c.Client.Config, tmpTask)
case crv1.PgtaskUpgrade:
log.Debug("upgrade task added")
clusteroperator.AddUpgrade(c.Client, tmpTask, keyNamespace)
case crv1.PgtaskFailover:
log.Debug("failover task added")
if !dupeFailover(c.Client, tmpTask, keyNamespace) {
clusteroperator.FailoverBase(keyNamespace, c.Client, tmpTask, c.Client.Config)
} else {
log.Debugf("skipping duplicate onAdd failover task %s/%s", keyNamespace, keyResourceName)
}
case crv1.PgtaskDeleteData:
log.Debug("delete data task added")
if !dupeDeleteData(c.Client, tmpTask, keyNamespace) {
taskoperator.RemoveData(keyNamespace, c.Client, tmpTask)
} else {
log.Debugf("skipping duplicate onAdd delete data task %s/%s", keyNamespace, keyResourceName)
}
case crv1.PgtaskDeleteBackups:
log.Debug("delete backups task added")
taskoperator.RemoveBackups(keyNamespace, c.Client, tmpTask)
case crv1.PgtaskBackrest:
log.Debug("backrest task added")
backrestoperator.Backrest(keyNamespace, c.Client, tmpTask)
case crv1.PgtaskBackrestRestore:
log.Debug("backrest restore task added")
c.handleBackrestRestore(tmpTask)
case crv1.PgtaskpgDump:
log.Debug("pgDump task added")
pgdumpoperator.Dump(keyNamespace, c.Client, tmpTask)
case crv1.PgtaskpgRestore:
log.Debug("pgDump restore task added")
pgdumpoperator.Restore(keyNamespace, c.Client, tmpTask)
case crv1.PgtaskAutoFailover:
log.Debugf("autofailover task added %s", keyResourceName)
case crv1.PgtaskWorkflow:
log.Debugf("workflow task added [%s] ID [%s]", keyResourceName, tmpTask.Spec.Parameters[crv1.PgtaskWorkflowID])
case crv1.PgtaskCloneStep1, crv1.PgtaskCloneStep2, crv1.PgtaskCloneStep3:
log.Debugf("clone task added [%s]", keyResourceName)
clusteroperator.Clone(c.Client, c.Client.Config, keyNamespace, tmpTask)
default:
log.Debugf("unknown task type on pgtask added [%s]", tmpTask.Spec.TaskType)
}
c.Queue.Forget(key)
return true
}
// onAdd is called when a pgtask is added
func (c *Controller) onAdd(obj interface{}) {
task := obj.(*crv1.Pgtask)
//handle the case of when the operator restarts, we do not want
//to process pgtasks already processed
if task.Status.State == crv1.PgtaskStateProcessed {
log.Debug("pgtask " + task.ObjectMeta.Name + " already processed")
return
}
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
log.Debugf("task putting key in queue %s", key)
c.Queue.Add(key)
}
}
// onUpdate is called when a pgtask is updated
func (c *Controller) onUpdate(oldObj, newObj interface{}) {
//task := newObj.(*crv1.Pgtask)
// log.Debugf("[Controller] onUpdate ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink)
}
// onDelete is called when a pgtask is deleted
func (c *Controller) onDelete(obj interface{}) {
}
// AddPGTaskEventHandler adds the pgtask event handler to the pgtask informer
func (c *Controller) AddPGTaskEventHandler() {
c.Informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.onAdd,
UpdateFunc: c.onUpdate,
DeleteFunc: c.onDelete,
})
log.Debugf("pgtask Controller: added event handler to informer")
}
//de-dupe logic for a failover, if the failover started
//parameter is set, it means a failover has already been
//started on this
func dupeFailover(clientset pgo.Interface, task *crv1.Pgtask, ns string) bool {
tmp, err := clientset.CrunchydataV1().Pgtasks(ns).Get(task.Spec.Name, metav1.GetOptions{})
if err != nil {
//a big time error if this occurs
return false
}
if tmp.Spec.Parameters[config.LABEL_FAILOVER_STARTED] == "" {
return false
}
return true
}
//de-dupe logic for a delete data, if the delete data job started
//parameter is set, it means a delete data job has already been
//started on this
func dupeDeleteData(clientset pgo.Interface, task *crv1.Pgtask, ns string) bool {
tmp, err := clientset.CrunchydataV1().Pgtasks(ns).Get(task.Spec.Name, metav1.GetOptions{})
if err != nil {
//a big time error if this occurs
return false
}
if tmp.Spec.Parameters[config.LABEL_DELETE_DATA_STARTED] == "" {
return false
}
return true
}
// WorkerCount returns the worker count for the controller
func (c *Controller) WorkerCount() int {
return c.PgtaskWorkerCount
}