-
Notifications
You must be signed in to change notification settings - Fork 51
/
store_azure.go
471 lines (410 loc) · 15.6 KB
/
store_azure.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
package fragment
import (
"context"
"errors"
"fmt"
"io"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/Azure/azure-storage-blob-go/azblob"
log "github.com/sirupsen/logrus"
pb "go.gazette.dev/core/broker/protocol"
)
// azureStoreConfig configures a Fragment store of the "azure://" or "azure-ad://" scheme.
// It is initialized from parsed URL parametrs of the pb.FragmentStore
type azureStoreConfig struct {
accountTenantID string // The tenant ID that owns the storage account that we're writing into
// NOTE: This is not the tenant ID that owns the servie principal
storageAccountName string // Storage accounts in Azure are the equivalent to a "bucket" in S3
containerName string // In azure, blobs are stored inside of containers, which live inside accounts
prefix string // This is the path prefix for the blobs inside the container
RewriterConfig
}
func (cfg *azureStoreConfig) serviceUrl() string {
return fmt.Sprintf("https://%s.blob.core.windows.net", cfg.storageAccountName)
}
func (cfg *azureStoreConfig) containerURL() string {
return fmt.Sprintf("%s/%s", cfg.serviceUrl(), cfg.containerName)
}
type udcAndExp struct {
udc *service.UserDelegationCredential
exp *time.Time
}
type azureBackend struct {
// This is a cache of configured Pipelines for each tenant. These do not expire
pipelines map[string]pipeline.Pipeline
// This is a cache of Azure storage clients for each tenant. These do not expire
clients map[string]*service.Client
mu sync.Mutex
// This is a cache of URL-signing credentials for each tenant. These DO expire
udcs map[string]udcAndExp
}
func (a *azureBackend) Provider() string {
return "azure"
}
// See here for an example of how to use the Azure client libraries to create signatures:
// https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/storage/azblob/service/examples_test.go#L285
func (a *azureBackend) SignGet(endpoint *url.URL, fragment pb.Fragment, d time.Duration) (string, error) {
cfg, err := parseAzureEndpoint(endpoint)
if err != nil {
return "", err
}
blobName := cfg.rewritePath(cfg.prefix, fragment.ContentPath())
udc, err := a.getUserDelegationCredential(endpoint)
if err != nil {
return "", err
}
sasQueryParams, err := sas.BlobSignatureValues{
Protocol: sas.ProtocolHTTPS, // Users MUST use HTTPS (not HTTP)
ExpiryTime: time.Now().UTC().Add(d), // Timestamps are expected in UTC https://docs.microsoft.com/en-us/rest/api/storageservices/create-service-sas#service-sas-example
ContainerName: cfg.containerName,
BlobName: blobName,
// These are the permissions granted to the signed URLs
Permissions: to.Ptr(sas.BlobPermissions{Read: true}).String(),
}.SignWithUserDelegation(udc)
if err != nil {
return "", err
}
log.WithFields(log.Fields{
"tenantId": cfg.accountTenantID,
"storageAccountName": cfg.storageAccountName,
"containerName": cfg.containerName,
"blobName": blobName,
"expiryTime": sasQueryParams.ExpiryTime(),
}).Debug("Signed get request")
return fmt.Sprintf("%s/%s?%s", cfg.containerURL(), blobName, sasQueryParams.Encode()), nil
}
func (a *azureBackend) Exists(ctx context.Context, ep *url.URL, fragment pb.Fragment) (bool, error) {
cfg, client, err := a.getAzurePipeline(ep)
if err != nil {
return false, err
}
blobURL, err := a.buildBlobURL(cfg, client, fragment.ContentPath())
if err != nil {
return false, err
}
if _, err = blobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}); err == nil {
return true, nil
}
storageErr, ok := err.(azblob.StorageError)
if !ok {
return false, err
}
if storageErr.ServiceCode() == azblob.ServiceCodeBlobNotFound {
return false, nil
}
return false, storageErr
}
func (a *azureBackend) Open(ctx context.Context, ep *url.URL, fragment pb.Fragment) (io.ReadCloser, error) {
cfg, client, err := a.getAzurePipeline(ep)
if err != nil {
return nil, err
}
blobURL, err := a.buildBlobURL(cfg, client, fragment.ContentPath())
if err != nil {
return nil, err
}
download, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
if err != nil {
return nil, err
}
return download.Body(azblob.RetryReaderOptions{}), nil
}
func (a *azureBackend) Persist(ctx context.Context, ep *url.URL, spool Spool) error {
cfg, client, err := a.getAzurePipeline(ep)
if err != nil {
return err
}
blobURL, err := a.buildBlobURL(cfg, client, spool.ContentPath())
if err != nil {
return err
}
headers := azblob.BlobHTTPHeaders{}
var body io.ReadSeeker
if spool.CompressionCodec == pb.CompressionCodec_GZIP_OFFLOAD_DECOMPRESSION {
headers.ContentEncoding = "gzip"
}
if spool.CompressionCodec != pb.CompressionCodec_NONE {
body = io.NewSectionReader(spool.compressedFile, 0, spool.compressedLength)
} else {
body = io.NewSectionReader(spool.File, 0, spool.ContentLength())
}
_, err = blobURL.Upload(ctx, body, headers, azblob.Metadata{}, azblob.BlobAccessConditions{}, azblob.DefaultAccessTier, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}, azblob.ImmutabilityPolicyOptions{})
return err
}
func (a *azureBackend) List(ctx context.Context, store pb.FragmentStore, ep *url.URL, journal pb.Journal, callback func(pb.Fragment)) error {
cfg, client, err := a.getAzurePipeline(ep)
if err != nil {
return err
}
u, err := url.Parse(cfg.containerURL())
if err != nil {
return err
}
containerURL := azblob.NewContainerURL(*u, client)
options := azblob.ListBlobsSegmentOptions{Prefix: cfg.rewritePath(cfg.prefix, journal.String()) + "/"}
for marker := (azblob.Marker{}); marker.NotDone(); {
segmentList, err := containerURL.ListBlobsFlatSegment(ctx, marker, options)
if err != nil {
return err
}
for _, blob := range segmentList.Segment.BlobItems {
if strings.HasSuffix(blob.Name, "/") {
//Ignore directory-like objects, usually created by mounting buckets with a FUSE driver.
} else if frag, err := pb.ParseFragmentFromRelativePath(journal, blob.Name[len(*segmentList.Prefix):]); err != nil {
log.WithFields(log.Fields{
"storageAccountName": cfg.storageAccountName,
"name": blob.Name,
"err": err,
}).Warning("parsing fragment")
} else if *(blob.Properties.ContentLength) == 0 && frag.ContentLength() > 0 {
log.WithFields(log.Fields{
"storageAccountName": cfg.storageAccountName,
"name": blob.Name,
}).Warning("zero-length fragment")
} else {
frag.ModTime = blob.Properties.LastModified.Unix()
frag.BackingStore = store
callback(frag)
}
}
marker = segmentList.NextMarker
}
return nil
}
func (a *azureBackend) Remove(ctx context.Context, fragment pb.Fragment) error {
cfg, client, err := a.getAzurePipeline(fragment.BackingStore.URL())
if err != nil {
return err
}
blobURL, err := a.buildBlobURL(cfg, client, fragment.ContentPath())
if err != nil {
return err
}
_, err = blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, azblob.BlobAccessConditions{})
return err
}
// Turns an `azcore.TokenCredential` into an auto-refreshing `azblob.TokenCredential`
// This is useful for turning credentials coming from e.g `azidentity` into
// credentials that can be used with `azblob` Pipelines.
func getAzureStorageCredential(coreCredential azcore.TokenCredential, tenant string) (azblob.TokenCredential, error) {
var tokenRefresher = func(credential azblob.TokenCredential) time.Duration {
accessToken, err := coreCredential.GetToken(context.Background(), policy.TokenRequestOptions{TenantID: tenant, Scopes: []string{"https://storage.azure.com/.default"}})
if err != nil {
panic(err)
}
credential.SetToken(accessToken.Token)
// Give 60s of padding in order to make sure we always have a non-expired token.
// If we didn't do this, we would *begin* the refresh process as the token expires,
// potentially leaving any consumer with an expired token while we fetch a new one.
exp := accessToken.ExpiresOn.Sub(time.Now().Add(time.Minute))
return exp
}
credential := azblob.NewTokenCredential("", tokenRefresher)
return credential, nil
}
func parseAzureEndpoint(endpoint *url.URL) (cfg azureStoreConfig, err error) {
if err = parseStoreArgs(endpoint, &cfg); err != nil {
return
}
// Omit leading slash from URI. Note that FragmentStore already
// enforces that URL Paths end in '/'.
var splitPath = strings.Split(endpoint.Path[1:], "/")
if endpoint.Scheme == "azure" {
// Since only one non-ad "Shared Key" credential can be injected via
// environment variables, we should only keep around one client for
// all `azure://` requests.
cfg.accountTenantID = "AZURE_SHARED_KEY"
cfg.storageAccountName = os.Getenv("AZURE_ACCOUNT_NAME")
cfg.containerName, cfg.prefix = endpoint.Host, endpoint.Path[1:]
} else if endpoint.Scheme == "azure-ad" {
cfg.accountTenantID, cfg.storageAccountName, cfg.containerName, cfg.prefix = endpoint.Host, splitPath[0], splitPath[1], strings.Join(splitPath[2:], "/")
}
return cfg, nil
}
func (a *azureBackend) getAzureServiceClient(endpoint *url.URL) (client *service.Client, err error) {
var cfg azureStoreConfig
if cfg, err = parseAzureEndpoint(endpoint); err != nil {
return nil, err
}
if endpoint.Scheme == "azure" {
var accountName = os.Getenv("AZURE_ACCOUNT_NAME")
var accountKey = os.Getenv("AZURE_ACCOUNT_KEY")
a.mu.Lock()
client, ok := a.clients[accountName]
a.mu.Unlock()
if ok {
log.WithFields(log.Fields{
"storageAccountName": accountName,
}).Info("Re-using cached azure:// service client")
return client, nil
}
sharedKeyCred, err := service.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return nil, err
}
serviceClient, err := service.NewClientWithSharedKeyCredential(cfg.serviceUrl(), sharedKeyCred, &service.ClientOptions{})
if err != nil {
return nil, err
}
a.mu.Lock()
a.clients[accountName] = serviceClient
a.mu.Lock()
return serviceClient, nil
} else if endpoint.Scheme == "azure-ad" {
// Link to the Azure docs describing what fields are required for active directory auth
// https://learn.microsoft.com/en-us/azure/developer/go/azure-sdk-authentication-service-principal?tabs=azure-cli#-option-1-authenticate-with-a-secret
var clientId = os.Getenv("AZURE_CLIENT_ID")
var clientSecret = os.Getenv("AZURE_CLIENT_SECRET")
a.mu.Lock()
client, ok := a.clients[cfg.accountTenantID]
a.mu.Unlock()
if ok {
log.WithFields(log.Fields{
"accountTenantId": cfg.accountTenantID,
}).Info("Re-using cached azure-ad:// service client")
return client, nil
}
identityCreds, err := azidentity.NewClientSecretCredential(
cfg.accountTenantID,
clientId,
clientSecret,
&azidentity.ClientSecretCredentialOptions{
AdditionallyAllowedTenants: []string{cfg.accountTenantID},
DisableInstanceDiscovery: true,
},
)
if err != nil {
return nil, err
}
serviceClient, err := service.NewClient(cfg.serviceUrl(), identityCreds, &service.ClientOptions{})
if err != nil {
return nil, err
}
a.mu.Lock()
a.clients[cfg.accountTenantID] = serviceClient
a.mu.Unlock()
return serviceClient, nil
}
return nil, errors.New("unrecognized URI scheme")
}
func (a *azureBackend) getAzurePipeline(ep *url.URL) (cfg azureStoreConfig, client pipeline.Pipeline, err error) {
if cfg, err = parseAzureEndpoint(ep); err != nil {
return
}
a.mu.Lock()
client = a.pipelines[cfg.accountTenantID]
a.mu.Unlock()
if client != nil {
return
}
var credentials azblob.Credential
if ep.Scheme == "azure" {
var accountName = os.Getenv("AZURE_ACCOUNT_NAME")
var accountKey = os.Getenv("AZURE_ACCOUNT_KEY")
// Create an azblob credential that we can pass to `NewPipeline`
credentials, err = azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return cfg, nil, err
}
} else if ep.Scheme == "azure-ad" {
// Link to the Azure docs describing what fields are required for active directory auth
// https://learn.microsoft.com/en-us/azure/developer/go/azure-sdk-authentication-service-principal?tabs=azure-cli#-option-1-authenticate-with-a-secret
var clientId = os.Getenv("AZURE_CLIENT_ID")
var clientSecret = os.Getenv("AZURE_CLIENT_SECRET")
identityCreds, err := azidentity.NewClientSecretCredential(
cfg.accountTenantID,
clientId,
clientSecret,
&azidentity.ClientSecretCredentialOptions{
AdditionallyAllowedTenants: []string{cfg.accountTenantID},
DisableInstanceDiscovery: true,
},
)
if err != nil {
return cfg, nil, err
}
credentials, err = getAzureStorageCredential(identityCreds, cfg.accountTenantID)
if err != nil {
return cfg, nil, err
}
}
client = azblob.NewPipeline(credentials, azblob.PipelineOptions{})
a.mu.Lock()
a.pipelines[cfg.accountTenantID] = client
a.mu.Unlock()
log.WithFields(log.Fields{
"tenant": cfg.accountTenantID,
"storageAccountName": cfg.storageAccountName,
"storageContainerName": cfg.containerName,
"pathPrefix": cfg.prefix,
}).Info("constructed new Azure Storage pipeline client")
return cfg, client, nil
}
func (a *azureBackend) buildBlobURL(cfg azureStoreConfig, client pipeline.Pipeline, path string) (*azblob.BlockBlobURL, error) {
u, err := url.Parse(fmt.Sprint(cfg.containerURL(), "/", cfg.rewritePath(cfg.prefix, path)))
if err != nil {
return nil, err
}
blobURL := azblob.NewBlockBlobURL(*u, client)
return &blobURL, nil
}
// Cache UserDelegationCredentials and refresh them when needed
func (a *azureBackend) getUserDelegationCredential(endpoint *url.URL) (*service.UserDelegationCredential, error) {
var cfg, err = parseAzureEndpoint(endpoint)
if err != nil {
return nil, err
}
a.mu.Lock()
var udc, hasCachedUdc = a.udcs[cfg.accountTenantID]
a.mu.Unlock()
// https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blob-user-delegation-sas-create-cli#use-azure-ad-credentials-to-secure-a-sas
// According to the above docs, signed URLs generated with a UDC are invalid after
// that UDC expires. In addition, a UDC can live up to 7 days. So let's ensure that
// we always sign URLs with a UDC that has at least 5 days of useful life left in it.
// ----| NOW |------|NOW+5Day|-----| udcExp |---- No need to refresh
// ----| NOW |-----| udcExp |-----|NOW+5Day|---- Need to refresh
// ----|udcExp|-----| NOW | ------------------ Need to refresh
if !hasCachedUdc || (udc.exp != nil && udc.exp.Before(time.Now().Add(time.Hour*24*5))) {
// Generate UDCs that expire 6 days from now, and refresh them after they
// have less than 5 days left until they expire.
var startTime = time.Now().Add(time.Second * -10)
var expTime = time.Now().Add(time.Hour * 24 * 6)
var info = service.KeyInfo{
Start: to.Ptr(startTime.UTC().Format(sas.TimeFormat)),
Expiry: to.Ptr(expTime.UTC().Format(sas.TimeFormat)),
}
var serviceClient, err = a.getAzureServiceClient(endpoint)
if err != nil {
return nil, err
}
cred, err := serviceClient.GetUserDelegationCredential(context.Background(), info, nil)
if err != nil {
return nil, err
}
log.WithFields(log.Fields{
"newExpiration": expTime,
"newStart": startTime.String(),
"service.KeyInfo": info,
"tenant": cfg.accountTenantID,
}).Info("Refreshing Azure Storage UDC")
udc = udcAndExp{
udc: cred,
exp: &expTime,
}
a.mu.Lock()
a.udcs[cfg.accountTenantID] = udc
a.mu.Unlock()
}
return udc.udc, nil
}