Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Instrument ArrayNode #550

Merged
merged 68 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
4bb3138
updated flyteidl to local to get ArrayNode
hamersaw Apr 3, 2023
33deb42
added boilerplate to support ArrayNode
hamersaw Apr 3, 2023
bf0a1ff
pushing forward
hamersaw Apr 6, 2023
6dc7fb1
refactored node executor interfaces to fix dependency cycle
hamersaw Apr 6, 2023
188a675
refactoring almost complete
hamersaw Apr 6, 2023
6f379e7
refactor complete
hamersaw Apr 6, 2023
caa9b07
supporting environment variables
hamersaw Apr 7, 2023
03dcc56
minimum viable product
hamersaw Apr 13, 2023
658abc7
update print statements for debugging
hamersaw Apr 13, 2023
e33af40
massive refactor fixing NodeExecutionContext override for ArrayNode
hamersaw Apr 25, 2023
110e1ea
refactoring TODOs
hamersaw Apr 26, 2023
0a5c58a
subnode retries working
hamersaw Apr 26, 2023
7fee1e6
parallelism working
hamersaw Apr 27, 2023
dc0c164
cache and cache_serialize working - first new functionality in maptask
hamersaw Apr 27, 2023
0f880cd
adding implementation notes
hamersaw Apr 28, 2023
0aa773e
Merge branch 'master' into feature/array-node
hamersaw May 22, 2023
e740627
removed eventing from subtasks
hamersaw May 24, 2023
b8271ef
adding correct requirements
hamersaw May 25, 2023
4549591
working end-2-end with flytekit
hamersaw May 26, 2023
b4c6f3e
reporting output directory on success
hamersaw May 26, 2023
134f215
fixed output directory append
hamersaw May 30, 2023
d36cd44
mocking TaskTemplate interface to enable caching
hamersaw May 30, 2023
fa472d4
capture failure reasons
hamersaw May 30, 2023
a6e20c7
wrapped up abort and finalize functionality
hamersaw May 31, 2023
f5a46b9
mocking initialization events
hamersaw May 31, 2023
6dcbe53
sending all events
hamersaw May 31, 2023
59df842
minor refactoring of debug prints and formatting
hamersaw Jun 1, 2023
9a9d0f0
intratask checkpointing working
hamersaw Jun 2, 2023
377497e
support for and
hamersaw Jun 6, 2023
5cf3259
setting node log ids correctly
hamersaw Jun 6, 2023
437dc91
reporting cache status
hamersaw Jun 7, 2023
a6d31d7
Merge branch 'master' into feature/array-node
hamersaw Jun 8, 2023
d2abccc
correctly setting subnode abort phase
hamersaw Jun 12, 2023
1de3e16
removing dead code
hamersaw Jun 12, 2023
a66bb60
cleaned up most random TODO items
hamersaw Jun 12, 2023
3609dd7
refactored into new files
hamersaw Jun 12, 2023
fdf8d6a
refactoring for ArrayNode unit tests
hamersaw Jun 13, 2023
b150f1a
refactored for unit testing to allow creation of NodeExecutor in arra…
hamersaw Jun 13, 2023
b871b23
first unit test for handling ArrayNodePhaseNone
hamersaw Jun 14, 2023
49f4d32
most of executing unit tests completed
hamersaw Jun 14, 2023
7767668
finished executing unit tests
hamersaw Jun 14, 2023
73a26f4
finished succeeding unit tests
hamersaw Jun 15, 2023
575deea
wrote failing phase unit tests
hamersaw Jun 15, 2023
6dc6b88
moving towards complete unit_test success
hamersaw Jun 15, 2023
a38ec17
unit tests passing
hamersaw Jun 15, 2023
db1e361
fixed lint issues
hamersaw Jun 16, 2023
e0f156d
updated flyteidl dep
hamersaw Jun 16, 2023
509900b
Merge branch 'master' into feature/array-node
hamersaw Jun 29, 2023
211ff29
added unit tests for Abort
hamersaw Jun 29, 2023
c6df3fd
adding unit test for Finalize
hamersaw Jun 29, 2023
7c1931d
added utils unit tests
hamersaw Jun 30, 2023
0df9572
moved state structs to handler package
hamersaw Jun 30, 2023
4496efb
added docs
hamersaw Jul 5, 2023
b13ef67
cleaned up abort event reporting
hamersaw Jul 7, 2023
1c446b7
fixed RecordNodeEvent unit tests
hamersaw Jul 7, 2023
7782460
removed taskEventRecorder from nodes package
hamersaw Jul 7, 2023
837379d
adding interface checking for arraynode
hamersaw Jul 11, 2023
23d5312
added transform unit test
hamersaw Jul 11, 2023
5615a86
Merge branch 'master' into feature/array-node
hamersaw Jul 11, 2023
5dbc665
fixed input bindings issue
hamersaw Jul 13, 2023
7914641
fixed unit tests
hamersaw Jul 13, 2023
d5eb484
fixed unit tests
hamersaw Jul 14, 2023
b044277
go generate
hamersaw Jul 14, 2023
14fae22
addressing random TODO
hamersaw Jul 17, 2023
34937b3
Merge branch 'master' into feature/array-node
hamersaw Jul 17, 2023
bdbef61
fixed unit tests
hamersaw Jul 19, 2023
d0edb9a
Merge branch 'master' into feature/array-node
hamersaw Jul 28, 2023
70eda6a
addressing pr comments
hamersaw Jul 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
tasks:
- container:
args:
- "pyflyte-fast-execute"
- "--additional-distribution"
- "s3://my-s3-bucket/flytesnacks/development/SMJBJX7BQJ6MCOABLKQT5VZXVY======/script_mode.tar.gz"
- "--dest-dir"
- "/root"
- "--"
- "pyflyte-map-execute"
- "--inputs"
- "{{.input}}"
- "--output-prefix"
- "{{.outputPrefix}}"
- "--raw-output-data-prefix"
- "{{.rawOutputDataPrefix}}"
- "--checkpoint-path"
- "{{.checkpointOutputPrefix}}"
- "--prev-checkpoint"
- "{{.prevCheckpointPrefix}}"
- "--resolver"
- "MapTaskResolver"
- "--"
- "vars"
- ""
- "resolver"
- "flytekit.core.python_auto_container.default_task_resolver"
- "task-module"
- "map-task"
- "task-name"
- "a_mappable_task"
image: "array-node:ee1ba227aa95447d04bb1761691b4d97749642dc"
resources:
limits:
- name: 1
value: "1"
- name: 3
value: "500Mi"
requests:
- name: 1
value: "1"
- name: 3
value: "300Mi"
id:
name: task-1
project: flytesnacks
domain: development
metadata:
discoverable: true
discovery_version: "1.0"
cache_serializable: true
interface:
inputs:
variables:
a:
type:
simple: INTEGER
outputs:
variables:
o0:
type:
simple: STRING
workflow:
id:
name: workflow-with-array-node
interface:
inputs:
variables:
x:
type:
collectionType:
simple: INTEGER
nodes:
- id: node-1
inputs:
- binding:
promise:
node_id: start-node
var: x
var: a
arrayNode:
parallelism: 0
node:
metadata:
retries:
retries: 3
taskNode:
referenceId:
name: task-1
project: flytesnacks
domain: development
91 changes: 91 additions & 0 deletions cmd/kubectl-flyte/cmd/testdata/array-node-cache.yaml.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
tasks:
- container:
args:
- "pyflyte-fast-execute"
- "--additional-distribution"
- "s3://my-s3-bucket/flytesnacks/development/SMJBJX7BQJ6MCOABLKQT5VZXVY======/script_mode.tar.gz"
- "--dest-dir"
- "/root"
- "--"
- "pyflyte-map-execute"
- "--inputs"
- "{{.input}}"
- "--output-prefix"
- "{{.outputPrefix}}"
- "--raw-output-data-prefix"
- "{{.rawOutputDataPrefix}}"
- "--checkpoint-path"
- "{{.checkpointOutputPrefix}}"
- "--prev-checkpoint"
- "{{.prevCheckpointPrefix}}"
- "--resolver"
- "MapTaskResolver"
- "--"
- "vars"
- ""
- "resolver"
- "flytekit.core.python_auto_container.default_task_resolver"
- "task-module"
- "map-task"
- "task-name"
- "a_mappable_task"
image: "array-node:ee1ba227aa95447d04bb1761691b4d97749642dc"
resources:
limits:
- name: 1
value: "1"
- name: 3
value: "500Mi"
requests:
- name: 1
value: "1"
- name: 3
value: "300Mi"
id:
name: task-1
project: flytesnacks
domain: development
metadata:
discoverable: true
discovery_version: "1.0"
cache_serializable: false
interface:
inputs:
variables:
a:
type:
simple: INTEGER
outputs:
variables:
o0:
type:
simple: STRING
workflow:
id:
name: workflow-with-array-node
interface:
inputs:
variables:
x:
type:
collectionType:
simple: INTEGER
nodes:
- id: node-1
inputs:
- binding:
promise:
node_id: start-node
var: x
var: a
arrayNode:
parallelism: 1
node:
metadata:
retries:
retries: 3
taskNode:
referenceId:
name: task-1
project: flytesnacks
domain: development
13 changes: 13 additions & 0 deletions cmd/kubectl-flyte/cmd/testdata/array-node-inputs.yaml.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
literals:
"x":
collection:
literals:
- scalar:
primitive:
integer: "1"
- scalar:
primitive:
integer: "2"
- scalar:
primitive:
integer: "3"
86 changes: 86 additions & 0 deletions cmd/kubectl-flyte/cmd/testdata/array-node.yaml.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
tasks:
- container:
args:
- "pyflyte-fast-execute"
- "--additional-distribution"
- "s3://my-s3-bucket/flytesnacks/development/SMJBJX7BQJ6MCOABLKQT5VZXVY======/script_mode.tar.gz"
- "--dest-dir"
- "/root"
- "--"
- "pyflyte-map-execute"
- "--inputs"
- "{{.input}}"
- "--output-prefix"
- "{{.outputPrefix}}"
- "--raw-output-data-prefix"
- "{{.rawOutputDataPrefix}}"
- "--checkpoint-path"
- "{{.checkpointOutputPrefix}}"
- "--prev-checkpoint"
- "{{.prevCheckpointPrefix}}"
- "--resolver"
- "MapTaskResolver"
- "--"
- "vars"
- ""
- "resolver"
- "flytekit.core.python_auto_container.default_task_resolver"
- "task-module"
- "map-task"
- "task-name"
- "a_mappable_task"
image: "array-node:ee1ba227aa95447d04bb1761691b4d97749642dc"
resources:
limits:
- name: 1
value: "1"
- name: 3
value: "500Mi"
requests:
- name: 1
value: "1"
- name: 3
value: "300Mi"
id:
name: task-1
metadata:
discoverable: false
cache_serializable: false
interface:
inputs:
variables:
a:
type:
simple: INTEGER
outputs:
variables:
x:
type:
simple: STRING
workflow:
id:
name: workflow-with-array-node
interface:
inputs:
variables:
x:
type:
collectionType:
simple: INTEGER
nodes:
- id: node-1
inputs:
- binding:
promise:
node_id: start-node
var: x
var: a
arrayNode:
parallelism: 1
node:
metadata:
retries:
retries: 3
taskNode:
referenceId:
name: task-1
8 changes: 4 additions & 4 deletions events/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/golang/protobuf/proto"
)

const maxErrorMessageLength = 104857600 //100KB
const MaxErrorMessageLength = 104857600 //100KB
const truncationIndicator = "... <Message Truncated> ..."

type recordingMetrics struct {
Expand Down Expand Up @@ -60,23 +60,23 @@ func (r *eventRecorder) sinkEvent(ctx context.Context, event proto.Message) erro

func (r *eventRecorder) RecordNodeEvent(ctx context.Context, e *event.NodeExecutionEvent) error {
if err, ok := e.GetOutputResult().(*event.NodeExecutionEvent_Error); ok {
truncateErrorMessage(err.Error, maxErrorMessageLength)
truncateErrorMessage(err.Error, MaxErrorMessageLength)
}

return r.sinkEvent(ctx, e)
}

func (r *eventRecorder) RecordTaskEvent(ctx context.Context, e *event.TaskExecutionEvent) error {
if err, ok := e.GetOutputResult().(*event.TaskExecutionEvent_Error); ok {
truncateErrorMessage(err.Error, maxErrorMessageLength)
truncateErrorMessage(err.Error, MaxErrorMessageLength)
}

return r.sinkEvent(ctx, e)
}

func (r *eventRecorder) RecordWorkflowEvent(ctx context.Context, e *event.WorkflowExecutionEvent) error {
if err, ok := e.GetOutputResult().(*event.WorkflowExecutionEvent_Error); ok {
truncateErrorMessage(err.Error, maxErrorMessageLength)
truncateErrorMessage(err.Error, MaxErrorMessageLength)
}

return r.sinkEvent(ctx, e)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,5 @@ require (
)

replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d

replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.11-0.20230614183933-d56d4d37bf34
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.10 h1:SHeiaWRt8EAVuFsat+BJswtc07HTZ4DqhfTEYSm621k=
github.com/flyteorg/flyteidl v1.5.10/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteidl v1.5.11-0.20230614183933-d56d4d37bf34 h1:Gj5UKqJU+ozeTeYAvDWHiF4HSVufHW1W1ecymFfbbis=
github.com/flyteorg/flyteidl v1.5.11-0.20230614183933-d56d4d37bf34/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.1.8 h1:UVYdqDdcIqz2JIso+m3MsaPSsTZJZyZQ6Eg7nhX9r/Y=
github.com/flyteorg/flyteplugins v1.1.8/go.mod h1:sRxeatEOHq1b9bTxTRNcwoIkVTAVN9dTz8toXkfcz2E=
github.com/flyteorg/flytestdlib v1.0.20 h1:BrCQMlpdrFAPlADFJvCyn7gm+37df9WGYqLEB1mOlCQ=
Expand Down
24 changes: 24 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/array.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package v1alpha1

type ArrayNodeSpec struct {
SubNodeSpec *NodeSpec
Parallelism uint32
MinSuccesses *uint32
MinSuccessRatio *float32
}

func (a *ArrayNodeSpec) GetSubNodeSpec() *NodeSpec {
return a.SubNodeSpec
}

func (a *ArrayNodeSpec) GetParallelism() uint32 {
return a.Parallelism
}

func (a *ArrayNodeSpec) GetMinSuccesses() *uint32 {
return a.MinSuccesses
}

func (a *ArrayNodeSpec) GetMinSuccessRatio() *float32 {
return a.MinSuccessRatio
}
Loading
Loading