forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
117 lines (96 loc) · 2.07 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package storage
import (
"os"
"path"
"sync"
"github.com/boltdb/bolt"
"github.com/influxdata/kapacitor/services/httpd"
"github.com/pkg/errors"
)
type Diagnostic interface {
Error(msg string, err error)
}
type Service struct {
dbpath string
boltdb *bolt.DB
stores map[string]Interface
mu sync.Mutex
registrar StoreActionerRegistrar
apiServer *APIServer
versions Versions
HTTPDService interface {
AddRoutes([]httpd.Route) error
DelRoutes([]httpd.Route)
}
diag Diagnostic
}
func NewService(conf Config, d Diagnostic) *Service {
return &Service{
dbpath: conf.BoltDBPath,
diag: d,
stores: make(map[string]Interface),
}
}
const (
versionsNamespace = "versions"
)
func (s *Service) Open() error {
s.mu.Lock()
defer s.mu.Unlock()
err := os.MkdirAll(path.Dir(s.dbpath), 0755)
if err != nil {
return errors.Wrapf(err, "mkdir dirs %q", s.dbpath)
}
db, err := bolt.Open(s.dbpath, 0600, nil)
if err != nil {
return errors.Wrapf(err, "open boltdb @ %q", s.dbpath)
}
s.boltdb = db
s.registrar = NewStorageResitrar()
s.apiServer = &APIServer{
DB: s.boltdb,
Registrar: s.registrar,
HTTPDService: s.HTTPDService,
diag: s.diag,
}
if err := s.apiServer.Open(); err != nil {
return err
}
s.versions = NewVersions(s.store(versionsNamespace))
return nil
}
func (s *Service) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.apiServer != nil {
if err := s.apiServer.Close(); err != nil {
return err
}
}
if s.boltdb != nil {
return s.boltdb.Close()
}
return nil
}
// Return a namespaced store.
// Calling Store with the same namespace returns the same Store.
func (s *Service) Store(name string) Interface {
s.mu.Lock()
defer s.mu.Unlock()
return s.store(name)
}
func (s *Service) store(name string) Interface {
if store, ok := s.stores[name]; ok {
return store
} else {
store = NewBolt(s.boltdb, name)
s.stores[name] = store
return store
}
}
func (s *Service) Versions() Versions {
return s.versions
}
func (s *Service) Register(name string, store StoreActioner) {
s.registrar.Register(name, store)
}