-
Notifications
You must be signed in to change notification settings - Fork 1
/
supervisor.go
124 lines (105 loc) · 2.1 KB
/
supervisor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package backend
import (
"bufio"
"github.com/prometheus/common/log"
"github.com/zeebo/errs"
"go.uber.org/zap"
"gopkg.in/yaml.v3"
"os"
"os/exec"
"sync"
)
type Supervisor struct {
log *zap.Logger
cmd *exec.Cmd
mu sync.Mutex
lastErr error
running bool
}
type RunCfg struct {
Bucket string
Grant string
}
func (s *Supervisor) Start() (err error) {
s.mu.Lock()
defer s.mu.Unlock()
s.cmd = exec.Command("registry", "serve", "config.yml")
cfg, err := s.readConfig()
if err != nil {
return errs.New("Storj gateway is not yet configured")
}
s.cmd.Env = []string{"REGISTRY_STORAGE_STORJ_BUCKET=" + cfg.Bucket, "REGISTRY_STORAGE_STORJ_ACCESSGRANT=" + cfg.Grant}
pipeOut, err := s.cmd.StdoutPipe()
if err != nil {
return errs.Wrap(err)
}
pipeErr, err := s.cmd.StderrPipe()
if err != nil {
return errs.Wrap(err)
}
err = s.cmd.Start()
if err != nil {
return err
}
s.running = true
go func() {
scanner := bufio.NewScanner(pipeOut)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
log.Info(scanner.Text())
}
}()
go func() {
scanner := bufio.NewScanner(pipeErr)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
log.Info(scanner.Text())
}
}()
go func() {
defer func() {
s.running = false
}()
s.lastErr = s.cmd.Wait()
log.Info("Registry process has been stopped")
}()
return nil
}
func (s *Supervisor) Stop() error {
s.mu.Lock()
defer s.mu.Unlock()
if !s.running {
return nil
}
s.running = false
return s.cmd.Process.Kill()
}
func (s *Supervisor) Configure(bucket string, grant string) error {
s.mu.Lock()
defer s.mu.Unlock()
out, err := yaml.Marshal(&RunCfg{
Bucket: bucket,
Grant: grant,
})
if err != nil {
return err
}
return os.WriteFile("storj.yaml", out, 0600)
}
func (s *Supervisor) readConfig() (cfg RunCfg, err error) {
in, err := os.ReadFile("storj.yaml")
if err != nil {
return
}
err = yaml.Unmarshal(in, &cfg)
return
}
func (s *Supervisor) Status() (string, error) {
if s.running {
return "running", nil
}
if _, err := s.readConfig(); err != nil {
return "missing", nil
}
return "stopped", nil
}