Skip to content

Commit

Permalink
Consolidate and reorganize project, domain & workflow resource matchi…
Browse files Browse the repository at this point in the history
…ng (flyteorg#49)
  • Loading branch information
katrogan committed Jan 3, 2020
1 parent 84f181a commit e0ac32e
Show file tree
Hide file tree
Showing 66 changed files with 2,163 additions and 711 deletions.
14 changes: 10 additions & 4 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions boilerplate/lyft/golang_test_targets/Makefile
Expand Up @@ -10,8 +10,8 @@ lint: #lints the package for common code smells
# However, that call seem to have some effects (e.g. https://github.com/golang/go/issues/29452) which, for some
# reason, allows the subsequent calls to succeed.
# TODO: Evaluate whether this is still a problem after moving admin dependency system to go modules.
GO111MODULE=off GL_DEBUG=linters_output,loader,env golangci-lint run --exclude deprecated -v || true
GO111MODULE=off GL_DEBUG=linters_output,loader,env golangci-lint run --deadline=5m --exclude deprecated -v
GO111MODULE=off golangci-lint run --exclude deprecated -v || true
GO111MODULE=off golangci-lint run --deadline=5m --exclude deprecated -v

# If code is failing goimports linter, this will fix.
# skips 'vendor'
Expand Down
23 changes: 10 additions & 13 deletions pkg/clusterresource/controller.go
Expand Up @@ -11,7 +11,7 @@ import (
"strings"
"time"

"github.com/lyft/flyteadmin/pkg/repositories/transformers"
"github.com/lyft/flyteadmin/pkg/resourcematching"

"github.com/lyft/flyteadmin/pkg/executioncluster/interfaces"

Expand Down Expand Up @@ -174,21 +174,18 @@ func (c *controller) getCustomTemplateValues(
customTemplateValues[key] = value
}
collectedErrs := make([]error, 0)
// All project-domain defaults saved in the database take precedence over the domain-specific defaults.
projectDomainModel, err := c.db.ProjectDomainRepo().Get(ctx, project, domain)
if err != nil {
if err.(errors.FlyteAdminError).Code() != codes.NotFound {
// Not found is fine because not every project-domain combination will have specific custom resource
// attributes.
collectedErrs = append(collectedErrs, err)
}
}
projectDomain, err := transformers.FromProjectDomainModel(projectDomainModel)
// All override values saved in the database take precedence over the domain-specific defaults.
attributes, err := resourcematching.GetOverrideValuesToApply(ctx, resourcematching.GetOverrideValuesInput{
Db: c.db,
Project: project,
Domain: domain,
Resource: admin.MatchableResource_CLUSTER_RESOURCE,
})
if err != nil {
collectedErrs = append(collectedErrs, err)
}
if len(projectDomain.Attributes) > 0 {
for templateKey, templateValue := range projectDomain.Attributes {
if attributes != nil && attributes.GetClusterResourceAttributes() != nil {
for templateKey, templateValue := range attributes.GetClusterResourceAttributes().Attributes {
customTemplateValues[fmt.Sprintf(templateVariableFormat, templateKey)] = templateValue
}
}
Expand Down
30 changes: 14 additions & 16 deletions pkg/clusterresource/controller_test.go
Expand Up @@ -7,9 +7,6 @@ import (
"testing"
"time"

"github.com/lyft/flyteadmin/pkg/errors"
"google.golang.org/grpc/codes"

"github.com/lyft/flyteadmin/pkg/repositories/transformers"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"

Expand Down Expand Up @@ -154,15 +151,20 @@ func TestGetCustomTemplateValues(t *testing.T) {
projectDomainAttributes := admin.ProjectDomainAttributes{
Project: "project-foo",
Domain: "domain-bar",
Attributes: map[string]string{
"var1": "val1",
"var2": "val2",
MatchingAttributes: &admin.MatchingAttributes{
Target: &admin.MatchingAttributes_ClusterResourceAttributes{ClusterResourceAttributes: &admin.ClusterResourceAttributes{
Attributes: map[string]string{
"var1": "val1",
"var2": "val2",
},
},
},
},
}
projectDomainModel, err := transformers.ToProjectDomainModel(projectDomainAttributes)
projectDomainModel, err := transformers.ToProjectDomainAttributesModel(projectDomainAttributes, admin.MatchableResource_CLUSTER_RESOURCE)
assert.Nil(t, err)
mockRepository.ProjectDomainRepo().(*repositoryMocks.MockProjectDomainRepo).GetFunction = func(
ctx context.Context, project, domain string) (models.ProjectDomain, error) {
mockRepository.ProjectDomainAttributesRepo().(*repositoryMocks.MockProjectDomainAttributesRepo).GetFunction = func(
ctx context.Context, project, domain, resource string) (models.ProjectDomainAttributes, error) {
assert.Equal(t, "project-foo", project)
assert.Equal(t, "domain-bar", domain)
return projectDomainModel, nil
Expand All @@ -188,10 +190,6 @@ func TestGetCustomTemplateValues(t *testing.T) {

func TestGetCustomTemplateValues_NothingToOverride(t *testing.T) {
mockRepository := repositoryMocks.NewMockRepository()
mockRepository.ProjectDomainRepo().(*repositoryMocks.MockProjectDomainRepo).GetFunction = func(
ctx context.Context, project, domain string) (models.ProjectDomain, error) {
return models.ProjectDomain{}, errors.NewFlyteAdminError(codes.NotFound, "not found")
}
testController := controller{
db: mockRepository,
}
Expand All @@ -209,9 +207,9 @@ func TestGetCustomTemplateValues_NothingToOverride(t *testing.T) {

func TestGetCustomTemplateValues_InvalidDBModel(t *testing.T) {
mockRepository := repositoryMocks.NewMockRepository()
mockRepository.ProjectDomainRepo().(*repositoryMocks.MockProjectDomainRepo).GetFunction = func(
ctx context.Context, project, domain string) (models.ProjectDomain, error) {
return models.ProjectDomain{
mockRepository.ProjectDomainAttributesRepo().(*repositoryMocks.MockProjectDomainAttributesRepo).GetFunction = func(
ctx context.Context, project, domain, resource string) (models.ProjectDomainAttributes, error) {
return models.ProjectDomainAttributes{
Attributes: []byte("i'm invalid"),
}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/impl/execution_manager.go
Expand Up @@ -246,7 +246,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(

// Dynamically assign task resource defaults.
for _, task := range workflow.Closure.CompiledWorkflow.Tasks {
validation.SetDefaults(ctx, m.config.TaskResourceConfiguration(), task)
validation.SetDefaults(ctx, m.config.TaskResourceConfiguration(), task, m.db, name)
}

// Dynamically assign execution queues.
Expand Down Expand Up @@ -926,7 +926,7 @@ func NewExecutionManager(
userScope promutils.Scope,
publisher notificationInterfaces.Publisher,
urlData dataInterfaces.RemoteURLInterface) interfaces.ExecutionInterface {
queueAllocator := executions.NewQueueAllocator(config)
queueAllocator := executions.NewQueueAllocator(config, db)
systemMetrics := newExecutionSystemMetrics(systemScope)

userMetrics := executionUserMetrics{
Expand Down
14 changes: 6 additions & 8 deletions pkg/manager/impl/execution_manager_test.go
Expand Up @@ -337,10 +337,8 @@ func TestCreateExecution_TaggedQueue(t *testing.T) {
},
}, []runtimeInterfaces.WorkflowConfig{
{
Project: "project",
Domain: "domain",
WorkflowName: "name",
Tags: []string{"tag"},
Domain: "domain",
Tags: []string{"tag"},
},
}),
nil, nil, nil, nil)
Expand Down Expand Up @@ -1551,7 +1549,7 @@ func TestListExecutions_TransformerError(t *testing.T) {

func TestExecutionManager_PublishNotifications(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
queue := executions.NewQueueAllocator(getMockExecutionsConfigProvider())
queue := executions.NewQueueAllocator(getMockExecutionsConfigProvider(), repository)

mockApplicationConfig := runtimeMocks.MockApplicationProvider{}
mockApplicationConfig.SetNotificationsConfig(runtimeInterfaces.NotificationsConfig{
Expand Down Expand Up @@ -1647,7 +1645,7 @@ func TestExecutionManager_PublishNotifications(t *testing.T) {

func TestExecutionManager_PublishNotificationsTransformError(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
queue := executions.NewQueueAllocator(getMockExecutionsConfigProvider())
queue := executions.NewQueueAllocator(getMockExecutionsConfigProvider(), repository)
var execManager = &ExecutionManager{
db: repository,
config: getMockExecutionsConfigProvider(),
Expand Down Expand Up @@ -1688,7 +1686,7 @@ func TestExecutionManager_PublishNotificationsTransformError(t *testing.T) {

func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformError(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
queue := executions.NewQueueAllocator(getMockExecutionsConfigProvider())
queue := executions.NewQueueAllocator(getMockExecutionsConfigProvider(), repository)
publishFunc := func(ctx context.Context, key string, msg proto.Message) error {
return errors.New("error publishing message")
}
Expand Down Expand Up @@ -1759,7 +1757,7 @@ func TestExecutionManager_TestExecutionManager_PublishNotificationsTransformErro

func TestExecutionManager_PublishNotificationsNoPhaseMatch(t *testing.T) {
repository := repositoryMocks.NewMockRepository()
queue := executions.NewQueueAllocator(getMockExecutionsConfigProvider())
queue := executions.NewQueueAllocator(getMockExecutionsConfigProvider(), repository)

var myExecManager = &ExecutionManager{
db: repository,
Expand Down

0 comments on commit e0ac32e

Please sign in to comment.