diff --git a/rest/blip_api_test.go b/rest/blip_api_test.go index 1a53cffd45..23be09ab66 100644 --- a/rest/blip_api_test.go +++ b/rest/blip_api_test.go @@ -1425,3 +1425,52 @@ func TestMultipleOustandingChangesSubscriptions(t *testing.T) { assert.True(t, errorCode == "") } + +// Reproduce issue SG #3738 +// +// - Add 5 docs to channel ABC +// - Purge one doc via _purge REST API +// - Flush rev cache +// - Send subChanges request +// - Reply to all changes saying all docs are wanted +// - Wait to receive rev messages for all 5 docs +// - Expected: receive all 5 docs (4 revs and 1 norev) +// - Actual: only recieve 4 docs (4 revs) +func TestMissingNoRev(t *testing.T) { + + rt := RestTester{} + btSpec := BlipTesterSpec{ + restTester: &rt, + } + bt, err := NewBlipTesterFromSpec(btSpec) + assertNoError(t, err, "Unexpected error creating BlipTester") + defer bt.Close() + + // Create 5 docs + for i := 0; i < 5; i++ { + docId := fmt.Sprintf("doc-%d", i) + docRev := fmt.Sprintf("1-abc%d", i) + sent, _, resp, err := bt.SendRev(docId, docRev, []byte(`{"key": "val", "channels": ["ABC"]}`), blip.Properties{}) + assert.True(t, sent) + log.Printf("resp: %v, err: %v", resp, err) + } + + // Get a reference to the database + targetDbContext, err := rt.ServerContext().GetDatabase("db") + assertNoError(t, err, "failed") + targetDb, err := db.GetDatabase(targetDbContext, nil) + assertNoError(t, err, "failed") + + // Purge one doc + doc0Id := fmt.Sprintf("doc-%d", 0) + err = targetDb.Purge(doc0Id) + assertNoError(t, err, "failed") + + // Flush rev cache + targetDb.FlushRevisionCache() + + // Pull docs, expect to pull 4 since one was purged. (also expect to NOT get stuck) + docs := bt.WaitForNumDocsViaChanges(4) + assert.True(t, len(docs) == 4) + +} diff --git a/rest/blip_sync.go b/rest/blip_sync.go index a6b9bbc15f..56dd7823c0 100644 --- a/rest/blip_sync.go +++ b/rest/blip_sync.go @@ -451,7 +451,7 @@ func (bh *blipHandler) handleChangesResponse(sender *blip.Sender, response *blip return } } - bh.sendRevision(sender, seq, docID, revID, knownRevs, maxHistory) + bh.sendRevOrNorev(sender, seq, docID, revID, knownRevs, maxHistory) } } } @@ -540,15 +540,45 @@ func (bh *blipHandler) handleProposedChanges(rq *blip.Message) error { //////// DOCUMENTS: -// Pushes a revision body to the client -func (bh *blipHandler) sendRevision(sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) { - bh.Logf(base.LevelDebug, base.KeySync, "Sending rev %q %s based on %d known. User:%s", base.UD(docID), revID, len(knownRevs), base.UD(bh.effectiveUsername)) +func (bh *blipHandler) sendRevOrNorev(sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) { + body, err := bh.db.GetRev(docID, revID, true, nil) if err != nil { - base.Warnf(base.KeyAll, "[%s] blipHandler can't get doc %q/%s: %v", bh.blipContext.ID, base.UD(docID), revID, err) - return + bh.sendNoRev(err, sender, seq, docID, revID) + } else { + bh.sendRevision(body, sender, seq, docID, revID, knownRevs, maxHistory) + } +} + +func (bh *blipHandler) sendNoRev(err error, sender *blip.Sender, seq db.SequenceID, docID string, revID string) { + + bh.Logf(base.LevelDebug, base.KeySync, "Sending norev %q %s due to error: %v. User:%s", base.UD(docID), revID, err, base.UD(bh.effectiveUsername)) + + outrq := blip.NewRequest() + outrq.SetProfile("norev") + outrq.Properties["id"] = docID + outrq.Properties["rev"] = revID + seqJSON, marshalErr := json.Marshal(seq) + if marshalErr == nil { + outrq.Properties["sequence"] = string(seqJSON) } + status, reason := base.ErrorAsHTTPStatus(err) + outrq.Properties["error"] = fmt.Sprintf("%s", status) + + // Add a "reason" field that gives more detailed explanation on the cause of the error. + outrq.Properties["reason"] = fmt.Sprintf("%s", reason) + + outrq.SetNoReply(true) + sender.Send(outrq) + +} + +// Pushes a revision body to the client +func (bh *blipHandler) sendRevision(body db.Body, sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) { + + bh.Logf(base.LevelDebug, base.KeySync, "Sending rev %q %s based on %d known. User:%s", base.UD(docID), revID, len(knownRevs), base.UD(bh.effectiveUsername)) + // Get the revision's history as a descending array of ancestor revIDs: history := db.ParseRevisions(body)[1:] delete(body, "_revisions") diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index 08a9417ff4..cabe52e710 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -1190,6 +1190,14 @@ func (bt *BlipTester) PullDocs() (docs map[string]RestDocument) { } + // -------- Norev handler callback -------- + bt.blipContext.HandlerForProfile["norev"] = func(request *blip.Message) { + // If a norev is received, then don't bother waiting for one of the expected revisions, since it will never come. + // The norev could be added to the returned docs map, but so far there is no need for that. The ability + // to assert on the number of actually received revisions (which norevs won't affect) meets current test requirements. + defer revsFinishedWg.Done() + } + // Send subChanges to subscribe to changes, which will cause the "changes" profile handler above to be called back changesFinishedWg.Add(1) subChangesRequest := blip.NewRequest()