Skip to content

Commit 21735af

Browse files
author
Ibrahim Jarif
authored
fix: Fix race condition in block.incRef (#1337)
Fixes hypermodeinc/dgraph#5456 . This PR fixes the crash that could occur when a block was read from the cache. There was a logical race condition. The following sequence of events could occur which would cause the crash. 1. An iterator makes `t.Block(idx)` call 2. The `t.Block` function finds the block in the cache. The newly found block has `ref=1` which means it was held only by the cache. 3. The `t.Block` function is holding the block and at the same time the block gets evicted from the cache. The existing ref of the block was `1` so the cache eviction would decrement the ref and make it `0`. When the ref becomes `0`, the block is added to the `sync.Pool` and is ready to be reused. 4. While the block got evicted from the cache, the iterator received the block and it incremented the ref from `0` to `1` and starts using this. Since the block was inserted into the syncPool in the 3rd event, it could be modified by anyone while the iterator is using it.
1 parent 86b1db9 commit 21735af

File tree

3 files changed

+70
-13
lines changed

3 files changed

+70
-13
lines changed

badger/cmd/bank.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func toSlice(bal uint64) []byte {
125125
}
126126

127127
func getBalance(txn *badger.Txn, account int) (uint64, error) {
128-
item, err := txn.Get(key(account))
128+
item, err := get(txn, key(account))
129129
if err != nil {
130130
return 0, err
131131
}
@@ -197,14 +197,33 @@ func diff(a, b []account) string {
197197

198198
var errFailure = errors.New("test failed due to balance mismatch")
199199

200+
// get function will fetch the value for the key "k" either by using the
201+
// txn.Get API or the iterator.Seek API.
202+
func get(txn *badger.Txn, k []byte) (*badger.Item, error) {
203+
if rand.Int()%2 == 0 {
204+
return txn.Get(k)
205+
}
206+
207+
iopt := badger.DefaultIteratorOptions
208+
// PrefectValues is expensive. We don't need it here.
209+
iopt.PrefetchValues = false
210+
it := txn.NewIterator(iopt)
211+
defer it.Close()
212+
it.Seek(k)
213+
if it.Valid() {
214+
return it.Item(), nil
215+
}
216+
return nil, badger.ErrKeyNotFound
217+
}
218+
200219
// seekTotal retrives the total of all accounts by seeking for each account key.
201220
func seekTotal(txn *badger.Txn) ([]account, error) {
202221
expected := uint64(numAccounts) * uint64(initialBal)
203222
var accounts []account
204223

205224
var total uint64
206225
for i := 0; i < numAccounts; i++ {
207-
item, err := txn.Get(key(i))
226+
item, err := get(txn, key(i))
208227
if err != nil {
209228
log.Printf("Error for account: %d. err=%v. key=%q\n", i, err, key(i))
210229
return accounts, err
@@ -343,7 +362,11 @@ func runTest(cmd *cobra.Command, args []string) error {
343362
WithNumMemtables(2).
344363
// Do not GC any versions, because we need them for the disect..
345364
WithNumVersionsToKeep(int(math.MaxInt32)).
346-
WithValueThreshold(1) // Make all values go to value log
365+
WithValueThreshold(1). // Make all values go to value log
366+
WithCompression(options.ZSTD).
367+
WithKeepL0InMemory(false).
368+
WithMaxCacheSize(10 << 20)
369+
347370
if mmap {
348371
opts = opts.WithTableLoadingMode(options.MemoryMap)
349372
}

table/iterator.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@ func (itr *blockIterator) setBlock(b *block) {
4444
// Decrement the ref for the old block. If the old block was compressed, we
4545
// might be able to reuse it.
4646
itr.block.decrRef()
47-
// Increment the ref for the new block.
48-
b.incrRef()
4947

5048
itr.block = b
5149
itr.err = nil

table/table.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,25 +187,46 @@ type block struct {
187187
ref int32
188188
}
189189

190-
func (b *block) incrRef() {
191-
atomic.AddInt32(&b.ref, 1)
190+
// incrRef increments the ref of a block and return a bool indicating if the
191+
// increment was successful. A true value indicates that the block can be used.
192+
func (b *block) incrRef() bool {
193+
for {
194+
// We can't blindly add 1 to ref. We need to check whether it has
195+
// reached zero first, because if it did, then we should absolutely not
196+
// use this block.
197+
ref := atomic.LoadInt32(&b.ref)
198+
// The ref would not be equal to 0 unless the existing
199+
// block get evicted before this line. If the ref is zero, it means that
200+
// the block is already added the the blockPool and cannot be used
201+
// anymore. The ref of a new block is 1 so the following condition will
202+
// be true only if the block got reused before we could increment its
203+
// ref.
204+
if ref == 0 {
205+
return false
206+
}
207+
// Increment the ref only if it is not zero and has not changed between
208+
// the time we read it and we're updating it.
209+
//
210+
if atomic.CompareAndSwapInt32(&b.ref, ref, ref+1) {
211+
return true
212+
}
213+
}
192214
}
193215
func (b *block) decrRef() {
194216
if b == nil {
195217
return
196218
}
197219

198-
p := atomic.AddInt32(&b.ref, -1)
199220
// Insert the []byte into pool only if the block is resuable. When a block
200221
// is reusable a new []byte is used for decompression and this []byte can
201222
// be reused.
202223
// In case of an uncompressed block, the []byte is a reference to the
203224
// table.mmap []byte slice. Any attempt to write data to the mmap []byte
204225
// will lead to SEGFAULT.
205-
if p == 0 && b.isReusable {
226+
if atomic.AddInt32(&b.ref, -1) == 0 && b.isReusable {
206227
blockPool.Put(&b.data)
207228
}
208-
y.AssertTrue(p >= 0)
229+
y.AssertTrue(atomic.LoadInt32(&b.ref) >= 0)
209230
}
210231
func (b *block) size() int64 {
211232
return int64(3*intSize /* Size of the offset, entriesIndexStart and chkLen */ +
@@ -419,6 +440,9 @@ func (t *Table) readIndex() error {
419440
return nil
420441
}
421442

443+
// block function return a new block. Each block holds a ref and the byte
444+
// slice stored in the block will be reused when the ref becomes zero. The
445+
// caller should release the block by calling block.decrRef() on it.
422446
func (t *Table) block(idx int) (*block, error) {
423447
y.AssertTruef(idx >= 0, "idx=%d", idx)
424448
if idx >= len(t.blockIndex) {
@@ -428,12 +452,18 @@ func (t *Table) block(idx int) (*block, error) {
428452
key := t.blockCacheKey(idx)
429453
blk, ok := t.opt.Cache.Get(key)
430454
if ok && blk != nil {
431-
return blk.(*block), nil
455+
// Use the block only if the increment was successful. The block
456+
// could get evicted from the cache between the Get() call and the
457+
// incrRef() call.
458+
if b := blk.(*block); b.incrRef() {
459+
return b, nil
460+
}
432461
}
433462
}
434463
ko := t.blockIndex[idx]
435464
blk := &block{
436465
offset: int(ko.Offset),
466+
ref: 1,
437467
}
438468
var err error
439469
if blk.data, err = t.read(blk.offset, int(ko.Len)); err != nil {
@@ -490,8 +520,14 @@ func (t *Table) block(idx int) (*block, error) {
490520
}
491521
if t.opt.Cache != nil {
492522
key := t.blockCacheKey(idx)
493-
blk.incrRef()
494-
t.opt.Cache.Set(key, blk, blk.size())
523+
// incrRef should never return false here because we're calling it on a
524+
// new block with ref=1.
525+
y.AssertTrue(blk.incrRef())
526+
527+
// Decrement the block ref if we could not insert it in the cache.
528+
if !t.opt.Cache.Set(key, blk, blk.size()) {
529+
blk.decrRef()
530+
}
495531
}
496532
return blk, nil
497533
}

0 commit comments

Comments
 (0)