-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.go
187 lines (157 loc) · 4.91 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package core
import (
"context"
"fmt"
"os"
"time"
"github.com/alexanderbez/titan/config"
"github.com/dgraph-io/badger"
)
const (
// Default BadgerDB discardRatio. It represents the discard ratio for the
// BadgerDB GC.
//
// Ref: https://godoc.org/github.com/dgraph-io/badger#DB.RunValueLogGC
badgerDiscardRatio = 0.5
// Default BadgerDB GC interval
badgerGCInterval = 10 * time.Minute
)
// Badger namespaces
var (
BadgerAlertsNamespace = []byte("alerts")
BadgerMonitorsNamespace = []byte("monitors")
)
type (
// DB defines an embedded key/value store database interface.
DB interface {
Get(namespace, key []byte) (value []byte, err error)
Set(namespace, key, value []byte) error
Has(namespace, key []byte) (bool, error)
SetWithTTL(namespace, key, value []byte, ttl time.Duration) error
Close() error
}
// BadgerDB is a wrapper around a BadgerDB backend database that implements
// the DB interface.
BadgerDB struct {
db *badger.DB
ctx context.Context
cancelFunc context.CancelFunc
logger Logger
}
)
// NewBadgerDB returns a new initialized BadgerDB database implementing the DB
// interface. If the database cannot be initialized, an error will be returned.
func NewBadgerDB(cfg config.Config, logger Logger) (DB, error) {
if err := os.MkdirAll(cfg.Database.DataDir, 0774); err != nil {
return nil, err
}
opts := badger.DefaultOptions
opts.SyncWrites = true
opts.Dir, opts.ValueDir = cfg.Database.DataDir, cfg.Database.DataDir
badgerDB, err := badger.Open(opts)
if err != nil {
return nil, err
}
bdb := &BadgerDB{
db: badgerDB,
logger: logger.With("module", "db"),
}
bdb.ctx, bdb.cancelFunc = context.WithCancel(context.Background())
go bdb.runGC()
return bdb, nil
}
// Get implements the DB interface. It attempts to get a value for a given key
// and namespace. If the key does not exist in the provided namespace, an error
// is returned, otherwise the retrieved value.
func (bdb *BadgerDB) Get(namespace, key []byte) (value []byte, err error) {
err = bdb.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(badgerNamespaceKey(namespace, key))
if err != nil {
return err
}
tmpValue, err := item.Value()
if err != nil {
return err
}
// Copy the value as the value provided Badger is only valid while the
// transaction is open.
value = make([]byte, len(tmpValue))
copy(value, tmpValue)
return nil
})
if err != nil {
return nil, err
}
return value, nil
}
// SetWithTTL implements the DB interface. It attempts to store a value for a
// given key, namespace and TTL duration. If the key/value pair cannot be
// saved, an error is returned.
func (bdb *BadgerDB) SetWithTTL(namespace, key, value []byte, ttl time.Duration) error {
err := bdb.db.Update(func(txn *badger.Txn) error {
return txn.SetWithTTL(badgerNamespaceKey(namespace, key), value, ttl)
})
if err != nil {
bdb.logger.Debugf("failed to set key %s for namespace %s: %v", key, namespace, err)
return err
}
return nil
}
// Set implements the DB interface. It attempts to store a value for a given key
// and namespace. If the key/value pair cannot be saved, an error is returned.
func (bdb *BadgerDB) Set(namespace, key, value []byte) error {
err := bdb.db.Update(func(txn *badger.Txn) error {
return txn.Set(badgerNamespaceKey(namespace, key), value)
})
if err != nil {
bdb.logger.Debugf("failed to set key %s for namespace %s: %v", key, namespace, err)
return err
}
return nil
}
// Has implements the DB interface. It returns a boolean reflecting if the
// database has a given key for a namespace or not. An error is only returned if
// an error to Get would be returned that is not of type badger.ErrKeyNotFound.
func (bdb *BadgerDB) Has(namespace, key []byte) (ok bool, err error) {
_, err = bdb.Get(namespace, key)
switch err {
case badger.ErrKeyNotFound:
ok, err = false, nil
case nil:
ok, err = true, nil
}
return
}
// Close implements the DB interface. It closes the connection to the underlying
// BadgerDB database as well as invoking the context's cancel function.
func (bdb *BadgerDB) Close() error {
bdb.cancelFunc()
return bdb.db.Close()
}
// runGC triggers the garbage collection for the BadgerDB backend database. It
// should be run in a goroutine.
func (bdb *BadgerDB) runGC() {
ticker := time.NewTicker(badgerGCInterval)
for {
select {
case <-ticker.C:
err := bdb.db.RunValueLogGC(badgerDiscardRatio)
if err != nil {
// don't report error when GC didn't result in any cleanup
if err == badger.ErrNoRewrite {
bdb.logger.Debugf("no BadgerDB GC occurred: %v", err)
} else {
bdb.logger.Errorf("failed to GC BadgerDB: %v", err)
}
}
case <-bdb.ctx.Done():
return
}
}
}
// badgerNamespaceKey returns a composite key used for lookup and storage for a
// given namespace and key.
func badgerNamespaceKey(namespace, key []byte) []byte {
prefix := []byte(fmt.Sprintf("%s/", namespace))
return append(prefix, key...)
}