-
Notifications
You must be signed in to change notification settings - Fork 487
/
store.go
135 lines (110 loc) · 2.63 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package servicegraphprocessor
import (
"container/list"
"errors"
"sync"
"time"
)
var (
errTooManyItems = errors.New("too many items")
)
type storeCallback func(e *edge)
type store struct {
l *list.List
mtx *sync.RWMutex
m map[string]*list.Element
evictCallback storeCallback
ttl time.Duration
maxItems int
}
func newStore(ttl time.Duration, maxItems int, evictCallback storeCallback) *store {
s := &store{
l: list.New(),
mtx: &sync.RWMutex{},
m: make(map[string]*list.Element),
evictCallback: evictCallback,
ttl: ttl,
maxItems: maxItems,
}
return s
}
func (s *store) len() int {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.l.Len()
}
// shouldEvictHead checks if the oldest item (head of list) has expired and should be evicted.
// Returns true if the item has expired, false otherwise.
//
// Must be called under lock.
func (s *store) shouldEvictHead() bool {
h := s.l.Front()
if h == nil {
return false
}
ts := h.Value.(*edge).expiration
//TODO: This should use a steady monotonic clock? Otherwise if the time is adjusted this will not work properly
return ts < time.Now().Unix()
}
// evictHead removes the head from the store (and map).
// It also collects metrics for the evicted edge.
//
// Must be called under lock.
func (s *store) evictHead() {
front := s.l.Front().Value.(*edge)
s.evictEdge(front.key)
}
// evictEdge evicts and edge under lock
func (s *store) evictEdgeWithLock(key string) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.evictEdge(key)
}
// evictEdge removes the edge from the store (and map).
// It also collects metrics for the evicted edge.
//
// Must be called under lock.
func (s *store) evictEdge(key string) {
ele := s.m[key]
if ele == nil { // it may already have been processed
return
}
edge := ele.Value.(*edge)
s.evictCallback(edge)
delete(s.m, key)
s.l.Remove(ele)
}
// Fetches an edge from the store.
// If the edge doesn't exist, it creates a new one with the default TTL.
func (s *store) upsertEdge(k string, cb storeCallback) (*edge, error) {
s.mtx.Lock()
defer s.mtx.Unlock()
if storedEdge, ok := s.m[k]; ok {
edge := storedEdge.Value.(*edge)
cb(edge)
return edge, nil
}
if s.l.Len() >= s.maxItems {
// todo: try to evict expired items
return nil, errTooManyItems
}
newEdge := newEdge(k, s.ttl)
ele := s.l.PushBack(newEdge)
s.m[k] = ele
cb(newEdge)
return newEdge, nil
}
// expire evicts all expired items in the store.
func (s *store) expire() {
s.mtx.RLock()
if !s.shouldEvictHead() {
s.mtx.RUnlock()
return
}
s.mtx.RUnlock()
s.mtx.Lock()
defer s.mtx.Unlock()
for s.shouldEvictHead() {
s.evictHead()
}
}