Skip to content

Commit

Permalink
fix(controller): Workflow stop and resume by node didn't properly sup…
Browse files Browse the repository at this point in the history
…port offloaded nodes. Fixes #2543 (#2548)
  • Loading branch information
markterm authored and alexec committed May 1, 2020
1 parent db4cfc7 commit ad28a9c
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 75 deletions.
2 changes: 1 addition & 1 deletion server/workflow/workflow_server.go
Expand Up @@ -323,7 +323,7 @@ func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg

func (s *workflowServer) StopWorkflow(ctx context.Context, req *workflowpkg.WorkflowStopRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
err := util.StopWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name, req.NodeFieldSelector, req.Message)
err := util.StopWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.offloadNodeStatusRepo, req.Name, req.NodeFieldSelector, req.Message)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions test/e2e/cli_test.go
Expand Up @@ -310,9 +310,15 @@ func (s *CLISuite) TestWorkflowSuspendResume() {
})
}

func (s *CLISuite) TestNodeSuspendResume() {
// https://github.com/argoproj/argo/issues/2621
s.T().SkipNow()
func (s *CLISuite) TestNodeSuspendResumeNoPersistence() {
if s.Persistence.IsEnabled() {
// Persistence is enabled for this test, but it is not enabled for the Argo Server in this test suite.
s.T().SkipNow()
}
NodeSuspendResumeCommon(s.E2ESuite)
}

func NodeSuspendResumeCommon(s fixtures.E2ESuite) {
s.Given().
Workflow("@testdata/node-suspend.yaml").
When().
Expand Down
37 changes: 37 additions & 0 deletions test/e2e/cli_with_server_test.go
Expand Up @@ -165,6 +165,43 @@ func (s *CLIWithServerSuite) TestWorkflowRetryPersistence() {
})
}

func (s *CLIWithServerSuite) TestWorkflowSuspendResumePersistence() {
if !s.Persistence.IsEnabled() {
// Persistence is disabled for this test, but it is enabled for the Argo Server in this test suite.
// When this is the case, this behavior is tested in cli_test.go
s.T().SkipNow()
}
s.Given().
Workflow("@testdata/sleep-3s.yaml").
When().
SubmitWorkflow().
WaitForWorkflowToStart(10*time.Second).
RunCli([]string{"suspend", "sleep-3s"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "workflow sleep-3s suspended")
}
}).
RunCli([]string{"resume", "sleep-3s"}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err) {
assert.Contains(t, output, "workflow sleep-3s resumed")
}
}).
WaitForWorkflow(20 * time.Second).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
})
}

func (s *CLIWithServerSuite) TestNodeSuspendResumePersistence() {
if !s.Persistence.IsEnabled() {
// Persistence is disabled for this test, but it is enabled for the Argo Server in this test suite.
// When this is the case, this behavior is tested in cli_test.go
s.T().SkipNow()
}
NodeSuspendResumeCommon(s.E2ESuite)
}

func TestCLIWithServerSuite(t *testing.T) {
suite.Run(t, new(CLIWithServerSuite))
}
2 changes: 1 addition & 1 deletion workflow/controller/operator_test.go
Expand Up @@ -1228,7 +1228,7 @@ func TestSuspendTemplateWithFailedResume(t *testing.T) {
assert.Equal(t, 0, len(pods.Items))

// resume the workflow. verify resume workflow edits nodestatus correctly
err = util.StopWorkflow(wfcset, wf.ObjectMeta.Name, "inputs.parameters.param1.value=value1", "Step failed!")
err = util.StopWorkflow(wfcset, sqldb.ExplosiveOffloadNodeStatusRepo, wf.ObjectMeta.Name, "inputs.parameters.param1.value=value1", "Step failed!")
assert.NoError(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
Expand Down
146 changes: 76 additions & 70 deletions workflow/util/util.go
Expand Up @@ -329,17 +329,17 @@ func SuspendWorkflow(wfIf v1alpha1.WorkflowInterface, workflowName string) error
// Retries conflict errors
func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatusRepo, workflowName string, nodeFieldSelector string) error {
if len(nodeFieldSelector) > 0 {
return updateWorkflowNodeByKey(wfIf, workflowName, nodeFieldSelector, wfv1.NodeSucceeded, "")
return updateWorkflowNodeByKey(wfIf, repo, workflowName, nodeFieldSelector, wfv1.NodeSucceeded, "")
} else {
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
wf, err := wfIf.Get(workflowName, metav1.GetOptions{})
if err != nil {
return false, err
}

err = packer.DecompressWorkflow(wf)
err = decompressAndFetchOffloadedNodes(wf, repo)
if err != nil {
return false, fmt.Errorf("unable to decompress workflow: %s", err)
return false, err
}

workflowUpdated := false
Expand All @@ -348,21 +348,10 @@ func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatu
workflowUpdated = true
}

nodes := wf.Status.Nodes
if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return false, fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
var err error
nodes, err = repo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return false, fmt.Errorf("unable to retrieve offloaded nodes: %s", err)
}
}
newNodes := nodes.DeepCopy()
newNodes := wf.Status.Nodes.DeepCopy()

// To resume a workflow with a suspended node we simply mark the node as Successful
for nodeID, node := range nodes {
for nodeID, node := range wf.Status.Nodes {
if node.IsActiveSuspendNode() {
node.Phase = wfv1.NodeSucceeded
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
Expand All @@ -372,24 +361,9 @@ func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatu
}

if workflowUpdated {
if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return false, fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
offloadVersion, err := repo.Save(string(wf.UID), wf.Namespace, newNodes)
if err != nil {
return false, fmt.Errorf("unable to save offloaded nodes: %s", err)
}
wf.Status.OffloadNodeStatusVersion = offloadVersion
wf.Status.CompressedNodes = ""
wf.Status.Nodes = nil
} else {
wf.Status.Nodes = newNodes
}

err = packer.CompressWorkflowIfNeeded(wf)
err = compressAndOffloadNodes(wf, repo, newNodes)
if err != nil {
return false, fmt.Errorf("unable to compress workflow: %s", err)
return false, fmt.Errorf("unable to compress or offload workflow nodes: %s", err)
}

_, err = wfIf.Update(wf)
Expand All @@ -406,6 +380,57 @@ func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatu
}
}

func decompressAndFetchOffloadedNodes(wf *wfv1.Workflow, repo sqldb.OffloadNodeStatusRepo) error {
err := packer.DecompressWorkflow(wf)
if err != nil {
return err
}

if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
nodes, err := repo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return fmt.Errorf("unable to retrieve offloaded nodes: %s", err)
} else {
wf.Status.Nodes = nodes
}
}
return nil
}

func compressAndOffloadNodes(wf *wfv1.Workflow, repo sqldb.OffloadNodeStatusRepo, newNodes wfv1.Nodes) error {
wf.Status.Nodes = newNodes
doOffload := wf.Status.IsOffloadNodeStatus() || os.Getenv("ALWAYS_OFFLOAD_NODE_STATUS") == "true"

if !doOffload {
err := packer.CompressWorkflowIfNeeded(wf)
if packer.IsTooLargeError(err) && repo.IsEnabled() {
doOffload = true
err = nil
}
if err != nil {
return err
}
}

if doOffload {
if !repo.IsEnabled() {
return fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
offloadVersion, err := repo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes)
if err == nil {
wf.Status.Nodes = nil
wf.Status.CompressedNodes = ""
wf.Status.OffloadNodeStatusVersion = offloadVersion
} else {
return err
}
}
return nil
}

func selectorMatchesNode(selector fields.Selector, node wfv1.NodeStatus) bool {
nodeFields := fields.Set{
"displayName": node.DisplayName,
Expand All @@ -425,7 +450,7 @@ func selectorMatchesNode(selector fields.Selector, node wfv1.NodeStatus) bool {
return selector.Matches(nodeFields)
}

func updateWorkflowNodeByKey(wfIf v1alpha1.WorkflowInterface, workflowName string, nodeFieldSelector string, phase wfv1.NodePhase, message string) error {
func updateWorkflowNodeByKey(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatusRepo, workflowName string, nodeFieldSelector string, phase wfv1.NodePhase, message string) error {
selector, err := fields.ParseSelector(nodeFieldSelector)

if err != nil {
Expand All @@ -437,26 +462,32 @@ func updateWorkflowNodeByKey(wfIf v1alpha1.WorkflowInterface, workflowName strin
return false, err
}

err = packer.DecompressWorkflow(wf)
err = decompressAndFetchOffloadedNodes(wf, repo)
if err != nil {
log.Fatal(err)
return false, err
}

nodeUpdated := false
for nodeID, node := range wf.Status.Nodes {
nodes := wf.Status.Nodes
for nodeID, node := range nodes {
if node.IsActiveSuspendNode() {
if selectorMatchesNode(selector, node) {
node.Phase = phase
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
if len(message) > 0 {
node.Message = message
}
wf.Status.Nodes[nodeID] = node
nodes[nodeID] = node
nodeUpdated = true
}
}
}
if nodeUpdated {
err = compressAndOffloadNodes(wf, repo, nodes)
if err != nil {
return false, fmt.Errorf("unable to compress or offload workflow nodes: %s", err)
}

_, err = wfIf.Update(wf)
if err != nil {
if apierr.IsConflict(err) {
Expand Down Expand Up @@ -614,9 +645,9 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
return nil, errors.Errorf(errors.CodeBadRequest, "workflow must be Failed/Error to retry")
}

err := packer.DecompressWorkflow(wf)
err := decompressAndFetchOffloadedNodes(wf, repo)
if err != nil {
return nil, fmt.Errorf("unable to decompress workflow: %s", err)
return nil, err
}

newWF := wf.DeepCopy()
Expand All @@ -638,16 +669,6 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
newNodes := make(map[string]wfv1.NodeStatus)
onExitNodeName := wf.ObjectMeta.Name + ".onExit"
nodes := wf.Status.Nodes
if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return nil, fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
var err error
nodes, err = repo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return nil, fmt.Errorf("unable to retrieve offloaded nodes: %s", err)
}
}

// Get all children of nodes that match filter
nodeIDsToReset, err := getNodeIDsToReset(restartSuccessful, nodeFieldSelector, nodes)
Expand Down Expand Up @@ -698,31 +719,16 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
}
}

if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return nil, fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
offloadVersion, err := repo.Save(string(newWF.UID), newWF.Namespace, newNodes)
if err != nil {
return nil, fmt.Errorf("unable to save offloaded nodes: %s", err)
}
newWF.Status.OffloadNodeStatusVersion = offloadVersion
newWF.Status.CompressedNodes = ""
newWF.Status.Nodes = nil
} else {
newWF.Status.Nodes = newNodes
err = compressAndOffloadNodes(newWF, repo, newNodes)
if err != nil {
return nil, fmt.Errorf("unable to compress or offload workflow nodes: %s", err)
}

newWF.Status.StoredTemplates = make(map[string]wfv1.Template)
for id, tmpl := range wf.Status.StoredTemplates {
newWF.Status.StoredTemplates[id] = tmpl
}

err = packer.CompressWorkflowIfNeeded(newWF)
if err != nil {
return nil, fmt.Errorf("unable to compress workflow: %s", err)
}

return wfClient.Update(newWF)
}

Expand Down Expand Up @@ -799,9 +805,9 @@ func TerminateWorkflow(wfClient v1alpha1.WorkflowInterface, name string) error {

// StopWorkflow terminates a workflow by setting its spec.shutdown to ShutdownStrategyStop
// Or terminates a single resume step referenced by nodeFieldSelector
func StopWorkflow(wfClient v1alpha1.WorkflowInterface, name string, nodeFieldSelector string, message string) error {
func StopWorkflow(wfClient v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatusRepo, name string, nodeFieldSelector string, message string) error {
if len(nodeFieldSelector) > 0 {
return updateWorkflowNodeByKey(wfClient, name, nodeFieldSelector, wfv1.NodeFailed, message)
return updateWorkflowNodeByKey(wfClient, repo, name, nodeFieldSelector, wfv1.NodeFailed, message)
} else {
patchObj := map[string]interface{}{
"spec": map[string]interface{}{
Expand Down

0 comments on commit ad28a9c

Please sign in to comment.