Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Added workflow level matchable attribute get for execution config (#383)
Browse files Browse the repository at this point in the history
* Added workflow level matchable attribute get for execution config

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* Fixed test

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>

* Feedback and rebase

Signed-off-by: Prafulla Mahindrakar <prafulla.mahindrakar@gmail.com>
  • Loading branch information
pmahindrakar-oss committed Mar 30, 2022
1 parent f01a822 commit d9b1c03
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 8 deletions.
6 changes: 5 additions & 1 deletion pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,15 +490,19 @@ func (m *ExecutionManager) getExecutionConfig(ctx context.Context, request *admi
return workflowExecConfig, nil
}

var workflowName string
if launchPlan != nil && launchPlan.Spec != nil {
// merge the launch plan spec into workflowExecConfig
if isChanged := mergeIntoExecConfig(workflowExecConfig, launchPlan.Spec); isChanged {
return workflowExecConfig, nil
}
if launchPlan.Spec.WorkflowId != nil {
workflowName = launchPlan.Spec.WorkflowId.Name
}
}

matchableResource, err := util.GetMatchableResource(ctx, m.resourceManager,
admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, request.Project, request.Domain)
admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG, request.Project, request.Domain, workflowName)
if err != nil {
return nil, err
}
Expand Down
44 changes: 44 additions & 0 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3961,6 +3961,50 @@ func TestGetExecutionConfigOverrides(t *testing.T) {
assert.Nil(t, execConfig.GetLabels())
assert.Nil(t, execConfig.GetAnnotations())
})
t.Run("matchable resource workflow resource", func(t *testing.T) {
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Workflow: workflowIdentifier.Name,
ResourceType: admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG,
})
return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{
MaxParallelism: 300,
SecurityContext: &core.SecurityContext{
RunAs: &core.Identity{
K8SServiceAccount: "workflowDefault",
},
},
},
},
},
}, nil
}
request := &admin.ExecutionCreateRequest{
Project: workflowIdentifier.Project,
Domain: workflowIdentifier.Domain,
Spec: &admin.ExecutionSpec{},
}
launchPlan := &admin.LaunchPlan{
Spec: &admin.LaunchPlanSpec{
WorkflowId: &core.Identifier{
Name: workflowIdentifier.Name,
},
},
}
execConfig, err := executionManager.getExecutionConfig(context.TODO(), request, launchPlan)
assert.NoError(t, err)
assert.Equal(t, int32(300), execConfig.MaxParallelism)
assert.Equal(t, "workflowDefault", execConfig.SecurityContext.RunAs.K8SServiceAccount)
assert.Nil(t, execConfig.GetRawOutputDataConfig())
assert.Nil(t, execConfig.GetLabels())
assert.Nil(t, execConfig.GetAnnotations())
})
t.Run("matchable resource failure", func(t *testing.T) {
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/manager/impl/util/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,19 +235,20 @@ func GetTaskExecutionModel(
return &taskExecutionModel, nil
}

// GetMatchableResource gets matchable resource for resourceType and project - domain combination.
// GetMatchableResource gets matchable resource for resourceType and project - domain - workflow combination.
// Returns nil with nothing is found or return an error
func GetMatchableResource(ctx context.Context, resourceManager interfaces.ResourceInterface, resourceType admin.MatchableResource,
project, domain string) (*interfaces.ResourceResponse, error) {
project, domain, workflowName string) (*interfaces.ResourceResponse, error) {
matchableResource, err := resourceManager.GetResource(ctx, interfaces.ResourceRequest{
Project: project,
Domain: domain,
Workflow: workflowName,
ResourceType: resourceType,
})
if err != nil {
if flyteAdminError, ok := err.(errors.FlyteAdminError); !ok || flyteAdminError.Code() != codes.NotFound {
logger.Errorf(ctx, "Failed to get %v overrides in %s project %s domain with error: %v", resourceType,
project, domain, err)
logger.Errorf(ctx, "Failed to get %v overrides in %s project %s domain %s workflow with error: %v", resourceType,
project, domain, workflowName, err)
return nil, err
}
}
Expand Down
32 changes: 29 additions & 3 deletions pkg/manager/impl/util/shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ func TestGetMatchableResource(t *testing.T) {
resourceType := admin.MatchableResource_WORKFLOW_EXECUTION_CONFIG
project := "dummyProject"
domain := "dummyDomain"
workflow := "dummyWorkflow"
t.Run("successful fetch", func(t *testing.T) {
resourceManager := &managerMocks.MockResourceManager{}
resourceManager.GetResourceFunc = func(ctx context.Context,
Expand All @@ -501,7 +502,32 @@ func TestGetMatchableResource(t *testing.T) {
}, nil
}

mr, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain)
mr, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain, "")
assert.Equal(t, int32(12), mr.Attributes.GetWorkflowExecutionConfig().MaxParallelism)
assert.Nil(t, err)
})
t.Run("successful fetch workflow matchable", func(t *testing.T) {
resourceManager := &managerMocks.MockResourceManager{}
resourceManager.GetResourceFunc = func(ctx context.Context,
request managerInterfaces.ResourceRequest) (*managerInterfaces.ResourceResponse, error) {
assert.EqualValues(t, request, managerInterfaces.ResourceRequest{
Project: project,
Domain: domain,
Workflow: workflow,
ResourceType: resourceType,
})
return &managerInterfaces.ResourceResponse{
Attributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_WorkflowExecutionConfig{
WorkflowExecutionConfig: &admin.WorkflowExecutionConfig{
MaxParallelism: 12,
},
},
},
}, nil
}

mr, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain, workflow)
assert.Equal(t, int32(12), mr.Attributes.GetWorkflowExecutionConfig().MaxParallelism)
assert.Nil(t, err)
})
Expand All @@ -518,7 +544,7 @@ func TestGetMatchableResource(t *testing.T) {
return nil, flyteAdminErrors.NewFlyteAdminError(codes.NotFound, "resource not found")
}

_, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain)
_, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain, "")
assert.Nil(t, err)
})

Expand All @@ -534,7 +560,7 @@ func TestGetMatchableResource(t *testing.T) {
return nil, flyteAdminErrors.NewFlyteAdminError(codes.Internal, "internal error")
}

_, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain)
_, err := GetMatchableResource(context.Background(), resourceManager, resourceType, project, domain, "")
assert.NotNil(t, err)
})
}

0 comments on commit d9b1c03

Please sign in to comment.