-
-
Notifications
You must be signed in to change notification settings - Fork 12
/
role.go
106 lines (98 loc) · 2.4 KB
/
role.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package backup
import (
"context"
"net/url"
"os"
"strings"
"time"
"beryju.io/gravity/pkg/extconfig"
"beryju.io/gravity/pkg/roles"
"beryju.io/gravity/pkg/roles/api/types"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/robfig/cron/v3"
"github.com/swaggest/rest/web"
"go.uber.org/zap"
)
const (
EventTopicBackupRun = "roles.backup.run"
)
type Role struct {
mc *minio.Client
cfg *RoleConfig
c *cron.Cron
log *zap.Logger
i roles.Instance
ctx context.Context
}
func New(instance roles.Instance) *Role {
r := &Role{
log: instance.Log(),
i: instance,
ctx: instance.Context(),
}
r.i.AddEventListener(types.EventTopicAPIMuxSetup, func(ev *roles.Event) {
svc := ev.Payload.Data["svc"].(*web.Service)
svc.Post("/api/v1/backup/start", r.APIBackupStart())
svc.Get("/api/v1/backup/status", r.APIBackupStatus())
svc.Get("/api/v1/roles/backup", r.APIRoleConfigGet())
svc.Post("/api/v1/roles/backup", r.APIRoleConfigPut())
})
r.i.AddEventListener(EventTopicBackupRun, func(ev *roles.Event) {
r.SaveSnapshot(ev.Context)
})
return r
}
func (r *Role) Start(ctx context.Context, config []byte) error {
os.MkdirAll(extconfig.Get().Dirs().BackupDir, os.ModeSticky|os.ModePerm)
r.cfg = r.decodeRoleConfig(config)
if r.cfg.Endpoint == "" {
return roles.ErrRoleNotConfigured
}
endpoint, err := url.Parse(r.cfg.Endpoint)
if err != nil {
return err
}
opts := &minio.Options{
Secure: strings.EqualFold(endpoint.Scheme, "https"),
Transport: extconfig.Transport(),
}
if r.cfg.AccessKey != "" {
opts.Creds = credentials.NewStaticV4(r.cfg.AccessKey, r.cfg.SecretKey, "")
} else {
opts.Creds = credentials.NewChainCredentials(
[]credentials.Provider{
&credentials.EnvAWS{},
&credentials.EnvMinio{},
&credentials.FileAWSCredentials{},
&credentials.FileMinioClient{},
&credentials.IAM{},
},
)
}
r.c = cron.New()
if r.cfg.CronExpr != "" {
ei, err := r.c.AddFunc(r.cfg.CronExpr, func() {
ctx, cancel := context.WithTimeout(r.ctx, 60*time.Minute)
defer cancel()
r.SaveSnapshot(ctx)
})
if err != nil {
return err
}
r.log.Info("next backup run", zap.String("next", r.c.Entry(ei).Next.String()))
r.c.Start()
}
minioClient, err := minio.New(endpoint.Host, opts)
if err != nil {
return err
}
r.mc = minioClient
r.ensureBucket()
return nil
}
func (r *Role) Stop() {
if r.c != nil {
<-r.c.Stop().Done()
}
}