-
Notifications
You must be signed in to change notification settings - Fork 57
/
key.go
134 lines (110 loc) · 2.76 KB
/
key.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package memdb
import (
"sync"
"time"
"github.com/bricks-cloud/bricksllm/internal/key"
"github.com/bricks-cloud/bricksllm/internal/stats"
"go.uber.org/zap"
)
type Storage interface {
GetAllKeys() ([]*key.ResponseKey, error)
GetUpdatedKeys(updatedAt int64) ([]*key.ResponseKey, error)
}
type MemDb struct {
external Storage
lastUpdated int64
hashToKeys map[string]*key.ResponseKey
hashToKeysLock sync.RWMutex
done chan bool
interval time.Duration
log *zap.Logger
}
func NewMemDb(ex Storage, log *zap.Logger, interval time.Duration) (*MemDb, error) {
hashToKeys := map[string]*key.ResponseKey{}
keys, err := ex.GetAllKeys()
if err != nil {
return nil, err
}
numberOfKeys := 0
var latetest int64 = -1
for _, k := range keys {
hashToKeys[k.Key] = k
numberOfKeys++
if k.UpdatedAt > latetest {
latetest = k.UpdatedAt
}
}
if numberOfKeys != 0 {
log.Sugar().Infof("key settings memdb updated at %d with %d keys settings", latetest, numberOfKeys)
}
return &MemDb{
external: ex,
hashToKeys: hashToKeys,
log: log,
lastUpdated: latetest,
interval: interval,
done: make(chan bool),
}, nil
}
func (mdb *MemDb) GetKey(hash string) *key.ResponseKey {
k, ok := mdb.hashToKeys[hash]
if ok {
return k
}
return nil
}
func (mdb *MemDb) SetKey(k *key.ResponseKey) {
mdb.hashToKeysLock.RLock()
defer mdb.hashToKeysLock.RUnlock()
mdb.hashToKeys[k.Key] = k
}
func (mdb *MemDb) RemoveKey(k *key.ResponseKey) {
mdb.hashToKeysLock.RLock()
defer mdb.hashToKeysLock.RUnlock()
delete(mdb.hashToKeys, k.Key)
}
func (mdb *MemDb) Listen() {
ticker := time.NewTicker(mdb.interval)
mdb.log.Info("memdb started listening for key updates")
go func() {
lastUpdated := mdb.lastUpdated
for {
select {
case <-mdb.done:
mdb.log.Info("memdb stopped")
return
case <-ticker.C:
keys, err := mdb.external.GetUpdatedKeys(lastUpdated)
if err != nil {
stats.Incr("bricksllm.memdb.memdb.listen.get_updated_keys_error", nil, 1)
mdb.log.Sugar().Debugf("memdb failed to update keys: %v", err)
continue
}
if len(keys) == 0 {
continue
}
any := false
numberOfUpdated := 0
for _, k := range keys {
if k.UpdatedAt > lastUpdated {
lastUpdated = k.UpdatedAt
}
existing := mdb.GetKey(k.Key)
if existing == nil || k.UpdatedAt > existing.UpdatedAt {
mdb.log.Sugar().Infof("key settings memdb updated a key: %s", k.KeyId)
numberOfUpdated += 1
any = true
mdb.SetKey(k)
}
}
if any {
mdb.log.Sugar().Infof("key settings memdb updated at %d with %d keys settings", lastUpdated, numberOfUpdated)
}
}
}
}()
}
func (mdb *MemDb) Stop() {
mdb.log.Info("shutting down memdb...")
mdb.done <- true
}