Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 15 additions & 27 deletions controller/replicacontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1"
"github.com/crunchydata/postgres-operator/kubeapi"
"github.com/crunchydata/postgres-operator/util"
clusteroperator "github.com/crunchydata/postgres-operator/operator/cluster"
"k8s.io/client-go/util/workqueue"
"strings"
Expand Down Expand Up @@ -167,29 +168,6 @@ func (c *PgreplicaController) onAdd(obj interface{}) {
return
}

// NEVER modify objects from the store. It's a read-only, local cache.
// You can use clusterScheme.Copy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
// copyObj := replica.DeepCopyObject()
// replicaCopy := copyObj.(*crv1.Pgreplica)

// replicaCopy.Status = crv1.PgreplicaStatus{
// State: crv1.PgreplicaStateProcessed,
// Message: "Successfully processed Pgreplica by controller",
// }

// err := c.PgreplicaClient.Put().
// Name(replica.ObjectMeta.Name).
// Namespace(replica.ObjectMeta.Namespace).
// Resource(crv1.PgreplicaResourcePlural).
// Body(replicaCopy).
// Do().
// Error()

// if err != nil {
// log.Errorf("ERROR updating pgreplica status: %s", err.Error())
// }

key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
log.Debugf("onAdd putting key in queue %s", key)
Expand All @@ -198,10 +176,6 @@ func (c *PgreplicaController) onAdd(obj interface{}) {
log.Errorf("replicacontroller: error acquiring key: %s", err.Error())
}

//handle the case of when a pgreplica is added which is
//scaling up a cluster
// clusteroperator.ScaleBase(c.PgreplicaClientset, c.PgreplicaClient, replicaCopy, replicaCopy.ObjectMeta.Namespace)

}

// onUpdate is called when a pgreplica is updated
Expand All @@ -215,4 +189,18 @@ func (c *PgreplicaController) onUpdate(oldObj, newObj interface{}) {
func (c *PgreplicaController) onDelete(obj interface{}) {
replica := obj.(*crv1.Pgreplica)
log.Debugf("[PgreplicaController] OnDelete ns=%s %s", replica.ObjectMeta.Namespace, replica.ObjectMeta.SelfLink)

//make sure we are not removing a replica deployment
//that is now the primary after a failover
dep, found, _ := kubeapi.GetDeployment(c.PgreplicaClientset, replica.Spec.Name, replica.ObjectMeta.Namespace)
if found {
if dep.ObjectMeta.Labels[util.LABEL_SERVICE_NAME] == dep.ObjectMeta.Labels[util.LABEL_PG_CLUSTER] {
//the replica was made a primary at some point
//we will not scale down the deployment
log.Debugf("[PgreplicaController] OnDelete not scaling down the replica since it is acting as a primary")
} else {
clusteroperator.ScaleDownBase(c.PgreplicaClientset, c.PgreplicaClient, replica, replica.ObjectMeta.Namespace)
}
}

}
189 changes: 123 additions & 66 deletions controller/taskcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ limitations under the License.

import (
"context"
// "time"

"strings"
log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1"
"github.com/crunchydata/postgres-operator/util"
"github.com/crunchydata/postgres-operator/kubeapi"
backrestoperator "github.com/crunchydata/postgres-operator/operator/backrest"
benchmarkoperator "github.com/crunchydata/postgres-operator/operator/benchmark"
Expand All @@ -41,13 +42,17 @@ type PgtaskController struct {
PgtaskClient *rest.RESTClient
PgtaskScheme *runtime.Scheme
PgtaskClientset *kubernetes.Clientset
Queue workqueue.RateLimitingInterface
Namespace string
}

// Run starts an pgtask resource controller
func (c *PgtaskController) Run(ctx context.Context) error {
log.Debug("Watch Pgtask objects")

//shut down the work queue to cause workers to end
defer c.Queue.ShutDown()

// Watch Example objects
_, err := c.watchPgtasks(ctx)
if err != nil {
Expand Down Expand Up @@ -89,129 +94,143 @@ func (c *PgtaskController) watchPgtasks(ctx context.Context) (cache.Controller,
return controller, nil
}

// onAdd is called when a pgtask is added
func (c *PgtaskController) onAdd(obj interface{}) {
task := obj.(*crv1.Pgtask)
log.Debugf("[PgtaskController] onAdd ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink)

//handle the case of when the operator restarts, we do not want
//to process pgtasks already processed
if task.Status.State == crv1.PgtaskStateProcessed {
log.Debug("pgtask " + task.ObjectMeta.Name + " already processed")
return
}

//time.Sleep(time.Second * time.Duration(2))

// NEVER modify objects from the store. It's a read-only, local cache.
// You can use taskScheme.Copy() to make a deep copy of original object and modify this copy
// Or create a copy manually for better performance
copyObj := task.DeepCopyObject()
taskCopy := copyObj.(*crv1.Pgtask)
func (c *PgtaskController) RunWorker() {

//update the status of the task as processed to prevent reprocessing
taskCopy.Status = crv1.PgtaskStatus{
State: crv1.PgtaskStateProcessed,
Message: "Successfully processed Pgtask by controller",
//process the 'add' work queue forever
for c.processNextItem() {
}
task.Status = crv1.PgtaskStatus{
State: crv1.PgtaskStateProcessed,
Message: "Successfully processed Pgtask by controller",
}

func (c *PgtaskController) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := c.Queue.Get()
if quit {
return false
}

//Body(taskCopy).
log.Debugf("working on %s", key.(string))
keyParts := strings.Split(key.(string), "/")
keyNamespace := keyParts[0]
keyResourceName := keyParts[1]

//get pgtask
log.Debugf("queue got key ns=[%s] resource=[%s]", keyNamespace, keyResourceName)

// Tell the queue that we are done with processing this key. This unblocks the key for other workers
// This allows safe parallel processing because two pods with the same key are never processed in
// parallel.
defer c.Queue.Done(key)

tmpTask := crv1.Pgtask{}
found, err := kubeapi.Getpgtask(c.PgtaskClient, &tmpTask, task.ObjectMeta.Name, task.ObjectMeta.Namespace)
found, err := kubeapi.Getpgtask(c.PgtaskClient, &tmpTask, keyResourceName, keyNamespace)
if !found {
log.Errorf("ERROR onAdd getting pgtask : %s", err.Error())
return
return false
}

//update pgtask
tmpTask.Status = crv1.PgtaskStatus{
State: crv1.PgtaskStateProcessed,
Message: "Successfully processed Pgtask by controller",
}

err = kubeapi.Updatepgtask(c.PgtaskClient, &tmpTask, task.ObjectMeta.Name, task.ObjectMeta.Namespace)

/**
err = c.PgtaskClient.Put().
Name(tmpTask.ObjectMeta.Name).
Namespace(tmpTask.ObjectMeta.Namespace).
Resource(crv1.PgtaskResourcePlural).
Body(tmpTask).
Do().
Error()

*/
state := crv1.PgtaskStateProcessed
message := "Successfully processed Pgtask by controller"
err = kubeapi.PatchpgtaskStatus(c.PgtaskClient, state, message, &tmpTask, keyNamespace)
if err != nil {
log.Errorf("ERROR onAdd updating pgtask status: %s", err.Error())
return
return false
}

//process the incoming task
switch task.Spec.TaskType {
switch tmpTask.Spec.TaskType {
// case crv1.PgtaskMinorUpgrade:
// log.Debug("delete minor upgrade task added")
// clusteroperator.AddUpgrade(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
case crv1.PgtaskDeletePgbouncer:
log.Debug("delete pgbouncer task added")
clusteroperator.DeletePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace)
clusteroperator.DeletePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
case crv1.PgtaskReconfigurePgbouncer:
log.Debug("Reconfiguredelete pgbouncer task added")
clusteroperator.ReconfigurePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace)
clusteroperator.ReconfigurePgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
case crv1.PgtaskAddPgbouncer:
log.Debug("add pgbouncer task added")
clusteroperator.AddPgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace)
clusteroperator.AddPgbouncerFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
case crv1.PgtaskDeletePgpool:
log.Debug("delete pgpool task added")
clusteroperator.DeletePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace)
clusteroperator.DeletePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
case crv1.PgtaskReconfigurePgpool:
log.Debug("Reconfiguredelete pgpool task added")
clusteroperator.ReconfigurePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace)
clusteroperator.ReconfigurePgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
case crv1.PgtaskAddPgpool:
log.Debug("add pgpool task added")
clusteroperator.AddPgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, task, task.ObjectMeta.Namespace)
clusteroperator.AddPgpoolFromTask(c.PgtaskClientset, c.PgtaskClient, &tmpTask, keyNamespace)
case crv1.PgtaskFailover:
log.Debug("failover task added")
clusteroperator.FailoverBase(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task, c.PgtaskConfig)
if !dupeFailover(c.PgtaskClient, &tmpTask, keyNamespace) {
clusteroperator.FailoverBase(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask, c.PgtaskConfig)
} else {
log.Debug("skipping duplicate onAdd failover task %s/%s", keyNamespace, keyResourceName)
}

case crv1.PgtaskDeleteData:
log.Debug("delete data task added")
taskoperator.RemoveData(task.ObjectMeta.Namespace, c.PgtaskClientset, task)
if !dupeDeleteData(c.PgtaskClient, &tmpTask, keyNamespace) {
taskoperator.RemoveData(keyNamespace, c.PgtaskClientset, &tmpTask)
} else {
log.Debug("skipping duplicate onAdd delete data task %s/%s", keyNamespace, keyResourceName)
}
case crv1.PgtaskDeleteBackups:
log.Debug("delete backups task added")
taskoperator.RemoveBackups(task.ObjectMeta.Namespace, c.PgtaskClientset, task)
taskoperator.RemoveBackups(keyNamespace, c.PgtaskClientset, &tmpTask)
case crv1.PgtaskBackrest:
log.Debug("backrest task added")
backrestoperator.Backrest(task.ObjectMeta.Namespace, c.PgtaskClientset, task)
backrestoperator.Backrest(keyNamespace, c.PgtaskClientset, &tmpTask)
case crv1.PgtaskBackrestRestore:
log.Debug("backrest restore task added")
backrestoperator.Restore(c.PgtaskClient, task.ObjectMeta.Namespace, c.PgtaskClientset, task)
backrestoperator.Restore(c.PgtaskClient, keyNamespace, c.PgtaskClientset, &tmpTask)

case crv1.PgtaskpgDump:
log.Debug("pgDump task added")
pgdumpoperator.Dump(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task)
pgdumpoperator.Dump(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask)
case crv1.PgtaskpgRestore:
log.Debug("pgDump restore task added")
pgdumpoperator.Restore(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task)
pgdumpoperator.Restore(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask)

case crv1.PgtaskAutoFailover:
log.Debugf("autofailover task added %s", task.ObjectMeta.Name)
log.Debugf("autofailover task added %s", keyResourceName)
case crv1.PgtaskWorkflow:
log.Debugf("workflow task added [%s] ID [%s]", task.ObjectMeta.Name, task.Spec.Parameters[crv1.PgtaskWorkflowID])
log.Debugf("workflow task added [%s] ID [%s]", keyResourceName, tmpTask.Spec.Parameters[crv1.PgtaskWorkflowID])

case crv1.PgtaskBenchmark:
log.Debug("benchmark task added")
benchmarkoperator.Create(task.ObjectMeta.Namespace, c.PgtaskClientset, c.PgtaskClient, task)
benchmarkoperator.Create(keyNamespace, c.PgtaskClientset, c.PgtaskClient, &tmpTask)

default:
log.Debugf("unknown task type on pgtask added [%s]", task.Spec.TaskType)
log.Debugf("unknown task type on pgtask added [%s]", tmpTask.Spec.TaskType)
}

return true

}

// onAdd is called when a pgtask is added
func (c *PgtaskController) onAdd(obj interface{}) {
task := obj.(*crv1.Pgtask)
// log.Debugf("[PgtaskController] onAdd ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink)

//handle the case of when the operator restarts, we do not want
//to process pgtasks already processed
if task.Status.State == crv1.PgtaskStateProcessed {
log.Debug("pgtask " + task.ObjectMeta.Name + " already processed")
return
}

key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
log.Debugf("task putting key in queue %s", key)
c.Queue.Add(key)
}

}


// onUpdate is called when a pgtask is updated
func (c *PgtaskController) onUpdate(oldObj, newObj interface{}) {
task := newObj.(*crv1.Pgtask)
Expand All @@ -223,3 +242,41 @@ func (c *PgtaskController) onDelete(obj interface{}) {
task := obj.(*crv1.Pgtask)
log.Debugf("[PgtaskController] onDelete ns=%s %s", task.ObjectMeta.Namespace, task.ObjectMeta.SelfLink)
}

//de-dupe logic for a failover, if the failover started
//parameter is set, it means a failover has already been
//started on this
func dupeFailover(restClient *rest.RESTClient, task *crv1.Pgtask, ns string) bool {
tmp := crv1.Pgtask{}

found, _ := kubeapi.Getpgtask(restClient, &tmp, task.Spec.Name, ns)
if !found {
//a big time error if this occurs
return false
}

if tmp.Spec.Parameters[util.LABEL_FAILOVER_STARTED] == "" {
return false
}

return true
}

//de-dupe logic for a delete data, if the delete data job started
//parameter is set, it means a delete data job has already been
//started on this
func dupeDeleteData(restClient *rest.RESTClient, task *crv1.Pgtask, ns string) bool {
tmp := crv1.Pgtask{}

found, _ := kubeapi.Getpgtask(restClient, &tmp, task.Spec.Name, ns)
if !found {
//a big time error if this occurs
return false
}

if tmp.Spec.Parameters[util.LABEL_DELETE_DATA_STARTED] == "" {
return false
}

return true
}
Loading