/
blob.go
191 lines (155 loc) · 4.84 KB
/
blob.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
/*
Copyright © 2023 Daniel Chalef
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloud
import (
"context"
"errors"
"github.com/danielchalef/mrfparse/pkg/mrfparse/utils"
"io"
"net/url"
"os"
"path/filepath"
"strings"
"gocloud.dev/blob"
_ "gocloud.dev/blob/fileblob" // required by CDK as blob driver
_ "gocloud.dev/blob/gcsblob" // required by CDK as blob driver
_ "gocloud.dev/blob/s3blob" // required by CDK as blob driver
)
var log = utils.GetLogger()
// OpenBucket opens a blob storage bucket at the URI. Context can be used to cancel any operations.
// Google CDK is used to support both AWS S3 and Google Cloud Storage. Use the correct URI scheme to
// specify the storage provider (gs:// or s3://).
func OpenBucket(ctx context.Context, uri string) (*blob.Bucket, error) {
var (
err error
b *blob.Bucket
)
b, err = blob.OpenBucket(ctx, uri)
if err != nil {
return nil, err
}
return b, nil
}
// NewWriter creates a new io.WriteCloser for the given URI. Context can be used to cancel any operations.
// Google Cloud Storage, AWS S3, and local filesystem URIs are supported. Use the correct URI scheme for
// the storage provider (gs://, s3://) or no scheme for local filesystem.
func NewWriter(ctx context.Context, uri string) (io.WriteCloser, error) {
const (
flags = os.O_CREATE | os.O_WRONLY
perms = 0o644
)
var (
err error
k string
)
if !IsCloudURI(uri) {
return os.OpenFile(uri, flags, perms)
}
_, _, k, err = ParseBlobURI(uri)
if err != nil {
return nil, err
}
b, err := OpenBucket(ctx, uri)
if err != nil {
return nil, err
}
return b.NewWriter(ctx, k, nil)
}
// NewReader creates a new io.ReadCloser for the given URI. Context can be used to cancel any operations.
// Google Cloud Storage, AWS S3, and local filesystem URIs are supported. Use the correct URI scheme for
// the storage provider (gs://, s3://) or no scheme for local filesystem.
// The URI must be a file, not a directory.
func NewReader(ctx context.Context, uri string) (io.ReadCloser, error) {
var (
err error
k string
)
if !IsCloudURI(uri) {
return os.Open(uri)
}
_, _, k, err = ParseBlobURI(uri)
if err != nil {
return nil, err
}
b, err := OpenBucket(ctx, uri)
if err != nil {
return nil, err
}
return b.NewReader(ctx, k, nil)
}
// JoinURI joins two URI parts together, removing any trailing slashes from the left part and any
// leading slashes from the right part.
func JoinURI(left, right string) string {
return strings.TrimRight(left, "/") + "/" + strings.TrimLeft(right, "/")
}
// Glob enumerates cloud storage objects/file names at a URI and returns a list of objects/ filename URIs that match the given pattern.
// Context can be used to cancel any cloud operations.
// Google Cloud Storage, AWS S3, and local filesystem URIs are supported. Use the correct URI scheme for
// the storage provider (gs://, s3://) or no scheme for local filesystem.
// The pattern is a glob pattern, not a regular expression.
func Glob(ctx context.Context, uri, pattern string) ([]string, error) {
var (
matches []string
err error
)
// The path is a local filesystem path
if !IsCloudURI(uri) {
matches, err = filepath.Glob(filepath.Join(uri, pattern))
if err != nil {
return nil, err
}
return matches, nil
}
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
b, err := OpenBucket(context.Background(), uri)
if err != nil {
return nil, err
}
iter := b.List(&blob.ListOptions{Prefix: u.Path})
for {
obj, err := iter.Next(ctx)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
k := filepath.Base(obj.Key)
log.Debugf("Key is %s", k)
if matched, _ := filepath.Match(pattern, k); matched {
matches = append(matches, u.Scheme+u.Host+"/"+obj.Key)
}
}
log.Debugf("Found %d matches for %s", len(matches), pattern)
return matches, nil
}
// ParseBlobURI parses a URI into its scheme, bucket, and key components.
func ParseBlobURI(uri string) (scheme, bucket, key string, err error) {
u, err := url.Parse(uri)
if err != nil {
return "", "", "", err
}
return u.Scheme, u.Host, strings.TrimLeft(u.Path, "/"), nil
}
// IsCloudURI returns true if the URI is a cloud storage URI (gs:// or s3://).
// It does so by attempting to parse the URI and checking if the scheme is non-empty.
func IsCloudURI(uri string) bool {
s, _, _, err := ParseBlobURI(uri)
if err != nil {
return false
}
return s != ""
}