Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache TXN ID #10

Merged
merged 4 commits into from
Jun 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions txn/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func flush(r *Runner, t *transaction) error {
Runner: r,
goal: t,
goalKeys: make(map[docKey]bool),
queue: make(map[docKey][]token),
queue: make(map[docKey][]tokenAndId),
debugId: debugPrefix(),
}
for _, dkey := range f.goal.docKeys() {
Expand All @@ -26,10 +26,36 @@ type flusher struct {
*Runner
goal *transaction
goalKeys map[docKey]bool
queue map[docKey][]token
queue map[docKey][]tokenAndId
debugId string
}

type tokenAndId struct {
tt token
bid bson.ObjectId
}

func (ti tokenAndId) id() bson.ObjectId {
return ti.bid
}

func (ti tokenAndId) nonce() string {
return ti.tt.nonce()
}

func (ti tokenAndId) String() string {
return string(ti.tt)
}

func tokensWithIds(q []token) []tokenAndId {
out := make([]tokenAndId, len(q))
for i, tt := range q {
out[i].tt = tt
out[i].bid = tt.id()
}
return out
}

func (f *flusher) run() (err error) {
if chaosEnabled {
defer f.handleChaos(&err)
Expand Down Expand Up @@ -248,7 +274,7 @@ NextDoc:
if info.Remove == "" {
// Fast path, unless workload is insert/remove heavy.
revno[dkey] = info.Revno
f.queue[dkey] = info.Queue
f.queue[dkey] = tokensWithIds(info.Queue)
f.debugf("[A] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
continue NextDoc
} else {
Expand Down Expand Up @@ -310,7 +336,7 @@ NextDoc:
f.debugf("[B] Prepared document %v with revno %d and queue: %v", dkey, info.Revno, info.Queue)
}
revno[dkey] = info.Revno
f.queue[dkey] = info.Queue
f.queue[dkey] = tokensWithIds(info.Queue)
continue NextDoc
}
}
Expand Down Expand Up @@ -452,7 +478,7 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error)
break
}
}
f.queue[dkey] = info.Queue
f.queue[dkey] = tokensWithIds(info.Queue)
if !found {
// Rescanned transaction id was not in the queue. This could mean one
// of three things:
Expand Down Expand Up @@ -516,12 +542,13 @@ func assembledRevnos(ops []Op, revno map[docKey]int64) []int64 {

func (f *flusher) hasPreReqs(tt token, dkeys docKeys) (prereqs, found bool) {
found = true
ttId := tt.id()
NextDoc:
for _, dkey := range dkeys {
for _, dtt := range f.queue[dkey] {
if dtt == tt {
if dtt.tt == tt {
continue NextDoc
} else if dtt.id() != tt.id() {
} else if dtt.id() != ttId {
prereqs = true
}
}
Expand Down Expand Up @@ -909,17 +936,17 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err
return nil
}

func tokensToPull(dqueue []token, pull map[bson.ObjectId]*transaction, dontPull token) []token {
func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dontPull token) []token {
var result []token
for j := len(dqueue) - 1; j >= 0; j-- {
dtt := dqueue[j]
if dtt == dontPull {
if dtt.tt == dontPull {
continue
}
if _, ok := pull[dtt.id()]; ok {
// It was handled before and this is a leftover invalid
// nonce in the queue. Cherry-pick it out.
result = append(result, dtt)
result = append(result, dtt.tt)
}
}
return result
Expand Down
116 changes: 116 additions & 0 deletions txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ func (s *S) TestPurgeMissingPipelineSizeLimit(c *C) {
}

var flaky = flag.Bool("flaky", false, "Include flaky tests")
var txnQueueLength = flag.Int("qlength", 100, "txn-queue length for tests")


func (s *S) TestTxnQueueStressTest(c *C) {
// This fails about 20% of the time on Mongo 3.2 (I haven't tried
Expand Down Expand Up @@ -776,3 +778,117 @@ func (s *S) TestTxnQueueStressTest(c *C) {
}
}
}

type txnQueue struct {
Queue []string `bson:"txn-queue"`
}

func (s *S) TestTxnQueueAssertionGrowth(c *C) {
txn.SetDebug(false) // too much spam
err := s.accounts.Insert(M{"_id": 0, "balance": 0})
c.Assert(err, IsNil)
// Create many assertion only transactions.
t := time.Now()
ops := []txn.Op{{
C: "accounts",
Id: 0,
Assert: M{"balance": 0},
}}
for n := 0; n < *txnQueueLength; n++ {
err = s.runner.Run(ops, "", nil)
c.Assert(err, IsNil)
}
var qdoc txnQueue
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, *txnQueueLength)
c.Logf("%8.3fs to set up %d assertions", time.Since(t).Seconds(), *txnQueueLength)
t = time.Now()
txn.SetChaos(txn.Chaos{})
ops = []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
err = s.runner.Run(ops, "", nil)
c.Logf("%8.3fs to clear N=%d assertions and add one more txn",
time.Since(t).Seconds(), *txnQueueLength)
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, 1)
}

func (s *S) TestTxnQueueBrokenPrepared(c *C) {
txn.SetDebug(false) // too much spam
badTxnToken := "123456789012345678901234_deadbeef"
err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{badTxnToken}})
c.Assert(err, IsNil)
t := time.Now()
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$set": M{"balance": 0}},
}}
errString := `cannot find transaction ObjectIdHex("123456789012345678901234")`
for n := 0; n < *txnQueueLength; n++ {
err = s.runner.Run(ops, "", nil)
c.Assert(err.Error(), Equals, errString)
}
var qdoc txnQueue
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, *txnQueueLength+1)
c.Logf("%8.3fs to set up %d 'prepared' txns", time.Since(t).Seconds(), *txnQueueLength)
t = time.Now()
s.accounts.UpdateId(0, bson.M{"$pullAll": bson.M{"txn-queue": []string{badTxnToken}}})
ops = []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$inc": M{"balance": 100}},
}}
err = s.runner.ResumeAll()
c.Assert(err, IsNil)
c.Logf("%8.3fs to ResumeAll N=%d 'prepared' txns",
time.Since(t).Seconds(), *txnQueueLength)
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, 1)
}

func (s *S) TestTxnQueuePreparing(c *C) {
txn.SetDebug(false) // too much spam
err := s.accounts.Insert(M{"_id": 0, "balance": 0, "txn-queue": []string{}})
c.Assert(err, IsNil)
t := time.Now()
txn.SetChaos(txn.Chaos{
KillChance: 1.0,
Breakpoint: "set-prepared",
})
ops := []txn.Op{{
C: "accounts",
Id: 0,
Update: M{"$set": M{"balance": 0}},
}}
for n := 0; n < *txnQueueLength; n++ {
err = s.runner.Run(ops, "", nil)
c.Assert(err, Equals, txn.ErrChaos)
}
var qdoc txnQueue
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
c.Check(len(qdoc.Queue), Equals, *txnQueueLength)
c.Logf("%8.3fs to set up %d 'preparing' txns", time.Since(t).Seconds(), *txnQueueLength)
txn.SetChaos(txn.Chaos{})
t = time.Now()
err = s.runner.ResumeAll()
c.Logf("%8.3fs to ResumeAll N=%d 'preparing' txns",
time.Since(t).Seconds(), *txnQueueLength)
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
expectedCount := 100
if *txnQueueLength <= expectedCount {
expectedCount = *txnQueueLength - 1
}
c.Check(len(qdoc.Queue), Equals, expectedCount)
}