-
Notifications
You must be signed in to change notification settings - Fork 2
/
ppcache.go
264 lines (238 loc) · 7.93 KB
/
ppcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
// Copyright (c) 2013 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
// The following is the general idea on cache structure.
//
// |
// *------------* WRITE | READ *------------*
// | inode | ^ | ^ | inode |
// | ping-cache | | | | | pong-cache |
// | | *<-----*-----------* | |
// | knode | | | | *------->| knode |
// | ping-cache | | | | ncache() | pong-cache |
// *------------* | commitQ | *------------*
// ^ V ^ | (Locked access using sync.Mutex)
// | *------* | |
// commits*-----------| MVCC |<-* |
// recyles *------* |
// reclaims | |
// *----->ping2Pong() (atomic, no locking)
// |
// |
//
// The cycle of ping-pong,
//
// - reads will always refer to the pong-cache.
// - reads will populate the cache from disk, when ever cache lookup fails.
// - writes will refer to the commitQ maintained by MVCC, if node is not in
// commitQ it will refer to pong-cache.
//
// - ping-cache is operated only by the MVCC controller.
// - MVCC controller will _populate_ the ping-cache when new nodes are
// generated due to index mutations.
// - MVCC controller will _evict_ the pong-cache as and when nodes become stale
// due to index mutations.
//
// - ping2Pong() happens when snapshot is flushed to disk.
// - pong becomes ping, and MVCC controller will _populate_ and _evict_ the
// newly flipped ping-cache based on commited, recycled and reclaimed node,
// before allowing further mutations.
//
package btree
import (
"log"
"os"
"sync"
"sync/atomic"
"unsafe"
)
// In-memory data structure to cache intermediate nodes.
type pingPong struct {
// pong map for intermediate nodes and leaf nodes
ncpong unsafe.Pointer
lcpong unsafe.Pointer
// ping map for intermediate nodes and leaf nodes
ncping unsafe.Pointer
lcping unsafe.Pointer
// pong map for keys and docids
kdping unsafe.Pointer
kdpong unsafe.Pointer
sync.RWMutex
}
func (wstore *WStore) ncacheLookup(fpos int64) Node {
wstore.RLock()
defer wstore.RUnlock()
var node Node
nc := (*map[int64]Node)(atomic.LoadPointer(&wstore.ncpong))
if node = (*nc)[fpos]; node == nil {
lc := (*map[int64]Node)(atomic.LoadPointer(&wstore.lcpong))
if node = (*lc)[fpos]; node != nil {
wstore.lcHits += 1
}
} else {
wstore.ncHits += 1
}
return node
}
func (wstore *WStore) ncache(node Node) {
wstore.Lock()
defer wstore.Unlock()
fpos := node.getKnode().fpos
if node.isLeaf() {
lc := (*map[int64]Node)(atomic.LoadPointer(&wstore.lcpong))
if len(*lc) < wstore.MaxLeafCache {
(*lc)[fpos] = node
}
wstore.maxlenLC = max(wstore.maxlenLC, int64(len(*lc)))
} else {
nc := (*map[int64]Node)(atomic.LoadPointer(&wstore.ncpong))
(*nc)[fpos] = node
wstore.maxlenNC = max(wstore.maxlenNC, int64(len(*nc)))
}
}
func (wstore *WStore) ncacheEvict(fposs []int64) {
wstore.Lock()
defer wstore.Unlock()
nc := (*map[int64]Node)(atomic.LoadPointer(&wstore.ncpong))
lc := (*map[int64]Node)(atomic.LoadPointer(&wstore.lcpong))
for _, fpos := range fposs {
delete(*nc, fpos)
delete(*lc, fpos)
}
}
func (wstore *WStore) _pingCache(fpos int64, node Node) {
nc := (*map[int64]Node)(atomic.LoadPointer(&wstore.ncping))
lc := (*map[int64]Node)(atomic.LoadPointer(&wstore.lcping))
if node.isLeaf() {
(*lc)[fpos] = node
} else {
(*nc)[fpos] = node
}
}
func (wstore *WStore) _pingCacheEvict(fpos int64) {
nc := (*map[int64]Node)(atomic.LoadPointer(&wstore.ncping))
lc := (*map[int64]Node)(atomic.LoadPointer(&wstore.lcping))
delete(*nc, fpos)
delete(*lc, fpos)
}
func (wstore *WStore) cacheKey(fpos int64, key []byte) {
wstore.pingKey(DEFER_ADD, fpos, key)
}
func (wstore *WStore) cacheDocid(fpos int64, docid []byte) {
wstore.pingDocid(DEFER_ADD, fpos, docid)
}
func (wstore *WStore) lookupKey(rfd *os.File, fpos int64) []byte {
var key []byte
kdpong := (*map[int64][]byte)(atomic.LoadPointer(&wstore.kdpong))
if key = (*kdpong)[fpos]; key == nil {
return wstore.readKV(rfd, fpos)
} else {
wstore.keyHits += 1
}
return key
}
func (wstore *WStore) lookupDocid(rfd *os.File, fpos int64) []byte {
var docid []byte
kdpong := (*map[int64][]byte)(atomic.LoadPointer(&wstore.kdpong))
if docid = (*kdpong)[fpos]; docid == nil {
return wstore.readKV(rfd, fpos)
} else {
wstore.docidHits += 1
}
return docid
}
func (wstore *WStore) assertNotMemberCache(offsets []int64) {
if wstore.Debug {
nc := (*map[int64]Node)(atomic.LoadPointer(&wstore.ncping))
lc := (*map[int64]Node)(atomic.LoadPointer(&wstore.lcping))
for _, fpos := range offsets {
if (*nc)[fpos] != nil {
log.Panicln("to be freed fpos is in ncping-cache", fpos)
} else if (*lc)[fpos] != nil {
log.Panicln("to be freed fpos is in ncping-cache", fpos)
}
}
}
}
func (wstore *WStore) ping2Pong() {
wstore.Lock()
// Swap nodecache
ncping := atomic.LoadPointer(&wstore.ncping)
ncpong := atomic.LoadPointer(&wstore.ncpong)
atomic.StorePointer(&wstore.ncpong, ncping)
atomic.StorePointer(&wstore.ncping, ncpong)
// Swap leafcache
lcping := atomic.LoadPointer(&wstore.lcping)
lcpong := atomic.LoadPointer(&wstore.lcpong)
atomic.StorePointer(&wstore.lcpong, lcping)
atomic.StorePointer(&wstore.lcping, lcpong)
// Swap keycache
kdping := atomic.LoadPointer(&wstore.kdping)
kdpong := atomic.LoadPointer(&wstore.kdpong)
atomic.StorePointer(&wstore.kdpong, kdping)
atomic.StorePointer(&wstore.kdping, kdpong)
defer wstore.Unlock()
// Trim leaf cache
lc := (*map[int64]Node)(atomic.LoadPointer(&wstore.lcping))
nc := (*map[int64]Node)(atomic.LoadPointer(&wstore.ncping))
wstore.maxlenLC = max(wstore.maxlenLC, int64(len(*lc)))
wstore.maxlenNC = max(wstore.maxlenNC, int64(len(*nc)))
if len(*lc) > wstore.MaxLeafCache {
i := len(*lc)
for x := range *lc {
if i < wstore.MaxLeafCache {
break
}
delete(*lc, x)
}
}
}
func (wstore *WStore) displayPing() {
ncping := (*map[int64]Node)(atomic.LoadPointer(&wstore.ncping))
fposs := make([]int64, 0, 100)
for fpos, _ := range *ncping {
fposs = append(fposs, fpos)
}
lcping := (*map[int64]Node)(atomic.LoadPointer(&wstore.lcping))
fposs = make([]int64, 0, 100)
for fpos, _ := range *lcping {
fposs = append(fposs, fpos)
}
}
func (wstore *WStore) checkPingPong() {
ncping := (*map[int64]Node)(atomic.LoadPointer(&wstore.ncping))
ncpong := (*map[int64]Node)(atomic.LoadPointer(&wstore.ncpong))
if len(*ncping) != len(*ncpong) {
panic("Mismatch in nc ping-pong lengths")
}
for fpos := range *ncping {
if (*ncpong)[fpos] == nil {
panic("fpos not found in nc ping-pong")
}
}
//lcping := (*map[int64]Node)(atomic.LoadPointer(&wstore.lcping))
//lcpong := (*map[int64]Node)(atomic.LoadPointer(&wstore.lcpong))
//if len(*lcping) != len(*lcpong) {
// panic("Mismatch in lc ping-pong lengths")
//}
//for fpos := range *lcping {
// if (*lcpong)[fpos] == nil {
// panic("fpos not found in lc ping-pong")
// }
//}
kdping := (*map[int64][]byte)(atomic.LoadPointer(&wstore.kdping))
kdpong := (*map[int64][]byte)(atomic.LoadPointer(&wstore.kdpong))
if len(*kdping) != len(*kdpong) {
panic("Mismatch in kd ping-pong lengths")
}
for fpos := range *kdping {
if (*kdpong)[fpos] == nil {
panic("fpos not found in kd ping-pong")
}
}
}