-
Notifications
You must be signed in to change notification settings - Fork 63
Implement signal service for human-in-the-loop workflow orchestration #423
Conversation
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
|
||
func CreateSignalModel(signalID *core.SignalIdentifier, signalType *core.LiteralType, signalValue *core.Literal) (models.Signal, error) { | ||
signalModel := models.Signal{} | ||
if signalID != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when would this ever be nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should never be, a signal without an ID is invalid. Included it for completeness. Perhaps throwing an error would be better here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure that sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder since many of these fields are already validated at this point if we can reduce all the nil/empty checks in the body of this method? Your call of course!
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Codecov Report
@@ Coverage Diff @@
## master #423 +/- ##
==========================================
+ Coverage 60.16% 60.52% +0.36%
==========================================
Files 158 163 +5
Lines 14074 14492 +418
==========================================
+ Hits 8467 8771 +304
- Misses 4866 4955 +89
- Partials 741 766 +25
Flags with carried forward coverage won't be shown. Click here to find out more.
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. |
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Daniel Rammer <daniel@union.ai>
Signed-off-by: Dan Rammer <daniel@union.ai>
Signed-off-by: Dan Rammer <daniel@union.ai>
@@ -0,0 +1,147 @@ | |||
package rpc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for all this cruft, this rpc layer is admittedly pretty useless boilerplate, we could move the basic nil request validation, panic deferral and error transformation to just the manager layer (or get crafty with request middleware) not worth changing up in yourPR since you've already implemented this. Thank you for adding
|
||
func CreateSignalModel(signalID *core.SignalIdentifier, signalType *core.LiteralType, signalValue *core.Literal) (models.Signal, error) { | ||
signalModel := models.Signal{} | ||
if signalID != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder since many of these fields are already validated at this point if we can reduce all the nil/empty checks in the body of this method? Your call of course!
func FromSignalModel(signalModel models.Signal) (admin.Signal, error) { | ||
signal := admin.Signal{} | ||
|
||
var executionID *core.WorkflowExecutionIdentifier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe just init this once?
SignalKey: input, | ||
}).Take(&signal) | ||
timer.Stop() | ||
if errors.Is(tx.Error, gorm.ErrRecordNotFound) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this should no longer be necessary after our gorm v2 upgrade, the error transformer call below should catch this: https://github.com/flyteorg/flyteadmin/blob/master/pkg/repositories/errors/postgres.go#L56
@@ -390,6 +390,16 @@ var Migrations = []*gormigrate.Migration{ | |||
return tx.Model(&models.Execution{}).Migrator().DropIndex(&models.Execution{}, "idx_executions_created_at") | |||
}, | |||
}, | |||
// Create signals table. | |||
{ | |||
ID: "2022-04-11-signals", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:D
"failed to validate that signal [%v] exists, err: [%+v]", | ||
signalModel.SignalKey, err) | ||
} | ||
valueType := propellervalidators.LiteralTypeForLiteral(request.Value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh this is tedious, it's too bad we can't store the valueType in the db as a column but I'm guessing it's not possible to statically capture the list of castable types so we we can bake in the validation into the UpdateSignal database call as a where clause?
Co-authored-by: Katrina Rogan <katroganGH@gmail.com> Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: Eduardo Apolinario <eapolinario@users.noreply.github.com>
…#423) Signed-off-by: Daniel Rammer <daniel@union.ai>
TL;DR
Adds SignalService implementation to support manual signals to trigger GateNodes in workflows.
Type
Are all requirements met?
Complete description
^^^
Tracking Issue
flyteorg/flyte#208
Follow-up issue
NA