Skip to content

Commit

Permalink
Backport CBG-259 to 2.1.3.1 (#4059)
Browse files Browse the repository at this point in the history
* Remove fallthrough from blipSyncContext.Logf (Backport part of #3845)

* CBG-259 Handle errors from sender.Send to prevent starting goroutines to wait for unsent message responses
  • Loading branch information
bbrks authored and adamcfraser committed Apr 6, 2019
1 parent e95701b commit 60ebdbe
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 36 deletions.
11 changes: 3 additions & 8 deletions rest/blip_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/couchbase/go-blip"
"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/db"
"github.com/couchbaselabs/go.assert"
assert "github.com/couchbaselabs/go.assert"
)

// This test performs the following steps against the Sync Gateway passive blip replicator:
Expand Down Expand Up @@ -1308,10 +1308,8 @@ func TestMultipleOustandingChangesSubscriptions(t *testing.T) {
log.Printf("errorCode: %v", errorCode3)
assert.True(t, errorCode == "")


}


// Reproduce issue SG #3738
//
// - Add 5 docs to channel ABC
Expand All @@ -1326,7 +1324,7 @@ func TestMissingNoRev(t *testing.T) {

rt := RestTester{}
btSpec := BlipTesterSpec{
restTester: &rt,
restTester: &rt,
}
bt, err := NewBlipTesterFromSpec(btSpec)
assertNoError(t, err, "Unexpected error creating BlipTester")
Expand All @@ -1341,7 +1339,6 @@ func TestMissingNoRev(t *testing.T) {
log.Printf("resp: %v, err: %v", resp, err)
}


// Get a reference to the database
targetDbContext, err := rt.ServerContext().GetDatabase("db")
assertNoError(t, err, "failed")
Expand All @@ -1360,6 +1357,4 @@ func TestMissingNoRev(t *testing.T) {
docs := bt.WaitForNumDocsViaChanges(4)
assert.True(t, len(docs) == 4)



}
}
98 changes: 70 additions & 28 deletions rest/blip_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rest
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"regexp"
Expand Down Expand Up @@ -33,6 +34,8 @@ const (
BlipCBMobileReplication = "CBMobile_2"
)

var ErrClosedBLIPSender = errors.New("use of closed BLIP sender")

// Represents one BLIP connection (socket) opened by a client.
// This connection remains open until the client closes it, and can receive any number of requests.
type blipSyncContext struct {
Expand Down Expand Up @@ -190,13 +193,10 @@ func (ctx *blipSyncContext) Logf(logLevel base.LogLevel, logKey base.LogKey, for
switch logLevel {
case base.LevelError:
base.Errorf(logKey, formatWithContextID, paramsWithContextID...)
fallthrough
case base.LevelWarn:
base.Warnf(logKey, formatWithContextID, paramsWithContextID...)
fallthrough
case base.LevelInfo:
base.Infof(logKey, formatWithContextID, paramsWithContextID...)
fallthrough
case base.LevelDebug:
base.Debugf(logKey, formatWithContextID, paramsWithContextID...)
}
Expand Down Expand Up @@ -350,11 +350,14 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, params *subChangesParams

caughtUp := false
pendingChanges := make([][]interface{}, 0, bh.batchSize)
sendPendingChangesAt := func(minChanges int) {
sendPendingChangesAt := func(minChanges int) error {
if len(pendingChanges) >= minChanges {
bh.sendBatchOfChanges(sender, pendingChanges)
if err := bh.sendBatchOfChanges(sender, pendingChanges); err != nil {
return err
}
pendingChanges = make([][]interface{}, 0, bh.batchSize)
}
return nil
}

_, forceClose := generateBlipSyncChanges(bh.db, channelSet, options, params.docIDs(), func(changes []*db.ChangeEntry) error {
Expand All @@ -368,15 +371,22 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, params *subChangesParams
changeRow = changeRow[0:3]
}
pendingChanges = append(pendingChanges, changeRow)
sendPendingChangesAt(bh.batchSize)
if err := sendPendingChangesAt(bh.batchSize); err != nil {
return err
}
}
}
}
if caughtUp || len(changes) == 0 {
sendPendingChangesAt(1)
if err := sendPendingChangesAt(1); err != nil {
return err
}
if !caughtUp {
caughtUp = true
bh.sendBatchOfChanges(sender, nil) // Signal to client that it's caught up
// Signal to client that it's caught up
if err := bh.sendBatchOfChanges(sender, nil); err != nil {
return err
}
}
}
return nil
Expand All @@ -389,28 +399,40 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, params *subChangesParams

}

func (bh *blipHandler) sendBatchOfChanges(sender *blip.Sender, changeArray [][]interface{}) {
func (bh *blipHandler) sendBatchOfChanges(sender *blip.Sender, changeArray [][]interface{}) error {
outrq := blip.NewRequest()
outrq.SetProfile("changes")
outrq.SetJSONBody(changeArray)

if len(changeArray) > 0 {
if !sender.Send(outrq) {
return ErrClosedBLIPSender
}
// Spawn a goroutine to await the client's response:
sender.Send(outrq)
go bh.handleChangesResponse(sender, outrq.Response(), changeArray)
go func(bh *blipHandler, sender *blip.Sender, response *blip.Message, changeArray [][]interface{}) {
if err := bh.handleChangesResponse(sender, response, changeArray); err != nil {
bh.Logf(base.LevelError, base.KeyAll, "Error from bh.handleChangesResponse: %v", err)
}
}(bh, sender, outrq.Response(), changeArray)
} else {
outrq.SetNoReply(true)
sender.Send(outrq)
if !sender.Send(outrq) {
return ErrClosedBLIPSender
}
}

if len(changeArray) > 0 {
sequence := changeArray[0][0].(db.SequenceID)
bh.Logf(base.LevelInfo, base.KeySync, "Sent %d changes to client, from seq %s. User:%s", len(changeArray), sequence.String(), base.UD(bh.effectiveUsername))
} else {
bh.Logf(base.LevelInfo, base.KeySync, "Sent all changes to client. User:%s", base.UD(bh.effectiveUsername))
}

return nil
}

// Handles the response to a pushed "changes" message, i.e. the list of revisions the client wants
func (bh *blipHandler) handleChangesResponse(sender *blip.Sender, response *blip.Message, changeArray [][]interface{}) {
func (bh *blipHandler) handleChangesResponse(sender *blip.Sender, response *blip.Message, changeArray [][]interface{}) error {
defer func() {
if panicked := recover(); panicked != nil {
base.Warnf(base.KeyAll, "[%s] PANIC handling 'changes' response: %v\n%s", bh.blipContext.ID, panicked, debug.Stack())
Expand All @@ -419,8 +441,8 @@ func (bh *blipHandler) handleChangesResponse(sender *blip.Sender, response *blip

var answer []interface{}
if err := response.ReadJSONBody(&answer); err != nil {
bh.Logf(base.LevelInfo, base.KeySync, "Invalid response to 'changes' message: %s -- %s. User:%s", response, err, base.UD(bh.effectiveUsername))
return
bh.Logf(base.LevelError, base.KeyAll, "Invalid response to 'changes' message: %s -- %s. User:%s", response, err, base.UD(bh.effectiveUsername))
return errors.New("Invalid response to 'changes' message")
}

maxHistory := 0
Expand Down Expand Up @@ -448,13 +470,17 @@ func (bh *blipHandler) handleChangesResponse(sender *blip.Sender, response *blip
if revID, ok := rev.(string); ok {
knownRevs[revID] = true
} else {
bh.Logf(base.LevelInfo, base.KeySync, "Invalid response to 'changes' message. User:%s", base.UD(bh.effectiveUsername))
return
bh.Logf(base.LevelError, base.KeyAll, "Invalid response to 'changes' message. User:%s", base.UD(bh.effectiveUsername))
return errors.New("Invalid response to 'changes' message")
}
}
bh.sendRevOrNorev(sender, seq, docID, revID, knownRevs, maxHistory)
if err := bh.sendRevOrNorev(sender, seq, docID, revID, knownRevs, maxHistory); err != nil {
return err
}
}
}

return nil
}

// Handles a "changes" request, i.e. a set of changes pushed by the client
Expand Down Expand Up @@ -541,17 +567,19 @@ func (bh *blipHandler) handleProposedChanges(rq *blip.Message) error {

//////// DOCUMENTS:

func (bh *blipHandler) sendRevOrNorev(sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) {
func (bh *blipHandler) sendRevOrNorev(sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) error {

body, err := bh.db.GetRev(docID, revID, true, nil)
if err != nil {
bh.sendNoRev(err, sender, seq, docID, revID)
err = bh.sendNoRev(err, sender, seq, docID, revID)
} else {
bh.sendRevision(body, sender, seq, docID, revID, knownRevs, maxHistory)
err = bh.sendRevision(body, sender, seq, docID, revID, knownRevs, maxHistory)
}

return err
}

func (bh *blipHandler) sendNoRev(err error, sender *blip.Sender, seq db.SequenceID, docID string, revID string) {
func (bh *blipHandler) sendNoRev(err error, sender *blip.Sender, seq db.SequenceID, docID string, revID string) error {

bh.Logf(base.LevelDebug, base.KeySync, "Sending norev %q %s due to error: %v. User:%s", base.UD(docID), revID, err, base.UD(bh.effectiveUsername))

Expand All @@ -571,12 +599,16 @@ func (bh *blipHandler) sendNoRev(err error, sender *blip.Sender, seq db.Sequence
outrq.Properties["reason"] = fmt.Sprintf("%s", reason)

outrq.SetNoReply(true)
sender.Send(outrq)
if !sender.Send(outrq) {
return ErrClosedBLIPSender
}

return nil

}

// Pushes a revision body to the client
func (bh *blipHandler) sendRevision(body db.Body, sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) {
func (bh *blipHandler) sendRevision(body db.Body, sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) error {

bh.Logf(base.LevelDebug, base.KeySync, "Sending rev %q %s based on %d known. User:%s", base.UD(docID), revID, len(knownRevs), base.UD(bh.effectiveUsername))

Expand Down Expand Up @@ -613,7 +645,9 @@ func (bh *blipHandler) sendRevision(body db.Body, sender *blip.Sender, seq db.Se
if atts := db.BodyAttachments(body); atts != nil {
// Allow client to download attachments in 'atts', but only while pulling this rev
bh.addAllowedAttachments(atts)
sender.Send(outrq)
if !sender.Send(outrq) {
return ErrClosedBLIPSender
}
go func() {
defer func() {
if panicked := recover(); panicked != nil {
Expand All @@ -626,8 +660,12 @@ func (bh *blipHandler) sendRevision(body db.Body, sender *blip.Sender, seq db.Se
}()
} else {
outrq.SetNoReply(true)
sender.Send(outrq)
if !sender.Send(outrq) {
return ErrClosedBLIPSender
}
}

return nil
}

// Received a "rev" request, i.e. client is pushing a revision body
Expand Down Expand Up @@ -725,7 +763,9 @@ func (bh *blipHandler) downloadOrVerifyAttachments(body db.Body, minRevpos int,
outrq := blip.NewRequest()
outrq.Properties = map[string]string{"Profile": "proveAttachment", "digest": digest}
outrq.SetBody(nonce)
sender.Send(outrq)
if !sender.Send(outrq) {
return nil, ErrClosedBLIPSender
}
if body, err := outrq.Response().Body(); err != nil {
return nil, err
} else if string(body) != proof {
Expand All @@ -741,7 +781,9 @@ func (bh *blipHandler) downloadOrVerifyAttachments(body db.Body, minRevpos int,
if isCompressible(name, meta) {
outrq.Properties["compress"] = "true"
}
sender.Send(outrq)
if !sender.Send(outrq) {
return nil, ErrClosedBLIPSender
}
return outrq.Response().Body()
}
})
Expand Down

0 comments on commit 60ebdbe

Please sign in to comment.