From 754079c0358b83b39708482454d3b6e0c5acb760 Mon Sep 17 00:00:00 2001 From: Haytham AbuelFutuh Date: Thu, 14 Jan 2021 15:05:31 -0800 Subject: [PATCH] [Compiler] Add execution edges only for top level nodes (#216) * Fix branch compiler * cleanup * fix tests * clean up * lint * docs * cleanup * revert config change * avoid recompiling subworkflows * Also limit execution edges from start node * PR Comments * revert config.yaml * Use a smaller test file --- pkg/compiler/builders.go | 16 +- pkg/compiler/common/builder.go | 2 +- pkg/compiler/common/mocks/workflow.go | 41 + pkg/compiler/common/mocks/workflow_builder.go | 45 +- pkg/compiler/common/reader.go | 1 + pkg/compiler/test/compiler_test.go | 82 +- .../test/testdata/dynamic/success_1.json | 767 ++++++++++++++++++ .../transformers/k8s/builder_mock_test.go | 4 + pkg/compiler/validators/node.go | 21 +- pkg/compiler/workflow_compiler.go | 44 +- 10 files changed, 993 insertions(+), 30 deletions(-) create mode 100644 pkg/compiler/test/testdata/dynamic/success_1.json diff --git a/pkg/compiler/builders.go b/pkg/compiler/builders.go index c34ee1d114..89e63f321e 100755 --- a/pkg/compiler/builders.go +++ b/pkg/compiler/builders.go @@ -22,9 +22,10 @@ type workflowBuilder struct { // These are references to all subgraphs and tasks passed to CompileWorkflow. They will be passed around but will // not show in their entirety in the final Graph. The required subset of these will be added to each subgraph as // the compile traverses them. - allLaunchPlans map[string]c.InterfaceProvider - allTasks c.TaskIndex - allSubWorkflows c.WorkflowIndex + allLaunchPlans map[string]c.InterfaceProvider + allTasks c.TaskIndex + allSubWorkflows c.WorkflowIndex + allCompiledSubWorkflows c.WorkflowIndex } func (w workflowBuilder) GetFailureNode() c.Node { @@ -70,8 +71,13 @@ func (w workflowBuilder) GetLaunchPlan(id c.LaunchPlanID) (wf c.InterfaceProvide return } -func (w workflowBuilder) UpdateSubWorkflow(id c.WorkflowID, compiledWorkflow *core.CompiledWorkflow) { - w.allSubWorkflows[id.String()] = compiledWorkflow +func (w workflowBuilder) StoreCompiledSubWorkflow(id c.WorkflowID, compiledWorkflow *core.CompiledWorkflow) { + w.allCompiledSubWorkflows[id.String()] = compiledWorkflow +} + +func (w workflowBuilder) GetCompiledSubWorkflow(id c.WorkflowID) (wf *core.CompiledWorkflow, found bool) { + wf, found = w.allCompiledSubWorkflows[id.String()] + return } func (w workflowBuilder) GetSubWorkflow(id c.WorkflowID) (wf *core.CompiledWorkflow, found bool) { diff --git a/pkg/compiler/common/builder.go b/pkg/compiler/common/builder.go index daf6e063cc..e338756053 100644 --- a/pkg/compiler/common/builder.go +++ b/pkg/compiler/common/builder.go @@ -16,7 +16,7 @@ const ( // A mutable workflow used during the build of the intermediate layer. type WorkflowBuilder interface { Workflow - UpdateSubWorkflow(id WorkflowID, compiledWorkflow *core.CompiledWorkflow) + StoreCompiledSubWorkflow(id WorkflowID, compiledWorkflow *core.CompiledWorkflow) AddExecutionEdge(nodeFrom, nodeTo NodeID) AddNode(n NodeBuilder, errs errors.CompileErrors) (node NodeBuilder, ok bool) ValidateWorkflow(fg *core.CompiledWorkflow, errs errors.CompileErrors) (Workflow, bool) diff --git a/pkg/compiler/common/mocks/workflow.go b/pkg/compiler/common/mocks/workflow.go index 3bec3a59b7..410ecd5b7e 100644 --- a/pkg/compiler/common/mocks/workflow.go +++ b/pkg/compiler/common/mocks/workflow.go @@ -14,6 +14,47 @@ type Workflow struct { mock.Mock } +type Workflow_GetCompiledSubWorkflow struct { + *mock.Call +} + +func (_m Workflow_GetCompiledSubWorkflow) Return(wf *core.CompiledWorkflow, found bool) *Workflow_GetCompiledSubWorkflow { + return &Workflow_GetCompiledSubWorkflow{Call: _m.Call.Return(wf, found)} +} + +func (_m *Workflow) OnGetCompiledSubWorkflow(id core.Identifier) *Workflow_GetCompiledSubWorkflow { + c := _m.On("GetCompiledSubWorkflow", id) + return &Workflow_GetCompiledSubWorkflow{Call: c} +} + +func (_m *Workflow) OnGetCompiledSubWorkflowMatch(matchers ...interface{}) *Workflow_GetCompiledSubWorkflow { + c := _m.On("GetCompiledSubWorkflow", matchers...) + return &Workflow_GetCompiledSubWorkflow{Call: c} +} + +// GetCompiledSubWorkflow provides a mock function with given fields: id +func (_m *Workflow) GetCompiledSubWorkflow(id core.Identifier) (*core.CompiledWorkflow, bool) { + ret := _m.Called(id) + + var r0 *core.CompiledWorkflow + if rf, ok := ret.Get(0).(func(core.Identifier) *core.CompiledWorkflow); ok { + r0 = rf(id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*core.CompiledWorkflow) + } + } + + var r1 bool + if rf, ok := ret.Get(1).(func(core.Identifier) bool); ok { + r1 = rf(id) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + type Workflow_GetCoreWorkflow struct { *mock.Call } diff --git a/pkg/compiler/common/mocks/workflow_builder.go b/pkg/compiler/common/mocks/workflow_builder.go index d2e93ffde5..9da6ad15db 100644 --- a/pkg/compiler/common/mocks/workflow_builder.go +++ b/pkg/compiler/common/mocks/workflow_builder.go @@ -62,6 +62,47 @@ func (_m *WorkflowBuilder) AddNode(n common.NodeBuilder, errs errors.CompileErro return r0, r1 } +type WorkflowBuilder_GetCompiledSubWorkflow struct { + *mock.Call +} + +func (_m WorkflowBuilder_GetCompiledSubWorkflow) Return(wf *core.CompiledWorkflow, found bool) *WorkflowBuilder_GetCompiledSubWorkflow { + return &WorkflowBuilder_GetCompiledSubWorkflow{Call: _m.Call.Return(wf, found)} +} + +func (_m *WorkflowBuilder) OnGetCompiledSubWorkflow(id core.Identifier) *WorkflowBuilder_GetCompiledSubWorkflow { + c := _m.On("GetCompiledSubWorkflow", id) + return &WorkflowBuilder_GetCompiledSubWorkflow{Call: c} +} + +func (_m *WorkflowBuilder) OnGetCompiledSubWorkflowMatch(matchers ...interface{}) *WorkflowBuilder_GetCompiledSubWorkflow { + c := _m.On("GetCompiledSubWorkflow", matchers...) + return &WorkflowBuilder_GetCompiledSubWorkflow{Call: c} +} + +// GetCompiledSubWorkflow provides a mock function with given fields: id +func (_m *WorkflowBuilder) GetCompiledSubWorkflow(id core.Identifier) (*core.CompiledWorkflow, bool) { + ret := _m.Called(id) + + var r0 *core.CompiledWorkflow + if rf, ok := ret.Get(0).(func(core.Identifier) *core.CompiledWorkflow); ok { + r0 = rf(id) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*core.CompiledWorkflow) + } + } + + var r1 bool + if rf, ok := ret.Get(1).(func(core.Identifier) bool); ok { + r1 = rf(id) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + type WorkflowBuilder_GetCoreWorkflow struct { *mock.Call } @@ -464,8 +505,8 @@ func (_m *WorkflowBuilder) NewNodeBuilder(n *core.Node) common.NodeBuilder { return r0 } -// UpdateSubWorkflow provides a mock function with given fields: id, compiledWorkflow -func (_m *WorkflowBuilder) UpdateSubWorkflow(id core.Identifier, compiledWorkflow *core.CompiledWorkflow) { +// StoreCompiledSubWorkflow provides a mock function with given fields: id, compiledWorkflow +func (_m *WorkflowBuilder) StoreCompiledSubWorkflow(id core.Identifier, compiledWorkflow *core.CompiledWorkflow) { _m.Called(id, compiledWorkflow) } diff --git a/pkg/compiler/common/reader.go b/pkg/compiler/common/reader.go index 2edd098da9..11f3010cf3 100644 --- a/pkg/compiler/common/reader.go +++ b/pkg/compiler/common/reader.go @@ -17,6 +17,7 @@ type Workflow interface { GetTask(id TaskID) (task Task, found bool) GetLaunchPlan(id LaunchPlanID) (wf InterfaceProvider, found bool) GetSubWorkflow(id WorkflowID) (wf *core.CompiledWorkflow, found bool) + GetCompiledSubWorkflow(id WorkflowID) (wf *core.CompiledWorkflow, found bool) GetCoreWorkflow() *core.CompiledWorkflow GetFailureNode() Node GetNodes() NodeIndex diff --git a/pkg/compiler/test/compiler_test.go b/pkg/compiler/test/compiler_test.go index e582896b35..70f84d7a58 100644 --- a/pkg/compiler/test/compiler_test.go +++ b/pkg/compiler/test/compiler_test.go @@ -101,6 +101,85 @@ func marshalProto(t *testing.T, filename string, p proto.Message) { assert.NoError(t, ioutil.WriteFile(strings.Replace(filename, filepath.Ext(filename), ".yaml", 1), b, os.ModePerm)) } +func TestDynamic(t *testing.T) { + errors.SetConfig(errors.Config{IncludeSource: true}) + assert.NoError(t, filepath.Walk("testdata/dynamic", func(path string, info os.FileInfo, err error) error { + if info.IsDir() { + return nil + } + + t.Run(path, func(t *testing.T) { + // If you want to debug a single use-case. Uncomment this line. + //if !strings.HasSuffix(path, "success_1.json") { + // t.SkipNow() + //} + + raw, err := ioutil.ReadFile(path) + assert.NoError(t, err) + wf := &core.DynamicJobSpec{} + err = jsonpb.UnmarshalString(string(raw), wf) + if !assert.NoError(t, err) { + t.FailNow() + } + + t.Log("Compiling Workflow") + compiledTasks := mustCompileTasks(t, wf.Tasks) + wfTemplate := &core.WorkflowTemplate{ + Id: &core.Identifier{ + Domain: "domain", + Name: "name", + Version: "version", + }, + Interface: &core.TypedInterface{ + Inputs: &core.VariableMap{Variables: map[string]*core.Variable{}}, + Outputs: &core.VariableMap{Variables: map[string]*core.Variable{ + "o0": { + Type: &core.LiteralType{ + Type: &core.LiteralType_CollectionType{ + CollectionType: &core.LiteralType{ + Type: &core.LiteralType_Simple{ + Simple: core.SimpleType_INTEGER, + }, + }, + }, + }, + }, + }}, + }, + Nodes: wf.Nodes, + Outputs: wf.Outputs, + } + compiledWfc, err := compiler.CompileWorkflow(wfTemplate, wf.Subworkflows, compiledTasks, + []common.InterfaceProvider{}) + if !assert.NoError(t, err) { + t.FailNow() + } + + inputs := map[string]interface{}{} + for varName, v := range compiledWfc.Primary.Template.Interface.Inputs.Variables { + inputs[varName] = utils.MustMakeDefaultLiteralForType(v.Type) + } + + flyteWf, err := k8s.BuildFlyteWorkflow(compiledWfc, + utils.MustMakeLiteral(inputs).GetMap(), + &core.WorkflowExecutionIdentifier{ + Project: "hello", + Domain: "domain", + Name: "name", + }, + "namespace") + if assert.NoError(t, err) { + raw, err := json.Marshal(flyteWf) + if assert.NoError(t, err) { + assert.NotEmpty(t, raw) + } + } + }) + + return nil + })) +} + func TestBranches(t *testing.T) { errors.SetConfig(errors.Config{IncludeSource: true}) assert.NoError(t, filepath.Walk("testdata/branch", func(path string, info os.FileInfo, err error) error { @@ -109,7 +188,8 @@ func TestBranches(t *testing.T) { } t.Run(path, func(t *testing.T) { - //if !strings.HasSuffix(path, "success_6.json") { + // If you want to debug a single use-case. Uncomment this line. + //if !strings.HasSuffix(path, "success_1.json") { // t.SkipNow() //} diff --git a/pkg/compiler/test/testdata/dynamic/success_1.json b/pkg/compiler/test/testdata/dynamic/success_1.json new file mode 100644 index 0000000000..0d0eb1ed47 --- /dev/null +++ b/pkg/compiler/test/testdata/dynamic/success_1.json @@ -0,0 +1,767 @@ +{ + "nodes": [ + { + "id": "dynamic-merge-sort-remotely-n0", + "metadata": { + "name": "flytekit.annotated.python_function_task.recipes.03_advanced.advanced_lang_features_copy.split", + "retries": {}, + "interruptible": false + }, + "inputs": [ + { + "var": "input", + "binding": { + "collection": { + "bindings": [ + { + "scalar": { + "primitive": { + "integer": "934" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "3465" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "4552" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "8436" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "6044" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "466" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "9706" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "8815" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "327" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "7458" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "5040" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "9129" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "4348" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "459" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "1752" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "4103" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "2634" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "222" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "4416" + } + } + }, + { + "scalar": { + "primitive": { + "integer": "5188" + } + } + } + ] + } + } + } + ], + "taskNode": { + "referenceId": { + "resourceType": "TASK", + "project": "flytetester", + "domain": "development", + "name": "recipes.03_advanced.advanced_lang_features_copy.split", + "version": "f6e892be276205a3cb11c058f389f69e02008b65" + } + } + }, + { + "id": "dynamic-merge-sort-remotely-n1", + "metadata": { + "name": "flytekit.annotated.workflow.recipes.03_advanced.advanced_lang_features_copy.merge_sort", + "retries": {}, + "interruptible": false + }, + "inputs": [ + { + "var": "count", + "binding": { + "promise": { + "nodeId": "dynamic-merge-sort-remotely-n0", + "var": "o2" + } + } + }, + { + "var": "input", + "binding": { + "promise": { + "nodeId": "dynamic-merge-sort-remotely-n0", + "var": "o0" + } + } + } + ], + "upstreamNodeIds": [ + "dynamic-merge-sort-remotely-n0" + ], + "workflowNode": { + "subWorkflowRef": { + "resourceType": "WORKFLOW", + "project": "flytetester", + "domain": "development", + "name": "recipes.03_advanced.advanced_lang_features_copy.merge_sort", + "version": "f6e892be276205a3cb11c058f389f69e02008b65" + } + } + }, + { + "id": "dynamic-merge-sort-remotely-n2", + "metadata": { + "name": "flytekit.annotated.workflow.recipes.03_advanced.advanced_lang_features_copy.merge_sort", + "retries": {}, + "interruptible": false + }, + "inputs": [ + { + "var": "count", + "binding": { + "promise": { + "nodeId": "dynamic-merge-sort-remotely-n0", + "var": "o2" + } + } + }, + { + "var": "input", + "binding": { + "promise": { + "nodeId": "dynamic-merge-sort-remotely-n0", + "var": "o1" + } + } + } + ], + "upstreamNodeIds": [ + "dynamic-merge-sort-remotely-n0" + ], + "workflowNode": { + "subWorkflowRef": { + "resourceType": "WORKFLOW", + "project": "flytetester", + "domain": "development", + "name": "recipes.03_advanced.advanced_lang_features_copy.merge_sort", + "version": "f6e892be276205a3cb11c058f389f69e02008b65" + } + } + }, + { + "id": "dynamic-merge-sort-remotely-n3", + "metadata": { + "name": "flytekit.annotated.python_function_task.recipes.03_advanced.advanced_lang_features_copy.merge", + "retries": {}, + "interruptible": false + }, + "inputs": [ + { + "var": "x", + "binding": { + "promise": { + "nodeId": "dynamic-merge-sort-remotely-n1", + "var": "o0" + } + } + }, + { + "var": "y", + "binding": { + "promise": { + "nodeId": "dynamic-merge-sort-remotely-n2", + "var": "o0" + } + } + } + ], + "upstreamNodeIds": [ + "dynamic-merge-sort-remotely-n1", + "dynamic-merge-sort-remotely-n2" + ], + "taskNode": { + "referenceId": { + "resourceType": "TASK", + "project": "flytetester", + "domain": "development", + "name": "recipes.03_advanced.advanced_lang_features_copy.merge", + "version": "f6e892be276205a3cb11c058f389f69e02008b65" + } + } + } + ], + "minSuccesses": "4", + "outputs": [ + { + "var": "o0", + "binding": { + "promise": { + "nodeId": "dynamic-merge-sort-remotely-n3", + "var": "o0" + } + } + } + ], + "tasks": [ + { + "id": { + "resourceType": "TASK", + "project": "flytetester", + "domain": "development", + "name": "recipes.03_advanced.advanced_lang_features_copy.split", + "version": "f6e892be276205a3cb11c058f389f69e02008b65" + }, + "type": "python-task", + "metadata": { + "runtime": { + "type": "FLYTE_SDK", + "version": "0.16.0", + "flavor": "python" + }, + "retries": {}, + "interruptible": false + }, + "interface": { + "inputs": { + "variables": { + "input": { + "type": { + "collectionType": { + "simple": "INTEGER" + } + }, + "description": "input" + } + } + }, + "outputs": { + "variables": { + "o1": { + "type": { + "collectionType": { + "simple": "INTEGER" + } + }, + "description": "o1" + }, + "o2": { + "type": { + "simple": "INTEGER" + }, + "description": "o2" + }, + "o0": { + "type": { + "collectionType": { + "simple": "INTEGER" + } + }, + "description": "o0" + } + } + } + }, + "container": { + "image": "flytecookbook:f6e892be276205a3cb11c058f389f69e02008b65", + "args": [ + "pyflyte-execute", + "--task-module", + "recipes.03_advanced.advanced_lang_features_copy", + "--task-name", + "split", + "--inputs", + "{{.input}}", + "--output-prefix", + "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}" + ], + "resources": {}, + "env": [ + { + "key": "FLYTE_INTERNAL_CONFIGURATION_PATH", + "value": "/root/sandbox.config" + }, + { + "key": "FLYTE_INTERNAL_IMAGE", + "value": "flytecookbook:f6e892be276205a3cb11c058f389f69e02008b65" + } + ] + } + }, + { + "id": { + "resourceType": "TASK", + "project": "flytetester", + "domain": "development", + "name": "recipes.03_advanced.advanced_lang_features_copy.merge", + "version": "f6e892be276205a3cb11c058f389f69e02008b65" + }, + "type": "python-task", + "metadata": { + "runtime": { + "type": "FLYTE_SDK", + "version": "0.16.0", + "flavor": "python" + }, + "retries": {}, + "interruptible": false + }, + "interface": { + "inputs": { + "variables": { + "x": { + "type": { + "collectionType": { + "simple": "INTEGER" + } + }, + "description": "x" + }, + "y": { + "type": { + "collectionType": { + "simple": "INTEGER" + } + }, + "description": "y" + } + } + }, + "outputs": { + "variables": { + "o0": { + "type": { + "collectionType": { + "simple": "INTEGER" + } + }, + "description": "o0" + } + } + } + }, + "container": { + "image": "flytecookbook:f6e892be276205a3cb11c058f389f69e02008b65", + "args": [ + "pyflyte-execute", + "--task-module", + "recipes.03_advanced.advanced_lang_features_copy", + "--task-name", + "merge", + "--inputs", + "{{.input}}", + "--output-prefix", + "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}" + ], + "resources": {}, + "env": [ + { + "key": "FLYTE_INTERNAL_CONFIGURATION_PATH", + "value": "/root/sandbox.config" + }, + { + "key": "FLYTE_INTERNAL_IMAGE", + "value": "flytecookbook:f6e892be276205a3cb11c058f389f69e02008b65" + } + ] + } + }, + { + "id": { + "resourceType": "TASK", + "project": "flytetester", + "domain": "development", + "name": "recipes.03_advanced.advanced_lang_features_copy.merge_sort_remotely", + "version": "f6e892be276205a3cb11c058f389f69e02008b65" + }, + "type": "dynamic-task", + "metadata": { + "runtime": { + "type": "FLYTE_SDK", + "version": "0.16.0", + "flavor": "python" + }, + "retries": {}, + "interruptible": false + }, + "interface": { + "inputs": { + "variables": { + "input": { + "type": { + "collectionType": { + "simple": "INTEGER" + } + }, + "description": "input" + } + } + }, + "outputs": { + "variables": { + "o0": { + "type": { + "collectionType": { + "simple": "INTEGER" + } + }, + "description": "o0" + } + } + } + }, + "container": { + "image": "flytecookbook:f6e892be276205a3cb11c058f389f69e02008b65", + "args": [ + "pyflyte-execute", + "--task-module", + "recipes.03_advanced.advanced_lang_features_copy", + "--task-name", + "merge_sort_remotely", + "--inputs", + "{{.input}}", + "--output-prefix", + "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}" + ], + "resources": {}, + "env": [ + { + "key": "FLYTE_INTERNAL_CONFIGURATION_PATH", + "value": "/root/sandbox.config" + }, + { + "key": "FLYTE_INTERNAL_IMAGE", + "value": "flytecookbook:f6e892be276205a3cb11c058f389f69e02008b65" + } + ] + } + }, + { + "id": { + "resourceType": "TASK", + "project": "flytetester", + "domain": "development", + "name": "recipes.03_advanced.advanced_lang_features_copy.merge_sort_locally", + "version": "f6e892be276205a3cb11c058f389f69e02008b65" + }, + "type": "python-task", + "metadata": { + "runtime": { + "type": "FLYTE_SDK", + "version": "0.16.0", + "flavor": "python" + }, + "retries": {}, + "interruptible": false + }, + "interface": { + "inputs": { + "variables": { + "input": { + "type": { + "collectionType": { + "simple": "INTEGER" + } + }, + "description": "input" + } + } + }, + "outputs": { + "variables": { + "o0": { + "type": { + "collectionType": { + "simple": "INTEGER" + } + }, + "description": "o0" + } + } + } + }, + "container": { + "image": "flytecookbook:f6e892be276205a3cb11c058f389f69e02008b65", + "args": [ + "pyflyte-execute", + "--task-module", + "recipes.03_advanced.advanced_lang_features_copy", + "--task-name", + "merge_sort_locally", + "--inputs", + "{{.input}}", + "--output-prefix", + "{{.outputPrefix}}", + "--raw-output-data-prefix", + "{{.rawOutputDataPrefix}}" + ], + "resources": {}, + "env": [ + { + "key": "FLYTE_INTERNAL_CONFIGURATION_PATH", + "value": "/root/sandbox.config" + }, + { + "key": "FLYTE_INTERNAL_IMAGE", + "value": "flytecookbook:f6e892be276205a3cb11c058f389f69e02008b65" + } + ] + } + } + ], + "subworkflows": [ + { + "id": { + "resourceType": "WORKFLOW", + "project": "flytetester", + "domain": "development", + "name": "recipes.03_advanced.advanced_lang_features_copy.merge_sort", + "version": "f6e892be276205a3cb11c058f389f69e02008b65" + }, + "metadata": {}, + "interface": { + "inputs": { + "variables": { + "count": { + "type": { + "simple": "INTEGER" + }, + "description": "count" + }, + "input": { + "type": { + "collectionType": { + "simple": "INTEGER" + } + }, + "description": "input" + } + } + }, + "outputs": { + "variables": { + "o0": { + "type": { + "collectionType": { + "simple": "INTEGER" + } + }, + "description": "o0" + } + } + } + }, + "nodes": [ + { + "id": "node-0", + "metadata": { + "name": "terminal_case", + "retries": {}, + "interruptible": false + }, + "inputs": [ + { + "var": ".count", + "binding": { + "promise": { + "var": "count" + } + } + } + ], + "branchNode": { + "ifElse": { + "case": { + "condition": { + "comparison": { + "operator": "LT", + "leftValue": { + "var": ".count" + }, + "rightValue": { + "primitive": { + "integer": "10" + } + } + } + }, + "thenNode": { + "id": "branchn0", + "metadata": { + "name": "flytekit.annotated.python_function_task.recipes.03_advanced.advanced_lang_features_copy.merge_sort_locally", + "retries": {}, + "interruptible": false + }, + "inputs": [ + { + "var": "input", + "binding": { + "promise": { + "var": "input" + } + } + } + ], + "taskNode": { + "referenceId": { + "resourceType": "TASK", + "project": "flytetester", + "domain": "development", + "name": "recipes.03_advanced.advanced_lang_features_copy.merge_sort_locally", + "version": "f6e892be276205a3cb11c058f389f69e02008b65" + } + } + } + }, + "elseNode": { + "id": "branchn1", + "metadata": { + "name": "flytekit.annotated.dynamic_workflow_task.recipes.03_advanced.advanced_lang_features_copy.merge_sort_remotely", + "retries": {}, + "interruptible": false + }, + "inputs": [ + { + "var": "input", + "binding": { + "promise": { + "var": "input" + } + } + } + ], + "taskNode": { + "referenceId": { + "resourceType": "TASK", + "project": "flytetester", + "domain": "development", + "name": "recipes.03_advanced.advanced_lang_features_copy.merge_sort_remotely", + "version": "f6e892be276205a3cb11c058f389f69e02008b65" + } + } + } + } + } + } + ], + "outputs": [ + { + "var": "o0", + "binding": { + "promise": { + "nodeId": "node-0", + "var": "o0" + } + } + } + ], + "metadataDefaults": {} + } + ] +} + diff --git a/pkg/compiler/transformers/k8s/builder_mock_test.go b/pkg/compiler/transformers/k8s/builder_mock_test.go index a14a44e3c2..9c8f7acdcc 100644 --- a/pkg/compiler/transformers/k8s/builder_mock_test.go +++ b/pkg/compiler/transformers/k8s/builder_mock_test.go @@ -15,6 +15,10 @@ type mockWorkflow struct { upstream common.StringAdjacencyList } +func (m mockWorkflow) GetCompiledSubWorkflow(id common.WorkflowID) (wf *core.CompiledWorkflow, found bool) { + panic("method invocation not expected") +} + func (m mockWorkflow) GetSubWorkflow(id common.WorkflowID) (wf *core.CompiledWorkflow, found bool) { panic("method invocation not expected") } diff --git a/pkg/compiler/validators/node.go b/pkg/compiler/validators/node.go index c328ba2b1b..2738b6a54b 100644 --- a/pkg/compiler/validators/node.go +++ b/pkg/compiler/validators/node.go @@ -154,15 +154,20 @@ func ValidateNode(w c.WorkflowBuilder, n c.NodeBuilder, validateConditionTypes b } } else if workflowN := n.GetWorkflowNode(); workflowN != nil && workflowN.GetSubWorkflowRef() != nil { workflowID := *workflowN.GetSubWorkflowRef() - if wf, wfOk := w.GetSubWorkflow(workflowID); wfOk { - // This might lead to redundant errors if the same subWorkflow is invoked from multiple nodes in the main - // workflow. - if subWorkflow, workflowOk := w.ValidateWorkflow(wf, errs.NewScope()); workflowOk { - n.SetSubWorkflow(subWorkflow) - w.UpdateSubWorkflow(workflowID, subWorkflow.GetCoreWorkflow()) + // Only compile the subworkflow if it has not been error-free compiled before. + if _, wfOk := w.GetCompiledSubWorkflow(workflowID); !wfOk { + if wf, wfOk := w.GetSubWorkflow(workflowID); wfOk { + // This might lead to redundant errors if the same subWorkflow is invoked from multiple nodes in the main + // workflow. + if n.GetSubWorkflow() == nil { + if subWorkflow, workflowOk := w.ValidateWorkflow(wf, errs.NewScope()); workflowOk { + n.SetSubWorkflow(subWorkflow) + w.StoreCompiledSubWorkflow(workflowID, subWorkflow.GetCoreWorkflow()) + } + } + } else { + errs.Collect(errors.NewWorkflowReferenceNotFoundErr(n.GetId(), workflowN.GetSubWorkflowRef().String())) } - } else { - errs.Collect(errors.NewWorkflowReferenceNotFoundErr(n.GetId(), workflowN.GetSubWorkflowRef().String())) } } else if taskN := n.GetTaskNode(); taskN != nil && taskN.GetReferenceId() != nil { if task, found := w.GetTask(*taskN.GetReferenceId()); found { diff --git a/pkg/compiler/workflow_compiler.go b/pkg/compiler/workflow_compiler.go index 3376ee8ac7..265e2221d2 100755 --- a/pkg/compiler/workflow_compiler.go +++ b/pkg/compiler/workflow_compiler.go @@ -186,8 +186,14 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile globalOutputNode.SetInterface(&core.TypedInterface{Inputs: wf.CoreWorkflow.Template.Interface.Outputs}) globalOutputNode.SetInputs(wf.CoreWorkflow.Template.Outputs) + // Track top level nodes (a branch in a branch node is NOT a top level node). The final graph should ensure that all + // top level nodes are executed before the end node. We do that by adding execution edges from leaf nodes that do not + // contribute to the final outputs to the end node. + topLevelNodes := sets.NewString() + // Add and validate all other nodes for _, n := range checkpoint { + topLevelNodes.Insert(n.Id) if node, addOk := wf.AddNode(wf.NewNodeBuilder(n), errs.NewScope()); addOk { v.ValidateNode(&wf, node, false /* validateConditionTypes */, errs.NewScope()) } @@ -230,12 +236,23 @@ func (w workflowBuilder) ValidateWorkflow(fg *flyteWorkflow, errs errors.Compile continue } - if _, foundUpStream := wf.upstreamNodes[nodeID]; !foundUpStream { - wf.AddExecutionEdge(c.StartNodeID, nodeID) + // Nodes that do not have a upstream dependencies means they do not rely on the workflow inputs to execute. + // This is a rare but possible occurrence, by explicitly adding an execution edge from the start node to these + // nodes, we ensure that propeller starts executing the workflow by running all such nodes and then their + // downstream dependencies. + if topLevelNodes.Has(nodeID) { + if _, foundUpStream := wf.upstreamNodes[nodeID]; !foundUpStream { + wf.AddExecutionEdge(c.StartNodeID, nodeID) + } } - if _, foundDownStream := wf.downstreamNodes[nodeID]; !foundDownStream { - wf.AddExecutionEdge(nodeID, c.EndNodeID) + // When propeller executes nodes it'll ensure that any node does not start executing until all of its upstream + // dependencies have finished successfully. By explicitly adding execution edges from such nodes to end-node, we + // ensure that execution continues until all nodes successfully finish. + if topLevelNodes.Has(nodeID) { + if _, foundDownStream := wf.downstreamNodes[nodeID]; !foundDownStream { + wf.AddExecutionEdge(nodeID, c.EndNodeID) + } } } @@ -348,14 +365,15 @@ func newWorkflowBuilder(fg *flyteWorkflow, wfIndex c.WorkflowIndex, tasks c.Task workflows map[string]c.InterfaceProvider) workflowBuilder { return workflowBuilder{ - CoreWorkflow: fg, - LaunchPlans: map[string]c.InterfaceProvider{}, - Nodes: c.NewNodeIndex(), - Tasks: c.NewTaskIndex(), - downstreamNodes: c.StringAdjacencyList{}, - upstreamNodes: c.StringAdjacencyList{}, - allSubWorkflows: wfIndex, - allLaunchPlans: workflows, - allTasks: tasks, + CoreWorkflow: fg, + LaunchPlans: map[string]c.InterfaceProvider{}, + Nodes: c.NewNodeIndex(), + Tasks: c.NewTaskIndex(), + downstreamNodes: c.StringAdjacencyList{}, + upstreamNodes: c.StringAdjacencyList{}, + allSubWorkflows: wfIndex, + allCompiledSubWorkflows: c.WorkflowIndex{}, + allLaunchPlans: workflows, + allTasks: tasks, } }