From 4c78be60e0622bba1a3dc3a45140a39b4734897a Mon Sep 17 00:00:00 2001 From: Carvel Baus Date: Fri, 16 Aug 2019 16:30:49 -0400 Subject: [PATCH 1/4] Backport of Queue pgbackup PR#915 to operator 3.5 --- apis/cr/v1/backup.go | 1 + controller/backupcontroller.go | 194 ++++++++++++++++++++++++++++---- controller/jobcontroller.go | 1 + controller/replicacontroller.go | 122 ++++++++++++++++---- kubeapi/pgbackup.go | 79 +++++++++++++ kubeapi/pgreplica.go | 42 +++++++ operator/cluster/cluster.go | 4 + postgres-operator.go | 8 +- 8 files changed, 409 insertions(+), 42 deletions(-) diff --git a/apis/cr/v1/backup.go b/apis/cr/v1/backup.go index 646c620f66..47d6ef7ede 100644 --- a/apis/cr/v1/backup.go +++ b/apis/cr/v1/backup.go @@ -23,6 +23,7 @@ import ( const PgbackupResourcePlural = "pgbackups" // Backup job processing states - used by pgdump +const PgBackupJobReSubmitted = "Backup Job Re-Submitted" const PgBackupJobSubmitted = "Backup Job Submitted" const PgBackupJobInProgress = "Backup Job In Progress" const PgBackupJobCompleted = "Backup Job Completed" diff --git a/controller/backupcontroller.go b/controller/backupcontroller.go index bb06582408..b19d9fdc7d 100644 --- a/controller/backupcontroller.go +++ b/controller/backupcontroller.go @@ -24,9 +24,13 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "strings" crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" + "github.com/crunchydata/postgres-operator/kubeapi" + backupoperator "github.com/crunchydata/postgres-operator/operator/backup" + "k8s.io/client-go/util/workqueue" ) // PgbackupController holds connections required by the controller @@ -35,12 +39,18 @@ type PgbackupController struct { PgbackupScheme *runtime.Scheme PgbackupClientset *kubernetes.Clientset Namespace string + Queue workqueue.RateLimitingInterface + UpdateQueue workqueue.RateLimitingInterface } // Run starts controller func (c *PgbackupController) Run(ctx context.Context) error { log.Debugf("Watch Pgbackup objects") + //shut down the work queue to cause workers to end + defer c.Queue.ShutDown() + defer c.UpdateQueue.ShutDown() + _, err := c.watchPgbackups(ctx) if err != nil { log.Errorf("Failed to register watch for Pgbackup resource: %v", err) @@ -48,6 +58,9 @@ func (c *PgbackupController) Run(ctx context.Context) error { } <-ctx.Done() + + log.Debugf("Watch Pgbackup ending") + return ctx.Err() } @@ -81,6 +94,148 @@ func (c *PgbackupController) watchPgbackups(ctx context.Context) (cache.Controll return controller, nil } +func (c *PgbackupController) RunWorker() { + + //process the 'add' work queue forever + for c.processNextItem() { + } +} + +func (c *PgbackupController) RunUpdateWorker() { + + //process the 'add' work queue forever + for c.processNextUpdateItem() { + } +} + +func (c *PgbackupController) processNextUpdateItem() bool { + // Wait until there is a new item in the working queue + key, quit := c.UpdateQueue.Get() + if quit { + return false + } + + log.Debugf("working on %s", key.(string)) + keyParts := strings.Split(key.(string), "/") + keyNamespace := keyParts[0] + keyResourceName := keyParts[1] + + log.Debugf("update queue got key ns=[%s] resource=[%s]", keyNamespace, keyResourceName) + + // Tell the queue that we are done with processing this key. This unblocks the key for other workers + // This allows safe parallel processing because two pods with the same key are never processed in + // parallel. + defer c.UpdateQueue.Done(key) + + // Invoke the method containing the business logic + // for pgbackups, the convention is the CRD name is always + // the same as the pg-cluster label value + + // in this case, the de-dupe logic is to test whether a backup + // job is already running, if so, then we don't create another + // backup job + selector := "pg-cluster=" + keyResourceName + ",pgbackup=true" + jobs, err := kubeapi.GetJobs(c.PgbackupClientset, selector, keyNamespace) + if err != nil { + log.Errorf("update working...error found " + err.Error()) + return true + } + + jobRunning := false + for _, j := range jobs.Items { + if j.Status.Succeeded <= 0 { + jobRunning = true + } + } + + if jobRunning { + log.Debugf("update working...found job already, would do nothing") + } else { + log.Debugf("update working...no job found, means we process") + b := crv1.Pgbackup{} + found, err := kubeapi.Getpgbackup(c.PgbackupClient, &b, keyResourceName, keyNamespace) + if found { + state := crv1.PgbackupStateProcessed + message := "Successfully processed Pgbackup by controller" + err = kubeapi.PatchpgbackupStatus(c.PgbackupClient, state, message, &b, b.ObjectMeta.Namespace) + if err != nil { + log.Errorf("ERROR updating pgbackup status: %s", err.Error()) + } + + backupoperator.AddBackupBase(c.PgbackupClientset, c.PgbackupClient, &b, b.ObjectMeta.Namespace) + + //no error, tell the queue to stop tracking history + c.UpdateQueue.Forget(key) + } + } + + return true +} + +func (c *PgbackupController) processNextItem() bool { + // Wait until there is a new item in the working queue + key, quit := c.Queue.Get() + + if quit { + return false + } + + log.Debugf("working on %s", key.(string)) + keyParts := strings.Split(key.(string), "/") + keyNamespace := keyParts[0] + keyResourceName := keyParts[1] + + log.Debugf("queue got key ns=[%s] resource=[%s]", keyNamespace, keyResourceName) + + // Tell the queue that we are done with processing this key. This unblocks the key for other workers + // This allows safe parallel processing because two pods with the same key are never processed in + // parallel. + defer c.Queue.Done(key) + + // Invoke the method containing the business logic + // for pgbackups, the convention is the CRD name is always + // the same as the pg-cluster label value + + // in this case, the de-dupe logic is to test whether a backup + // job is already running, if so, then we don't create another + // backup job + selector := "pg-cluster=" + keyResourceName + ",pgbackup=true" + jobs, err := kubeapi.GetJobs(c.PgbackupClientset, selector, keyNamespace) + if err != nil { + log.Errorf("working...error found " + err.Error()) + return true + } + + jobRunning := false + for _, j := range jobs.Items { + if j.Status.Succeeded <= 0 { + jobRunning = true + } + } + + if jobRunning { + log.Debugf("working...found job already, would do nothing") + } else { + log.Debugf("working...no job found, means we process") + b := crv1.Pgbackup{} + found, err := kubeapi.Getpgbackup(c.PgbackupClient, &b, keyResourceName, keyNamespace) + if found { + state := crv1.PgbackupStateProcessed + message := "Successfully processed Pgbackup by controller" + err = kubeapi.PatchpgbackupStatus(c.PgbackupClient, state, message, &b, b.ObjectMeta.Namespace) + if err != nil { + log.Errorf("ERROR updating pgbackup status: %s", err.Error()) + } + backupoperator.AddBackupBase(c.PgbackupClientset, c.PgbackupClient, &b, b.ObjectMeta.Namespace) + + //no error, tell the queue to stop tracking history + c.Queue.Forget(key) + } + } + return true +} + + // onAdd is called when a pgbackup is added func (c *PgbackupController) onAdd(obj interface{}) { backup := obj.(*crv1.Pgbackup) @@ -93,37 +248,32 @@ func (c *PgbackupController) onAdd(obj interface{}) { return } - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use backupScheme.Copy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - copyObj := backup.DeepCopyObject() - - backupCopy := copyObj.(*crv1.Pgbackup) - backupCopy.Status = crv1.PgbackupStatus{ - State: crv1.PgbackupStateProcessed, - Message: "Successfully processed Pgbackup by controller", - } - - err := c.PgbackupClient.Put(). - Name(backup.ObjectMeta.Name). - Namespace(backup.ObjectMeta.Namespace). - Resource(crv1.PgbackupResourcePlural). - Body(backupCopy). - Do(). - Error() - - if err != nil { - log.Errorf("ERROR updating pgbackup status: %s", err.Error()) + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + log.Debugf("[PgbackupController] putting key in queue %s", key) + c.Queue.Add(key) } //handle new pgbackups - backupoperator.AddBackupBase(c.PgbackupClientset, c.PgbackupClient, backupCopy, backup.ObjectMeta.Namespace) + // backupoperator.AddBackupBase(c.PgbackupClientset, c.PgbackupClient, backupCopy, backup.ObjectMeta.Namespace) } // onUpdate is called when a pgbackup is updated func (c *PgbackupController) onUpdate(oldObj, newObj interface{}) { + oldBackup := oldObj.(*crv1.Pgbackup) backup := newObj.(*crv1.Pgbackup) log.Debugf("[PgbackupController] ns=%s onUpdate %s", backup.ObjectMeta.Namespace, backup.ObjectMeta.SelfLink) + + // check for re-submission of backup + if oldBackup.Spec.BackupStatus != crv1.PgBackupJobReSubmitted && backup.Spec.BackupStatus == crv1.PgBackupJobReSubmitted { + log.Debugf("[PgbackupController] ns=%s onUpdate %s re-submitted", backup.ObjectMeta.Namespace, backup.ObjectMeta.SelfLink) + key, err := cache.MetaNamespaceKeyFunc(oldObj) + if err == nil { + log.Debugf("[PgbackupController] putting key in update queue %s", key) + c.UpdateQueue.Add(key) + } +} + } // onDelete is called when a pgbackup is deleted diff --git a/controller/jobcontroller.go b/controller/jobcontroller.go index 7f8f986f1c..bf25f91f2e 100644 --- a/controller/jobcontroller.go +++ b/controller/jobcontroller.go @@ -117,6 +117,7 @@ func (c *JobController) onUpdate(oldObj, newObj interface{}) { status = crv1.JobErrorStatus } + // change this to patchpgbackupbackupstatus ?? if labels[util.LABEL_BACKREST] != "true" { err = util.Patch(c.JobClient, "/spec/backupstatus", status, "pgbackups", dbname, job.ObjectMeta.Namespace) if err != nil { diff --git a/controller/replicacontroller.go b/controller/replicacontroller.go index 3de6db1a86..4011090e11 100644 --- a/controller/replicacontroller.go +++ b/controller/replicacontroller.go @@ -25,7 +25,10 @@ import ( "k8s.io/client-go/tools/cache" crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" + "github.com/crunchydata/postgres-operator/kubeapi" clusteroperator "github.com/crunchydata/postgres-operator/operator/cluster" + "k8s.io/client-go/util/workqueue" + "strings" ) // PgreplicaController holds the connections for the controller @@ -34,11 +37,15 @@ type PgreplicaController struct { PgreplicaScheme *runtime.Scheme PgreplicaClientset *kubernetes.Clientset Namespace string + Ctx context.Context + Queue workqueue.RateLimitingInterface } // Run starts an pgreplica resource controller func (c *PgreplicaController) Run(ctx context.Context) error { + defer c.Queue.ShutDown() + _, err := c.watchPgreplicas(ctx) if err != nil { log.Errorf("Failed to register watch for Pgreplica resource: %v", err) @@ -79,6 +86,75 @@ func (c *PgreplicaController) watchPgreplicas(ctx context.Context) (cache.Contro return controller, nil } +func (c *PgreplicaController) RunWorker() { + + //process the 'add' work queue forever + for c.processNextItem() { + } +} + +func (c *PgreplicaController) processNextItem() bool { + // Wait until there is a new item in the working queue + key, quit := c.Queue.Get() + if quit { + return false + } + + log.Debugf("working on %s", key.(string)) + keyParts := strings.Split(key.(string), "/") + keyNamespace := keyParts[0] + keyResourceName := keyParts[1] + + log.Debugf("pgreplica queue got key ns=[%s] resource=[%s]", keyNamespace, keyResourceName) + + // Tell the queue that we are done with processing this key. This unblocks the key for other workers + // This allows safe parallel processing because two pods with the same key are never processed in + // parallel. + defer c.Queue.Done(key) + // Invoke the method containing the business logic + // for pgbackups, the convention is the CRD name is always + // the same as the pg-cluster label value + + // in this case, the de-dupe logic is to test whether a replica + // deployment exists already , if so, then we don't create another + // backup job + _, found, _ := kubeapi.GetDeployment(c.PgreplicaClientset, keyResourceName, keyNamespace) + + depRunning := false + if found { + depRunning = true + } + + if depRunning { + log.Debugf("working...found replica already, would do nothing") + } else { + log.Debugf("working...no replica found, means we process") + + //handle the case of when a pgreplica is added which is + //scaling up a cluster + replica := crv1.Pgreplica{} + found, err := kubeapi.Getpgreplica(c.PgreplicaClient, &replica, keyResourceName, keyNamespace) + if !found { + log.Error(err) + return false + } + clusteroperator.ScaleBase(c.PgreplicaClientset, c.PgreplicaClient, &replica, replica.ObjectMeta.Namespace) + + state := crv1.PgreplicaStateProcessed + message := "Successfully processed Pgreplica by controller" + err = kubeapi.PatchpgreplicaStatus(c.PgreplicaClient, state, message, &replica, replica.ObjectMeta.Namespace) + if err != nil { + log.Errorf("ERROR updating pgreplica status: %s", err.Error()) + } + + //no error, tell the queue to stop tracking history + c.Queue.Forget(key) + } + return true +} + + + // onAdd is called when a pgreplica is added func (c *PgreplicaController) onAdd(obj interface{}) { replica := obj.(*crv1.Pgreplica) @@ -94,29 +170,37 @@ func (c *PgreplicaController) onAdd(obj interface{}) { // NEVER modify objects from the store. It's a read-only, local cache. // You can use clusterScheme.Copy() to make a deep copy of original object and modify this copy // Or create a copy manually for better performance - copyObj := replica.DeepCopyObject() - replicaCopy := copyObj.(*crv1.Pgreplica) - - replicaCopy.Status = crv1.PgreplicaStatus{ - State: crv1.PgreplicaStateProcessed, - Message: "Successfully processed Pgreplica by controller", - } - - err := c.PgreplicaClient.Put(). - Name(replica.ObjectMeta.Name). - Namespace(replica.ObjectMeta.Namespace). - Resource(crv1.PgreplicaResourcePlural). - Body(replicaCopy). - Do(). - Error() - - if err != nil { - log.Errorf("ERROR updating pgreplica status: %s", err.Error()) + // copyObj := replica.DeepCopyObject() + // replicaCopy := copyObj.(*crv1.Pgreplica) + + // replicaCopy.Status = crv1.PgreplicaStatus{ + // State: crv1.PgreplicaStateProcessed, + // Message: "Successfully processed Pgreplica by controller", + // } + + // err := c.PgreplicaClient.Put(). + // Name(replica.ObjectMeta.Name). + // Namespace(replica.ObjectMeta.Namespace). + // Resource(crv1.PgreplicaResourcePlural). + // Body(replicaCopy). + // Do(). + // Error() + + // if err != nil { + // log.Errorf("ERROR updating pgreplica status: %s", err.Error()) + // } + + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + log.Debugf("onAdd putting key in queue %s", key) + c.Queue.Add(key) + } else { + log.Errorf("replicacontroller: error acquiring key: %s", err.Error()) } //handle the case of when a pgreplica is added which is //scaling up a cluster - clusteroperator.ScaleBase(c.PgreplicaClientset, c.PgreplicaClient, replicaCopy, replicaCopy.ObjectMeta.Namespace) + // clusteroperator.ScaleBase(c.PgreplicaClientset, c.PgreplicaClient, replicaCopy, replicaCopy.ObjectMeta.Namespace) } diff --git a/kubeapi/pgbackup.go b/kubeapi/pgbackup.go index ce43e98398..674d52ef7e 100644 --- a/kubeapi/pgbackup.go +++ b/kubeapi/pgbackup.go @@ -16,9 +16,12 @@ package kubeapi */ import ( + "encoding/json" log "github.com/sirupsen/logrus" crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" + jsonpatch "github.com/evanphx/json-patch" kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" ) @@ -106,3 +109,79 @@ func Createpgbackup(client *rest.RESTClient, backup *crv1.Pgbackup, namespace st return err } + +func PatchpgbackupStatus(restclient *rest.RESTClient, state crv1.PgbackupState, message string, oldCrd *crv1.Pgbackup, namespace string) error { + + oldData, err := json.Marshal(oldCrd) + if err != nil { + return err + } + + //change it + oldCrd.Status = crv1.PgbackupStatus{ + State: state, + Message: message, + } + + //create the patch + var newData, patchBytes []byte + newData, err = json.Marshal(oldCrd) + if err != nil { + return err + } + patchBytes, err = jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return err + } + + log.Debug(string(patchBytes)) + + //apply patch + _, err6 := restclient.Patch(types.MergePatchType). + Namespace(namespace). + Resource(crv1.PgbackupResourcePlural). + Name(oldCrd.Spec.Name). + Body(patchBytes). + Do(). + Get() + + return err6 + +} + +// PatchpgbacupBackupStatus - patch the backup status +func PatchpgbackupBackupStatus(restclient *rest.RESTClient, status string, oldCrd *crv1.Pgbackup, namespace string) error { + + oldData, err := json.Marshal(oldCrd) + if err != nil { + return err + } + + //change it + oldCrd.Spec.BackupStatus = status + + //create the patch + var newData, patchBytes []byte + newData, err = json.Marshal(oldCrd) + if err != nil { + return err + } + patchBytes, err = jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return err + } + + log.Debug(string(patchBytes)) + + //apply patch + _, err6 := restclient.Patch(types.MergePatchType). + Namespace(namespace). + Resource(crv1.PgbackupResourcePlural). + Name(oldCrd.Spec.Name). + Body(patchBytes). + Do(). + Get() + + return err6 + +} \ No newline at end of file diff --git a/kubeapi/pgreplica.go b/kubeapi/pgreplica.go index aea1e72706..ddd8a88f53 100644 --- a/kubeapi/pgreplica.go +++ b/kubeapi/pgreplica.go @@ -16,11 +16,14 @@ package kubeapi */ import ( + "encoding/json" log "github.com/sirupsen/logrus" + jsonpatch "github.com/evanphx/json-patch" crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/rest" + "k8s.io/apimachinery/pkg/types" ) // GetpgreplicasBySelector gets a list of pgreplicas by selector @@ -141,3 +144,42 @@ func Updatepgreplica(client *rest.RESTClient, replica *crv1.Pgreplica, name, nam log.Debugf("updated pgreplica %s", replica.Name) return err } + +func PatchpgreplicaStatus(restclient *rest.RESTClient, state crv1.PgreplicaState, message string, oldCrd *crv1.Pgreplica, namespace string) error { + + oldData, err := json.Marshal(oldCrd) + if err != nil { + return err + } + + //change it + oldCrd.Status = crv1.PgreplicaStatus{ + State: state, + Message: message, + } + + //create the patch + var newData, patchBytes []byte + newData, err = json.Marshal(oldCrd) + if err != nil { + return err + } + patchBytes, err = jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return err + } + + log.Debug(string(patchBytes)) + + //apply patch + _, err6 := restclient.Patch(types.MergePatchType). + Namespace(namespace). + Resource(crv1.PgreplicaResourcePlural). + Name(oldCrd.Spec.Name). + Body(patchBytes). + Do(). + Get() + + return err6 + +} \ No newline at end of file diff --git a/operator/cluster/cluster.go b/operator/cluster/cluster.go index 1e782aa983..b073c70f23 100644 --- a/operator/cluster/cluster.go +++ b/operator/cluster/cluster.go @@ -69,6 +69,7 @@ func init() { func AddClusterBase(clientset *kubernetes.Clientset, client *rest.RESTClient, cl *crv1.Pgcluster, namespace string) { var err error + log.Debugf("*** Entering AddClusterBase *** %s", cl.Spec.ClusterName) if cl.Spec.Status == crv1.UpgradeCompletedStatus { log.Warn("crv1 pgcluster " + cl.Spec.ClusterName + " is already marked complete, will not recreate") return @@ -165,6 +166,9 @@ func AddClusterBase(clientset *kubernetes.Clientset, client *rest.RESTClient, cl log.Error("error in replicas value " + err.Error()) return } + + log.Debugf("Cluster ReplicaCount: %s", cl.Spec.Replicas) + //create a CRD for each replica for i := 0; i < replicaCount; i++ { spec := crv1.PgreplicaSpec{} diff --git a/postgres-operator.go b/postgres-operator.go index 52e3df3ae5..8af5292bee 100644 --- a/postgres-operator.go +++ b/postgres-operator.go @@ -27,7 +27,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - + "k8s.io/client-go/util/workqueue" // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -103,6 +103,7 @@ func main() { PgreplicaClient: crdClient, PgreplicaScheme: crdScheme, PgreplicaClientset: Clientset, + Queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), Namespace: Namespace, } pgUpgradecontroller := controller.PgupgradeController{ @@ -115,6 +116,8 @@ func main() { PgbackupClient: crdClient, PgbackupScheme: crdScheme, PgbackupClientset: Clientset, + Queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + UpdateQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), Namespace: Namespace, } pgPolicycontroller := controller.PgpolicyController{ @@ -139,7 +142,10 @@ func main() { go pgTaskcontroller.Run(ctx) go pgClustercontroller.Run(ctx) go pgReplicacontroller.Run(ctx) + go pgReplicacontroller.RunWorker() go pgBackupcontroller.Run(ctx) + go pgBackupcontroller.RunWorker() + go pgBackupcontroller.RunUpdateWorker() go pgUpgradecontroller.Run(ctx) go pgPolicycontroller.Run(ctx) go podcontroller.Run(ctx) From 488743f666c74b821c98c75d39cd8e3c6b4a72ed Mon Sep 17 00:00:00 2001 From: Carvel Baus Date: Mon, 19 Aug 2019 12:08:16 -0400 Subject: [PATCH 2/4] add queue to cluster controller for onAdd events --- controller/clustercontroller.go | 116 +++++++++++++++++++++++++++----- kubeapi/pgcluster.go | 44 ++++++++++++ postgres-operator.go | 2 + util/labels.go | 1 + 4 files changed, 147 insertions(+), 16 deletions(-) diff --git a/controller/clustercontroller.go b/controller/clustercontroller.go index 2c1ab09929..3e91c961d9 100644 --- a/controller/clustercontroller.go +++ b/controller/clustercontroller.go @@ -18,6 +18,8 @@ limitations under the License. import ( "context" "fmt" + "strings" + "io/ioutil" log "github.com/sirupsen/logrus" crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" "github.com/crunchydata/postgres-operator/kubeapi" @@ -29,6 +31,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" ) // PgclusterController holds the connections for the controller @@ -36,6 +39,7 @@ type PgclusterController struct { PgclusterClient *rest.RESTClient PgclusterScheme *runtime.Scheme PgclusterClientset *kubernetes.Clientset + Queue workqueue.RateLimitingInterface Namespace string } @@ -43,6 +47,9 @@ type PgclusterController struct { func (c *PgclusterController) Run(ctx context.Context) error { log.Debug("Watch Pgcluster objects") + //shut down the work queue to cause workers to end + defer c.Queue.ShutDown() + _, err := c.watchPgclusters(ctx) if err != nil { log.Errorf("Failed to register watch for Pgcluster resource: %v", err) @@ -95,34 +102,102 @@ func (c *PgclusterController) onAdd(obj interface{}) { return } - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use clusterScheme.Copy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - copyObj := cluster.DeepCopyObject() - clusterCopy := copyObj.(*crv1.Pgcluster) + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + log.Debugf("cluster putting key in queue %s", key) + c.Queue.Add(key) + } + + // clusterCopy.Status = crv1.PgclusterStatus{ + // State: crv1.PgclusterStateProcessed, + // Message: "Successfully processed Pgcluster by controller", + // } + + // err := c.PgclusterClient.Put(). + // Name(cluster.ObjectMeta.Name). + // Namespace(cluster.ObjectMeta.Namespace). + // Resource(crv1.PgclusterResourcePlural). + // Body(clusterCopy). + // Do(). + // Error() + + // if err != nil { + // log.Errorf("ERROR updating pgcluster status on add: %s", err.Error()) + // } + + // log.Debugf("pgcluster added: %s", cluster.ObjectMeta.Name) + + // clusteroperator.AddClusterBase(c.PgclusterClientset, c.PgclusterClient, clusterCopy, cluster.ObjectMeta.Namespace) +} + + +func (c *PgclusterController) RunWorker() { + + //process the 'add' work queue forever + for c.processNextItem() { + } +} - clusterCopy.Status = crv1.PgclusterStatus{ - State: crv1.PgclusterStateProcessed, - Message: "Successfully processed Pgcluster by controller", +func (c *PgclusterController) processNextItem() bool { + // Wait until there is a new item in the working queue + key, quit := c.Queue.Get() + if quit { + return false } - err := c.PgclusterClient.Put(). - Name(cluster.ObjectMeta.Name). - Namespace(cluster.ObjectMeta.Namespace). - Resource(crv1.PgclusterResourcePlural). - Body(clusterCopy). - Do(). - Error() + log.Debugf("working on %s", key.(string)) + keyParts := strings.Split(key.(string), "/") + keyNamespace := keyParts[0] + keyResourceName := keyParts[1] + + log.Debugf("cluster add queue got key ns=[%s] resource=[%s]", keyNamespace, keyResourceName) + + // Tell the queue that we are done with processing this key. This unblocks the key for other workers + // This allows safe parallel processing because two pods with the same key are never processed in + // parallel. + defer c.Queue.Done(key) + + // Invoke the method containing the business logic + // for pgbackups, the convention is the CRD name is always + // the same as the pg-cluster label value + // in this case, the de-dupe logic is to test whether a cluster + // deployment exists , if so, then we don't create another + _, found, err := kubeapi.GetDeployment(c.PgclusterClientset, keyResourceName, keyNamespace) + + if found { + log.Debugf("cluster add - dep already found, not creating again") + return true + } + + //get the pgcluster + cluster := crv1.Pgcluster{} + found, err = kubeapi.Getpgcluster(c.PgclusterClient, &cluster, keyResourceName, keyNamespace) + if !found { + log.Debugf("cluster add - pgcluster not found, this is invalid") + return false + } + + + addIdentifier(&cluster) + + state := crv1.PgclusterStateProcessed + message := "Successfully processed Pgcluster by controller" + + err = kubeapi.PatchpgclusterStatus(c.PgclusterClient, state, message, &cluster, keyNamespace) if err != nil { log.Errorf("ERROR updating pgcluster status on add: %s", err.Error()) + return false } log.Debugf("pgcluster added: %s", cluster.ObjectMeta.Name) - clusteroperator.AddClusterBase(c.PgclusterClientset, c.PgclusterClient, clusterCopy, cluster.ObjectMeta.Namespace) + clusteroperator.AddClusterBase(c.PgclusterClientset, c.PgclusterClient, &cluster, cluster.ObjectMeta.Namespace) + + return true } + // onUpdate is called when a pgcluster is updated func (c *PgclusterController) onUpdate(oldObj, newObj interface{}) { oldcluster := oldObj.(*crv1.Pgcluster) @@ -212,3 +287,12 @@ func getReadyStatus(pod *v1.Pod) (string, bool) { return fmt.Sprintf("%d/%d", readyCount, containerCount), equal } + +func addIdentifier(clusterCopy *crv1.Pgcluster) { + u, err := ioutil.ReadFile("/proc/sys/kernel/random/uuid") + if err != nil { + log.Error(err) + } + + clusterCopy.ObjectMeta.Labels[util.LABEL_PG_CLUSTER_IDENTIFIER] = string(u[:len(u)-1]) +} diff --git a/kubeapi/pgcluster.go b/kubeapi/pgcluster.go index ea0c247d70..3357826dad 100644 --- a/kubeapi/pgcluster.go +++ b/kubeapi/pgcluster.go @@ -16,10 +16,13 @@ package kubeapi */ import ( + "encoding/json" log "github.com/sirupsen/logrus" crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" + jsonpatch "github.com/evanphx/json-patch" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" ) @@ -145,3 +148,44 @@ func Updatepgcluster(client *rest.RESTClient, cluster *crv1.Pgcluster, name, nam log.Debugf("updated pgcluster %s", cluster.Name) return err } + + +// PatchpgclusterStatus - patches a pgcluster resource +func PatchpgclusterStatus(restclient *rest.RESTClient, state crv1.PgclusterState, message string, oldCrd *crv1.Pgcluster, namespace string) error { + + oldData, err := json.Marshal(oldCrd) + if err != nil { + return err + } + + //change it + oldCrd.Status = crv1.PgclusterStatus{ + State: state, + Message: message, + } + + //create the patch + var newData, patchBytes []byte + newData, err = json.Marshal(oldCrd) + if err != nil { + return err + } + patchBytes, err = jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return err + } + + log.Debug(string(patchBytes)) + + //apply patch + _, err6 := restclient.Patch(types.MergePatchType). + Namespace(namespace). + Resource(crv1.PgclusterResourcePlural). + Name(oldCrd.Spec.Name). + Body(patchBytes). + Do(). + Get() + + return err6 + +} \ No newline at end of file diff --git a/postgres-operator.go b/postgres-operator.go index 8af5292bee..274e6faa37 100644 --- a/postgres-operator.go +++ b/postgres-operator.go @@ -97,6 +97,7 @@ func main() { PgclusterClient: crdClient, PgclusterScheme: crdScheme, PgclusterClientset: Clientset, + Queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), Namespace: Namespace, } pgReplicacontroller := controller.PgreplicaController{ @@ -141,6 +142,7 @@ func main() { defer cancelFunc() go pgTaskcontroller.Run(ctx) go pgClustercontroller.Run(ctx) + go pgClustercontroller.RunWorker() go pgReplicacontroller.Run(ctx) go pgReplicacontroller.RunWorker() go pgBackupcontroller.Run(ctx) diff --git a/util/labels.go b/util/labels.go index 583ef43d95..2d28e32f56 100644 --- a/util/labels.go +++ b/util/labels.go @@ -20,6 +20,7 @@ const LABEL_NAME = "name" const LABEL_SELECTOR = "selector" const LABEL_OPERATOR = "postgres-operator" const LABEL_PG_CLUSTER = "pg-cluster" +const LABEL_PG_CLUSTER_IDENTIFIER = "pg-cluster-id" const LABEL_PG_DATABASE = "pg-database" const LABEL_PGBACKUP = "pgbackup" const LABEL_PGTASK = "pg-task" From 5f75e00bbed31c3a532a5a455a34632d66d432b1 Mon Sep 17 00:00:00 2001 From: Carvel Baus Date: Mon, 19 Aug 2019 15:26:29 -0400 Subject: [PATCH 3/4] Remove commented code after testing. --- controller/clustercontroller.go | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/controller/clustercontroller.go b/controller/clustercontroller.go index 3e91c961d9..b249e8deb2 100644 --- a/controller/clustercontroller.go +++ b/controller/clustercontroller.go @@ -108,26 +108,6 @@ func (c *PgclusterController) onAdd(obj interface{}) { c.Queue.Add(key) } - // clusterCopy.Status = crv1.PgclusterStatus{ - // State: crv1.PgclusterStateProcessed, - // Message: "Successfully processed Pgcluster by controller", - // } - - // err := c.PgclusterClient.Put(). - // Name(cluster.ObjectMeta.Name). - // Namespace(cluster.ObjectMeta.Namespace). - // Resource(crv1.PgclusterResourcePlural). - // Body(clusterCopy). - // Do(). - // Error() - - // if err != nil { - // log.Errorf("ERROR updating pgcluster status on add: %s", err.Error()) - // } - - // log.Debugf("pgcluster added: %s", cluster.ObjectMeta.Name) - - // clusteroperator.AddClusterBase(c.PgclusterClientset, c.PgclusterClient, clusterCopy, cluster.ObjectMeta.Namespace) } From d26eb01649bfdbba710f69b8a39b0e261fb76260 Mon Sep 17 00:00:00 2001 From: Carvel Baus Date: Tue, 20 Aug 2019 15:19:31 -0400 Subject: [PATCH 4/4] backport of pgtask queue --- controller/replicacontroller.go | 42 +++---- controller/taskcontroller.go | 189 +++++++++++++++++++++----------- kubeapi/pgtask.go | 42 +++++++ operator/cluster/failover.go | 48 ++++++++ postgres-operator.go | 2 + util/labels.go | 2 + 6 files changed, 232 insertions(+), 93 deletions(-) diff --git a/controller/replicacontroller.go b/controller/replicacontroller.go index 4011090e11..55be682c64 100644 --- a/controller/replicacontroller.go +++ b/controller/replicacontroller.go @@ -26,6 +26,7 @@ import ( crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" "github.com/crunchydata/postgres-operator/kubeapi" + "github.com/crunchydata/postgres-operator/util" clusteroperator "github.com/crunchydata/postgres-operator/operator/cluster" "k8s.io/client-go/util/workqueue" "strings" @@ -167,29 +168,6 @@ func (c *PgreplicaController) onAdd(obj interface{}) { return } - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use clusterScheme.Copy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - // copyObj := replica.DeepCopyObject() - // replicaCopy := copyObj.(*crv1.Pgreplica) - - // replicaCopy.Status = crv1.PgreplicaStatus{ - // State: crv1.PgreplicaStateProcessed, - // Message: "Successfully processed Pgreplica by controller", - // } - - // err := c.PgreplicaClient.Put(). - // Name(replica.ObjectMeta.Name). - // Namespace(replica.ObjectMeta.Namespace). - // Resource(crv1.PgreplicaResourcePlural). - // Body(replicaCopy). - // Do(). - // Error() - - // if err != nil { - // log.Errorf("ERROR updating pgreplica status: %s", err.Error()) - // } - key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { log.Debugf("onAdd putting key in queue %s", key) @@ -198,10 +176,6 @@ func (c *PgreplicaController) onAdd(obj interface{}) { log.Errorf("replicacontroller: error acquiring key: %s", err.Error()) } - //handle the case of when a pgreplica is added which is - //scaling up a cluster - // clusteroperator.ScaleBase(c.PgreplicaClientset, c.PgreplicaClient, replicaCopy, replicaCopy.ObjectMeta.Namespace) - } // onUpdate is called when a pgreplica is updated @@ -215,4 +189,18 @@ func (c *PgreplicaController) onUpdate(oldObj, newObj interface{}) { func (c *PgreplicaController) onDelete(obj interface{}) { replica := obj.(*crv1.Pgreplica) log.Debugf("[PgreplicaController] OnDelete ns=%s %s", replica.ObjectMeta.Namespace, replica.ObjectMeta.SelfLink) + + //make sure we are not removing a replica deployment + //that is now the primary after a failover + dep, found, _ := kubeapi.GetDeployment(c.PgreplicaClientset, replica.Spec.Name, replica.ObjectMeta.Namespace) + if found { + if dep.ObjectMeta.Labels[util.LABEL_SERVICE_NAME] == dep.ObjectMeta.Labels[util.LABEL_PG_CLUSTER] { + //the replica was made a primary at some point + //we will not scale down the deployment + log.Debugf("[PgreplicaController] OnDelete not scaling down the replica since it is acting as a primary") + } else { + clusteroperator.ScaleDownBase(c.PgreplicaClientset, c.PgreplicaClient, replica, replica.ObjectMeta.Namespace) + } + } + } diff --git a/controller/taskcontroller.go b/controller/taskcontroller.go index f5eb874250..6238bcf6e2 100644 --- a/controller/taskcontroller.go +++ b/controller/taskcontroller.go @@ -17,16 +17,17 @@ limitations under the License. import ( "context" - // "time" - + "strings" log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" + "github.com/crunchydata/postgres-operator/util" "github.com/crunchydata/postgres-operator/kubeapi" backrestoperator "github.com/crunchydata/postgres-operator/operator/backrest" benchmarkoperator "github.com/crunchydata/postgres-operator/operator/benchmark" @@ -41,6 +42,7 @@ type PgtaskController struct { PgtaskClient *rest.RESTClient PgtaskScheme *runtime.Scheme PgtaskClientset *kubernetes.Clientset + Queue workqueue.RateLimitingInterface Namespace string } @@ -48,6 +50,9 @@ type PgtaskController struct { func (c *PgtaskController) Run(ctx context.Context) error { log.Debug("Watch Pgtask objects") + //shut down the work queue to cause workers to end + defer c.Queue.ShutDown() + // Watch Example objects _, err := c.watchPgtasks(ctx) if err != nil { @@ -89,129 +94,143 @@ func (c *PgtaskController) watchPgtasks(ctx context.Context) (cache.Controller, return controller, nil } -// onAdd is called when a pgtask is added -func (c *PgtaskController) onAdd(obj interface{}) { - task := obj.(*crv1.Pgtask) - log.Debugf("[PgtaskController] onAdd ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink) - - //handle the case of when the operator restarts, we do not want - //to process pgtasks already processed - if task.Status.State == crv1.PgtaskStateProcessed { - log.Debug("pgtask " + task.ObjectMeta.Name + " already processed") - return - } - //time.Sleep(time.Second * time.Duration(2)) - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use taskScheme.Copy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - copyObj := task.DeepCopyObject() - taskCopy := copyObj.(*crv1.Pgtask) +func (c *PgtaskController) RunWorker() { - //update the status of the task as processed to prevent reprocessing - taskCopy.Status = crv1.PgtaskStatus{ - State: crv1.PgtaskStateProcessed, - Message: "Successfully processed Pgtask by controller", + //process the 'add' work queue forever + for c.processNextItem() { } - task.Status = crv1.PgtaskStatus{ - State: crv1.PgtaskStateProcessed, - Message: "Successfully processed Pgtask by controller", +} + +func (c *PgtaskController) processNextItem() bool { + // Wait until there is a new item in the working queue + key, quit := c.Queue.Get() + if quit { + return false } - //Body(taskCopy). + log.Debugf("working on %s", key.(string)) + keyParts := strings.Split(key.(string), "/") + keyNamespace := keyParts[0] + keyResourceName := keyParts[1] - //get pgtask + log.Debugf("queue got key ns=[%s] resource=[%s]", keyNamespace, keyResourceName) + + // Tell the queue that we are done with processing this key. This unblocks the key for other workers + // This allows safe parallel processing because two pods with the same key are never processed in + // parallel. + defer c.Queue.Done(key) tmpTask := crv1.Pgtask{} - found, err := kubeapi.Getpgtask(c.PgtaskClient, &tmpTask, task.ObjectMeta.Name, task.ObjectMeta.Namespace) + found, err := kubeapi.Getpgtask(c.PgtaskClient, &tmpTask, keyResourceName, keyNamespace) if !found { log.Errorf("ERROR onAdd getting pgtask : %s", err.Error()) - return + return false } //update pgtask - tmpTask.Status = crv1.PgtaskStatus{ - State: crv1.PgtaskStateProcessed, - Message: "Successfully processed Pgtask by controller", - } - - err = kubeapi.Updatepgtask(c.PgtaskClient, &tmpTask, task.ObjectMeta.Name, task.ObjectMeta.Namespace) - - /** - err = c.PgtaskClient.Put(). - Name(tmpTask.ObjectMeta.Name). - Namespace(tmpTask.ObjectMeta.Namespace). - Resource(crv1.PgtaskResourcePlural). - Body(tmpTask). - Do(). - Error() - - */ + state := crv1.PgtaskStateProcessed + message := "Successfully processed Pgtask by controller" + err = kubeapi.PatchpgtaskStatus(c.PgtaskClient, state, message, &tmpTask, keyNamespace) if err != nil { log.Errorf("ERROR onAdd updating pgtask status: %s", err.Error()) - return + return false } //process the incoming task - switch task.Spec.TaskType { + switch tmpTask.Spec.TaskType { + // case crv1.PgtaskMinorUpgrade: + // log.Debug("delete minor upgrade task added") + // clusteroperator.AddUpgrade(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskDeletePgbouncer: log.Debug("delete pgbouncer task added") - clusteroperator.DeletePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace) + clusteroperator.DeletePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskReconfigurePgbouncer: log.Debug("Reconfiguredelete pgbouncer task added") - clusteroperator.ReconfigurePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace) + clusteroperator.ReconfigurePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskAddPgbouncer: log.Debug("add pgbouncer task added") - clusteroperator.AddPgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace) + clusteroperator.AddPgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskDeletePgpool: log.Debug("delete pgpool task added") - clusteroperator.DeletePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace) + clusteroperator.DeletePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskReconfigurePgpool: log.Debug("Reconfiguredelete pgpool task added") - clusteroperator.ReconfigurePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace) + clusteroperator.ReconfigurePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskAddPgpool: log.Debug("add pgpool task added") - clusteroperator.AddPgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace) + clusteroperator.AddPgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskFailover: log.Debug("failover task added") - clusteroperator.FailoverBase(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task, c.PgtaskConfig) + if !dupeFailover(c.PgtaskClient, &tmpTask, keyNamespace) { + clusteroperator.FailoverBase(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask, c.PgtaskConfig) + } else { + log.Debug("skipping duplicate onAdd failover task %s/%s", keyNamespace, keyResourceName) + } case crv1.PgtaskDeleteData: log.Debug("delete data task added") - taskoperator.RemoveData(task.ObjectMeta.Namespace, c.PgtaskClientset, task) + if !dupeDeleteData(c.PgtaskClient, &tmpTask, keyNamespace) { + taskoperator.RemoveData(keyNamespace, c.PgtaskClientset, &tmpTask) + } else { + log.Debug("skipping duplicate onAdd delete data task %s/%s", keyNamespace, keyResourceName) + } case crv1.PgtaskDeleteBackups: log.Debug("delete backups task added") - taskoperator.RemoveBackups(task.ObjectMeta.Namespace, c.PgtaskClientset, task) + taskoperator.RemoveBackups(keyNamespace, c.PgtaskClientset, &tmpTask) case crv1.PgtaskBackrest: log.Debug("backrest task added") - backrestoperator.Backrest(task.ObjectMeta.Namespace, c.PgtaskClientset, task) + backrestoperator.Backrest(keyNamespace, c.PgtaskClientset, &tmpTask) case crv1.PgtaskBackrestRestore: log.Debug("backrest restore task added") - backrestoperator.Restore(c.PgtaskClient, task.ObjectMeta.Namespace, c.PgtaskClientset, task) + backrestoperator.Restore(c.PgtaskClient, keyNamespace, c.PgtaskClientset, &tmpTask) case crv1.PgtaskpgDump: log.Debug("pgDump task added") - pgdumpoperator.Dump(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task) + pgdumpoperator.Dump(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask) case crv1.PgtaskpgRestore: log.Debug("pgDump restore task added") - pgdumpoperator.Restore(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task) + pgdumpoperator.Restore(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask) case crv1.PgtaskAutoFailover: - log.Debugf("autofailover task added %s", task.ObjectMeta.Name) + log.Debugf("autofailover task added %s", keyResourceName) case crv1.PgtaskWorkflow: - log.Debugf("workflow task added [%s] ID [%s]", task.ObjectMeta.Name, task.Spec.Parameters[crv1.PgtaskWorkflowID]) + log.Debugf("workflow task added [%s] ID [%s]", keyResourceName, tmpTask.Spec.Parameters[crv1.PgtaskWorkflowID]) case crv1.PgtaskBenchmark: log.Debug("benchmark task added") - benchmarkoperator.Create(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task) + benchmarkoperator.Create(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask) default: - log.Debugf("unknown task type on pgtask added [%s]", task.Spec.TaskType) + log.Debugf("unknown task type on pgtask added [%s]", tmpTask.Spec.TaskType) } + return true + } +// onAdd is called when a pgtask is added +func (c *PgtaskController) onAdd(obj interface{}) { + task := obj.(*crv1.Pgtask) + // log.Debugf("[PgtaskController] onAdd ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink) + + //handle the case of when the operator restarts, we do not want + //to process pgtasks already processed + if task.Status.State == crv1.PgtaskStateProcessed { + log.Debug("pgtask " + task.ObjectMeta.Name + " already processed") + return + } + + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + log.Debugf("task putting key in queue %s", key) + c.Queue.Add(key) + } + +} + + // onUpdate is called when a pgtask is updated func (c *PgtaskController) onUpdate(oldObj, newObj interface{}) { task := newObj.(*crv1.Pgtask) @@ -223,3 +242,41 @@ func (c *PgtaskController) onDelete(obj interface{}) { task := obj.(*crv1.Pgtask) log.Debugf("[PgtaskController] onDelete ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink) } + +//de-dupe logic for a failover, if the failover started +//parameter is set, it means a failover has already been +//started on this +func dupeFailover(restClient *rest.RESTClient, task *crv1.Pgtask, ns string) bool { + tmp := crv1.Pgtask{} + + found, _ := kubeapi.Getpgtask(restClient, &tmp, task.Spec.Name, ns) + if !found { + //a big time error if this occurs + return false + } + + if tmp.Spec.Parameters[util.LABEL_FAILOVER_STARTED] == "" { + return false + } + + return true +} + +//de-dupe logic for a delete data, if the delete data job started +//parameter is set, it means a delete data job has already been +//started on this +func dupeDeleteData(restClient *rest.RESTClient, task *crv1.Pgtask, ns string) bool { + tmp := crv1.Pgtask{} + + found, _ := kubeapi.Getpgtask(restClient, &tmp, task.Spec.Name, ns) + if !found { + //a big time error if this occurs + return false + } + + if tmp.Spec.Parameters[util.LABEL_DELETE_DATA_STARTED] == "" { + return false + } + + return true +} \ No newline at end of file diff --git a/kubeapi/pgtask.go b/kubeapi/pgtask.go index 06aa933057..442e9cf21b 100644 --- a/kubeapi/pgtask.go +++ b/kubeapi/pgtask.go @@ -16,10 +16,13 @@ package kubeapi */ import ( + "encoding/json" crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" + jsonpatch "github.com/evanphx/json-patch" log "github.com/sirupsen/logrus" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" ) @@ -159,3 +162,42 @@ func Deletepgtasks(client *rest.RESTClient, selector, namespace string) error { } return err } + +// PatchpgtaskStatus - update status of pgtask +func PatchpgtaskStatus(restclient *rest.RESTClient, state crv1.PgtaskState, message string, oldCrd *crv1.Pgtask, namespace string) error { + + oldData, err := json.Marshal(oldCrd) + if err != nil { + return err + } + + //change it + oldCrd.Status = crv1.PgtaskStatus{ + State: state, + Message: message, + } + + //create the patch + var newData, patchBytes []byte + newData, err = json.Marshal(oldCrd) + if err != nil { + return err + } + patchBytes, err = jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return err + } + log.Debug(string(patchBytes)) + + //apply patch + _, err6 := restclient.Patch(types.MergePatchType). + Namespace(namespace). + Resource(crv1.PgtaskResourcePlural). + Name(oldCrd.Spec.Name). + Body(patchBytes). + Do(). + Get() + + return err6 + +} \ No newline at end of file diff --git a/operator/cluster/failover.go b/operator/cluster/failover.go index 47626fd7c3..f4cc45eecd 100644 --- a/operator/cluster/failover.go +++ b/operator/cluster/failover.go @@ -19,18 +19,23 @@ package cluster */ import ( + "encoding/json" + "time" log "github.com/sirupsen/logrus" crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" "github.com/crunchydata/postgres-operator/kubeapi" "github.com/crunchydata/postgres-operator/operator" "github.com/crunchydata/postgres-operator/operator/backrest" "github.com/crunchydata/postgres-operator/util" + jsonpatch "github.com/evanphx/json-patch" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) // FailoverBase ... +// gets called first on a failover func FailoverBase(namespace string, clientset *kubernetes.Clientset, client *rest.RESTClient, task *crv1.Pgtask, restconfig *rest.Config) { var err error @@ -49,6 +54,13 @@ func FailoverBase(namespace string, clientset *kubernetes.Clientset, client *res return } + //create marker (clustername, namespace) + err = PatchpgtaskFailoverStatus(client, task, namespace) + if err != nil { + log.Error("could not set failover started marker for task %s cluster %s", task.Spec.Name, clusterName) + return + } + if cluster.Spec.Strategy == "" { cluster.Spec.Strategy = "1" log.Info("using default strategy") @@ -158,3 +170,39 @@ func replaceReplica(client *rest.RESTClient, cluster *crv1.Pgcluster, ns string) kubeapi.Createpgreplica(client, newInstance, ns) } + +// PatchpgtaskFailoverStatus - patch the pgtask with failover status +func PatchpgtaskFailoverStatus(restclient *rest.RESTClient, oldCrd *crv1.Pgtask, namespace string) error { + + oldData, err := json.Marshal(oldCrd) + if err != nil { + return err + } + + //change it + oldCrd.Spec.Parameters[util.LABEL_FAILOVER_STARTED] = time.Now().Format("2006-01-02.15.04.05") + + //create the patch + var newData, patchBytes []byte + newData, err = json.Marshal(oldCrd) + if err != nil { + return err + } + patchBytes, err = jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return err + } + log.Debug(string(patchBytes)) + + //apply patch + _, err6 := restclient.Patch(types.MergePatchType). + Namespace(namespace). + Resource(crv1.PgtaskResourcePlural). + Name(oldCrd.Spec.Name). + Body(patchBytes). + Do(). + Get() + + return err6 + +} \ No newline at end of file diff --git a/postgres-operator.go b/postgres-operator.go index 274e6faa37..fb55c23888 100644 --- a/postgres-operator.go +++ b/postgres-operator.go @@ -90,6 +90,7 @@ func main() { PgtaskClient: crdClient, PgtaskScheme: crdScheme, PgtaskClientset: Clientset, + Queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), Namespace: Namespace, } @@ -141,6 +142,7 @@ func main() { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() go pgTaskcontroller.Run(ctx) + go pgTaskcontroller.RunWorker() go pgClustercontroller.Run(ctx) go pgClustercontroller.RunWorker() go pgReplicacontroller.Run(ctx) diff --git a/util/labels.go b/util/labels.go index 2d28e32f56..0986922be3 100644 --- a/util/labels.go +++ b/util/labels.go @@ -31,6 +31,7 @@ const LABEL_FAILOVER = "failover" const LABEL_PRIMARY = "primary" const LABEL_TARGET = "target" const LABEL_RMDATA = "pgrmdata" +const LABEL_FAILOVER_STARTED = "failover-started" const LABEL_PGPOLICY = "pgpolicy" const LABEL_INGEST = "ingest" @@ -55,6 +56,7 @@ const LABEL_VERSION = "version" const LABEL_PGO_VERSION = "pgo-version" const LABEL_UPGRADE_DATE = "operator-upgrade-date" const LABEL_DELETE_DATA = "delete-data" +const LABEL_DELETE_DATA_STARTED = "delete-data-started" const LABEL_BACKREST = "pgo-backrest" const LABEL_BACKREST_JOB = "pgo-backrest-job"