Skip to content

Commit ff737ac

Browse files
committed
cache: mark cache entry as referenced when multiple readers
1 parent 6665d61 commit ff737ac

File tree

5 files changed

+80
-16
lines changed

5 files changed

+80
-16
lines changed

internal/cache/cache.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,9 +322,14 @@ func (c *Handle) GetWithReadHandle(
322322
// existing value if present. The value must have been allocated by Cache.Alloc.
323323
//
324324
// The cache takes a reference on the Value and holds it until it gets evicted.
325+
//
326+
// REQUIRES: value.refs() == 1
325327
func (c *Handle) Set(fileNum base.DiskFileNum, offset uint64, value *Value) {
328+
if n := value.refs(); n != 1 {
329+
panic(fmt.Sprintf("pebble: Value has already been added to the cache: refs=%d", n))
330+
}
326331
k := makeKey(c.id, fileNum, offset)
327-
c.cache.getShard(k).set(k, value)
332+
c.cache.getShard(k).set(k, value, false /*markAccessed*/)
328333
}
329334

330335
// Delete deletes the cached value for the specified file and offset.

internal/cache/clockpro.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -158,11 +158,7 @@ func (c *shard) getWithMaybeReadEntry(k key, desireReadEntry bool) (*Value, *rea
158158
return value, re
159159
}
160160

161-
func (c *shard) set(k key, value *Value) {
162-
if n := value.refs(); n != 1 {
163-
panic(fmt.Sprintf("pebble: Value has already been added to the cache: refs=%d", n))
164-
}
165-
161+
func (c *shard) set(k key, value *Value, markAccessed bool) {
166162
c.mu.Lock()
167163
defer c.mu.Unlock()
168164

@@ -173,6 +169,9 @@ func (c *shard) set(k key, value *Value) {
173169
// no cache entry? add it
174170
e = newEntry(k, int64(len(value.buf)))
175171
e.setValue(value)
172+
if markAccessed {
173+
e.referenced.Store(true)
174+
}
176175
if c.metaAdd(k, e) {
177176
value.ref.trace("add-cold")
178177
c.sizeCold += e.size
@@ -219,7 +218,7 @@ func (c *shard) set(k key, value *Value) {
219218
c.coldTarget = c.targetSize()
220219
}
221220

222-
e.referenced.Store(false)
221+
e.referenced.Store(markAccessed)
223222
e.setValue(value)
224223
e.ptype = etHot
225224
if c.metaAdd(k, e) {

internal/cache/read_shard.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package cache
66

77
import (
88
"context"
9+
"fmt"
910
"sync"
1011
"time"
1112

@@ -293,15 +294,10 @@ func (e *readEntry) unrefAndTryRemoveFromMap() {
293294
}
294295

295296
func (e *readEntry) setReadValue(v *Value) {
296-
// Add to the cache before taking another ref for readEntry, since the cache
297-
// expects ref=1 when it is called.
298-
//
299-
// TODO(sumeer): if e.refCount > 1, we should consider overriding to ensure
300-
// that it is added as etHot. The common case will be e.refCount = 1, and we
301-
// don't want to acquire e.mu twice, so one way to do this would be relax
302-
// the invariant in shard.Set that requires Value.refs() == 1. Then we can
303-
// do the work under e.mu before calling shard.Set.
304-
e.readShard.shard.set(e.key, v)
297+
if n := v.refs(); n != 1 {
298+
panic(fmt.Sprintf("pebble: Value has already been added to the cache: refs=%d", n))
299+
}
300+
concurrentRequesters := false
305301
e.mu.Lock()
306302
// Acquire a ref for readEntry, since we are going to remember it in e.mu.v.
307303
v.acquire()
@@ -318,8 +314,12 @@ func (e *readEntry) setReadValue(v *Value) {
318314
// readEntry.waitForReadPermissionOrHandle, and those will also use
319315
// e.mu.v.
320316
close(e.mu.ch)
317+
// e.mu.ch is non-nil only when there were concurrent requesters. NB: we
318+
// can't read e.refCount here since it is protected by e.readShard.mu.
319+
concurrentRequesters = true
321320
}
322321
e.mu.Unlock()
322+
e.readShard.shard.set(e.key, v, concurrentRequesters)
323323
e.unrefAndTryRemoveFromMap()
324324
}
325325

@@ -368,6 +368,8 @@ func (rh ReadHandle) Valid() bool {
368368
//
369369
// The cache takes a reference on the Value and holds it until it is evicted and
370370
// no longer needed by other readers.
371+
//
372+
// REQUIRES: v.refs() == 1
371373
func (rh ReadHandle) SetReadValue(v *Value) {
372374
rh.entry.setReadValue(v)
373375
}

internal/cache/read_shard_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
package cache
66

77
import (
8+
"cmp"
89
"context"
910
crand "crypto/rand"
1011
"fmt"
1112
"math/rand"
13+
"slices"
14+
"strings"
1215
"sync"
1316
"testing"
1417
"time"
@@ -190,6 +193,34 @@ func TestReadShard(t *testing.T) {
190193
time.Sleep(10 * time.Millisecond)
191194
return fmt.Sprintf("map-len: %d", c.readShard.lenForTesting())
192195

196+
case "print-shard":
197+
return func() string {
198+
c.mu.RLock()
199+
defer c.mu.RUnlock()
200+
type shardEntry struct {
201+
k key
202+
hasValue bool
203+
referenced bool
204+
}
205+
var entries []shardEntry
206+
c.blocks.All(func(k key, e *entry) bool {
207+
entries = append(entries,
208+
shardEntry{k: k, hasValue: e.val != nil, referenced: e.referenced.Load()})
209+
return true
210+
})
211+
slices.SortFunc(entries, func(a, b shardEntry) int {
212+
return cmp.Or(
213+
cmp.Compare(a.k.id, b.k.id), cmp.Compare(a.k.fileNum, b.k.fileNum),
214+
cmp.Compare(a.k.offset, b.k.offset))
215+
})
216+
var b strings.Builder
217+
for _, e := range entries {
218+
fmt.Fprintf(&b, "id=%d file=%d offset=%d hasValue=%t referenced=%t\n",
219+
e.k.id, e.k.fileNum, e.k.offset, e.hasValue, e.referenced)
220+
}
221+
return b.String()
222+
}()
223+
193224
default:
194225
return fmt.Sprintf("unknown command: %s", td.Cmd)
195226

internal/cache/testdata/read_shard

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,12 @@ set-read-value name=s1_f2_off3 val=brinjal
153153
----
154154
map-len: 0
155155

156+
# The added entry is marked as referenced, since there is a concurrent reader
157+
# (that is still waiting).
158+
print-shard
159+
----
160+
id=1 file=2 offset=3 hasValue=true referenced=true
161+
156162
wait name=s1_f2_off3_2
157163
----
158164
val: brinjal
@@ -161,3 +167,24 @@ map-len: 0
161167
get name=s1_f2_off3_3 id=1 file-num=2 offset=3
162168
----
163169
val: brinjal
170+
171+
# Get misses the cache and has turn to read.
172+
get name=s1_f2_off20 id=1 file-num=2 offset=20
173+
----
174+
waiting
175+
map-len: 1
176+
177+
wait name=s1_f2_off20
178+
----
179+
turn to read
180+
map-len: 1
181+
182+
set-read-value name=s1_f2_off20 val=okra
183+
----
184+
map-len: 0
185+
186+
# The added entry is not referenced, since there was not a concurrent reader.
187+
print-shard
188+
----
189+
id=1 file=2 offset=3 hasValue=true referenced=true
190+
id=1 file=2 offset=20 hasValue=true referenced=false

0 commit comments

Comments
 (0)