/
manage.go
80 lines (74 loc) · 1.79 KB
/
manage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package lakemanage
import (
"context"
"errors"
"syscall"
"time"
"github.com/brimdata/zed/api/client"
lakeapi "github.com/brimdata/zed/lake/api"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
func Update(ctx context.Context, lk lakeapi.Interface, conf Config, logger *zap.Logger) error {
if logger == nil {
logger = zap.NewNop()
}
branches, err := getBranches(ctx, conf, lk, logger)
if err != nil {
return err
}
group, ctx := errgroup.WithContext(ctx)
for _, branch := range branches {
branch.logger.Info("updating pool")
if err := branch.run(ctx); err != nil {
branch.logger.Error("update error", zap.Error(err))
}
}
return group.Wait()
}
func Monitor(ctx context.Context, conn *client.Connection, conf Config, logger *zap.Logger) error {
if logger == nil {
logger = zap.NewNop()
}
logger.Info("monitoring")
lk := lakeapi.NewRemoteLake(conn)
for {
err := monitor(ctx, lk, conf, logger)
if errors.Is(err, syscall.ECONNREFUSED) {
logger.Info("cannot connect to lake, retrying in 5 seconds")
} else if err != nil {
return err
}
select {
case <-time.After(5 * time.Second):
case <-ctx.Done():
return ctx.Err()
}
}
}
func monitor(ctx context.Context, lk lakeapi.Interface, conf Config, logger *zap.Logger) error {
for {
select {
case <-time.After(conf.interval()):
case <-ctx.Done():
return ctx.Err()
}
err := Update(ctx, lk, conf, logger)
if err != nil {
return err
}
}
}
func getBranches(ctx context.Context, conf Config, lk lakeapi.Interface, logger *zap.Logger) ([]*branch, error) {
pools, err := lakeapi.GetPools(ctx, lk)
if err != nil {
return nil, err
}
var branches []*branch
for _, pool := range pools {
if b := newBranch(conf, pool, lk, logger); b != nil {
branches = append(branches, b)
}
}
return branches, nil
}