diff --git a/pkg/manager/impl/execution_manager.go b/pkg/manager/impl/execution_manager.go index f0155a3af..a16e4bda0 100644 --- a/pkg/manager/impl/execution_manager.go +++ b/pkg/manager/impl/execution_manager.go @@ -490,15 +490,19 @@ func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admi return workflowExecConfig, nil } + var workflowName string if launchPlan != nil && launchPlan.Spec != nil { // merge the launch plan spec into workflowExecConfig if isChanged := mergeIntoExecConfig(workflowExecConfig, launchPlan.Spec); isChanged { return workflowExecConfig, nil } + if launchPlan.Spec.WorkflowId != nil { + workflowName = launchPlan.Spec.WorkflowId.Name + } } matchableResource, err := util.GetMatchableResource(ctx, m.resourceManager, - admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, request.Project, request.Domain) + admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, request.Project, request.Domain, workflowName) if err != nil { return nil, err } diff --git a/pkg/manager/impl/execution_manager_test.go b/pkg/manager/impl/execution_manager_test.go index 18af19870..afb30d1da 100644 --- a/pkg/manager/impl/execution_manager_test.go +++ b/pkg/manager/impl/execution_manager_test.go @@ -3961,6 +3961,50 @@ func TestGetExecutionConfigOverrides(t *testing.T) { assert.Nil(t, execConfig.GetLabels()) assert.Nil(t, execConfig.GetAnnotations()) }) + t.Run("matchable resource workflow resource", func(t *testing.T) { + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + assert.EqualValues(t, request, managerInterfaces.ResourceRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Workflow: workflowIdentifier.Name, + ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, + }) + return &managerInterfaces.ResourceResponse{ + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_WorkflowExecutionConfig{ + WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{ + MaxParallelism: 300, + SecurityContext: &core.SecurityContext{ + RunAs: &core.Identity{ + K8SServiceAccount: "workflowDefault", + }, + }, + }, + }, + }, + }, nil + } + request := &admin.ExecutionCreateRequest{ + Project: workflowIdentifier.Project, + Domain: workflowIdentifier.Domain, + Spec: &admin.ExecutionSpec{}, + } + launchPlan := &admin.LaunchPlan{ + Spec: &admin.LaunchPlanSpec{ + WorkflowId: &core.Identifier{ + Name: workflowIdentifier.Name, + }, + }, + } + execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan) + assert.NoError(t, err) + assert.Equal(t, int32(300), execConfig.MaxParallelism) + assert.Equal(t, "workflowDefault", execConfig.SecurityContext.RunAs.K8SServiceAccount) + assert.Nil(t, execConfig.GetRawOutputDataConfig()) + assert.Nil(t, execConfig.GetLabels()) + assert.Nil(t, execConfig.GetAnnotations()) + }) t.Run("matchable resource failure", func(t *testing.T) { resourceManager.GetResourceFunc = func(ctx context.Context, request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { diff --git a/pkg/manager/impl/util/shared.go b/pkg/manager/impl/util/shared.go index bed18fdd1..3758775fa 100644 --- a/pkg/manager/impl/util/shared.go +++ b/pkg/manager/impl/util/shared.go @@ -235,19 +235,20 @@ func GetTaskExecutionModel( return &taskExecutionModel, nil } -// GetMatchableResource gets matchable resource for resourceType and project - domain combination. +// GetMatchableResource gets matchable resource for resourceType and project - domain - workflow combination. // Returns nil with nothing is found or return an error func GetMatchableResource(ctx context.Context, resourceManager interfaces.ResourceInterface, resourceType admin.MatchableResource, - project, domain string) (*interfaces.ResourceResponse, error) { + project, domain, workflowName string) (*interfaces.ResourceResponse, error) { matchableResource, err := resourceManager.GetResource(ctx, interfaces.ResourceRequest{ Project: project, Domain: domain, + Workflow: workflowName, ResourceType: resourceType, }) if err != nil { if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound { - logger.Errorf(ctx, "Failed to get %v overrides in %s project %s domain with error: %v", resourceType, - project, domain, err) + logger.Errorf(ctx, "Failed to get %v overrides in %s project %s domain %s workflow with error: %v", resourceType, + project, domain, workflowName, err) return nil, err } } diff --git a/pkg/manager/impl/util/shared_test.go b/pkg/manager/impl/util/shared_test.go index fce7b682b..6885c9ee3 100644 --- a/pkg/manager/impl/util/shared_test.go +++ b/pkg/manager/impl/util/shared_test.go @@ -481,6 +481,7 @@ func TestGetMatchableResource(t *testing.T) { resourceType := admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG project := "dummyProject" domain := "dummyDomain" + workflow := "dummyWorkflow" t.Run("successful fetch", func(t *testing.T) { resourceManager := &managerMocks.MockResourceManager{} resourceManager.GetResourceFunc = func(ctx context.Context, @@ -501,7 +502,32 @@ func TestGetMatchableResource(t *testing.T) { }, nil } - mr, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain) + mr, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain, "") + assert.Equal(t, int32(12), mr.Attributes.GetWorkflowExecutionConfig().MaxParallelism) + assert.Nil(t, err) + }) + t.Run("successful fetch workflow matchable", func(t *testing.T) { + resourceManager := &managerMocks.MockResourceManager{} + resourceManager.GetResourceFunc = func(ctx context.Context, + request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) { + assert.EqualValues(t, request, managerInterfaces.ResourceRequest{ + Project: project, + Domain: domain, + Workflow: workflow, + ResourceType: resourceType, + }) + return &managerInterfaces.ResourceResponse{ + Attributes: &admin.MatchingAttributes{ + Target: &admin.MatchingAttributes_WorkflowExecutionConfig{ + WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{ + MaxParallelism: 12, + }, + }, + }, + }, nil + } + + mr, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain, workflow) assert.Equal(t, int32(12), mr.Attributes.GetWorkflowExecutionConfig().MaxParallelism) assert.Nil(t, err) }) @@ -518,7 +544,7 @@ func TestGetMatchableResource(t *testing.T) { return nil, flyteAdminErrors.NewFlyteAdminError(codes.NotFound, "resource not found") } - _, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain) + _, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain, "") assert.Nil(t, err) }) @@ -534,7 +560,7 @@ func TestGetMatchableResource(t *testing.T) { return nil, flyteAdminErrors.NewFlyteAdminError(codes.Internal, "internal error") } - _, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain) + _, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain, "") assert.NotNil(t, err) }) }