-
Notifications
You must be signed in to change notification settings - Fork 23
/
loader.go
136 lines (109 loc) · 3.07 KB
/
loader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package rate
import (
"sync"
"time"
"github.com/Conflux-Chain/confura/util"
"github.com/sirupsen/logrus"
)
const (
LimitKeyCacheSize = 5000
LimitKeyExpirationTTL = 60 * time.Second
)
type KeyInfo struct {
SID uint32 // bound strategy ID
AclID uint32 // bound allowlist ID
Key string // limit key
Type LimitType // limit type
}
type KeysetFilter struct {
SIDs []uint32 // strategy IDs
KeySet []string // limit key set
Limit int // result limit size (<= 0 means none)
}
// ksLoadFunc loads limit keyset with specific filter from wherever eg., store
type ksLoadFunc func(filter *KeysetFilter) ([]*KeyInfo, error)
type KeyLoader struct {
mu sync.Mutex
// raw keyset load function
ksload ksLoadFunc
// limit key cache: limit key => *KeyInfo (nil if missing)
keyCache *util.ExpirableLruCache
}
func NewKeyLoader(ksload ksLoadFunc) *KeyLoader {
kl := &KeyLoader{
ksload: ksload,
keyCache: util.NewExpirableLruCache(
LimitKeyCacheSize, LimitKeyExpirationTTL,
),
}
// warm up limit key cache for better performance
kl.warmUpKeyCache()
return kl
}
// Load loads key info from cache or raw loading from somewhere else
// if cache missed.
func (l *KeyLoader) Load(key string) (*KeyInfo, bool) {
// load from cache at first
if ki, ok := l.cacheLoad(key); ok { // cache hit
return ki, ok
}
// otherwise, populate the cache
return l.populateCache(key)
}
func (l *KeyLoader) cacheLoad(key string) (*KeyInfo, bool) {
cv, expired, found := l.keyCache.GetNoExp(key)
if found && !expired { // found in cache
return cv.(*KeyInfo), true
}
l.mu.Lock()
defer l.mu.Unlock()
cv, expired, found = l.keyCache.GetNoExp(key)
if found && !expired { // double check
return cv.(*KeyInfo), true
}
if found && expired {
// extend lifespan for expired cache kv temporarliy for performance
l.keyCache.Add(key, cv.(*KeyInfo))
}
return nil, false
}
func (l *KeyLoader) populateCache(key string) (*KeyInfo, bool) {
// load key info from db
ki, err := l.rawLoad(key)
if err != nil {
l.mu.Lock()
defer l.mu.Unlock()
// for db error, we cache nil for the key by which no expiry cache value existed
// so that db pressure can be mitigrated by reducing too many subsequential queries.
if _, _, found := l.keyCache.GetNoExp(key); !found {
l.keyCache.Add(key, nil)
}
logrus.WithField("key", key).
WithError(err).
Error("Key loader failed to load limit key info")
return nil, false
}
l.mu.Lock()
defer l.mu.Unlock()
// cache limit key
l.keyCache.Add(key, ki)
return ki, true
}
func (kl *KeyLoader) rawLoad(key string) (*KeyInfo, error) {
kinfos, err := kl.ksload(&KeysetFilter{KeySet: []string{key}})
if err == nil && len(kinfos) > 0 {
return kinfos[0], nil
}
return nil, err
}
func (kl *KeyLoader) warmUpKeyCache() {
kis, err := kl.ksload(&KeysetFilter{Limit: (LimitKeyCacheSize * 3 / 4)})
if err != nil {
logrus.WithError(err).Warn("Failed to load limit keyset to warm up cache")
return
}
for i := range kis {
kl.keyCache.Add(kis[i].Key, kis[i])
}
logrus.WithField("totalKeys", len(kis)).Info("Limit keyset loaded to cache")
}