Skip to content

Commit

Permalink
Interpret rollback response.
Browse files Browse the repository at this point in the history
  • Loading branch information
prataprc committed Feb 18, 2014
1 parent d926823 commit 2ca1112
Showing 1 changed file with 37 additions and 11 deletions.
48 changes: 37 additions & 11 deletions upr.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,23 @@ func (feed *UprFeed) GetStream(vbno uint16) *UprStream {
return feed.streams[vbno]
}

// CalculateVector will return a 3-element tuple of
// (vbucket-uuid, startSeq, highSeq) for a give failover-log and last known
// sequence number.
func CalculateVector(lastSeq uint64, flog FailoverLog) (
vuuid uint64, startseq uint64, highseq uint64) {

if lastSeq != 0 {
for _, log := range flog {
if lastSeq >= log[1] {
vuuid, startseq, highseq = log[0], lastSeq, log[1]
}
}
vuuid, startseq, highseq = flog[0][0], flog[0][1], flog[0][1]
}
return vuuid, startseq, highseq
}

func freshStreams(vbmaps map[uint16]*uprConnection) map[uint16]*UprStream {
streams := make(map[uint16]*UprStream)
for vbno := range vbmaps {
Expand Down Expand Up @@ -273,17 +290,26 @@ func handleUprMessage(feed *UprFeed, req *mcd.MCRequest) (err error) {
case uprStreamREQ:
rollb, flog, err := handleStreamResponse(request2Response(req))
if err == nil {
if flog != nil {
stream.Flog = flog
} else if rollb > 0 {
uprconn := feed.vbmap[vb]
log.Println("Requesting a rollback for %v to sequence %v",
vb, rollb)
flags := uint32(0)
err = requestStream(
uprconn.conn, flags, req.Opaque, vb, stream.Vuuid,
rollb, stream.Endseq, stream.Highseq)
if flog == nil {
log.Println("ERROR: flog cannot be empty")
}
stream.Flog = flog
} else if err.Error() == "ROLLBACK" {
uprconn := feed.vbmap[vb]
flags := uint32(0)
if stream.Startseq == 0 {
err := fmt.Errorf("ERROR: Unexpected rollback for start-sequence 0")
log.Println(err)
panic(err) // TODO: Can be removed
}
stream.Vuuid, stream.Startseq, stream.Highseq =
CalculateVector(rollb, stream.Flog)
log.Println("Requesting a rollback for %v to sequence %v", vb, rollb)
err = requestStream(
uprconn.conn, flags, req.Opaque, vb, stream.Vuuid,
stream.Startseq, stream.Endseq, stream.Highseq)
} else if err != nil {
log.Println(err)
}
case uprMUTATION, uprDELETION:
e := feed.makeUprEvent(req)
Expand Down Expand Up @@ -312,7 +338,7 @@ func handleStreamResponse(res *mcd.MCResponse) (uint64, FailoverLog, error) {
case res.Status == rollBack:
rollback = binary.BigEndian.Uint64(res.Extras)
log.Printf("Rollback %v for vb %v\n", rollback, res.Opaque /*vb*/)
return rollback, flog, err
return rollback, flog, fmt.Errorf("ROLLBACK")
case res.Status != mcd.SUCCESS:
err = fmt.Errorf("Unexpected status %v", res.Status)
}
Expand Down

0 comments on commit 2ca1112

Please sign in to comment.