Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): Workflow stop and resume by node didn't properly support offloaded nodes. Fixes #2543 #2548

Merged
merged 9 commits into from May 1, 2020
Merged
2 changes: 1 addition & 1 deletion server/workflow/workflow_server.go
Expand Up @@ -323,7 +323,7 @@ func (s *workflowServer) TerminateWorkflow(ctx context.Context, req *workflowpkg

func (s *workflowServer) StopWorkflow(ctx context.Context, req *workflowpkg.WorkflowStopRequest) (*v1alpha1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
err := util.StopWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), req.Name, req.NodeFieldSelector, req.Message)
err := util.StopWorkflow(wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.offloadNodeStatusRepo, req.Name, req.NodeFieldSelector, req.Message)
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions test/e2e/cli_test.go
Expand Up @@ -307,9 +307,15 @@ func (s *CLISuite) TestWorkflowSuspendResume() {
})
}

func (s *CLISuite) TestNodeSuspendResume() {
// https://github.com/argoproj/argo/issues/2621
s.T().SkipNow()
func (s *CLISuite) TestNodeSuspendResumeNoPersistence() {
if s.Persistence.IsEnabled() {
// Persistence is enabled for this test, but it is not enabled for the Argo Server in this test suite.
s.T().SkipNow()
}
NodeSuspendResumeCommon(s.E2ESuite)
}

func NodeSuspendResumeCommon(s fixtures.E2ESuite) {
s.Given().
Workflow("@testdata/node-suspend.yaml").
When().
Expand Down
9 changes: 9 additions & 0 deletions test/e2e/cli_with_server_test.go
Expand Up @@ -193,6 +193,15 @@ func (s *CLIWithServerSuite) TestWorkflowSuspendResumePersistence() {
})
}

func (s *CLIWithServerSuite) TestNodeSuspendResumePersistence() {
if !s.Persistence.IsEnabled() {
// Persistence is disabled for this test, but it is enabled for the Argo Server in this test suite.
// When this is the case, this behavior is tested in cli_test.go
s.T().SkipNow()
}
NodeSuspendResumeCommon(s.E2ESuite)
}

func TestCLIWithServerSuite(t *testing.T) {
suite.Run(t, new(CLIWithServerSuite))
}
2 changes: 1 addition & 1 deletion workflow/controller/operator_test.go
Expand Up @@ -1228,7 +1228,7 @@ func TestSuspendTemplateWithFailedResume(t *testing.T) {
assert.Equal(t, 0, len(pods.Items))

// resume the workflow. verify resume workflow edits nodestatus correctly
err = util.StopWorkflow(wfcset, wf.ObjectMeta.Name, "inputs.parameters.param1.value=value1", "Step failed!")
err = util.StopWorkflow(wfcset, sqldb.ExplosiveOffloadNodeStatusRepo, wf.ObjectMeta.Name, "inputs.parameters.param1.value=value1", "Step failed!")
assert.NoError(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
Expand Down
146 changes: 76 additions & 70 deletions workflow/util/util.go
Expand Up @@ -329,17 +329,17 @@ func SuspendWorkflow(wfIf v1alpha1.WorkflowInterface, workflowName string) error
// Retries conflict errors
func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatusRepo, workflowName string, nodeFieldSelector string) error {
if len(nodeFieldSelector) > 0 {
return updateWorkflowNodeByKey(wfIf, workflowName, nodeFieldSelector, wfv1.NodeSucceeded, "")
return updateWorkflowNodeByKey(wfIf, repo, workflowName, nodeFieldSelector, wfv1.NodeSucceeded, "")
} else {
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
wf, err := wfIf.Get(workflowName, metav1.GetOptions{})
if err != nil {
return false, err
}

err = packer.DecompressWorkflow(wf)
err = decompressAndFetchOffloadedNodes(wf, repo)
if err != nil {
return false, fmt.Errorf("unable to decompress workflow: %s", err)
return false, err
}

workflowUpdated := false
Expand All @@ -348,21 +348,10 @@ func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatu
workflowUpdated = true
}

nodes := wf.Status.Nodes
if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return false, fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
var err error
nodes, err = repo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return false, fmt.Errorf("unable to retrieve offloaded nodes: %s", err)
}
}
newNodes := nodes.DeepCopy()
newNodes := wf.Status.Nodes.DeepCopy()

// To resume a workflow with a suspended node we simply mark the node as Successful
for nodeID, node := range nodes {
for nodeID, node := range wf.Status.Nodes {
if node.IsActiveSuspendNode() {
node.Phase = wfv1.NodeSucceeded
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
Expand All @@ -372,24 +361,9 @@ func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatu
}

if workflowUpdated {
if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return false, fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
offloadVersion, err := repo.Save(string(wf.UID), wf.Namespace, newNodes)
if err != nil {
return false, fmt.Errorf("unable to save offloaded nodes: %s", err)
}
wf.Status.OffloadNodeStatusVersion = offloadVersion
wf.Status.CompressedNodes = ""
wf.Status.Nodes = nil
} else {
wf.Status.Nodes = newNodes
}

err = packer.CompressWorkflowIfNeeded(wf)
err = compressAndOffloadNodes(wf, repo, newNodes)
if err != nil {
return false, fmt.Errorf("unable to compress workflow: %s", err)
return false, fmt.Errorf("unable to compress or offload workflow nodes: %s", err)
}

_, err = wfIf.Update(wf)
Expand All @@ -406,6 +380,57 @@ func ResumeWorkflow(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatu
}
}

func decompressAndFetchOffloadedNodes(wf *wfv1.Workflow, repo sqldb.OffloadNodeStatusRepo) error {
err := packer.DecompressWorkflow(wf)
if err != nil {
return err
}

if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
nodes, err := repo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return fmt.Errorf("unable to retrieve offloaded nodes: %s", err)
} else {
wf.Status.Nodes = nodes
}
}
return nil
}

func compressAndOffloadNodes(wf *wfv1.Workflow, repo sqldb.OffloadNodeStatusRepo, newNodes wfv1.Nodes) error {
wf.Status.Nodes = newNodes
doOffload := wf.Status.IsOffloadNodeStatus() || os.Getenv("ALWAYS_OFFLOAD_NODE_STATUS") == "true"

if !doOffload {
err := packer.CompressWorkflowIfNeeded(wf)
if packer.IsTooLargeError(err) && repo.IsEnabled() {
doOffload = true
err = nil
}
if err != nil {
return err
}
}

if doOffload {
if !repo.IsEnabled() {
return fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
offloadVersion, err := repo.Save(string(wf.UID), wf.Namespace, wf.Status.Nodes)
if err == nil {
wf.Status.Nodes = nil
wf.Status.CompressedNodes = ""
wf.Status.OffloadNodeStatusVersion = offloadVersion
} else {
return err
}
}
return nil
}

func selectorMatchesNode(selector fields.Selector, node wfv1.NodeStatus) bool {
nodeFields := fields.Set{
"displayName": node.DisplayName,
Expand All @@ -425,7 +450,7 @@ func selectorMatchesNode(selector fields.Selector, node wfv1.NodeStatus) bool {
return selector.Matches(nodeFields)
}

func updateWorkflowNodeByKey(wfIf v1alpha1.WorkflowInterface, workflowName string, nodeFieldSelector string, phase wfv1.NodePhase, message string) error {
func updateWorkflowNodeByKey(wfIf v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatusRepo, workflowName string, nodeFieldSelector string, phase wfv1.NodePhase, message string) error {
selector, err := fields.ParseSelector(nodeFieldSelector)

if err != nil {
Expand All @@ -437,26 +462,32 @@ func updateWorkflowNodeByKey(wfIf v1alpha1.WorkflowInterface, workflowName strin
return false, err
}

err = packer.DecompressWorkflow(wf)
err = decompressAndFetchOffloadedNodes(wf, repo)
if err != nil {
log.Fatal(err)
return false, err
}

nodeUpdated := false
for nodeID, node := range wf.Status.Nodes {
nodes := wf.Status.Nodes
for nodeID, node := range nodes {
if node.IsActiveSuspendNode() {
if selectorMatchesNode(selector, node) {
node.Phase = phase
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
if len(message) > 0 {
node.Message = message
}
wf.Status.Nodes[nodeID] = node
nodes[nodeID] = node
nodeUpdated = true
}
}
}
if nodeUpdated {
err = compressAndOffloadNodes(wf, repo, nodes)
if err != nil {
return false, fmt.Errorf("unable to compress or offload workflow nodes: %s", err)
}

_, err = wfIf.Update(wf)
if err != nil {
if apierr.IsConflict(err) {
Expand Down Expand Up @@ -614,9 +645,9 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
return nil, errors.Errorf(errors.CodeBadRequest, "workflow must be Failed/Error to retry")
}

err := packer.DecompressWorkflow(wf)
err := decompressAndFetchOffloadedNodes(wf, repo)
if err != nil {
return nil, fmt.Errorf("unable to decompress workflow: %s", err)
return nil, err
}

newWF := wf.DeepCopy()
Expand All @@ -638,16 +669,6 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
newNodes := make(map[string]wfv1.NodeStatus)
onExitNodeName := wf.ObjectMeta.Name + ".onExit"
nodes := wf.Status.Nodes
if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return nil, fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
var err error
nodes, err = repo.Get(string(wf.UID), wf.GetOffloadNodeStatusVersion())
if err != nil {
return nil, fmt.Errorf("unable to retrieve offloaded nodes: %s", err)
}
}

// Get all children of nodes that match filter
nodeIDsToReset, err := getNodeIDsToReset(restartSuccessful, nodeFieldSelector, nodes)
Expand Down Expand Up @@ -698,31 +719,16 @@ func RetryWorkflow(kubeClient kubernetes.Interface, repo sqldb.OffloadNodeStatus
}
}

if wf.Status.IsOffloadNodeStatus() {
if !repo.IsEnabled() {
return nil, fmt.Errorf(sqldb.OffloadNodeStatusDisabled)
}
offloadVersion, err := repo.Save(string(newWF.UID), newWF.Namespace, newNodes)
if err != nil {
return nil, fmt.Errorf("unable to save offloaded nodes: %s", err)
}
newWF.Status.OffloadNodeStatusVersion = offloadVersion
newWF.Status.CompressedNodes = ""
newWF.Status.Nodes = nil
} else {
newWF.Status.Nodes = newNodes
err = compressAndOffloadNodes(newWF, repo, newNodes)
if err != nil {
return nil, fmt.Errorf("unable to compress or offload workflow nodes: %s", err)
}

newWF.Status.StoredTemplates = make(map[string]wfv1.Template)
for id, tmpl := range wf.Status.StoredTemplates {
newWF.Status.StoredTemplates[id] = tmpl
}

err = packer.CompressWorkflowIfNeeded(newWF)
if err != nil {
return nil, fmt.Errorf("unable to compress workflow: %s", err)
}

return wfClient.Update(newWF)
}

Expand Down Expand Up @@ -799,9 +805,9 @@ func TerminateWorkflow(wfClient v1alpha1.WorkflowInterface, name string) error {

// StopWorkflow terminates a workflow by setting its spec.shutdown to ShutdownStrategyStop
// Or terminates a single resume step referenced by nodeFieldSelector
func StopWorkflow(wfClient v1alpha1.WorkflowInterface, name string, nodeFieldSelector string, message string) error {
func StopWorkflow(wfClient v1alpha1.WorkflowInterface, repo sqldb.OffloadNodeStatusRepo, name string, nodeFieldSelector string, message string) error {
if len(nodeFieldSelector) > 0 {
return updateWorkflowNodeByKey(wfClient, name, nodeFieldSelector, wfv1.NodeFailed, message)
return updateWorkflowNodeByKey(wfClient, repo, name, nodeFieldSelector, wfv1.NodeFailed, message)
} else {
patchObj := map[string]interface{}{
"spec": map[string]interface{}{
Expand Down