This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
data.go
125 lines (108 loc) · 4.84 KB
/
data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package util
import (
"context"
"github.com/flyteorg/flyteadmin/pkg/common"
dataInterfaces "github.com/flyteorg/flyteadmin/pkg/data/interfaces"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/storage"
"github.com/golang/protobuf/proto"
)
func shouldFetchData(config *runtimeInterfaces.RemoteDataConfig, urlBlob admin.UrlBlob) bool {
return config.Scheme == common.Local || config.Scheme == common.None || config.MaxSizeInBytes == 0 ||
urlBlob.Bytes < config.MaxSizeInBytes
}
func shouldFetchOutputData(config *runtimeInterfaces.RemoteDataConfig, urlBlob admin.UrlBlob, outputURI string) bool {
return len(outputURI) > 0 && shouldFetchData(config, urlBlob)
}
// GetInputs returns an inputs URL blob and if config settings permit, inline inputs data for an execution.
func GetInputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,
remoteDataConfig *runtimeInterfaces.RemoteDataConfig, storageClient *storage.DataStore, inputURI string) (
*core.LiteralMap, *admin.UrlBlob, error) {
var inputsURLBlob admin.UrlBlob
var fullInputs core.LiteralMap
if len(inputURI) == 0 {
return &fullInputs, &inputsURLBlob, nil
}
var err error
if remoteDataConfig.SignedURL.Enabled {
inputsURLBlob, err = urlData.Get(ctx, inputURI)
if err != nil {
return nil, nil, err
}
}
if shouldFetchData(remoteDataConfig, inputsURLBlob) {
err = storageClient.ReadProtobuf(ctx, storage.DataReference(inputURI), &fullInputs)
if err != nil {
// If we fail to read the protobuf from the remote store, we shouldn't fail the request altogether.
// Instead we return the signed URL blob so that the client can use that to fetch the input data.
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", inputURI, err)
}
}
return &fullInputs, &inputsURLBlob, nil
}
// ExecutionClosure defines common methods in NodeExecutionClosure and TaskExecutionClosure used to return output data.
type ExecutionClosure interface {
GetOutputUri() string //nolint
GetOutputData() *core.LiteralMap
}
// Wrapper around an admin.ExecutionClosure object which conforms to the output interface
// used by admin.NodeExecutionClosure and admin.TaskExecutionClosure
// Due to historical reasons, the workflow execution closure message is slightly different.
type workflowExecutionClosure struct {
*admin.ExecutionClosure
}
func (c workflowExecutionClosure) GetOutputUri() string { //nolint
var outputURI string
if c.ExecutionClosure != nil && c.ExecutionClosure.GetOutputs() != nil {
outputURI = c.ExecutionClosure.GetOutputs().GetUri()
}
return outputURI
}
func (c workflowExecutionClosure) GetOutputData() *core.LiteralMap {
if c.ExecutionClosure.GetOutputs() != nil && c.ExecutionClosure.GetOutputs().GetValues() != nil {
return c.ExecutionClosure.GetOutputs().GetValues()
}
return c.ExecutionClosure.GetOutputData()
}
// ToExecutionClosureInterface converts a workflow execution closure to an implementation of the ExecutionClosure
// interface for use in producing execution output data.
func ToExecutionClosureInterface(closure *admin.ExecutionClosure) ExecutionClosure {
return &workflowExecutionClosure{
ExecutionClosure: closure,
}
}
// GetOutputs returns an outputs URL blob and if config settings permit, inline outputs data for an execution.
func GetOutputs(ctx context.Context, urlData dataInterfaces.RemoteURLInterface,
remoteDataConfig *runtimeInterfaces.RemoteDataConfig, storageClient *storage.DataStore, closure ExecutionClosure) (
*core.LiteralMap, *admin.UrlBlob, error) {
var outputsURLBlob admin.UrlBlob
var fullOutputs = &core.LiteralMap{}
if closure == nil {
return fullOutputs, &outputsURLBlob, nil
}
if len(closure.GetOutputUri()) > 0 && remoteDataConfig.SignedURL.Enabled {
var err error
outputsURLBlob, err = urlData.Get(ctx, closure.GetOutputUri())
if err != nil {
return nil, nil, err
}
}
if closure.GetOutputData() != nil {
if int64(proto.Size(closure.GetOutputData())) < remoteDataConfig.MaxSizeInBytes {
fullOutputs = closure.GetOutputData()
} else {
logger.Debugf(ctx, "execution closure contains output data that exceeds max data size for responses")
}
} else if shouldFetchOutputData(remoteDataConfig, outputsURLBlob, closure.GetOutputUri()) {
err := storageClient.ReadProtobuf(ctx, storage.DataReference(closure.GetOutputUri()), fullOutputs)
if err != nil {
// If we fail to read the protobuf from the remote store, we shouldn't fail the request altogether.
// Instead we return the signed URL blob so that the client can use that to fetch the output data.
logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", closure.GetOutputUri(), err)
}
}
return fullOutputs, &outputsURLBlob, nil
}