Skip to content

Commit

Permalink
Adds endpoints for NamedEntities (#20)
Browse files Browse the repository at this point in the history
* Adding endpoints for named entities

part of the wiring for new endpoints/manager

Getting most of the plumbing implemented

Getting the GET code path to work

Implementing update

Adding tests for metadata

Refactoring to support listing named entities

Making the endpoints work correctly

Fixing tests

* Remove unused mock field

* Fixing lint errors

* Updating GET endpoint to 404 if passed an invalid project/domain/name

* Fixing lint error

* PR feedback

* Updating package lock

* Removing stub implementation

* Fixing test

* remove misleading comment

* PR feedback and tests

* Adding integration tests and fixing a bug with getting single entities

* fixing a test

* Fixing lockfile

* Relaxing idl constraint

* Fixing lockfile again

* Changing serve command to use local config file again

* PR feedback
  • Loading branch information
schottra committed Nov 14, 2019
1 parent 5cd1bfd commit 0134a1b
Show file tree
Hide file tree
Showing 40 changed files with 1,836 additions and 180 deletions.
4 changes: 2 additions & 2 deletions Makefile
Expand Up @@ -9,7 +9,7 @@ update_boilerplate:

.PHONY: integration
integration:
GOCACHE=off go test -v -tags=integration ./tests/...
GOFLAGS="-count=1" go test -v -tags=integration ./tests/...

.PHONY: k8s_integration
k8s_integration:
Expand All @@ -25,7 +25,7 @@ linux_compile:

.PHONY: server
server:
go run cmd/main.go --logtostderr --server.kube-config ~/.kube/config --config ~/flyteadmin_config.yaml serve
go run cmd/main.go --logtostderr --server.kube-config ~/.kube/config --config flyteadmin_config.yaml serve

.PHONY: migrate
migrate:
Expand Down
11 changes: 11 additions & 0 deletions pkg/common/entity.go
@@ -1,5 +1,9 @@
package common

import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
)

type Entity = string

const (
Expand All @@ -11,3 +15,10 @@ const (
TaskExecution = "te"
Workflow = "w"
)

// ResourceTypeToEntity maps a resource type to an entity suitable for use with Database filters
var ResourceTypeToEntity = map[core.ResourceType]Entity{
core.ResourceType_LAUNCH_PLAN: LaunchPlan,
core.ResourceType_TASK: Task,
core.ResourceType_WORKFLOW: Workflow,
}
2 changes: 0 additions & 2 deletions pkg/manager/impl/launch_plan_manager_test.go
Expand Up @@ -33,8 +33,6 @@ import (
"google.golang.org/grpc/codes"
)

const project, domain, name, version = "project", "domain", "name", "version"

var active = int32(admin.LaunchPlanState_ACTIVE)
var inactive = int32(admin.LaunchPlanState_INACTIVE)
var mockScheduler = mocks.NewMockEventScheduler()
Expand Down
129 changes: 129 additions & 0 deletions pkg/manager/impl/named_entity_manager.go
@@ -0,0 +1,129 @@
package impl

import (
"context"
"strconv"

"github.com/lyft/flyteadmin/pkg/common"
"github.com/lyft/flyteadmin/pkg/errors"
"google.golang.org/grpc/codes"

"github.com/lyft/flyteadmin/pkg/manager/impl/util"
"github.com/lyft/flyteadmin/pkg/manager/impl/validation"
"github.com/lyft/flyteadmin/pkg/manager/interfaces"
"github.com/lyft/flyteadmin/pkg/repositories"
repoInterfaces "github.com/lyft/flyteadmin/pkg/repositories/interfaces"
"github.com/lyft/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
)

type NamedEntityMetrics struct {
Scope promutils.Scope
}

type NamedEntityManager struct {
db repositories.RepositoryInterface
config runtimeInterfaces.Configuration
metrics NamedEntityMetrics
}

func (m *NamedEntityManager) UpdateNamedEntity(ctx context.Context, request admin.NamedEntityUpdateRequest) (
*admin.NamedEntityUpdateResponse, error) {
if err := validation.ValidateNamedEntityUpdateRequest(request); err != nil {
logger.Debugf(ctx, "invalid request [%+v]: %v", request, err)
return nil, err
}

// Ensure entity exists before trying to update it
_, err := util.GetNamedEntity(ctx, m.db, request.ResourceType, *request.Id)
if err != nil {
return nil, err
}

metadataModel := transformers.CreateNamedEntityModel(&request)
err = m.db.NamedEntityRepo().Update(ctx, metadataModel)
if err != nil {
logger.Debugf(ctx, "Failed to update named_entity for [%+v] with err %v", request.Id, err)
return nil, err
}
return &admin.NamedEntityUpdateResponse{}, nil
}

func (m *NamedEntityManager) GetNamedEntity(ctx context.Context, request admin.NamedEntityGetRequest) (
*admin.NamedEntity, error) {
if err := validation.ValidateNamedEntityGetRequest(request); err != nil {
logger.Debugf(ctx, "invalid request [%+v]: %v", request, err)
return nil, err
}
return util.GetNamedEntity(ctx, m.db, request.ResourceType, *request.Id)
}

func (m *NamedEntityManager) ListNamedEntities(ctx context.Context, request admin.NamedEntityListRequest) (
*admin.NamedEntityList, error) {
if err := validation.ValidateNamedEntityListRequest(request); err != nil {
logger.Debugf(ctx, "invalid request [%+v]: %v", request, err)
return nil, err
}

filters, err := util.GetDbFilters(util.FilterSpec{
Project: request.Project,
Domain: request.Domain,
}, common.ResourceTypeToEntity[request.ResourceType])
if err != nil {
return nil, err
}
var sortParameter common.SortParameter
if request.SortBy != nil {
sortParameter, err = common.NewSortParameter(*request.SortBy)
if err != nil {
return nil, err
}
}
offset, err := validation.ValidateToken(request.Token)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"invalid pagination token %s for ListNamedEntities", request.Token)
}
listInput := repoInterfaces.ListResourceInput{
Limit: int(request.Limit),
Offset: offset,
InlineFilters: filters,
SortParameter: sortParameter,
}

output, err := m.db.NamedEntityRepo().List(ctx, request.ResourceType, listInput)
if err != nil {
logger.Debugf(ctx, "Failed to list named entities of type: %s with project: %s, domain: %s. Returned error was: %v",
request.ResourceType, request.Project, request.Domain, err)
return nil, err
}

var token string
if len(output.Entities) == int(request.Limit) {
token = strconv.Itoa(offset + len(output.Entities))
}
entities := transformers.FromNamedEntityModels(output.Entities)
return &admin.NamedEntityList{
Entities: entities,
Token: token,
}, nil

}

func NewNamedEntityManager(
db repositories.RepositoryInterface,
config runtimeInterfaces.Configuration,
scope promutils.Scope) interfaces.NamedEntityInterface {

metrics := NamedEntityMetrics{
Scope: scope,
}
return &NamedEntityManager{
db: db,
config: config,
metrics: metrics,
}
}
139 changes: 139 additions & 0 deletions pkg/manager/impl/named_entity_manager_test.go
@@ -0,0 +1,139 @@
package impl

import (
"context"
"testing"

"github.com/lyft/flyteadmin/pkg/manager/impl/testutils"
"github.com/lyft/flyteadmin/pkg/repositories"
"github.com/lyft/flyteadmin/pkg/repositories/interfaces"
repositoryMocks "github.com/lyft/flyteadmin/pkg/repositories/mocks"
"github.com/lyft/flyteadmin/pkg/repositories/models"
runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
runtimeMocks "github.com/lyft/flyteadmin/pkg/runtime/mocks"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
mockScope "github.com/lyft/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
)

var namedEntityIdentifier = admin.NamedEntityIdentifier{
Project: project,
Domain: domain,
Name: name,
}

var badIdentifier = admin.NamedEntityIdentifier{
Project: project,
Domain: domain,
Name: "",
}

func getMockRepositoryForNETest() repositories.RepositoryInterface {
return repositoryMocks.NewMockRepository()
}

func getMockConfigForNETest() runtimeInterfaces.Configuration {
mockConfig := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultProjects(), nil, nil, nil, nil, nil)
return mockConfig
}

func TestNamedEntityManager_Get(t *testing.T) {
repository := getMockRepositoryForNETest()
manager := NewNamedEntityManager(repository, getMockConfigForNETest(), mockScope.NewTestScope())

getFunction := func(input interfaces.GetNamedEntityInput) (models.NamedEntity, error) {
return models.NamedEntity{
NamedEntityKey: models.NamedEntityKey{
ResourceType: input.ResourceType,
Project: input.Project,
Domain: input.Domain,
Name: input.Name,
},
NamedEntityMetadataFields: models.NamedEntityMetadataFields{
Description: description,
},
}, nil
}
repository.NamedEntityRepo().(*repositoryMocks.MockNamedEntityRepo).SetGetCallback(getFunction)
response, err := manager.GetNamedEntity(context.Background(), admin.NamedEntityGetRequest{
ResourceType: resourceType,
Id: &namedEntityIdentifier,
})
assert.NoError(t, err)
assert.NotNil(t, response)
}

func TestNamedEntityManager_Get_BadRequest(t *testing.T) {
repository := getMockRepositoryForNETest()
manager := NewNamedEntityManager(repository, getMockConfigForNETest(), mockScope.NewTestScope())

response, err := manager.GetNamedEntity(context.Background(), admin.NamedEntityGetRequest{
ResourceType: core.ResourceType_UNSPECIFIED,
Id: &namedEntityIdentifier,
})
assert.Error(t, err)
assert.Nil(t, response)

response, err = manager.GetNamedEntity(context.Background(), admin.NamedEntityGetRequest{
ResourceType: resourceType,
Id: &badIdentifier,
})
assert.Error(t, err)
assert.Nil(t, response)
}

func TestNamedEntityManager_Update(t *testing.T) {
repository := getMockRepositoryForNETest()
manager := NewNamedEntityManager(repository, getMockConfigForNETest(), mockScope.NewTestScope())
updatedDescription := "updated description"
var updateCalled bool

updateFunction := func(input models.NamedEntity) error {
updateCalled = true
assert.Equal(t, input.Description, updatedDescription)
assert.Equal(t, input.ResourceType, resourceType)
assert.Equal(t, input.Project, project)
assert.Equal(t, input.Domain, domain)
assert.Equal(t, input.Name, name)
return nil
}
repository.NamedEntityRepo().(*repositoryMocks.MockNamedEntityRepo).SetUpdateCallback(updateFunction)
updatedMetadata := admin.NamedEntityMetadata{
Description: updatedDescription,
}
response, err := manager.UpdateNamedEntity(context.Background(), admin.NamedEntityUpdateRequest{
Metadata: &updatedMetadata,
ResourceType: resourceType,
Id: &namedEntityIdentifier,
})
assert.True(t, updateCalled)
assert.NoError(t, err)
assert.NotNil(t, response)
}

func TestNamedEntityManager_Update_BadRequest(t *testing.T) {
repository := getMockRepositoryForNETest()
manager := NewNamedEntityManager(repository, getMockConfigForNETest(), mockScope.NewTestScope())
updatedDescription := "updated description"

updatedMetadata := admin.NamedEntityMetadata{
Description: updatedDescription,
}
response, err := manager.UpdateNamedEntity(context.Background(), admin.NamedEntityUpdateRequest{
Metadata: &updatedMetadata,
ResourceType: core.ResourceType_UNSPECIFIED,
Id: &namedEntityIdentifier,
})
assert.Error(t, err)
assert.Nil(t, response)

response, err = manager.UpdateNamedEntity(context.Background(), admin.NamedEntityUpdateRequest{
Metadata: &updatedMetadata,
ResourceType: resourceType,
Id: &badIdentifier,
})
assert.Error(t, err)
assert.Nil(t, response)
}
1 change: 1 addition & 0 deletions pkg/manager/impl/shared/constants.go
Expand Up @@ -8,6 +8,7 @@ const (
Name = "name"
ID = "id"
Version = "version"
ResourceType = "resource_type"
Spec = "spec"
Type = "type"
RuntimeVersion = "runtime version"
Expand Down
12 changes: 12 additions & 0 deletions pkg/manager/impl/test_constants.go
@@ -0,0 +1,12 @@
package impl

import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
)

const project = "project"
const domain = "domain"
const name = "name"
const version = "version"
const description = "description"
const resourceType = core.ResourceType_WORKFLOW
33 changes: 27 additions & 6 deletions pkg/manager/impl/util/shared.go
Expand Up @@ -5,20 +5,17 @@ import (
"context"
"time"

"github.com/lyft/flyteadmin/pkg/manager/impl/validation"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"

"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flyteadmin/pkg/common"
"github.com/lyft/flyteadmin/pkg/errors"
"github.com/lyft/flyteadmin/pkg/manager/impl/shared"
"github.com/lyft/flyteadmin/pkg/manager/impl/validation"
"github.com/lyft/flyteadmin/pkg/repositories"
repoInterfaces "github.com/lyft/flyteadmin/pkg/repositories/interfaces"
"github.com/lyft/flyteadmin/pkg/repositories/models"
"github.com/lyft/flyteadmin/pkg/repositories/transformers"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/storage"
"google.golang.org/grpc/codes"
)
Expand Down Expand Up @@ -117,6 +114,30 @@ func GetLaunchPlan(
return transformers.FromLaunchPlanModel(launchPlanModel)
}

func GetNamedEntityModel(
ctx context.Context, repo repositories.RepositoryInterface, resourceType core.ResourceType, identifier admin.NamedEntityIdentifier) (models.NamedEntity, error) {
metadataModel, err := (repo).NamedEntityRepo().Get(ctx, repoInterfaces.GetNamedEntityInput{
ResourceType: resourceType,
Project: identifier.Project,
Domain: identifier.Domain,
Name: identifier.Name,
})
if err != nil {
return models.NamedEntity{}, err
}
return metadataModel, nil
}

func GetNamedEntity(
ctx context.Context, repo repositories.RepositoryInterface, resourceType core.ResourceType, identifier admin.NamedEntityIdentifier) (*admin.NamedEntity, error) {
metadataModel, err := GetNamedEntityModel(ctx, repo, resourceType, identifier)
if err != nil {
return nil, err
}
metadata := transformers.FromNamedEntityModel(metadataModel)
return &metadata, nil
}

// Returns the set of filters necessary to query launch plan models to find the active version of a launch plan
func GetActiveLaunchPlanVersionFilters(project, domain, name string) ([]common.InlineFilter, error) {
projectFilter, err := common.NewSingleValueFilter(common.LaunchPlan, common.Equal, shared.Project, project)
Expand Down

0 comments on commit 0134a1b

Please sign in to comment.