From 338156072f399dc27b134c25c29a641ae3435bd2 Mon Sep 17 00:00:00 2001 From: Anand Swaminathan Date: Fri, 4 Dec 2020 14:38:13 -0800 Subject: [PATCH] Event Version Change for Launch plan Handler (#211) * Event Version Change for Launch plan Handler * comment --- .../nodes/subworkflow/handler_test.go | 86 ++++++++++++++++++- .../nodes/subworkflow/launchplan.go | 40 +++++++-- .../nodes/subworkflow/launchplan_test.go | 3 - 3 files changed, 117 insertions(+), 12 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/handler_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/handler_test.go index 7a1c871147..71c6599459 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/handler_test.go @@ -54,7 +54,7 @@ var wfExecID = &core.WorkflowExecutionIdentifier{ Name: "name", } -func createNodeContext(phase v1alpha1.WorkflowNodePhase, n v1alpha1.ExecutableNode, s v1alpha1.ExecutableNodeStatus) *mocks3.NodeExecutionContext { +func createNodeContextWithVersion(phase v1alpha1.WorkflowNodePhase, n v1alpha1.ExecutableNode, s v1alpha1.ExecutableNodeStatus, version v1alpha1.EventVersion) *mocks3.NodeExecutionContext { wfNodeState := handler.WorkflowNodeState{} state := &workflowNodeStateHolder{s: wfNodeState} @@ -94,9 +94,25 @@ func createNodeContext(phase v1alpha1.WorkflowNodePhase, n v1alpha1.ExecutableNo }) nCtx.OnNodeStateReader().Return(nr) nCtx.OnNodeStateWriter().Return(state) + + ex := &execMocks.ExecutionContext{} + ex.OnGetEventVersion().Return(version) + ex.OnGetParentInfo().Return(nil) + ex.OnGetName().Return("name") + + nCtx.OnExecutionContext().Return(ex) + return nCtx } +func createNodeContextV1(phase v1alpha1.WorkflowNodePhase, n v1alpha1.ExecutableNode, s v1alpha1.ExecutableNodeStatus) *mocks3.NodeExecutionContext { + return createNodeContextWithVersion(phase, n, s, v1alpha1.EventVersion1) +} + +func createNodeContext(phase v1alpha1.WorkflowNodePhase, n v1alpha1.ExecutableNode, s v1alpha1.ExecutableNodeStatus) *mocks3.NodeExecutionContext { + return createNodeContextWithVersion(phase, n, s, v1alpha1.EventVersion0) +} + func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) { ctx := context.TODO() @@ -124,7 +140,7 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) { wfStatus := &mocks2.MutableWorkflowNodeStatus{} mockNodeStatus.OnGetOrCreateWorkflowStatus().Return(wfStatus) - t.Run("happy", func(t *testing.T) { + t.Run("happy v0", func(t *testing.T) { mockLPExec := &mocks.Executor{} h := New(nil, mockLPExec, promutils.NewTestScope()) @@ -146,6 +162,29 @@ func TestWorkflowNodeHandler_StartNode_Launchplan(t *testing.T) { assert.NoError(t, err) assert.Equal(t, handler.EPhaseRunning, s.Info().GetPhase()) }) + + t.Run("happy v1", func(t *testing.T) { + + mockLPExec := &mocks.Executor{} + h := New(nil, mockLPExec, promutils.NewTestScope()) + mockLPExec.OnLaunchMatch( + ctx, + mock.MatchedBy(func(o launchplan.LaunchContext) bool { + return o.ParentNodeExecution.NodeId == mockNode.GetID() && + o.ParentNodeExecution.ExecutionId == wfExecID + }), + mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool { + return assert.Equal(t, wfExecID.Project, o.Project) && assert.Equal(t, wfExecID.Domain, o.Domain) + }), + mock.MatchedBy(func(o *core.Identifier) bool { return lpID == o }), + mock.MatchedBy(func(o *core.LiteralMap) bool { return o.Literals == nil }), + ).Return(nil) + + nCtx := createNodeContextV1(v1alpha1.WorkflowNodePhaseUndefined, mockNode, mockNodeStatus) + s, err := h.Handle(ctx, nCtx) + assert.NoError(t, err) + assert.Equal(t, handler.EPhaseRunning, s.Info().GetPhase()) + }) } func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) { @@ -175,7 +214,7 @@ func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) { mockNodeStatus.OnGetAttempts().Return(attempts) mockNodeStatus.OnGetDataDir().Return(dataDir) - t.Run("stillRunning", func(t *testing.T) { + t.Run("stillRunning V0", func(t *testing.T) { mockLPExec := &mocks.Executor{} @@ -194,6 +233,25 @@ func TestWorkflowNodeHandler_CheckNodeStatus(t *testing.T) { assert.NoError(t, err) assert.Equal(t, handler.EPhaseRunning, s.Info().GetPhase()) }) + t.Run("stillRunning V1", func(t *testing.T) { + + mockLPExec := &mocks.Executor{} + + h := New(nil, mockLPExec, promutils.NewTestScope()) + mockLPExec.OnGetStatusMatch( + ctx, + mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool { + return assert.Equal(t, wfExecID.Project, o.Project) && assert.Equal(t, wfExecID.Domain, o.Domain) + }), + ).Return(&admin.ExecutionClosure{ + Phase: core.WorkflowExecution_RUNNING, + }, nil) + + nCtx := createNodeContextV1(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus) + s, err := h.Handle(ctx, nCtx) + assert.NoError(t, err) + assert.Equal(t, handler.EPhaseRunning, s.Info().GetPhase()) + }) } func TestWorkflowNodeHandler_AbortNode(t *testing.T) { @@ -223,7 +281,7 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) { mockNodeStatus.OnGetAttempts().Return(attempts) mockNodeStatus.OnGetDataDir().Return(dataDir) - t.Run("abort", func(t *testing.T) { + t.Run("abort v0", func(t *testing.T) { mockLPExec := &mocks.Executor{} nCtx := createNodeContext(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus) @@ -244,6 +302,26 @@ func TestWorkflowNodeHandler_AbortNode(t *testing.T) { assert.NoError(t, err) }) + t.Run("abort v1", func(t *testing.T) { + + mockLPExec := &mocks.Executor{} + nCtx := createNodeContextV1(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus) + + h := New(nil, mockLPExec, promutils.NewTestScope()) + mockLPExec.OnKillMatch( + ctx, + mock.MatchedBy(func(o *core.WorkflowExecutionIdentifier) bool { + return assert.Equal(t, wfExecID.Project, o.Project) && assert.Equal(t, wfExecID.Domain, o.Domain) + }), + mock.AnythingOfType(reflect.String.String()), + ).Return(nil) + + eCtx := &execMocks.ExecutionContext{} + nCtx.OnExecutionContext().Return(eCtx) + eCtx.OnGetName().Return("test") + err := h.Abort(ctx, nCtx, "test") + assert.NoError(t, err) + }) t.Run("abort-fail", func(t *testing.T) { mockLPExec := &mocks.Executor{} diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan.go index d2853c26f9..7dfff76c62 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/lyft/flytepropeller/pkg/controller/nodes/common" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytestdlib/logger" "github.com/lyft/flytestdlib/storage" @@ -18,6 +20,23 @@ type launchPlanHandler struct { launchPlan launchplan.Executor } +func getParentNodeExecutionID(nCtx handler.NodeExecutionContext) (*core.NodeExecutionIdentifier, error) { + nodeExecID := &core.NodeExecutionIdentifier{ + ExecutionId: nCtx.NodeExecutionMetadata().GetNodeExecutionID().ExecutionId, + } + if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 { + var err error + currentNodeUniqueID, err := common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeExecutionMetadata().GetNodeExecutionID().NodeId) + if err != nil { + return nil, err + } + nodeExecID.NodeId = currentNodeUniqueID + } else { + nodeExecID.NodeId = nCtx.NodeExecutionMetadata().GetNodeExecutionID().NodeId + } + return nodeExecID, nil +} + func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { nodeInputs, err := nCtx.InputReader().Get(ctx) if err != nil { @@ -25,8 +44,12 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, errMsg, nil)), nil } + parentNodeExecutionID, err := getParentNodeExecutionID(nCtx) + if err != nil { + return handler.UnknownTransition, err + } childID, err := GetChildWorkflowExecutionID( - nCtx.NodeExecutionMetadata().GetNodeExecutionID(), + parentNodeExecutionID, nCtx.CurrentAttempt(), ) if err != nil { @@ -37,7 +60,7 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No // TODO we need to add principal and nestinglevel as annotations or labels? Principal: "unknown", NestingLevel: 0, - ParentNodeExecution: nCtx.NodeExecutionMetadata().GetNodeExecutionID(), + ParentNodeExecution: parentNodeExecutionID, } err = l.launchPlan.Launch(ctx, launchCtx, childID, nCtx.Node().GetWorkflowNode().GetLaunchPlanRefID().Identifier, nodeInputs) if err != nil { @@ -60,10 +83,13 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No } func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { - + parentNodeExecutionID, err := getParentNodeExecutionID(nCtx) + if err != nil { + return handler.UnknownTransition, err + } // Handle launch plan childID, err := GetChildWorkflowExecutionID( - nCtx.NodeExecutionMetadata().GetNodeExecutionID(), + parentNodeExecutionID, nCtx.CurrentAttempt(), ) @@ -141,8 +167,12 @@ func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx hand } func (l *launchPlanHandler) HandleAbort(ctx context.Context, nCtx handler.NodeExecutionContext, reason string) error { + parentNodeExecutionID, err := getParentNodeExecutionID(nCtx) + if err != nil { + return err + } childID, err := GetChildWorkflowExecutionID( - nCtx.NodeExecutionMetadata().GetNodeExecutionID(), + parentNodeExecutionID, nCtx.CurrentAttempt(), ) if err != nil { diff --git a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan_test.go b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan_test.go index 861518de3f..c47ec12190 100644 --- a/flytepropeller/pkg/controller/nodes/subworkflow/launchplan_test.go +++ b/flytepropeller/pkg/controller/nodes/subworkflow/launchplan_test.go @@ -613,9 +613,6 @@ func TestLaunchPlanHandler_HandleAbort(t *testing.T) { launchPlan: mockLPExec, } nCtx := createNodeContext(v1alpha1.WorkflowNodePhaseExecuting, mockNode, mockNodeStatus) - eCtx := &execMocks.ExecutionContext{} - eCtx.OnGetName().Return("name") - nCtx.OnExecutionContext().Return(eCtx) err := h.HandleAbort(ctx, nCtx, "reason") assert.Error(t, err) assert.Equal(t, err, expectedErr)