/
gs.go
114 lines (99 loc) · 3.04 KB
/
gs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package storage
import (
"context"
"errors"
"io"
"io/fs"
"path/filepath"
"cloud.google.com/go/storage"
"github.com/rotisserie/eris"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"github.com/G-Research/fasttrackml/pkg/api/mlflow/config"
)
// GSStorageName is a Google Storage name.
const (
GSStorageName = "gs"
)
// GS represents adapter to work with GS storage artifacts.
type GS struct {
client *storage.Client
}
// NewGS creates new Google Storage instance.
func NewGS(ctx context.Context, config *config.ServiceConfig) (*GS, error) {
var options []option.ClientOption
if config.GSEndpointURI != "" {
// we use option.WithoutAuthentication() in order to make the GCS SDK work with our fake server.
// this should be changed if we ever need to use an alternative GCS implementation in a production setting.
options = append(options, option.WithEndpoint(config.GSEndpointURI), option.WithoutAuthentication())
}
client, err := storage.NewClient(ctx, options...)
if err != nil {
return nil, eris.Wrap(err, "error creating GS storage client")
}
return &GS{
client: client,
}, nil
}
// List implements ArtifactStorageProvider interface.
func (s GS) List(ctx context.Context, artifactURI, path string) ([]ArtifactObject, error) {
// 1. process input parameters.
bucket, rootPrefix, err := ExtractBucketAndPrefix(artifactURI)
if err != nil {
return nil, eris.Wrap(err, "error extracting bucket and prefix from provided uri")
}
prefix := filepath.Join(rootPrefix, path)
if prefix != "" {
prefix = prefix + "/"
}
// 2. read data from gs storage.
var artifactList []ArtifactObject
it := s.client.Bucket(bucket).Objects(ctx, &storage.Query{
Prefix: prefix,
Delimiter: "/",
})
for {
object, err := it.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
return nil, eris.Wrap(err, "error getting object information")
}
objectName := object.Name
if object.Name == "" {
objectName = object.Prefix
}
relPath, err := filepath.Rel(rootPrefix, objectName)
if err != nil {
return nil, eris.Wrapf(err, "error getting relative path for object: %s", object.Name)
}
// filter current directory from the result set.
if relPath == path {
continue
}
artifactList = append(artifactList, ArtifactObject{
Path: relPath,
Size: object.Size,
IsDir: object.Size == 0,
})
}
return artifactList, nil
}
// Get returns file content at the storage location.
func (s GS) Get(ctx context.Context, artifactURI, path string) (io.ReadCloser, error) {
// 1. create s3 request input.
bucketName, prefix, err := ExtractBucketAndPrefix(artifactURI)
if err != nil {
return nil, eris.Wrap(err, "error extracting bucket and prefix from provided uri")
}
// 2. get object from gcp storage.
reader, err := s.client.Bucket(bucketName).Object(filepath.Join(prefix, path)).NewReader(ctx)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, eris.Wrap(fs.ErrNotExist, "object does not exist")
}
return nil, eris.Wrap(err, "error getting object")
}
return reader, nil
}