This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathdata_store.go
51 lines (43 loc) · 1.8 KB
/
data_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package common
import (
"context"
"time"
"github.com/flyteorg/flyteadmin/pkg/async"
"github.com/flyteorg/flyteadmin/pkg/errors"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/shared"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/storage"
errrs "github.com/pkg/errors"
"google.golang.org/api/googleapi"
"google.golang.org/grpc/codes"
)
func OffloadLiteralMap(ctx context.Context, storageClient *storage.DataStore, literalMap *core.LiteralMap, nestedKeys ...string) (storage.DataReference, error) {
return OffloadLiteralMapWithRetryDelayAndAttempts(ctx, storageClient, literalMap, async.RetryDelay, 5, nestedKeys...)
}
func OffloadLiteralMapWithRetryDelayAndAttempts(ctx context.Context, storageClient *storage.DataStore, literalMap *core.LiteralMap, retryDelay time.Duration, attempts int, nestedKeys ...string) (storage.DataReference, error) {
if literalMap == nil {
literalMap = &core.LiteralMap{}
}
nestedKeyReference := []string{
shared.Metadata,
}
nestedKeyReference = append(nestedKeyReference, nestedKeys...)
uri, err := storageClient.ConstructReference(ctx, storageClient.GetBaseContainerFQN(ctx), nestedKeyReference...)
if err != nil {
return "", errors.NewFlyteAdminErrorf(codes.Internal, "Failed to construct data reference for [%+v] with err: %v", nestedKeys, err)
}
err = async.RetryOnSpecificErrors(attempts, retryDelay, func() error {
err = storageClient.WriteProtobuf(ctx, uri, storage.Options{}, literalMap)
return err
}, isRetryableError)
if err != nil {
return "", errors.NewFlyteAdminErrorf(codes.Internal, "Failed to write protobuf for [%+v] with err: %v", nestedKeys, err)
}
return uri, nil
}
func isRetryableError(err error) bool {
if e, ok := errrs.Cause(err).(*googleapi.Error); ok && e.Code == 409 {
return true
}
return false
}