diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config.go index bb6547ba2f..378f7099e9 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/execution_config.go @@ -2,6 +2,7 @@ package v1alpha1 import ( "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "k8s.io/apimachinery/pkg/api/resource" ) // This contains an OutputLocationPrefix. When running against AWS, this should be something of the form @@ -24,9 +25,29 @@ type ExecutionConfig struct { MaxParallelism uint32 // Defines execution behavior for processing nodes. RecoveryExecution WorkflowExecutionIdentifier + // Defines the resource requests and limits specified for tasks run as part of this execution that ought to be + // applied at execution time. + TaskResources TaskResources } type TaskPluginOverride struct { PluginIDs []string MissingPluginBehavior admin.PluginOverride_MissingPluginBehavior } + +// Defines a set of configurable resources of different types that a task can request or apply as limits. +type TaskResourceSpec struct { + CPU resource.Quantity + Memory resource.Quantity + EphemeralStorage resource.Quantity + Storage resource.Quantity +} + +// Defines the complete closure of compute resources a task can request and apply as limits. +type TaskResources struct { + // If the node where a task is running has enough of a resource available, a + // container may use more resources than its request for that resource specifies. + Requests TaskResourceSpec + // A hard limit, a task cannot consume resources greater than the limit specifies. + Limits TaskResourceSpec +} diff --git a/flytepropeller/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.pb b/flytepropeller/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.pb index c36fdb9d4f..175ff1e659 100755 Binary files a/flytepropeller/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.pb and b/flytepropeller/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.pb differ diff --git a/flytepropeller/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.yaml b/flytepropeller/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.yaml index 7a66bd6461..3398b20ca8 100644 --- a/flytepropeller/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.yaml +++ b/flytepropeller/pkg/compiler/test/testdata/app-workflows-work-one-python-task-w-f.yaml @@ -30,6 +30,12 @@ tasks: value: testValue2 - key: testKey3 value: testValue3 + - key: testKey1 + value: testValue1 + - key: testKey2 + value: testValue2 + - key: testKey3 + value: testValue3 image: myflytecontainer:abc123 resources: {} id: diff --git a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/5_myapp.workflows.cereal.mycereal_2.json b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/5_myapp.workflows.cereal.mycereal_2.json index 0928fa163e..d6151c513d 100755 --- a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/5_myapp.workflows.cereal.mycereal_2.json +++ b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/5_myapp.workflows.cereal.mycereal_2.json @@ -647,6 +647,20 @@ "executionConfig": { "TaskPluginImpls": null, "MaxParallelism": 0, - "RecoveryExecution": {} + "RecoveryExecution": {}, + "TaskResources": { + "Requests": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + }, + "Limits": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + } + } } } \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/mycereal_condition_has_no_deps.json b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/mycereal_condition_has_no_deps.json index eb8d7830a5..b590ddedfc 100755 --- a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/mycereal_condition_has_no_deps.json +++ b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/mycereal_condition_has_no_deps.json @@ -651,6 +651,20 @@ "executionConfig": { "TaskPluginImpls": null, "MaxParallelism": 0, - "RecoveryExecution": {} + "RecoveryExecution": {}, + "TaskResources": { + "Requests": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + }, + "Limits": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + } + } } } \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_1.json b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_1.json index 5969c44f76..74ebf8602b 100755 --- a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_1.json +++ b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_1.json @@ -388,6 +388,20 @@ "executionConfig": { "TaskPluginImpls": null, "MaxParallelism": 0, - "RecoveryExecution": {} + "RecoveryExecution": {}, + "TaskResources": { + "Requests": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + }, + "Limits": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + } + } } } \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_10_simple.json b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_10_simple.json index 787cd4ae43..d0d599c0c4 100755 --- a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_10_simple.json +++ b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_10_simple.json @@ -594,6 +594,20 @@ "executionConfig": { "TaskPluginImpls": null, "MaxParallelism": 0, - "RecoveryExecution": {} + "RecoveryExecution": {}, + "TaskResources": { + "Requests": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + }, + "Limits": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + } + } } } \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_2.json b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_2.json index 17affe4a1d..14b0f328af 100755 --- a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_2.json +++ b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_2.json @@ -429,6 +429,20 @@ "executionConfig": { "TaskPluginImpls": null, "MaxParallelism": 0, - "RecoveryExecution": {} + "RecoveryExecution": {}, + "TaskResources": { + "Requests": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + }, + "Limits": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + } + } } } \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_3.json b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_3.json index 841e2ef2ee..2f7de760f1 100755 --- a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_3.json +++ b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_3.json @@ -412,6 +412,20 @@ "executionConfig": { "TaskPluginImpls": null, "MaxParallelism": 0, - "RecoveryExecution": {} + "RecoveryExecution": {}, + "TaskResources": { + "Requests": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + }, + "Limits": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + } + } } } \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_4.json b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_4.json index 3c96d00aa6..7119e7cb08 100755 --- a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_4.json +++ b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_4.json @@ -498,6 +498,20 @@ "executionConfig": { "TaskPluginImpls": null, "MaxParallelism": 0, - "RecoveryExecution": {} + "RecoveryExecution": {}, + "TaskResources": { + "Requests": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + }, + "Limits": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + } + } } } \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_5.json b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_5.json index a7b46f188f..cb7f96246c 100755 --- a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_5.json +++ b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_5.json @@ -525,6 +525,20 @@ "executionConfig": { "TaskPluginImpls": null, "MaxParallelism": 0, - "RecoveryExecution": {} + "RecoveryExecution": {}, + "TaskResources": { + "Requests": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + }, + "Limits": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + } + } } } \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_6.json b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_6.json index 6d5588cc88..c1accfe76b 100755 --- a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_6.json +++ b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_6.json @@ -357,6 +357,20 @@ "executionConfig": { "TaskPluginImpls": null, "MaxParallelism": 0, - "RecoveryExecution": {} + "RecoveryExecution": {}, + "TaskResources": { + "Requests": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + }, + "Limits": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + } + } } } \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_7_nested.json b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_7_nested.json index d931758fbd..c58a545bbf 100755 --- a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_7_nested.json +++ b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_7_nested.json @@ -441,6 +441,20 @@ "executionConfig": { "TaskPluginImpls": null, "MaxParallelism": 0, - "RecoveryExecution": {} + "RecoveryExecution": {}, + "TaskResources": { + "Requests": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + }, + "Limits": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + } + } } } \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_8_nested.json b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_8_nested.json index a3ee4dcbe9..749305c1ea 100755 --- a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_8_nested.json +++ b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_8_nested.json @@ -523,6 +523,20 @@ "executionConfig": { "TaskPluginImpls": null, "MaxParallelism": 0, - "RecoveryExecution": {} + "RecoveryExecution": {}, + "TaskResources": { + "Requests": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + }, + "Limits": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + } + } } } \ No newline at end of file diff --git a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_9_nested.json b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_9_nested.json index 0a9bc398f2..859d19f50c 100755 --- a/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_9_nested.json +++ b/flytepropeller/pkg/compiler/test/testdata/branch/k8s/success_9_nested.json @@ -546,6 +546,20 @@ "executionConfig": { "TaskPluginImpls": null, "MaxParallelism": 0, - "RecoveryExecution": {} + "RecoveryExecution": {}, + "TaskResources": { + "Requests": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + }, + "Limits": { + "CPU": "0", + "Memory": "0", + "EphemeralStorage": "0", + "Storage": "0" + } + } } } \ No newline at end of file diff --git a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go index 74a72396be..2784e62aeb 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/dynamic_workflow_test.go @@ -499,7 +499,7 @@ func Test_dynamicNodeHandler_buildContextualDynamicWorkflow_withLaunchPlans(t *t composedPBStore.OnWriteRawMatch( mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("s3://my-s3-bucket/foo/bar/futures_compiled.pb"), - int64(1192), + int64(1354), storage.Options{}, mock.MatchedBy(func(rdr *bytes.Reader) bool { return true })).Return(errors.New("foo")) diff --git a/flytepropeller/pkg/controller/nodes/task/remote_workflow_store_test.go b/flytepropeller/pkg/controller/nodes/task/remote_workflow_store_test.go index 9ab5b25c6d..103afc0dfc 100644 --- a/flytepropeller/pkg/controller/nodes/task/remote_workflow_store_test.go +++ b/flytepropeller/pkg/controller/nodes/task/remote_workflow_store_test.go @@ -4,6 +4,8 @@ import ( "context" "testing" + "k8s.io/apimachinery/pkg/api/resource" + "github.com/go-test/deep" "github.com/golang/protobuf/proto" @@ -70,6 +72,22 @@ func Test_cacheFlyteWorkflow(t *testing.T) { UpstreamEdges: map[v1alpha1.NodeID][]v1alpha1.NodeID{}, }, }, + ExecutionConfig: v1alpha1.ExecutionConfig{ + TaskResources: v1alpha1.TaskResources{ + Requests: v1alpha1.TaskResourceSpec{ + CPU: resource.MustParse("1"), + Memory: resource.MustParse("1"), + Storage: resource.MustParse("1"), + EphemeralStorage: resource.MustParse("1"), + }, + Limits: v1alpha1.TaskResourceSpec{ + CPU: resource.MustParse("1"), + Memory: resource.MustParse("1"), + Storage: resource.MustParse("1"), + EphemeralStorage: resource.MustParse("1"), + }, + }, + }, } ctx := context.TODO() diff --git a/flytepropeller/pkg/controller/nodes/task/taskexec_context.go b/flytepropeller/pkg/controller/nodes/task/taskexec_context.go index eca6001373..36e65860a5 100644 --- a/flytepropeller/pkg/controller/nodes/task/taskexec_context.go +++ b/flytepropeller/pkg/controller/nodes/task/taskexec_context.go @@ -5,6 +5,9 @@ import ( "context" "strconv" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" @@ -45,6 +48,22 @@ func (te taskExecutionID) GetGeneratedName() string { return te.execName } +type taskOverrides struct { + pluginCore.TaskOverrides + resourceRequirements *v1.ResourceRequirements +} + +func (t taskOverrides) GetResources() *v1.ResourceRequirements { + return t.resourceRequirements +} + +func newTaskOverrides(overrides pluginCore.TaskOverrides, resourceRequirements *v1.ResourceRequirements) pluginCore.TaskOverrides { + return &taskOverrides{ + TaskOverrides: overrides, + resourceRequirements: resourceRequirements, + } +} + type taskExecutionMetadata struct { handler.NodeExecutionMetadata taskExecID taskExecutionID @@ -124,6 +143,69 @@ func (t taskExecutionContext) SecretManager() pluginCore.SecretManager { return t.sm } +// Validates and assigns a single resource by examining the default requests and max limit with the static resource value +// defined by this task and node execution context. +func assignResource(resourceName v1.ResourceName, execConfigRequest, execConfigLimit resource.Quantity, requests, limits v1.ResourceList) { + maxLimit := execConfigLimit + request, ok := requests[resourceName] + if !ok { + // Requests aren't required so we glean it from the execution config value (when possible) + if !execConfigRequest.IsZero() { + request = execConfigRequest + } + } else { + if request.Cmp(maxLimit) == 1 && !maxLimit.IsZero() { + // Adjust the request downwards to not exceed the max limit if it's set. + request = maxLimit + } + } + + limit, ok := limits[resourceName] + if !ok { + limit = request + } else { + if limit.Cmp(maxLimit) == 1 && !maxLimit.IsZero() { + // Adjust the limit downwards to not exceed the max limit if it's set. + limit = maxLimit + } + } + if request.Cmp(limit) == 1 { + // The limit should always be greater than or equal to the request + request = limit + } + + if !request.IsZero() { + requests[resourceName] = request + } + if !limit.IsZero() { + limits[resourceName] = limit + } +} + +// Reconciles platform-specific resource defaults requests and max limits with the static resource values +// defined by this task and node execution context. +func determineResourceRequirements(nCtx handler.NodeExecutionContext, taskResources v1alpha1.TaskResources) *v1.ResourceRequirements { + var requests = make(v1.ResourceList) + var limits = make(v1.ResourceList) + if nCtx.Node().GetResources() != nil { + if nCtx.Node().GetResources().Requests != nil { + requests = nCtx.Node().GetResources().Requests + } + if nCtx.Node().GetResources().Limits != nil { + limits = nCtx.Node().GetResources().Limits + } + } + + assignResource(v1.ResourceCPU, taskResources.Requests.CPU, taskResources.Limits.CPU, requests, limits) + assignResource(v1.ResourceMemory, taskResources.Requests.Memory, taskResources.Limits.Memory, requests, limits) + assignResource(v1.ResourceEphemeralStorage, taskResources.Requests.EphemeralStorage, taskResources.Limits.EphemeralStorage, requests, limits) + assignResource(v1.ResourceStorage, taskResources.Requests.Storage, taskResources.Limits.Storage, requests, limits) + return &v1.ResourceRequirements{ + Requests: requests, + Limits: limits, + } +} + func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.NodeExecutionContext, plugin pluginCore.Plugin) (*taskExecutionContext, error) { id := GetTaskExecutionIdentifier(nCtx) @@ -178,7 +260,7 @@ func (t *Handler) newTaskExecutionContext(ctx context.Context, nCtx handler.Node tm: taskExecutionMetadata{ NodeExecutionMetadata: nCtx.NodeExecutionMetadata(), taskExecID: taskExecutionID{execName: uniqueID, id: id}, - o: nCtx.Node(), + o: newTaskOverrides(nCtx.Node(), determineResourceRequirements(nCtx, nCtx.ExecutionContext().GetExecutionConfig().TaskResources)), maxAttempts: maxAttempts, }, rm: resourcemanager.GetTaskResourceManager( diff --git a/flytepropeller/pkg/controller/nodes/task/taskexec_context_test.go b/flytepropeller/pkg/controller/nodes/task/taskexec_context_test.go index 943bf521a7..7329606a72 100644 --- a/flytepropeller/pkg/controller/nodes/task/taskexec_context_test.go +++ b/flytepropeller/pkg/controller/nodes/task/taskexec_context_test.go @@ -5,6 +5,8 @@ import ( "context" "testing" + "k8s.io/apimachinery/pkg/api/resource" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" @@ -20,7 +22,7 @@ import ( "github.com/flyteorg/flytestdlib/promutils" "github.com/flyteorg/flytestdlib/storage" "github.com/stretchr/testify/assert" - v12 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -64,7 +66,10 @@ func TestHandler_newTaskExecutionContext(t *testing.T) { ns.OnGetDataDir().Return(storage.DataReference("data-dir")) ns.OnGetOutputDir().Return(storage.DataReference("output-dir")) - res := &v12.ResourceRequirements{} + res := &corev1.ResourceRequirements{ + Requests: make(corev1.ResourceList), + Limits: make(corev1.ResourceList), + } n := &flyteMocks.ExecutableNode{} n.OnGetResources().Return(res) ma := 5 @@ -181,3 +186,194 @@ func TestHandler_newTaskExecutionContext(t *testing.T) { anotherTaskExecCtx, _ := tk.newTaskExecutionContext(context.TODO(), nCtx, anotherPlugin) assert.Equal(t, anotherTaskExecCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), "fpmmhh6q") } + +func TestAssignResource(t *testing.T) { + type testCase struct { + name string + execConfigRequest resource.Quantity + execConfigLimit resource.Quantity + requests corev1.ResourceList + limits corev1.ResourceList + expectedRequests corev1.ResourceList + expectedLimits corev1.ResourceList + } + var testCases = []testCase{ + { + name: "nothing to do", + execConfigRequest: resource.MustParse("10"), + execConfigLimit: resource.MustParse("100"), + requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + expectedRequests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + expectedLimits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + { + name: "assign request", + execConfigRequest: resource.MustParse("10"), + execConfigLimit: resource.MustParse("100"), + requests: corev1.ResourceList{}, + limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + }, + expectedRequests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10"), + }, + expectedLimits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + }, + }, + { + name: "adjust request", + execConfigRequest: resource.MustParse("10"), + execConfigLimit: resource.MustParse("100"), + requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000"), + }, + limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + }, + expectedRequests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + }, + expectedLimits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + }, + }, + { + name: "assign limit", + execConfigRequest: resource.MustParse("10"), + execConfigLimit: resource.MustParse("100"), + requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10"), + }, + limits: corev1.ResourceList{}, + expectedRequests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10"), + }, + expectedLimits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10"), + }, + }, + { + name: "adjust limit based on exec config", + execConfigRequest: resource.MustParse("10"), + execConfigLimit: resource.MustParse("100"), + requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10"), + }, + limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1000"), + }, + expectedRequests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10"), + }, + expectedLimits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("100"), + }, + }, + { + name: "assigned request should not exceed limit", + execConfigRequest: resource.MustParse("10"), + execConfigLimit: resource.MustParse("100"), + requests: corev1.ResourceList{}, + limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + expectedRequests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + expectedLimits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + }, + }, + } + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + assignResource(corev1.ResourceCPU, test.execConfigRequest, test.execConfigLimit, test.requests, test.limits) + assert.EqualValues(t, test.requests, test.expectedRequests) + assert.EqualValues(t, test.limits, test.expectedLimits) + }) + } +} + +func TestDetermineResourceRequirements(t *testing.T) { + node := &flyteMocks.ExecutableNode{} + node.OnGetResources().Return(&corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("10"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("100"), + corev1.ResourceEphemeralStorage: resource.MustParse("10"), + corev1.ResourceStorage: resource.MustParse("20"), + }, + }) + nodeExecutionContext := &nodeMocks.NodeExecutionContext{} + nodeExecutionContext.OnNode().Return(node) + + taskResources := v1alpha1.TaskResources{ + Requests: v1alpha1.TaskResourceSpec{ + CPU: resource.MustParse("1"), + Memory: resource.MustParse("20"), + EphemeralStorage: resource.MustParse("50"), + }, + Limits: v1alpha1.TaskResourceSpec{ + CPU: resource.MustParse("2"), + Memory: resource.MustParse("50"), + EphemeralStorage: resource.MustParse("100"), + Storage: resource.MustParse("15"), + }, + } + resources := determineResourceRequirements(nodeExecutionContext, taskResources) + assert.EqualValues(t, resources.Requests, corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("10"), + corev1.ResourceEphemeralStorage: resource.MustParse("10"), + }) + assert.EqualValues(t, resources.Limits, corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("50"), + corev1.ResourceEphemeralStorage: resource.MustParse("10"), + corev1.ResourceStorage: resource.MustParse("15"), + }) +} + +func TestGetResources(t *testing.T) { + node := &flyteMocks.ExecutableNode{} + node.OnGetResources().Return(&corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("10"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourceMemory: resource.MustParse("100"), + corev1.ResourceEphemeralStorage: resource.MustParse("10"), + }, + }) + + computedRequirements := &corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("20"), + }, + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("2"), + corev1.ResourceMemory: resource.MustParse("200"), + corev1.ResourceEphemeralStorage: resource.MustParse("20"), + }, + } + + overrides := newTaskOverrides(node, computedRequirements) + assert.EqualValues(t, overrides.GetResources(), computedRequirements) +}