This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 60
/
handler.go
54 lines (44 loc) · 1.68 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package end
import (
"context"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/storage"
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler"
)
type endHandler struct {
}
func (e endHandler) FinalizeRequired() bool {
return false
}
func (e endHandler) Setup(ctx context.Context, setupContext handler.SetupContext) error {
return nil
}
func (e endHandler) Handle(ctx context.Context, executionContext handler.NodeExecutionContext) (handler.Transition, error) {
inputs, err := executionContext.InputReader().Get(ctx)
if err != nil {
return handler.UnknownTransition, err
}
if inputs != nil {
logger.Debugf(ctx, "Workflow has outputs. Storing them.")
// TODO we should use OutputWriter here
o := v1alpha1.GetOutputsFile(executionContext.NodeStatus().GetOutputDir())
so := storage.Options{}
if err := executionContext.DataStore().WriteProtobuf(ctx, o, so, inputs); err != nil {
logger.Errorf(ctx, "Failed to store workflow outputs. Error [%s]", err)
return handler.UnknownTransition, errors.Wrapf(errors.CausedByError, executionContext.NodeID(), err, "Failed to store workflow outputs, as end-node")
}
}
logger.Debugf(ctx, "End node success")
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoSuccess(nil)), nil
}
func (e endHandler) Abort(_ context.Context, _ handler.NodeExecutionContext, _ string) error {
return nil
}
func (e endHandler) Finalize(_ context.Context, _ handler.NodeExecutionContext) error {
return nil
}
func New() handler.Node {
return &endHandler{}
}