diff --git a/controller/replicacontroller.go b/controller/replicacontroller.go index 4011090e11..55be682c64 100644 --- a/controller/replicacontroller.go +++ b/controller/replicacontroller.go @@ -26,6 +26,7 @@ import ( crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" "github.com/crunchydata/postgres-operator/kubeapi" + "github.com/crunchydata/postgres-operator/util" clusteroperator "github.com/crunchydata/postgres-operator/operator/cluster" "k8s.io/client-go/util/workqueue" "strings" @@ -167,29 +168,6 @@ func (c *PgreplicaController) onAdd(obj interface{}) { return } - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use clusterScheme.Copy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - // copyObj := replica.DeepCopyObject() - // replicaCopy := copyObj.(*crv1.Pgreplica) - - // replicaCopy.Status = crv1.PgreplicaStatus{ - // State: crv1.PgreplicaStateProcessed, - // Message: "Successfully processed Pgreplica by controller", - // } - - // err := c.PgreplicaClient.Put(). - // Name(replica.ObjectMeta.Name). - // Namespace(replica.ObjectMeta.Namespace). - // Resource(crv1.PgreplicaResourcePlural). - // Body(replicaCopy). - // Do(). - // Error() - - // if err != nil { - // log.Errorf("ERROR updating pgreplica status: %s", err.Error()) - // } - key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { log.Debugf("onAdd putting key in queue %s", key) @@ -198,10 +176,6 @@ func (c *PgreplicaController) onAdd(obj interface{}) { log.Errorf("replicacontroller: error acquiring key: %s", err.Error()) } - //handle the case of when a pgreplica is added which is - //scaling up a cluster - // clusteroperator.ScaleBase(c.PgreplicaClientset, c.PgreplicaClient, replicaCopy, replicaCopy.ObjectMeta.Namespace) - } // onUpdate is called when a pgreplica is updated @@ -215,4 +189,18 @@ func (c *PgreplicaController) onUpdate(oldObj, newObj interface{}) { func (c *PgreplicaController) onDelete(obj interface{}) { replica := obj.(*crv1.Pgreplica) log.Debugf("[PgreplicaController] OnDelete ns=%s %s", replica.ObjectMeta.Namespace, replica.ObjectMeta.SelfLink) + + //make sure we are not removing a replica deployment + //that is now the primary after a failover + dep, found, _ := kubeapi.GetDeployment(c.PgreplicaClientset, replica.Spec.Name, replica.ObjectMeta.Namespace) + if found { + if dep.ObjectMeta.Labels[util.LABEL_SERVICE_NAME] == dep.ObjectMeta.Labels[util.LABEL_PG_CLUSTER] { + //the replica was made a primary at some point + //we will not scale down the deployment + log.Debugf("[PgreplicaController] OnDelete not scaling down the replica since it is acting as a primary") + } else { + clusteroperator.ScaleDownBase(c.PgreplicaClientset, c.PgreplicaClient, replica, replica.ObjectMeta.Namespace) + } + } + } diff --git a/controller/taskcontroller.go b/controller/taskcontroller.go index f5eb874250..6238bcf6e2 100644 --- a/controller/taskcontroller.go +++ b/controller/taskcontroller.go @@ -17,16 +17,17 @@ limitations under the License. import ( "context" - // "time" - + "strings" log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" + "github.com/crunchydata/postgres-operator/util" "github.com/crunchydata/postgres-operator/kubeapi" backrestoperator "github.com/crunchydata/postgres-operator/operator/backrest" benchmarkoperator "github.com/crunchydata/postgres-operator/operator/benchmark" @@ -41,6 +42,7 @@ type PgtaskController struct { PgtaskClient *rest.RESTClient PgtaskScheme *runtime.Scheme PgtaskClientset *kubernetes.Clientset + Queue workqueue.RateLimitingInterface Namespace string } @@ -48,6 +50,9 @@ type PgtaskController struct { func (c *PgtaskController) Run(ctx context.Context) error { log.Debug("Watch Pgtask objects") + //shut down the work queue to cause workers to end + defer c.Queue.ShutDown() + // Watch Example objects _, err := c.watchPgtasks(ctx) if err != nil { @@ -89,129 +94,143 @@ func (c *PgtaskController) watchPgtasks(ctx context.Context) (cache.Controller, return controller, nil } -// onAdd is called when a pgtask is added -func (c *PgtaskController) onAdd(obj interface{}) { - task := obj.(*crv1.Pgtask) - log.Debugf("[PgtaskController] onAdd ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink) - - //handle the case of when the operator restarts, we do not want - //to process pgtasks already processed - if task.Status.State == crv1.PgtaskStateProcessed { - log.Debug("pgtask " + task.ObjectMeta.Name + " already processed") - return - } - //time.Sleep(time.Second * time.Duration(2)) - // NEVER modify objects from the store. It's a read-only, local cache. - // You can use taskScheme.Copy() to make a deep copy of original object and modify this copy - // Or create a copy manually for better performance - copyObj := task.DeepCopyObject() - taskCopy := copyObj.(*crv1.Pgtask) +func (c *PgtaskController) RunWorker() { - //update the status of the task as processed to prevent reprocessing - taskCopy.Status = crv1.PgtaskStatus{ - State: crv1.PgtaskStateProcessed, - Message: "Successfully processed Pgtask by controller", + //process the 'add' work queue forever + for c.processNextItem() { } - task.Status = crv1.PgtaskStatus{ - State: crv1.PgtaskStateProcessed, - Message: "Successfully processed Pgtask by controller", +} + +func (c *PgtaskController) processNextItem() bool { + // Wait until there is a new item in the working queue + key, quit := c.Queue.Get() + if quit { + return false } - //Body(taskCopy). + log.Debugf("working on %s", key.(string)) + keyParts := strings.Split(key.(string), "/") + keyNamespace := keyParts[0] + keyResourceName := keyParts[1] - //get pgtask + log.Debugf("queue got key ns=[%s] resource=[%s]", keyNamespace, keyResourceName) + + // Tell the queue that we are done with processing this key. This unblocks the key for other workers + // This allows safe parallel processing because two pods with the same key are never processed in + // parallel. + defer c.Queue.Done(key) tmpTask := crv1.Pgtask{} - found, err := kubeapi.Getpgtask(c.PgtaskClient, &tmpTask, task.ObjectMeta.Name, task.ObjectMeta.Namespace) + found, err := kubeapi.Getpgtask(c.PgtaskClient, &tmpTask, keyResourceName, keyNamespace) if !found { log.Errorf("ERROR onAdd getting pgtask : %s", err.Error()) - return + return false } //update pgtask - tmpTask.Status = crv1.PgtaskStatus{ - State: crv1.PgtaskStateProcessed, - Message: "Successfully processed Pgtask by controller", - } - - err = kubeapi.Updatepgtask(c.PgtaskClient, &tmpTask, task.ObjectMeta.Name, task.ObjectMeta.Namespace) - - /** - err = c.PgtaskClient.Put(). - Name(tmpTask.ObjectMeta.Name). - Namespace(tmpTask.ObjectMeta.Namespace). - Resource(crv1.PgtaskResourcePlural). - Body(tmpTask). - Do(). - Error() - - */ + state := crv1.PgtaskStateProcessed + message := "Successfully processed Pgtask by controller" + err = kubeapi.PatchpgtaskStatus(c.PgtaskClient, state, message, &tmpTask, keyNamespace) if err != nil { log.Errorf("ERROR onAdd updating pgtask status: %s", err.Error()) - return + return false } //process the incoming task - switch task.Spec.TaskType { + switch tmpTask.Spec.TaskType { + // case crv1.PgtaskMinorUpgrade: + // log.Debug("delete minor upgrade task added") + // clusteroperator.AddUpgrade(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskDeletePgbouncer: log.Debug("delete pgbouncer task added") - clusteroperator.DeletePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace) + clusteroperator.DeletePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskReconfigurePgbouncer: log.Debug("Reconfiguredelete pgbouncer task added") - clusteroperator.ReconfigurePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace) + clusteroperator.ReconfigurePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskAddPgbouncer: log.Debug("add pgbouncer task added") - clusteroperator.AddPgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace) + clusteroperator.AddPgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskDeletePgpool: log.Debug("delete pgpool task added") - clusteroperator.DeletePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace) + clusteroperator.DeletePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskReconfigurePgpool: log.Debug("Reconfiguredelete pgpool task added") - clusteroperator.ReconfigurePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace) + clusteroperator.ReconfigurePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskAddPgpool: log.Debug("add pgpool task added") - clusteroperator.AddPgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace) + clusteroperator.AddPgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace) case crv1.PgtaskFailover: log.Debug("failover task added") - clusteroperator.FailoverBase(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task, c.PgtaskConfig) + if !dupeFailover(c.PgtaskClient, &tmpTask, keyNamespace) { + clusteroperator.FailoverBase(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask, c.PgtaskConfig) + } else { + log.Debug("skipping duplicate onAdd failover task %s/%s", keyNamespace, keyResourceName) + } case crv1.PgtaskDeleteData: log.Debug("delete data task added") - taskoperator.RemoveData(task.ObjectMeta.Namespace, c.PgtaskClientset, task) + if !dupeDeleteData(c.PgtaskClient, &tmpTask, keyNamespace) { + taskoperator.RemoveData(keyNamespace, c.PgtaskClientset, &tmpTask) + } else { + log.Debug("skipping duplicate onAdd delete data task %s/%s", keyNamespace, keyResourceName) + } case crv1.PgtaskDeleteBackups: log.Debug("delete backups task added") - taskoperator.RemoveBackups(task.ObjectMeta.Namespace, c.PgtaskClientset, task) + taskoperator.RemoveBackups(keyNamespace, c.PgtaskClientset, &tmpTask) case crv1.PgtaskBackrest: log.Debug("backrest task added") - backrestoperator.Backrest(task.ObjectMeta.Namespace, c.PgtaskClientset, task) + backrestoperator.Backrest(keyNamespace, c.PgtaskClientset, &tmpTask) case crv1.PgtaskBackrestRestore: log.Debug("backrest restore task added") - backrestoperator.Restore(c.PgtaskClient, task.ObjectMeta.Namespace, c.PgtaskClientset, task) + backrestoperator.Restore(c.PgtaskClient, keyNamespace, c.PgtaskClientset, &tmpTask) case crv1.PgtaskpgDump: log.Debug("pgDump task added") - pgdumpoperator.Dump(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task) + pgdumpoperator.Dump(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask) case crv1.PgtaskpgRestore: log.Debug("pgDump restore task added") - pgdumpoperator.Restore(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task) + pgdumpoperator.Restore(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask) case crv1.PgtaskAutoFailover: - log.Debugf("autofailover task added %s", task.ObjectMeta.Name) + log.Debugf("autofailover task added %s", keyResourceName) case crv1.PgtaskWorkflow: - log.Debugf("workflow task added [%s] ID [%s]", task.ObjectMeta.Name, task.Spec.Parameters[crv1.PgtaskWorkflowID]) + log.Debugf("workflow task added [%s] ID [%s]", keyResourceName, tmpTask.Spec.Parameters[crv1.PgtaskWorkflowID]) case crv1.PgtaskBenchmark: log.Debug("benchmark task added") - benchmarkoperator.Create(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task) + benchmarkoperator.Create(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask) default: - log.Debugf("unknown task type on pgtask added [%s]", task.Spec.TaskType) + log.Debugf("unknown task type on pgtask added [%s]", tmpTask.Spec.TaskType) } + return true + } +// onAdd is called when a pgtask is added +func (c *PgtaskController) onAdd(obj interface{}) { + task := obj.(*crv1.Pgtask) + // log.Debugf("[PgtaskController] onAdd ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink) + + //handle the case of when the operator restarts, we do not want + //to process pgtasks already processed + if task.Status.State == crv1.PgtaskStateProcessed { + log.Debug("pgtask " + task.ObjectMeta.Name + " already processed") + return + } + + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + log.Debugf("task putting key in queue %s", key) + c.Queue.Add(key) + } + +} + + // onUpdate is called when a pgtask is updated func (c *PgtaskController) onUpdate(oldObj, newObj interface{}) { task := newObj.(*crv1.Pgtask) @@ -223,3 +242,41 @@ func (c *PgtaskController) onDelete(obj interface{}) { task := obj.(*crv1.Pgtask) log.Debugf("[PgtaskController] onDelete ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink) } + +//de-dupe logic for a failover, if the failover started +//parameter is set, it means a failover has already been +//started on this +func dupeFailover(restClient *rest.RESTClient, task *crv1.Pgtask, ns string) bool { + tmp := crv1.Pgtask{} + + found, _ := kubeapi.Getpgtask(restClient, &tmp, task.Spec.Name, ns) + if !found { + //a big time error if this occurs + return false + } + + if tmp.Spec.Parameters[util.LABEL_FAILOVER_STARTED] == "" { + return false + } + + return true +} + +//de-dupe logic for a delete data, if the delete data job started +//parameter is set, it means a delete data job has already been +//started on this +func dupeDeleteData(restClient *rest.RESTClient, task *crv1.Pgtask, ns string) bool { + tmp := crv1.Pgtask{} + + found, _ := kubeapi.Getpgtask(restClient, &tmp, task.Spec.Name, ns) + if !found { + //a big time error if this occurs + return false + } + + if tmp.Spec.Parameters[util.LABEL_DELETE_DATA_STARTED] == "" { + return false + } + + return true +} \ No newline at end of file diff --git a/kubeapi/pgtask.go b/kubeapi/pgtask.go index 06aa933057..442e9cf21b 100644 --- a/kubeapi/pgtask.go +++ b/kubeapi/pgtask.go @@ -16,10 +16,13 @@ package kubeapi */ import ( + "encoding/json" crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" + jsonpatch "github.com/evanphx/json-patch" log "github.com/sirupsen/logrus" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" ) @@ -159,3 +162,42 @@ func Deletepgtasks(client *rest.RESTClient, selector, namespace string) error { } return err } + +// PatchpgtaskStatus - update status of pgtask +func PatchpgtaskStatus(restclient *rest.RESTClient, state crv1.PgtaskState, message string, oldCrd *crv1.Pgtask, namespace string) error { + + oldData, err := json.Marshal(oldCrd) + if err != nil { + return err + } + + //change it + oldCrd.Status = crv1.PgtaskStatus{ + State: state, + Message: message, + } + + //create the patch + var newData, patchBytes []byte + newData, err = json.Marshal(oldCrd) + if err != nil { + return err + } + patchBytes, err = jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return err + } + log.Debug(string(patchBytes)) + + //apply patch + _, err6 := restclient.Patch(types.MergePatchType). + Namespace(namespace). + Resource(crv1.PgtaskResourcePlural). + Name(oldCrd.Spec.Name). + Body(patchBytes). + Do(). + Get() + + return err6 + +} \ No newline at end of file diff --git a/operator/cluster/failover.go b/operator/cluster/failover.go index 47626fd7c3..f4cc45eecd 100644 --- a/operator/cluster/failover.go +++ b/operator/cluster/failover.go @@ -19,18 +19,23 @@ package cluster */ import ( + "encoding/json" + "time" log "github.com/sirupsen/logrus" crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1" "github.com/crunchydata/postgres-operator/kubeapi" "github.com/crunchydata/postgres-operator/operator" "github.com/crunchydata/postgres-operator/operator/backrest" "github.com/crunchydata/postgres-operator/util" + jsonpatch "github.com/evanphx/json-patch" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) // FailoverBase ... +// gets called first on a failover func FailoverBase(namespace string, clientset *kubernetes.Clientset, client *rest.RESTClient, task *crv1.Pgtask, restconfig *rest.Config) { var err error @@ -49,6 +54,13 @@ func FailoverBase(namespace string, clientset *kubernetes.Clientset, client *res return } + //create marker (clustername, namespace) + err = PatchpgtaskFailoverStatus(client, task, namespace) + if err != nil { + log.Error("could not set failover started marker for task %s cluster %s", task.Spec.Name, clusterName) + return + } + if cluster.Spec.Strategy == "" { cluster.Spec.Strategy = "1" log.Info("using default strategy") @@ -158,3 +170,39 @@ func replaceReplica(client *rest.RESTClient, cluster *crv1.Pgcluster, ns string) kubeapi.Createpgreplica(client, newInstance, ns) } + +// PatchpgtaskFailoverStatus - patch the pgtask with failover status +func PatchpgtaskFailoverStatus(restclient *rest.RESTClient, oldCrd *crv1.Pgtask, namespace string) error { + + oldData, err := json.Marshal(oldCrd) + if err != nil { + return err + } + + //change it + oldCrd.Spec.Parameters[util.LABEL_FAILOVER_STARTED] = time.Now().Format("2006-01-02.15.04.05") + + //create the patch + var newData, patchBytes []byte + newData, err = json.Marshal(oldCrd) + if err != nil { + return err + } + patchBytes, err = jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return err + } + log.Debug(string(patchBytes)) + + //apply patch + _, err6 := restclient.Patch(types.MergePatchType). + Namespace(namespace). + Resource(crv1.PgtaskResourcePlural). + Name(oldCrd.Spec.Name). + Body(patchBytes). + Do(). + Get() + + return err6 + +} \ No newline at end of file diff --git a/postgres-operator.go b/postgres-operator.go index 274e6faa37..fb55c23888 100644 --- a/postgres-operator.go +++ b/postgres-operator.go @@ -90,6 +90,7 @@ func main() { PgtaskClient: crdClient, PgtaskScheme: crdScheme, PgtaskClientset: Clientset, + Queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), Namespace: Namespace, } @@ -141,6 +142,7 @@ func main() { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() go pgTaskcontroller.Run(ctx) + go pgTaskcontroller.RunWorker() go pgClustercontroller.Run(ctx) go pgClustercontroller.RunWorker() go pgReplicacontroller.Run(ctx) diff --git a/util/labels.go b/util/labels.go index 2d28e32f56..0986922be3 100644 --- a/util/labels.go +++ b/util/labels.go @@ -31,6 +31,7 @@ const LABEL_FAILOVER = "failover" const LABEL_PRIMARY = "primary" const LABEL_TARGET = "target" const LABEL_RMDATA = "pgrmdata" +const LABEL_FAILOVER_STARTED = "failover-started" const LABEL_PGPOLICY = "pgpolicy" const LABEL_INGEST = "ingest" @@ -55,6 +56,7 @@ const LABEL_VERSION = "version" const LABEL_PGO_VERSION = "pgo-version" const LABEL_UPGRADE_DATE = "operator-upgrade-date" const LABEL_DELETE_DATA = "delete-data" +const LABEL_DELETE_DATA_STARTED = "delete-data-started" const LABEL_BACKREST = "pgo-backrest" const LABEL_BACKREST_JOB = "pgo-backrest-job"