Skip to content

Commit

Permalink
Resolve resource requirements in propeller at execution time (flyteor…
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan committed Sep 1, 2021
1 parent 8c854d5 commit 6ae7428
Show file tree
Hide file tree
Showing 19 changed files with 507 additions and 16 deletions.
21 changes: 21 additions & 0 deletions flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1alpha1

import (
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"k8s.io/apimachinery/pkg/api/resource"
)

// This contains an OutputLocationPrefix. When running against AWS, this should be something of the form
Expand All @@ -24,9 +25,29 @@ type ExecutionConfig struct {
MaxParallelism uint32
// Defines execution behavior for processing nodes.
RecoveryExecution WorkflowExecutionIdentifier
// Defines the resource requests and limits specified for tasks run as part of this execution that ought to be
// applied at execution time.
TaskResources TaskResources
}

type TaskPluginOverride struct {
PluginIDs []string
MissingPluginBehavior admin.PluginOverride_MissingPluginBehavior
}

// Defines a set of configurable resources of different types that a task can request or apply as limits.
type TaskResourceSpec struct {
CPU resource.Quantity
Memory resource.Quantity
EphemeralStorage resource.Quantity
Storage resource.Quantity
}

// Defines the complete closure of compute resources a task can request and apply as limits.
type TaskResources struct {
// If the node where a task is running has enough of a resource available, a
// container may use more resources than its request for that resource specifies.
Requests TaskResourceSpec
// A hard limit, a task cannot consume resources greater than the limit specifies.
Limits TaskResourceSpec
}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ tasks:
value: testValue2
- key: testKey3
value: testValue3
- key: testKey1
value: testValue1
- key: testKey2
value: testValue2
- key: testKey3
value: testValue3
image: myflytecontainer:abc123
resources: {}
id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,20 @@
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0,
"RecoveryExecution": {}
"RecoveryExecution": {},
"TaskResources": {
"Requests": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,20 @@
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0,
"RecoveryExecution": {}
"RecoveryExecution": {},
"TaskResources": {
"Requests": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,20 @@
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0,
"RecoveryExecution": {}
"RecoveryExecution": {},
"TaskResources": {
"Requests": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,20 @@
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0,
"RecoveryExecution": {}
"RecoveryExecution": {},
"TaskResources": {
"Requests": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,20 @@
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0,
"RecoveryExecution": {}
"RecoveryExecution": {},
"TaskResources": {
"Requests": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,20 @@
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0,
"RecoveryExecution": {}
"RecoveryExecution": {},
"TaskResources": {
"Requests": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,20 @@
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0,
"RecoveryExecution": {}
"RecoveryExecution": {},
"TaskResources": {
"Requests": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,20 @@
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0,
"RecoveryExecution": {}
"RecoveryExecution": {},
"TaskResources": {
"Requests": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,20 @@
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0,
"RecoveryExecution": {}
"RecoveryExecution": {},
"TaskResources": {
"Requests": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,20 @@
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0,
"RecoveryExecution": {}
"RecoveryExecution": {},
"TaskResources": {
"Requests": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,20 @@
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0,
"RecoveryExecution": {}
"RecoveryExecution": {},
"TaskResources": {
"Requests": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,20 @@
"executionConfig": {
"TaskPluginImpls": null,
"MaxParallelism": 0,
"RecoveryExecution": {}
"RecoveryExecution": {},
"TaskResources": {
"Requests": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
},
"Limits": {
"CPU": "0",
"Memory": "0",
"EphemeralStorage": "0",
"Storage": "0"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t
composedPBStore.OnWriteRawMatch(
mock.MatchedBy(func(ctx context.Context) bool { return true }),
storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"),
int64(1192),
int64(1354),
storage.Options{},
mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"k8s.io/apimachinery/pkg/api/resource"

"github.com/go-test/deep"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -70,6 +72,22 @@ func Test_cacheFlyteWorkflow(t *testing.T) {
UpstreamEdges: map[v1alpha1.NodeID][]v1alpha1.NodeID{},
},
},
ExecutionConfig: v1alpha1.ExecutionConfig{
TaskResources: v1alpha1.TaskResources{
Requests: v1alpha1.TaskResourceSpec{
CPU: resource.MustParse("1"),
Memory: resource.MustParse("1"),
Storage: resource.MustParse("1"),
EphemeralStorage: resource.MustParse("1"),
},
Limits: v1alpha1.TaskResourceSpec{
CPU: resource.MustParse("1"),
Memory: resource.MustParse("1"),
Storage: resource.MustParse("1"),
EphemeralStorage: resource.MustParse("1"),
},
},
},
}

ctx := context.TODO()
Expand Down

0 comments on commit 6ae7428

Please sign in to comment.