Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache Transaction Ids #454

Open
wants to merge 3 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,6 @@ func (s *S) TestSecondaryModeWithMongosInsert(c *C) {
c.Assert(result.A, Equals, 1)
}


func (s *S) TestRemovalOfClusterMember(c *C) {
if *fast {
c.Skip("-fast")
Expand Down
47 changes: 37 additions & 10 deletions txn/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func flush(r *Runner, t *transaction) error {
Runner: r,
goal: t,
goalKeys: make(map[docKey]bool),
queue: make(map[docKey][]token),
queue: make(map[docKey][]tokenAndId),
debugId: debugPrefix(),
}
for _, dkey := range f.goal.docKeys() {
Expand All @@ -25,10 +25,36 @@ type flusher struct {
*Runner
goal *transaction
goalKeys map[docKey]bool
queue map[docKey][]token
queue map[docKey][]tokenAndId
debugId string
}

type tokenAndId struct {
tt token
bid bson.ObjectId
}

func (ti tokenAndId) id() bson.ObjectId {
return ti.bid
}

func (ti tokenAndId) nonce() string {
return ti.tt.nonce()
}

func (ti tokenAndId) String() string {
return string(ti.tt)
}

func tokensWithIds(q []token) []tokenAndId {
out := make([]tokenAndId, len(q))
for i, tt := range q {
out[i].tt = tt
out[i].bid = tt.id()
}
return out
}

func (f *flusher) run() (err error) {
if chaosEnabled {
defer f.handleChaos(&err)
Expand Down Expand Up @@ -247,7 +273,7 @@ NextDoc:
if info.Remove == "" {
// Fast path, unless workload is insert/remove heavy.
revno[dkey] = info.Revno
f.queue[dkey] = info.Queue
f.queue[dkey] = tokensWithIds(info.Queue)
f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
continue NextDoc
} else {
Expand Down Expand Up @@ -309,7 +335,7 @@ NextDoc:
f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
}
revno[dkey] = info.Revno
f.queue[dkey] = info.Queue
f.queue[dkey] = tokensWithIds(info.Queue)
continue NextDoc
}
}
Expand Down Expand Up @@ -451,7 +477,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error)
break
}
}
f.queue[dkey] = info.Queue
f.queue[dkey] = tokensWithIds(info.Queue)
if !found {
// Rescanned transaction id was not in the queue. This could mean one
// of three things:
Expand Down Expand Up @@ -515,12 +541,13 @@ func assembledRevnos(ops []Op, revno map[docKey]int64) []int64 {

func (f *flusher) hasPreReqs(tt token, dkeys docKeys) (prereqs, found bool) {
found = true
ttId := tt.id()
NextDoc:
for _, dkey := range dkeys {
for _, dtt := range f.queue[dkey] {
if dtt == tt {
if dtt.tt == tt {
continue NextDoc
} else if dtt.id() != tt.id() {
} else if dtt.id() != ttId {
prereqs = true
}
}
Expand Down Expand Up @@ -908,17 +935,17 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err
return nil
}

func tokensToPull(dqueue []token, pull map[bson.ObjectId]*transaction, dontPull token) []token {
func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dontPull token) []token {
var result []token
for j := len(dqueue) - 1; j >= 0; j-- {
dtt := dqueue[j]
if dtt == dontPull {
if dtt.tt == dontPull {
continue
}
if _, ok := pull[dtt.id()]; ok {
// It was handled before and this is a leftover invalid
// nonce in the queue. Cherry-pick it out.
result = append(result, dtt)
result = append(result, dtt.tt)
}
}
return result
Expand Down
2 changes: 1 addition & 1 deletion txn/sim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package txn_test

import (
"flag"
. "gopkg.in/check.v1"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"gopkg.in/mgo.v2/dbtest"
"gopkg.in/mgo.v2/txn"
. "gopkg.in/check.v1"
"math/rand"
"time"
)
Expand Down
2 changes: 1 addition & 1 deletion txn/tarjan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package txn

import (
"fmt"
"gopkg.in/mgo.v2/bson"
. "gopkg.in/check.v1"
"gopkg.in/mgo.v2/bson"
)

type TarjanSuite struct{}
Expand Down
116 changes: 116 additions & 0 deletions txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
}

var flaky = flag.Bool("flaky", false, "Include flaky tests")
var txnQueueLength = flag.Int("qlength", 100, "txn-queue length for tests")


func (s *S) TestTxnQueueStressTest(c *C) {
// This fails about 20% of the time on Mongo 3.2 (I haven't tried
Expand Down Expand Up @@ -776,3 +778,117 @@ func (s *S) TestTxnQueueStressTest(c *C) {
}
}
}

type txnQueue struct {
Queue []string `bson:"txn-queue"`
}

func (s *S) TestTxnQueueAssertionGrowth(c *C) {
txn.SetDebug(false) // too much spam
err := s.accounts.Insert(M{"_id": 0, "balance": 0})
c.Assert(err, IsNil)
// Create many assertion only transactions.
t := time.Now()
ops := []txn.Op{{
C: "accounts",
Id: 0,
Assert: M{"balance": 0},
}}
for n := 0; n < *txnQueueLength; n++ {
err = s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
}
var qdoc txnQueue
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, *txnQueueLength)
c.Logf("%8.3fs to set up %d assertions", time.Since(t).Seconds(), *txnQueueLength)
t = time.Now()
txn.SetChaos(txn.Chaos{})
ops = []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
err = s.runner.Run(ops, "", nil)
c.Logf("%8.3fs to clear N=%d assertions and add one more txn",
time.Since(t).Seconds(), *txnQueueLength)
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, 1)
}

func (s *S) TestTxnQueueBrokenPrepared(c *C) {
txn.SetDebug(false) // too much spam
badTxnToken := "123456789012345678901234_deadbeef"
err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{badTxnToken}})
c.Assert(err, IsNil)
t := time.Now()
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$set": M{"balance": 0}},
}}
errString := `cannot find transaction ObjectIdHex("123456789012345678901234")`
for n := 0; n < *txnQueueLength; n++ {
err = s.runner.Run(ops, "", nil)
c.Assert(err.Error(), Equals, errString)
}
var qdoc txnQueue
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, *txnQueueLength+1)
c.Logf("%8.3fs to set up %d 'prepared' txns", time.Since(t).Seconds(), *txnQueueLength)
t = time.Now()
s.accounts.UpdateId(0, bson.M{"$pullAll": bson.M{"txn-queue": []string{badTxnToken}}})
ops = []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
err = s.runner.ResumeAll()
c.Assert(err, IsNil)
c.Logf("%8.3fs to ResumeAll N=%d 'prepared' txns",
time.Since(t).Seconds(), *txnQueueLength)
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, 1)
}

func (s *S) TestTxnQueuePreparing(c *C) {
txn.SetDebug(false) // too much spam
err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{}})
c.Assert(err, IsNil)
t := time.Now()
txn.SetChaos(txn.Chaos{
KillChance: 1.0,
Breakpoint: "set-prepared",
})
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$set": M{"balance": 0}},
}}
for n := 0; n < *txnQueueLength; n++ {
err = s.runner.Run(ops, "", nil)
c.Assert(err, Equals, txn.ErrChaos)
}
var qdoc txnQueue
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, *txnQueueLength)
c.Logf("%8.3fs to set up %d 'preparing' txns", time.Since(t).Seconds(), *txnQueueLength)
txn.SetChaos(txn.Chaos{})
t = time.Now()
err = s.runner.ResumeAll()
c.Logf("%8.3fs to ResumeAll N=%d 'preparing' txns",
time.Since(t).Seconds(), *txnQueueLength)
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
expectedCount := 100
if *txnQueueLength <= expectedCount {
expectedCount = *txnQueueLength - 1
}
c.Check(len(qdoc.Queue), Equals, expectedCount)
}