Skip to content

Commit

Permalink
fix(controller): dehydrate workflow before deleting offloaded node st…
Browse files Browse the repository at this point in the history
…atus (#6112)
  • Loading branch information
copierrj committed Jun 21, 2021
1 parent 510b4a8 commit 9fe8c10
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 32 deletions.
2 changes: 1 addition & 1 deletion persist/sqldb/explosive_offload_node_status_repo.go
Expand Up @@ -33,6 +33,6 @@ func (n *explosiveOffloadNodeStatusRepo) Delete(string, string) error {
return OffloadNotSupportedError
}

func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) ([]UUIDVersion, error) {
func (n *explosiveOffloadNodeStatusRepo) ListOldOffloads(string) (map[string][]string, error) {
return nil, OffloadNotSupportedError
}
8 changes: 4 additions & 4 deletions persist/sqldb/mocks/OffloadNodeStatusRepo.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions persist/sqldb/offload_node_status_repo.go
Expand Up @@ -26,7 +26,7 @@ type OffloadNodeStatusRepo interface {
Save(uid, namespace string, nodes wfv1.Nodes) (string, error)
Get(uid, version string) (wfv1.Nodes, error)
List(namespace string) (map[UUIDVersion]wfv1.Nodes, error)
ListOldOffloads(namespace string) ([]UUIDVersion, error)
ListOldOffloads(namespace string) (map[string][]string, error)
Delete(uid, version string) error
IsEnabled() bool
}
Expand Down Expand Up @@ -178,7 +178,7 @@ func (wdc *nodeOffloadRepo) List(namespace string) (map[UUIDVersion]wfv1.Nodes,
return res, nil
}

func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) ([]UUIDVersion, error) {
func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) (map[string][]string, error) {
log.WithFields(log.Fields{"namespace": namespace}).Debug("Listing old offloaded nodes")
var records []UUIDVersion
err := wdc.session.
Expand All @@ -191,7 +191,11 @@ func (wdc *nodeOffloadRepo) ListOldOffloads(namespace string) ([]UUIDVersion, er
if err != nil {
return nil, err
}
return records, nil
x := make(map[string][]string)
for _, r := range records {
x[r.UID] = append(x[r.UID], r.Version)
}
return x, nil
}

func (wdc *nodeOffloadRepo) Delete(uid, version string) error {
Expand Down
71 changes: 47 additions & 24 deletions workflow/controller/controller.go
Expand Up @@ -187,6 +187,7 @@ var indexers = cache.Indexers{
indexes.SemaphoreConfigIndexName: indexes.WorkflowSemaphoreKeysIndexFunc(),
indexes.WorkflowPhaseIndex: indexes.MetaWorkflowPhaseIndexFunc(),
indexes.ConditionsIndex: indexes.ConditionsIndexFunc,
indexes.UIDIndex: indexes.MetaUIDFunc,
}

// Run starts an Workflow resource controller
Expand Down Expand Up @@ -553,37 +554,59 @@ func (wfc *WorkflowController) workflowGarbageCollector(stopCh <-chan struct{})
log.WithField("err", err).Error("Failed to list old offloaded nodes")
continue
}
if len(oldRecords) == 0 {
log.Info("Zero old offloads, nothing to do")
continue
}
// get every lives workflow (1000s) into a map
liveOffloadNodeStatusVersions := make(map[types.UID]string)
workflows, err := util.NewWorkflowLister(wfc.wfInformer).List()
if err != nil {
log.WithField("err", err).Error("Failed to list incomplete workflows")
continue
}
for _, wf := range workflows {
// this could be the empty string - as it is no longer offloaded
liveOffloadNodeStatusVersions[wf.UID] = wf.Status.OffloadNodeStatusVersion
}
log.WithFields(log.Fields{"len_wfs": len(liveOffloadNodeStatusVersions), "len_old_offloads": len(oldRecords)}).Info("Deleting old offloads that are not live")
for _, record := range oldRecords {
// this could be empty string
nodeStatusVersion, ok := liveOffloadNodeStatusVersions[types.UID(record.UID)]
if !ok || nodeStatusVersion != record.Version {
err := wfc.offloadNodeStatusRepo.Delete(record.UID, record.Version)
if err != nil {
log.WithField("err", err).Error("Failed to delete offloaded nodes")
}
log.WithField("len_wfs", len(oldRecords)).Info("Deleting old offloads that are not live")
for uid, versions := range oldRecords {
if err := wfc.deleteOffloadedNodesForWorkflow(uid, versions); err != nil {
log.WithError(err).WithField("uid", uid).Error("Failed to delete old offloaded nodes")
}
}
log.Info("Workflow GC finished")
}
}
}
}

func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versions []string) error {
workflows, err := wfc.wfInformer.GetIndexer().ByIndex(indexes.UIDIndex, uid)
if err != nil {
return err
}
var wf *wfv1.Workflow
switch l := len(workflows); l {
case 0:
log.WithField("uid", uid).Info("Workflow missing, probably deleted")
case 1:
un := workflows[0].(*unstructured.Unstructured)
wf, err = util.FromUnstructured(un)
if err != nil {
return err
}
key := wf.ObjectMeta.Namespace + "/" + wf.ObjectMeta.Name
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)
// workflow might still be hydrated
if wfc.hydrator.IsHydrated(wf) {
log.WithField("uid", wf.UID).Info("Hydrated workflow encountered")
err = wfc.hydrator.Dehydrate(wf)
if err != nil {
return err
}
}
default:
return fmt.Errorf("expected no more than 1 workflow, got %d", l)
}
for _, version := range versions {
// skip delete if offload is live
if wf != nil && wf.Status.OffloadNodeStatusVersion == version {
continue
}
if err := wfc.offloadNodeStatusRepo.Delete(uid, version); err != nil {
return err
}
}
return nil
}

func (wfc *WorkflowController) archivedWorkflowGarbageCollector(stopCh <-chan struct{}) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

Expand Down
1 change: 1 addition & 0 deletions workflow/controller/indexes/indexes.go
Expand Up @@ -15,4 +15,5 @@ const (
PodPhaseIndex = "pod.phase"
ConditionsIndex = "status.conditions"
SemaphoreConfigIndexName = "bySemaphoreConfigMap"
UIDIndex = "uid"
)
14 changes: 14 additions & 0 deletions workflow/controller/indexes/uid_index.go
@@ -0,0 +1,14 @@
package indexes

import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"
)

var MetaUIDFunc cache.IndexFunc = func(obj interface{}) ([]string, error) {
v, err := meta.Accessor(obj)
if err != nil {
return nil, nil
}
return []string{string(v.GetUID())}, nil
}

0 comments on commit 9fe8c10

Please sign in to comment.