Skip to content

Commit

Permalink
Fix scan range write skew
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n committed Apr 26, 2024
1 parent fece30f commit 38a8dbb
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 5 deletions.
35 changes: 30 additions & 5 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ type oracle struct {
type committedTxn struct {
ts uint64
// ConflictKeys Keeps track of the entries written at timestamp ts.
conflictKeys map[uint64]struct{}
// The first one is the hash of the write key. The second one is the
// the len of the reads when insert this write
conflictKeys map[uint64]int
}

func newOracle(opt Options) *oracle {
Expand Down Expand Up @@ -151,11 +153,33 @@ func (o *oracle) hasConflict(txn *Txn) bool {
continue
}

// check if there is any direct conflict
for _, ro := range txn.reads {
if _, has := committedTxn.conflictKeys[ro]; has {
return true
}
}

// check if there is any indirect conflict (logical conflict)
for _, i := range txn.conflictKeys {
if i == -1 {
continue
}

happensBeforeReads := txn.reads[:i]

for _, j := range committedTxn.conflictKeys {
otherHappensBeforeReads := txn.reads[:j]

for fp := range happensBeforeReads {
for otherFp := range otherHappensBeforeReads {
if fp == otherFp {
return true
}
}
}
}
}
}

return false
Expand Down Expand Up @@ -256,7 +280,8 @@ type Txn struct {

reads []uint64 // contains fingerprints of keys read.
// contains fingerprints of keys written. This is used for conflict detection.
conflictKeys map[uint64]struct{}
// The value int is the len of the reads when insert this write
conflictKeys map[uint64]int
readsLock sync.Mutex // guards the reads slice. See addReadKey.

pendingWrites map[string]*Entry // cache stores any writes done by txn.
Expand Down Expand Up @@ -393,8 +418,8 @@ func (txn *Txn) modify(e *Entry) error {
// The txn.conflictKeys is used for conflict detection. If conflict detection
// is disabled, we don't need to store key hashes in this map.
if txn.db.opt.DetectConflicts {
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.conflictKeys[fp] = struct{}{}
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.conflictKeys[fp] = len(txn.reads) - 1 // store this write happens after how many reads
}
// If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
// Add the entry to duplicateWrites only if both the entries have different versions. For
Expand Down Expand Up @@ -783,7 +808,7 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn {
}
if update {
if db.opt.DetectConflicts {
txn.conflictKeys = make(map[uint64]struct{})
txn.conflictKeys = make(map[uint64]int)
}
txn.pendingWrites = make(map[string]*Entry)
}
Expand Down
58 changes: 58 additions & 0 deletions txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,64 @@ func TestTxnWriteSkew(t *testing.T) {
})
}

// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data
func TestTxnWriteSkew2(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
// Setup
db.opt.managedTxns = false
txn := db.NewTransaction(true)
defer txn.Discard()
txn.SetEntry(NewEntry([]byte("a1"), []byte("10")))

Check failure on line 338 in txn_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn.SetEntry` is not checked (errcheck)
txn.SetEntry(NewEntry([]byte("a2"), []byte("20")))

Check failure on line 339 in txn_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn.SetEntry` is not checked (errcheck)
txn.SetEntry(NewEntry([]byte("b1"), []byte("100")))

Check failure on line 340 in txn_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn.SetEntry` is not checked (errcheck)
txn.SetEntry(NewEntry([]byte("b2"), []byte("200")))
require.NoError(t, txn.Commit())

txn1 := db.NewTransaction(true)
defer txn1.Discard()

itr := txn1.NewIterator(DefaultIteratorOptions)
sum := 0
{
for itr.Rewind(); itr.Valid(); itr.Next() {
if itr.Item().Key()[0] == 'a' {
a, _ := itr.Item().ValueCopy(nil)
val, _ := strconv.ParseUint(string(a), 10, 64)
sum += int(val)
}
}
itr.Close()
}

txn1.SetEntry(NewEntry([]byte("b3"), []byte("30")))

Check failure on line 360 in txn_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn1.SetEntry` is not checked (errcheck)

txn2 := db.NewTransaction(true)
defer txn2.Discard()
{
itr = txn2.NewIterator(DefaultIteratorOptions)
sum = 0
for itr.Rewind(); itr.Valid(); itr.Next() {
if itr.Item().Key()[0] == 'b' {
a, _ := itr.Item().ValueCopy(nil)
val, _ := strconv.ParseUint(string(a), 10, 64)
sum += int(val)
}
}
itr.Close()
}

txn2.SetEntry(NewEntry([]byte("a3"), []byte("300")))

Check failure on line 377 in txn_test.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `txn2.SetEntry` is not checked (errcheck)
require.NoError(t, txn2.Commit())

// Each transaction has modified what the other transaction would have read.
// If both were allowed to commit, this would break serializable behavior,
// because if they were run one at a time, one of the transactions would have seen the INSERT the other committed.
// We wait for a successful COMMIT of one of the transactions before we roll anything back,
// though, to ensure progress and prevent thrashing.
require.Error(t, txn1.Commit())
})
}

// a3, a2, b4 (del), b3, c2, c1
// Read at ts=4 -> a3, c2
// Read at ts=4(Uncommitted) -> a3, b4
Expand Down

0 comments on commit 38a8dbb

Please sign in to comment.