Skip to content

Commit

Permalink
fix: resync of wfr (#1276)
Browse files Browse the repository at this point in the history
  • Loading branch information
cd1989 authored and caicloud-bot committed Aug 30, 2019
1 parent 47fda02 commit 967a268
Show file tree
Hide file tree
Showing 12 changed files with 25 additions and 18 deletions.
1 change: 1 addition & 0 deletions pkg/workflow/controller/controllers/configmap.go
Expand Up @@ -53,6 +53,7 @@ func NewConfigMapController(client clientset.Interface, namespace string, cm str
Key: key,
EventType: UPDATE,
Object: new,
OldObject: old,
})
},
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/workflow/controller/controllers/controller.go
Expand Up @@ -42,6 +42,7 @@ type Event struct {
Key string
EventType EventType
Object interface{}
OldObject interface{}
}

// Run ...
Expand Down Expand Up @@ -96,7 +97,7 @@ func (c *Controller) doWork(e Event) error {
case CREATE:
c.eventHandler.ObjectCreated(e.Object)
case UPDATE:
c.eventHandler.ObjectUpdated(e.Object)
c.eventHandler.ObjectUpdated(e.OldObject, e.Object)
case DELETE:
c.eventHandler.ObjectDeleted(e.Object)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/workflow/controller/controllers/exucution_cluster.go
Expand Up @@ -45,6 +45,7 @@ func NewExecutionClusterController(client clientset.Interface) *Controller {
Key: key,
EventType: UPDATE,
Object: new,
OldObject: old,
})
},
DeleteFunc: func(obj interface{}) {
Expand Down
1 change: 1 addition & 0 deletions pkg/workflow/controller/controllers/pod.go
Expand Up @@ -51,6 +51,7 @@ func NewPodController(clusterClient kubernetes.Interface, client clientset.Inter
Key: key,
EventType: UPDATE,
Object: new,
OldObject: old,
})
},
DeleteFunc: func(obj interface{}) {
Expand Down
1 change: 1 addition & 0 deletions pkg/workflow/controller/controllers/workflow_trigger.go
Expand Up @@ -53,6 +53,7 @@ func NewWorkflowTriggerController(client clientset.Interface) *Controller {
Key: key,
EventType: UPDATE,
Object: new,
OldObject: old,
})
},
DeleteFunc: func(obj interface{}) {
Expand Down
6 changes: 1 addition & 5 deletions pkg/workflow/controller/controllers/workflowrun.go
@@ -1,8 +1,6 @@
package controllers

import (
"reflect"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -41,9 +39,6 @@ func NewWorkflowRunController(client clientset.Interface) *Controller {
})
},
UpdateFunc: func(old, new interface{}) {
if reflect.DeepEqual(old, new) {
return
}
key, err := cache.MetaNamespaceKeyFunc(new)
if err != nil {
return
Expand All @@ -52,6 +47,7 @@ func NewWorkflowRunController(client clientset.Interface) *Controller {
Key: key,
EventType: UPDATE,
Object: new,
OldObject: old,
})
},
DeleteFunc: func(obj interface{}) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/workflow/controller/handlers/configmap/handler.go
Expand Up @@ -21,8 +21,8 @@ func (h *Handler) ObjectCreated(obj interface{}) {
}

// ObjectUpdated ...
func (h *Handler) ObjectUpdated(obj interface{}) {
h.process(obj)
func (h *Handler) ObjectUpdated(old, new interface{}) {
h.process(new)
}

// ObjectDeleted ...
Expand Down
4 changes: 2 additions & 2 deletions pkg/workflow/controller/handlers/executioncluster/handler.go
Expand Up @@ -33,8 +33,8 @@ func (h *Handler) ObjectCreated(obj interface{}) {
}

// ObjectUpdated ...
func (h *Handler) ObjectUpdated(obj interface{}) {
cluster, ok := obj.(*v1alpha1.ExecutionCluster)
func (h *Handler) ObjectUpdated(old, new interface{}) {
cluster, ok := new.(*v1alpha1.ExecutionCluster)
if !ok {
log.Warning("unknown resource type")
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/workflow/controller/handlers/interface.go
Expand Up @@ -5,7 +5,7 @@ type Interface interface {
// ObjectCreated handles object creation
ObjectCreated(obj interface{})
// ObjectUpdated handles object update
ObjectUpdated(new interface{})
ObjectUpdated(old, new interface{})
// ObjectDeleted handles object deletion
ObjectDeleted(obj interface{})
}
4 changes: 2 additions & 2 deletions pkg/workflow/controller/handlers/pod/handler.go
Expand Up @@ -27,8 +27,8 @@ func (h *Handler) ObjectCreated(obj interface{}) {
}

// ObjectUpdated ...
func (h *Handler) ObjectUpdated(obj interface{}) {
h.onUpdate(obj)
func (h *Handler) ObjectUpdated(old, new interface{}) {
h.onUpdate(new)
}

// ObjectDeleted ...
Expand Down
12 changes: 9 additions & 3 deletions pkg/workflow/controller/handlers/workflowrun/handler.go
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"net/http"
"reflect"

log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -85,21 +86,26 @@ func (h *Handler) ObjectCreated(obj interface{}) {
}

// ObjectUpdated handles a updated WorkflowRun
func (h *Handler) ObjectUpdated(obj interface{}) {
originWfr, ok := obj.(*v1alpha1.WorkflowRun)
func (h *Handler) ObjectUpdated(old, new interface{}) {
originWfr, ok := new.(*v1alpha1.WorkflowRun)
if !ok {
log.Warning("unknown resource type")
return
}
log.WithField("name", originWfr.Name).Debug("Start to process WorkflowRun update")

if !validate(originWfr) {
log.WithField("wfr", originWfr.Name).Warning("Invalid wfr")
return
}

// Refresh updates 'refresh' time field of the WorkflowRun in the queue.
h.LimitedQueues.Refresh(originWfr)

if reflect.DeepEqual(old, new) {
return
}
log.WithField("name", originWfr.Name).Debug("Start to process WorkflowRun update")

// Add the WorkflowRun object to GC processor, it will be checked before actually added to
// the GC queue.
h.GCProcessor.Add(originWfr)
Expand Down
4 changes: 2 additions & 2 deletions pkg/workflow/controller/handlers/workflowtrigger/handler.go
Expand Up @@ -26,8 +26,8 @@ func (h *Handler) ObjectCreated(obj interface{}) {
}

// ObjectUpdated ...
func (h *Handler) ObjectUpdated(obj interface{}) {
if wft, err := ToWorkflowTrigger(obj); err != nil {
func (h *Handler) ObjectUpdated(old, new interface{}) {
if wft, err := ToWorkflowTrigger(new); err != nil {
log.Warn("Convert to WorkflowTrigger error: ", err)
} else {
h.CronManager.UpdateCron(wft)
Expand Down

0 comments on commit 967a268

Please sign in to comment.