diff --git a/rest/blip_api_test.go b/rest/blip_api_test.go index 3d8cbcb7f5..f1a9998de0 100644 --- a/rest/blip_api_test.go +++ b/rest/blip_api_test.go @@ -14,7 +14,7 @@ import ( "github.com/couchbase/go-blip" "github.com/couchbase/sync_gateway/base" "github.com/couchbase/sync_gateway/db" - "github.com/couchbaselabs/go.assert" + assert "github.com/couchbaselabs/go.assert" ) // This test performs the following steps against the Sync Gateway passive blip replicator: @@ -1308,10 +1308,8 @@ func TestMultipleOustandingChangesSubscriptions(t *testing.T) { log.Printf("errorCode: %v", errorCode3) assert.True(t, errorCode == "") - } - // Reproduce issue SG #3738 // // - Add 5 docs to channel ABC @@ -1326,7 +1324,7 @@ func TestMissingNoRev(t *testing.T) { rt := RestTester{} btSpec := BlipTesterSpec{ - restTester: &rt, + restTester: &rt, } bt, err := NewBlipTesterFromSpec(btSpec) assertNoError(t, err, "Unexpected error creating BlipTester") @@ -1341,7 +1339,6 @@ func TestMissingNoRev(t *testing.T) { log.Printf("resp: %v, err: %v", resp, err) } - // Get a reference to the database targetDbContext, err := rt.ServerContext().GetDatabase("db") assertNoError(t, err, "failed") @@ -1360,6 +1357,4 @@ func TestMissingNoRev(t *testing.T) { docs := bt.WaitForNumDocsViaChanges(4) assert.True(t, len(docs) == 4) - - -} \ No newline at end of file +} diff --git a/rest/blip_sync.go b/rest/blip_sync.go index 230a43c8f7..17fae54a3c 100644 --- a/rest/blip_sync.go +++ b/rest/blip_sync.go @@ -3,6 +3,7 @@ package rest import ( "bytes" "encoding/json" + "errors" "fmt" "net/http" "regexp" @@ -33,6 +34,8 @@ const ( BlipCBMobileReplication = "CBMobile_2" ) +var ErrClosedBLIPSender = errors.New("use of closed BLIP sender") + // Represents one BLIP connection (socket) opened by a client. // This connection remains open until the client closes it, and can receive any number of requests. type blipSyncContext struct { @@ -190,13 +193,10 @@ func (ctx *blipSyncContext) Logf(logLevel base.LogLevel, logKey base.LogKey, for switch logLevel { case base.LevelError: base.Errorf(logKey, formatWithContextID, paramsWithContextID...) - fallthrough case base.LevelWarn: base.Warnf(logKey, formatWithContextID, paramsWithContextID...) - fallthrough case base.LevelInfo: base.Infof(logKey, formatWithContextID, paramsWithContextID...) - fallthrough case base.LevelDebug: base.Debugf(logKey, formatWithContextID, paramsWithContextID...) } @@ -350,11 +350,14 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, params *subChangesParams caughtUp := false pendingChanges := make([][]interface{}, 0, bh.batchSize) - sendPendingChangesAt := func(minChanges int) { + sendPendingChangesAt := func(minChanges int) error { if len(pendingChanges) >= minChanges { - bh.sendBatchOfChanges(sender, pendingChanges) + if err := bh.sendBatchOfChanges(sender, pendingChanges); err != nil { + return err + } pendingChanges = make([][]interface{}, 0, bh.batchSize) } + return nil } _, forceClose := generateBlipSyncChanges(bh.db, channelSet, options, params.docIDs(), func(changes []*db.ChangeEntry) error { @@ -368,15 +371,22 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, params *subChangesParams changeRow = changeRow[0:3] } pendingChanges = append(pendingChanges, changeRow) - sendPendingChangesAt(bh.batchSize) + if err := sendPendingChangesAt(bh.batchSize); err != nil { + return err + } } } } if caughtUp || len(changes) == 0 { - sendPendingChangesAt(1) + if err := sendPendingChangesAt(1); err != nil { + return err + } if !caughtUp { caughtUp = true - bh.sendBatchOfChanges(sender, nil) // Signal to client that it's caught up + // Signal to client that it's caught up + if err := bh.sendBatchOfChanges(sender, nil); err != nil { + return err + } } } return nil @@ -389,28 +399,40 @@ func (bh *blipHandler) sendChanges(sender *blip.Sender, params *subChangesParams } -func (bh *blipHandler) sendBatchOfChanges(sender *blip.Sender, changeArray [][]interface{}) { +func (bh *blipHandler) sendBatchOfChanges(sender *blip.Sender, changeArray [][]interface{}) error { outrq := blip.NewRequest() outrq.SetProfile("changes") outrq.SetJSONBody(changeArray) + if len(changeArray) > 0 { + if !sender.Send(outrq) { + return ErrClosedBLIPSender + } // Spawn a goroutine to await the client's response: - sender.Send(outrq) - go bh.handleChangesResponse(sender, outrq.Response(), changeArray) + go func(bh *blipHandler, sender *blip.Sender, response *blip.Message, changeArray [][]interface{}) { + if err := bh.handleChangesResponse(sender, response, changeArray); err != nil { + bh.Logf(base.LevelError, base.KeyAll, "Error from bh.handleChangesResponse: %v", err) + } + }(bh, sender, outrq.Response(), changeArray) } else { outrq.SetNoReply(true) - sender.Send(outrq) + if !sender.Send(outrq) { + return ErrClosedBLIPSender + } } + if len(changeArray) > 0 { sequence := changeArray[0][0].(db.SequenceID) bh.Logf(base.LevelInfo, base.KeySync, "Sent %d changes to client, from seq %s. User:%s", len(changeArray), sequence.String(), base.UD(bh.effectiveUsername)) } else { bh.Logf(base.LevelInfo, base.KeySync, "Sent all changes to client. User:%s", base.UD(bh.effectiveUsername)) } + + return nil } // Handles the response to a pushed "changes" message, i.e. the list of revisions the client wants -func (bh *blipHandler) handleChangesResponse(sender *blip.Sender, response *blip.Message, changeArray [][]interface{}) { +func (bh *blipHandler) handleChangesResponse(sender *blip.Sender, response *blip.Message, changeArray [][]interface{}) error { defer func() { if panicked := recover(); panicked != nil { base.Warnf(base.KeyAll, "[%s] PANIC handling 'changes' response: %v\n%s", bh.blipContext.ID, panicked, debug.Stack()) @@ -419,8 +441,8 @@ func (bh *blipHandler) handleChangesResponse(sender *blip.Sender, response *blip var answer []interface{} if err := response.ReadJSONBody(&answer); err != nil { - bh.Logf(base.LevelInfo, base.KeySync, "Invalid response to 'changes' message: %s -- %s. User:%s", response, err, base.UD(bh.effectiveUsername)) - return + bh.Logf(base.LevelError, base.KeyAll, "Invalid response to 'changes' message: %s -- %s. User:%s", response, err, base.UD(bh.effectiveUsername)) + return errors.New("Invalid response to 'changes' message") } maxHistory := 0 @@ -448,13 +470,17 @@ func (bh *blipHandler) handleChangesResponse(sender *blip.Sender, response *blip if revID, ok := rev.(string); ok { knownRevs[revID] = true } else { - bh.Logf(base.LevelInfo, base.KeySync, "Invalid response to 'changes' message. User:%s", base.UD(bh.effectiveUsername)) - return + bh.Logf(base.LevelError, base.KeyAll, "Invalid response to 'changes' message. User:%s", base.UD(bh.effectiveUsername)) + return errors.New("Invalid response to 'changes' message") } } - bh.sendRevOrNorev(sender, seq, docID, revID, knownRevs, maxHistory) + if err := bh.sendRevOrNorev(sender, seq, docID, revID, knownRevs, maxHistory); err != nil { + return err + } } } + + return nil } // Handles a "changes" request, i.e. a set of changes pushed by the client @@ -541,17 +567,19 @@ func (bh *blipHandler) handleProposedChanges(rq *blip.Message) error { //////// DOCUMENTS: -func (bh *blipHandler) sendRevOrNorev(sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) { +func (bh *blipHandler) sendRevOrNorev(sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) error { body, err := bh.db.GetRev(docID, revID, true, nil) if err != nil { - bh.sendNoRev(err, sender, seq, docID, revID) + err = bh.sendNoRev(err, sender, seq, docID, revID) } else { - bh.sendRevision(body, sender, seq, docID, revID, knownRevs, maxHistory) + err = bh.sendRevision(body, sender, seq, docID, revID, knownRevs, maxHistory) } + + return err } -func (bh *blipHandler) sendNoRev(err error, sender *blip.Sender, seq db.SequenceID, docID string, revID string) { +func (bh *blipHandler) sendNoRev(err error, sender *blip.Sender, seq db.SequenceID, docID string, revID string) error { bh.Logf(base.LevelDebug, base.KeySync, "Sending norev %q %s due to error: %v. User:%s", base.UD(docID), revID, err, base.UD(bh.effectiveUsername)) @@ -571,12 +599,16 @@ func (bh *blipHandler) sendNoRev(err error, sender *blip.Sender, seq db.Sequence outrq.Properties["reason"] = fmt.Sprintf("%s", reason) outrq.SetNoReply(true) - sender.Send(outrq) + if !sender.Send(outrq) { + return ErrClosedBLIPSender + } + + return nil } // Pushes a revision body to the client -func (bh *blipHandler) sendRevision(body db.Body, sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) { +func (bh *blipHandler) sendRevision(body db.Body, sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) error { bh.Logf(base.LevelDebug, base.KeySync, "Sending rev %q %s based on %d known. User:%s", base.UD(docID), revID, len(knownRevs), base.UD(bh.effectiveUsername)) @@ -613,7 +645,9 @@ func (bh *blipHandler) sendRevision(body db.Body, sender *blip.Sender, seq db.Se if atts := db.BodyAttachments(body); atts != nil { // Allow client to download attachments in 'atts', but only while pulling this rev bh.addAllowedAttachments(atts) - sender.Send(outrq) + if !sender.Send(outrq) { + return ErrClosedBLIPSender + } go func() { defer func() { if panicked := recover(); panicked != nil { @@ -626,8 +660,12 @@ func (bh *blipHandler) sendRevision(body db.Body, sender *blip.Sender, seq db.Se }() } else { outrq.SetNoReply(true) - sender.Send(outrq) + if !sender.Send(outrq) { + return ErrClosedBLIPSender + } } + + return nil } // Received a "rev" request, i.e. client is pushing a revision body @@ -725,7 +763,9 @@ func (bh *blipHandler) downloadOrVerifyAttachments(body db.Body, minRevpos int, outrq := blip.NewRequest() outrq.Properties = map[string]string{"Profile": "proveAttachment", "digest": digest} outrq.SetBody(nonce) - sender.Send(outrq) + if !sender.Send(outrq) { + return nil, ErrClosedBLIPSender + } if body, err := outrq.Response().Body(); err != nil { return nil, err } else if string(body) != proof { @@ -741,7 +781,9 @@ func (bh *blipHandler) downloadOrVerifyAttachments(body db.Body, minRevpos int, if isCompressible(name, meta) { outrq.Properties["compress"] = "true" } - sender.Send(outrq) + if !sender.Send(outrq) { + return nil, ErrClosedBLIPSender + } return outrq.Response().Body() } })