diff --git a/txn.go b/txn.go index 438af8d5d..8187f1d70 100644 --- a/txn.go +++ b/txn.go @@ -62,7 +62,9 @@ type oracle struct { type committedTxn struct { ts uint64 // ConflictKeys Keeps track of the entries written at timestamp ts. - conflictKeys map[uint64]struct{} + // The first one is the hash of the write key. The second one is the + // the len of the reads when insert this write + conflictKeys map[uint64]int } func newOracle(opt Options) *oracle { @@ -151,11 +153,33 @@ func (o *oracle) hasConflict(txn *Txn) bool { continue } + // check if there is any direct conflict for _, ro := range txn.reads { if _, has := committedTxn.conflictKeys[ro]; has { return true } } + + // check if there is any indirect conflict (logical conflict) + for _, i := range txn.conflictKeys { + if i == -1 { + continue + } + + happensBeforeReads := txn.reads[:i] + + for _, j := range committedTxn.conflictKeys { + otherHappensBeforeReads := txn.reads[:j] + + for fp := range happensBeforeReads { + for otherFp := range otherHappensBeforeReads { + if fp == otherFp { + return true + } + } + } + } + } } return false @@ -256,7 +280,8 @@ type Txn struct { reads []uint64 // contains fingerprints of keys read. // contains fingerprints of keys written. This is used for conflict detection. - conflictKeys map[uint64]struct{} + // The value int is the len of the reads when insert this write + conflictKeys map[uint64]int readsLock sync.Mutex // guards the reads slice. See addReadKey. pendingWrites map[string]*Entry // cache stores any writes done by txn. @@ -393,8 +418,8 @@ func (txn *Txn) modify(e *Entry) error { // The txn.conflictKeys is used for conflict detection. If conflict detection // is disabled, we don't need to store key hashes in this map. if txn.db.opt.DetectConflicts { - fp := z.MemHash(e.Key) // Avoid dealing with byte arrays. - txn.conflictKeys[fp] = struct{}{} + fp := z.MemHash(e.Key) // Avoid dealing with byte arrays. + txn.conflictKeys[fp] = len(txn.reads) - 1 // store this write happens after how many reads } // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice. // Add the entry to duplicateWrites only if both the entries have different versions. For @@ -783,7 +808,7 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn { } if update { if db.opt.DetectConflicts { - txn.conflictKeys = make(map[uint64]struct{}) + txn.conflictKeys = make(map[uint64]int) } txn.pendingWrites = make(map[string]*Entry) } diff --git a/txn_test.go b/txn_test.go index 3830855fe..1529a9cad 100644 --- a/txn_test.go +++ b/txn_test.go @@ -328,6 +328,64 @@ func TestTxnWriteSkew(t *testing.T) { }) } +// https://wiki.postgresql.org/wiki/SSI#Intersecting_Data +func TestTxnWriteSkew2(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + // Setup + db.opt.managedTxns = false + txn := db.NewTransaction(true) + defer txn.Discard() + txn.SetEntry(NewEntry([]byte("a1"), []byte("10"))) + txn.SetEntry(NewEntry([]byte("a2"), []byte("20"))) + txn.SetEntry(NewEntry([]byte("b1"), []byte("100"))) + txn.SetEntry(NewEntry([]byte("b2"), []byte("200"))) + require.NoError(t, txn.Commit()) + + txn1 := db.NewTransaction(true) + defer txn1.Discard() + + itr := txn1.NewIterator(DefaultIteratorOptions) + sum := 0 + { + for itr.Rewind(); itr.Valid(); itr.Next() { + if itr.Item().Key()[0] == 'a' { + a, _ := itr.Item().ValueCopy(nil) + val, _ := strconv.ParseUint(string(a), 10, 64) + sum += int(val) + } + } + itr.Close() + } + + txn1.SetEntry(NewEntry([]byte("b3"), []byte("30"))) + + txn2 := db.NewTransaction(true) + defer txn2.Discard() + { + itr = txn2.NewIterator(DefaultIteratorOptions) + sum = 0 + for itr.Rewind(); itr.Valid(); itr.Next() { + if itr.Item().Key()[0] == 'b' { + a, _ := itr.Item().ValueCopy(nil) + val, _ := strconv.ParseUint(string(a), 10, 64) + sum += int(val) + } + } + itr.Close() + } + + txn2.SetEntry(NewEntry([]byte("a3"), []byte("300"))) + require.NoError(t, txn2.Commit()) + + // Each transaction has modified what the other transaction would have read. + // If both were allowed to commit, this would break serializable behavior, + // because if they were run one at a time, one of the transactions would have seen the INSERT the other committed. + // We wait for a successful COMMIT of one of the transactions before we roll anything back, + // though, to ensure progress and prevent thrashing. + require.Error(t, txn1.Commit()) + }) +} + // a3, a2, b4 (del), b3, c2, c1 // Read at ts=4 -> a3, c2 // Read at ts=4(Uncommitted) -> a3, b4