-
Notifications
You must be signed in to change notification settings - Fork 39
/
resources.go
464 lines (401 loc) · 12.3 KB
/
resources.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
// Package resources contains objects that are used to gather information about Kusto resources that are
// used during various ingestion methods.
package resources
import (
"context"
"errors"
"fmt"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Azure/azure-kusto-go/kusto"
kustoErrors "github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/data/table"
"github.com/Azure/azure-kusto-go/kusto/kql"
"github.com/cenkalti/backoff/v4"
)
const (
defaultInitialInterval = 1 * time.Second
defaultMultiplier = 2
retryCount = 4
fetchInterval = 1 * time.Hour
)
// mgmter is a private interface that allows us to write hermetic tests against the kusto.Client.Mgmt() method.
type mgmter interface {
Mgmt(ctx context.Context, db string, query kusto.Statement, options ...kusto.MgmtOption) (*kusto.RowIterator, error)
}
var objectTypes = map[string]bool{
"queue": true,
"blob": true,
"table": true,
}
// URI represents a resource URI for an ingestion command.
type URI struct {
u *url.URL
account, objectType, objectName string
sas url.Values
}
// Parse parses a string representing a Kutso resource URI.
func Parse(uri string) (*URI, error) {
// Example for a valid url:
// https://fkjsalfdks.blob.core.windows.com/sdsadsadsa?sas=asdasdasd
u, err := url.Parse(uri)
if err != nil {
return nil, err
}
if u.Scheme != "https" {
return nil, fmt.Errorf("URI scheme must be 'https', was '%s'", u.Scheme)
}
hostSplit := strings.Split(u.Hostname(), ".")
if len(hostSplit) < 5 {
return nil, fmt.Errorf("error: Storage URI (%s) is invalid'", uri)
}
var v *URI
if len(hostSplit) == 5 {
v = &URI{
u: u,
account: hostSplit[0],
objectType: hostSplit[1],
objectName: strings.TrimLeft(u.EscapedPath(), "/"),
sas: u.Query(),
}
} else {
v = &URI{
u: u,
account: hostSplit[0] + "." + hostSplit[1],
objectType: hostSplit[2],
objectName: strings.TrimLeft(u.EscapedPath(), "/"),
sas: u.Query(),
}
}
if err := v.validate(); err != nil {
return nil, err
}
return v, nil
}
// validate validates that the URI was valid.
// TODO(Daniel): You could add deep validation of each value we have split to give better diagnostic info on an error.
// I put in the most basic evalutation, but you might want to put checks for the account format or objectName foramt.
func (u *URI) validate() error {
if u.account == "" {
return fmt.Errorf("account name was not provided")
}
if !objectTypes[u.objectType] {
return fmt.Errorf("object type was not valid(queue|blob|table), was: %q", u.objectType)
}
if u.objectName == "" {
return fmt.Errorf("object name was not provided")
}
return nil
}
// Account is the Azure storage account that will be used.
func (u *URI) Account() string {
return u.account
}
// ObjectType returns the type of object that will be ingested: queue, blob or table.
func (u *URI) ObjectType() string {
return u.objectType
}
// ObjectName returns the object name of the resource, i.e container name.
func (u *URI) ObjectName() string {
return u.objectName
}
// SAS is shared access signature used to access Azure storage.
// https://docs.microsoft.com/en-us/azure/storage/common/storage-sas-overview
func (u *URI) SAS() url.Values {
return u.sas
}
// String implements fmt.Stringer.
func (u *URI) String() string {
return u.u.String()
}
// URL returns the internal *url.URL object.
func (u *URI) URL() *url.URL {
return u.u
}
// token represents a Kusto identity token.
type token struct {
AuthContext string `kusto:"AuthorizationContext"`
}
// Manager manages Kusto resources.
type Manager struct {
client mgmter
done chan struct{}
resources atomic.Value // Stores Ingestion
lastFetchTime atomic.Value // Stores time.Time
kustoToken token
authTokenCacheExpiration time.Time
authLock sync.Mutex
fetchLock sync.Mutex
rankedStorageAccount *RankedStorageAccountSet
}
// New is the constructor for Manager.
func New(client mgmter) (*Manager, error) {
m := &Manager{client: client, done: make(chan struct{}), rankedStorageAccount: newDefaultRankedStorageAccountSet()}
m.authLock = sync.Mutex{}
m.fetchLock = sync.Mutex{}
m.authTokenCacheExpiration = time.Now().UTC()
go m.renewResources()
return m, nil
}
// Close closes the manager. This stops any token refreshes.
func (m *Manager) Close() {
for {
select {
case <-m.done:
return
default:
close(m.done)
return
}
}
}
func (m *Manager) renewResources() {
tickDuration := 30 * time.Second
tick := time.NewTicker(tickDuration)
count := fetchInterval // Start with a fetch immediately.
for {
select {
case <-tick.C:
count += tickDuration
if count >= fetchInterval {
count = 0 * time.Second
m.fetchRetry(context.Background())
}
case <-m.done:
tick.Stop()
return
}
}
}
// AuthContext returns a string representing the authorization context. This auth token is a temporary token
// that can be used to write a message via ingestion. This is different than the ADAL token.
func (m *Manager) AuthContext(ctx context.Context) (string, error) {
m.authLock.Lock()
defer m.authLock.Unlock()
if m.authTokenCacheExpiration.After(time.Now().UTC()) {
return m.kustoToken.AuthContext, nil
}
var rows *kusto.RowIterator
retryCtx := backoff.WithContext(initBackoff(), ctx)
err := backoff.Retry(func() error {
var err error
rows, err = m.client.Mgmt(ctx, "NetDefaultDB", kql.New(".get kusto identity token"), kusto.IngestionEndpoint())
if err == nil {
return nil
}
if httpErr, ok := err.(*kustoErrors.HttpError); ok {
// only retry in case of throttling
if httpErr.IsThrottled() {
return err
}
}
return backoff.Permanent(err)
}, retryCtx)
if err != nil {
return "", fmt.Errorf("problem getting authorization context from Kusto via Mgmt: %s", err)
}
count := 0
token := token{}
err = rows.DoOnRowOrError(
func(r *table.Row, e *kustoErrors.Error) error {
if e != nil {
return e
}
if count != 0 {
return fmt.Errorf("call for AuthContext returned more than 1 Row")
}
count++
return r.ToStruct(&token)
},
)
if err != nil {
return "", err
}
m.kustoToken = token
m.authTokenCacheExpiration = time.Now().UTC().Add(time.Hour)
return token.AuthContext, nil
}
// ingestResc represents a kusto Mgmt() record about a resource
type ingestResc struct {
// Type is the type of resource, either "TempStorage" or "SecuredReadyForAggregationQueue".
Type string `kusto:"ResourceTypeName"`
// Root is the storage root URI, which should conform to the local URI type.
Root string `kusto:"StorageRoot"`
}
// Ingestion holds information about Ingestion resources.
type Ingestion struct {
// Queues contains URIs for Queue resources.
Queues []*URI
// Containers has URIs for blob resources.
Containers []*URI
// Tables contains URIs for table resources.
Tables []*URI
//
}
var errDoNotCare = errors.New("don't care about this")
func (i *Ingestion) importRec(rec ingestResc, rankedStorageAccounts *RankedStorageAccountSet) error {
u, err := Parse(rec.Root)
if err != nil {
return fmt.Errorf("the StorageRoot URI received(%s) has an error: %s", rec.Root, err)
}
switch rec.Type {
case "TempStorage":
i.Containers = append(i.Containers, u)
rankedStorageAccounts.registerStorageAccount(u.Account())
case "SecuredReadyForAggregationQueue":
i.Queues = append(i.Queues, u)
rankedStorageAccounts.registerStorageAccount(u.Account())
case "IngestionsStatusTable":
i.Tables = append(i.Tables, u)
default:
return errDoNotCare
}
return nil
}
// Returns a list of ranked storage account resources distributed by round robin.
func groupResourcesByStorageAccount(resources []*URI, rankedStorageAccount []RankedStorageAccount) []*URI {
// Group the resources by storage account.
storageAccounts := make(map[string][]*URI)
for _, resource := range resources {
storageAccounts[resource.Account()] = append(storageAccounts[resource.Account()], resource)
}
// Rank the resources by storage account.
var rankedResources []*URI
for _, account := range rankedStorageAccount {
if resources, ok := storageAccounts[account.getAccountName()]; ok {
rankedResources = append(rankedResources, resources...)
}
}
//Distribute the resources by round robin.
var distributedResources []*URI
for i := 0; i < len(rankedResources); i++ {
distributedResources = append(distributedResources, rankedResources[i%len(rankedResources)])
}
return distributedResources
}
func (i *Ingestion) getRankedStorageContainers(rankedStorageAccounts []RankedStorageAccount) []*URI {
return groupResourcesByStorageAccount(i.Containers, rankedStorageAccounts)
}
func (i *Ingestion) getRankedStorageQueues(rankedStorageAccounts []RankedStorageAccount) []*URI {
return groupResourcesByStorageAccount(i.Queues, rankedStorageAccounts)
}
// fetch makes a kusto.Client.Mgmt() call to retrieve the resources used for Ingestion.
func (m *Manager) fetch(ctx context.Context) error {
m.fetchLock.Lock()
defer m.fetchLock.Unlock()
var rows *kusto.RowIterator
retryCtx := backoff.WithContext(initBackoff(), ctx)
err := backoff.Retry(func() error {
var err error
rows, err = m.client.Mgmt(ctx, "NetDefaultDB", kql.New(".get ingestion resources"), kusto.IngestionEndpoint())
if err == nil {
return nil
}
if httpErr, ok := err.(*kustoErrors.HttpError); ok {
// only retry in case of throttling
if httpErr.IsThrottled() {
return err
}
}
return backoff.Permanent(err)
}, retryCtx)
if err != nil {
return fmt.Errorf("problem getting ingestion resources from Kusto: %s", err)
}
ingest := Ingestion{}
err = rows.DoOnRowOrError(
func(r *table.Row, e *kustoErrors.Error) error {
if e != nil {
return e
}
rec := ingestResc{}
if err := r.ToStruct(&rec); err != nil {
return err
}
if err := ingest.importRec(rec, m.rankedStorageAccount); err != nil && err != errDoNotCare {
return err
}
return nil
},
)
if err != nil {
return fmt.Errorf("problem reading ingestion resources from Kusto: %s", err)
}
m.resources.Store(ingest)
m.lastFetchTime.Store(time.Now().UTC())
return nil
}
func (m *Manager) fetchRetry(ctx context.Context) error {
attempts := 0
for {
select {
case <-m.done:
return nil
default:
}
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
err := m.fetch(ctx)
cancel()
if err != nil {
attempts++
if attempts > retryCount {
return fmt.Errorf("failed to fetch ingestion resources")
}
time.Sleep(10 * time.Second)
continue
}
return nil
}
}
func initBackoff() backoff.BackOff {
exp := backoff.NewExponentialBackOff()
exp.InitialInterval = defaultInitialInterval
exp.Multiplier = defaultMultiplier
return backoff.WithMaxRetries(exp, retryCount)
}
// Resources returns information about the ingestion resources. This will used cached information instead
// of fetching from source.
func (m *Manager) getResources() (Ingestion, error) {
lastFetchTime, ok := m.lastFetchTime.Load().(time.Time)
if !ok || lastFetchTime.Add(2*fetchInterval).Before(time.Now().UTC()) {
err := m.fetchRetry(context.Background())
if err != nil {
return Ingestion{}, err
}
}
i, ok := m.resources.Load().(Ingestion)
if !ok {
return Ingestion{}, fmt.Errorf("manager has not retrieved an Ingestion object yet")
}
return i, nil
}
// Report storage account resource usage results.
func (m *Manager) ReportStorageResourceResult(accountName string, success bool) {
m.rankedStorageAccount.addAccountResult(accountName, success)
}
// Get ranked containers
func (m *Manager) GetRankedStorageContainers() ([]*URI, error) {
ingestionResources, err := m.getResources()
if err != nil {
return nil, err
}
return ingestionResources.getRankedStorageContainers(m.rankedStorageAccount.getRankedShuffledAccounts()), nil
}
// get ranked queues
func (m *Manager) GetRankedStorageQueues() ([]*URI, error) {
ingestionResources, err := m.getResources()
if err != nil {
return nil, err
}
return ingestionResources.getRankedStorageQueues(m.rankedStorageAccount.getRankedShuffledAccounts()), nil
}
func (m *Manager) GetTables() ([]*URI, error) {
ingestionResources, err := m.getResources()
if err != nil {
return nil, err
}
return ingestionResources.Tables, nil
}