-
Notifications
You must be signed in to change notification settings - Fork 539
/
client.go
154 lines (128 loc) · 5.05 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package catalog
import (
"context"
"fmt"
"time"
"google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/datacatalog"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io"
)
//go:generate mockery -all -case=underscore
// Metadata to be associated with the catalog object
type Metadata struct {
WorkflowExecutionIdentifier *core.WorkflowExecutionIdentifier
NodeExecutionIdentifier *core.NodeExecutionIdentifier
TaskExecutionIdentifier *core.TaskExecutionIdentifier
}
// An identifier for a catalog object.
type Key struct {
Identifier core.Identifier
CacheVersion string
CacheIgnoreInputVars []string
TypedInterface core.TypedInterface
InputReader io.InputReader
}
func (k Key) String() string {
return fmt.Sprintf("%v:%v", k.Identifier, k.CacheVersion)
}
// Indicates that status of the query to Catalog. This can be returned for both Get and Put calls
type Status struct {
cacheStatus core.CatalogCacheStatus
metadata *core.CatalogMetadata
}
func (s Status) GetCacheStatus() core.CatalogCacheStatus {
return s.cacheStatus
}
func (s Status) GetMetadata() *core.CatalogMetadata {
return s.metadata
}
func NewPutFailureStatus(key *Key) Status {
md := &core.CatalogMetadata{
DatasetId: &key.Identifier,
}
return Status{cacheStatus: core.CatalogCacheStatus_CACHE_PUT_FAILURE, metadata: md}
}
func NewStatus(cacheStatus core.CatalogCacheStatus, md *core.CatalogMetadata) Status {
return Status{cacheStatus: cacheStatus, metadata: md}
}
// Indicates the Entry in Catalog that was populated
type Entry struct {
outputs io.OutputReader
status Status
}
func (e Entry) GetOutputs() io.OutputReader {
return e.outputs
}
func (e Entry) GetStatus() Status {
return e.status
}
func NewFailedCatalogEntry(status Status) Entry {
return Entry{status: status}
}
func NewCatalogEntry(outputs io.OutputReader, status Status) Entry {
return Entry{outputs: outputs, status: status}
}
// ReservationEntry encapsulates the current state of an artifact reservation within the catalog
type ReservationEntry struct {
expiresAt time.Time
heartbeatInterval time.Duration
ownerID string
status core.CatalogReservation_Status
}
// Returns the expiration timestamp at which the reservation will no longer be valid
func (r ReservationEntry) GetExpiresAt() time.Time {
return r.expiresAt
}
// Returns the heartbeat interval, denoting how often the catalog expects a reservation extension request
func (r ReservationEntry) GetHeartbeatInterval() time.Duration {
return r.heartbeatInterval
}
// Returns the ID of the current reservation owner
func (r ReservationEntry) GetOwnerID() string {
return r.ownerID
}
// Returns the status of the attempted reservation operation
func (r ReservationEntry) GetStatus() core.CatalogReservation_Status {
return r.status
}
// Creates a new ReservationEntry using the status, all other fields are set to default values
func NewReservationEntryStatus(status core.CatalogReservation_Status) ReservationEntry {
duration := 0 * time.Second
return ReservationEntry{
expiresAt: time.Time{},
heartbeatInterval: duration,
ownerID: "",
status: status,
}
}
// Creates a new ReservationEntry populated with the specified parameters
func NewReservationEntry(expiresAt time.Time, heartbeatInterval time.Duration, ownerID string, status core.CatalogReservation_Status) ReservationEntry {
return ReservationEntry{
expiresAt: expiresAt,
heartbeatInterval: heartbeatInterval,
ownerID: ownerID,
status: status,
}
}
// Client represents the default Catalog client that allows memoization and indexing of intermediate data in Flyte
type Client interface {
// Get returns the artifact associated with the given key.
Get(ctx context.Context, key Key) (Entry, error)
// GetOrExtendReservation tries to retrieve a (valid) reservation for the given key, creating a new one using the
// specified owner ID if none was found or updating an existing one if it has expired.
GetOrExtendReservation(ctx context.Context, key Key, ownerID string, heartbeatInterval time.Duration) (*datacatalog.Reservation, error)
// Put stores the given data using the specified key, creating artifact entries as required.
// To update an existing artifact, use Update instead.
Put(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) (Status, error)
// Update updates existing data stored at the specified key, overwriting artifact entries with the new data provided.
// To create a new (non-existent) artifact, use Put instead.
Update(ctx context.Context, key Key, reader io.OutputReader, metadata Metadata) (Status, error)
// ReleaseReservation releases an acquired reservation for the given key and owner ID.
ReleaseReservation(ctx context.Context, key Key, ownerID string) error
}
func IsNotFound(err error) bool {
taskStatus, ok := grpcStatus.FromError(err)
return ok && taskStatus.Code() == codes.NotFound
}