Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Implement signal service for human-in-the-loop workflow orchestration (
Browse files Browse the repository at this point in the history
…#423)


Signed-off-by: Daniel Rammer <daniel@union.ai>
  • Loading branch information
hamersaw committed Dec 13, 2022
1 parent fce143d commit a1804d5
Show file tree
Hide file tree
Showing 25 changed files with 2,050 additions and 9 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ require (
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/flyteorg/flyteidl v1.2.5
github.com/flyteorg/flyteplugins v1.0.18
github.com/flyteorg/flytepropeller v1.1.47
github.com/flyteorg/flytestdlib v1.0.12
github.com/flyteorg/flyteplugins v1.0.20
github.com/flyteorg/flytepropeller v1.1.51
github.com/flyteorg/flytestdlib v1.0.14
github.com/flyteorg/stow v0.3.6
github.com/ghodss/yaml v1.0.0
github.com/go-gormigrate/gormigrate/v2 v2.0.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -354,13 +354,13 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.2.5 h1:oPs0PX9opR9JtWjP5ZH2YMChkbGGL45PIy+90FlaxYc=
github.com/flyteorg/flyteidl v1.2.5/go.mod h1:OJAq333OpInPnMhvVz93AlEjmlQ+t0FAD4aakIYE4OU=
github.com/flyteorg/flyteplugins v1.0.18 h1:DOyxAFaS4luv7H9XRKUpHbO09imsG4LP8Du515FGXyM=
github.com/flyteorg/flyteplugins v1.0.18/go.mod h1:ZbZVBxEWh8Icj1AgfNKg0uPzHHGd9twa4eWcY2Yt6xE=
github.com/flyteorg/flytepropeller v1.1.47 h1:k+moR+YGOyKJnYHDZjBBXvwnuZJ7IhK/PRv/9Ak/QIs=
github.com/flyteorg/flytepropeller v1.1.47/go.mod h1:vZlQTBOsddrNGxmA0To+B2ld3VFg6sRWwcC4KU7+g9A=
github.com/flyteorg/flyteplugins v1.0.20 h1:8ZGN2c0iaZa3d/UmN2VYozLBRhthAIO48aD5g8Wly7s=
github.com/flyteorg/flyteplugins v1.0.20/go.mod h1:ZbZVBxEWh8Icj1AgfNKg0uPzHHGd9twa4eWcY2Yt6xE=
github.com/flyteorg/flytepropeller v1.1.51 h1:ITPH2Fqx+/1hKBFnfb6Rawws3VbEJ3tQ/1tQXSIXvcQ=
github.com/flyteorg/flytepropeller v1.1.51/go.mod h1:zstMUz30mIskZB4uMkObzOj3CjsGfXIV/+nVxlOmI7I=
github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c=
github.com/flyteorg/flytestdlib v1.0.12 h1:A+yN5TX/SezjCjzv/JV29SzlBAyKGeLDOfAiYqzrKcw=
github.com/flyteorg/flytestdlib v1.0.12/go.mod h1:nIBmBHtjTJvhZEn3e/EwVC/iMkR2tUX8hEiXjRBpH/s=
github.com/flyteorg/flytestdlib v1.0.14 h1:P6hy9yVrIEUxp4JaxV7/KwTSTYjHGizQu1fKXYkq9Y8=
github.com/flyteorg/flytestdlib v1.0.14/go.mod h1:nIBmBHtjTJvhZEn3e/EwVC/iMkR2tUX8hEiXjRBpH/s=
github.com/flyteorg/stow v0.3.3/go.mod h1:HBld7ud0i4khMHwJjkO8v+NSP7ddKa/ruhf4I8fliaA=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
github.com/flyteorg/stow v0.3.6/go.mod h1:5dfBitPM004dwaZdoVylVjxFT4GWAgI0ghAndhNUzCo=
Expand Down
1 change: 1 addition & 0 deletions pkg/common/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
NamedEntity = "nen"
NamedEntityMetadata = "nem"
Project = "p"
Signal = "s"
)

// ResourceTypeToEntity maps a resource type to an entity suitable for use with Database filters
Expand Down
160 changes: 160 additions & 0 deletions pkg/manager/impl/signal_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package impl

import (
"context"
"strconv"

"github.com/flyteorg/flytestdlib/contextutils"

"github.com/flyteorg/flyteadmin/pkg/common"
"github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/util"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/validation"
"github.com/flyteorg/flyteadmin/pkg/manager/interfaces"
repoInterfaces "github.com/flyteorg/flyteadmin/pkg/repositories/interfaces"
"github.com/flyteorg/flyteadmin/pkg/repositories/transformers"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

"google.golang.org/grpc/codes"
)

type signalMetrics struct {
Scope promutils.Scope
Set labeled.Counter
}

type SignalManager struct {
db repoInterfaces.Repository
metrics signalMetrics
}

func getSignalContext(ctx context.Context, identifier *core.SignalIdentifier) context.Context {
ctx = contextutils.WithProjectDomain(ctx, identifier.ExecutionId.Project, identifier.ExecutionId.Domain)
ctx = contextutils.WithWorkflowID(ctx, identifier.ExecutionId.Name)
return contextutils.WithSignalID(ctx, identifier.SignalId)
}

func (s *SignalManager) GetOrCreateSignal(ctx context.Context, request admin.SignalGetOrCreateRequest) (*admin.Signal, error) {
if err := validation.ValidateSignalGetOrCreateRequest(ctx, request); err != nil {
logger.Debugf(ctx, "invalid request [%+v]: %v", request, err)
return nil, err
}
ctx = getSignalContext(ctx, request.Id)

signalModel, err := transformers.CreateSignalModel(request.Id, request.Type, nil)
if err != nil {
logger.Errorf(ctx, "Failed to transform signal with id [%+v] and type [+%v] with err: %v", request.Id, request.Type, err)
return nil, err
}

err = s.db.SignalRepo().GetOrCreate(ctx, &signalModel)
if err != nil {
return nil, err
}

signal, err := transformers.FromSignalModel(signalModel)
if err != nil {
logger.Errorf(ctx, "Failed to transform signal model [%+v] with err: %v", signalModel, err)
return nil, err
}

return &signal, nil
}

func (s *SignalManager) ListSignals(ctx context.Context, request admin.SignalListRequest) (*admin.SignalList, error) {
if err := validation.ValidateSignalListRequest(ctx, request); err != nil {
logger.Debugf(ctx, "ListSignals request [%+v] is invalid: %v", request, err)
return nil, err
}
ctx = getExecutionContext(ctx, request.WorkflowExecutionId)

identifierFilters, err := util.GetWorkflowExecutionIdentifierFilters(ctx, *request.WorkflowExecutionId)
if err != nil {
return nil, err
}

filters, err := util.AddRequestFilters(request.Filters, common.Signal, identifierFilters)
if err != nil {
return nil, err
}
var sortParameter common.SortParameter
if request.SortBy != nil {
sortParameter, err = common.NewSortParameter(*request.SortBy)
if err != nil {
return nil, err
}
}

offset, err := validation.ValidateToken(request.Token)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"invalid pagination token %s for ListSignals", request.Token)
}

signalModelList, err := s.db.SignalRepo().List(ctx, repoInterfaces.ListResourceInput{
InlineFilters: filters,
Offset: offset,
Limit: int(request.Limit),
SortParameter: sortParameter,
})
if err != nil {
logger.Debugf(ctx, "Failed to list signals with request [%+v] with err %v",
request, err)
return nil, err
}

signalList, err := transformers.FromSignalModels(signalModelList)
if err != nil {
logger.Debugf(ctx, "failed to transform signal models for request [%+v] with err: %v", request, err)
return nil, err
}
var token string
if len(signalList) == int(request.Limit) {
token = strconv.Itoa(offset + len(signalList))
}
return &admin.SignalList{
Signals: signalList,
Token: token,
}, nil
}

func (s *SignalManager) SetSignal(ctx context.Context, request admin.SignalSetRequest) (*admin.SignalSetResponse, error) {
if err := validation.ValidateSignalSetRequest(ctx, s.db, request); err != nil {
return nil, err
}
ctx = getSignalContext(ctx, request.Id)

signalModel, err := transformers.CreateSignalModel(request.Id, nil, request.Value)
if err != nil {
logger.Errorf(ctx, "Failed to transform signal with id [%+v] and value [+%v] with err: %v", request.Id, request.Value, err)
return nil, err
}

err = s.db.SignalRepo().Update(ctx, signalModel.SignalKey, signalModel.Value)
if err != nil {
return nil, err
}

s.metrics.Set.Inc(ctx)
return &admin.SignalSetResponse{}, nil
}

func NewSignalManager(
db repoInterfaces.Repository,
scope promutils.Scope) interfaces.SignalInterface {
metrics := signalMetrics{
Scope: scope,
Set: labeled.NewCounter("num_set", "count of set signals", scope),
}

return &SignalManager{
db: db,
metrics: metrics,
}
}
Loading

0 comments on commit a1804d5

Please sign in to comment.