Skip to content

Commit

Permalink
Feature/issue 3738 norev (#3763)
Browse files Browse the repository at this point in the history
* Repro attempt for SG #3738

* /_flush_rev_cache endpoint for easy manual testing

* TODOs on fix

* Clean up test

* First pass at implmenting norev

* Update test helper to deal with norev messages

* Translate error to status code

* Avoid panic if there is a marshal err for the sequence

* Revert "/_flush_rev_cache endpoint for easy manual testing"

This reverts commit 9e54548.

* Update test comment

* PR feedback
  • Loading branch information
tleyden authored and adamcfraser committed Oct 19, 2018
1 parent 502b866 commit 3510112
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 6 deletions.
49 changes: 49 additions & 0 deletions rest/blip_api_test.go
Expand Up @@ -1425,3 +1425,52 @@ func TestMultipleOustandingChangesSubscriptions(t *testing.T) {
assert.True(t, errorCode == "")

}

// Reproduce issue SG #3738
//
// - Add 5 docs to channel ABC
// - Purge one doc via _purge REST API
// - Flush rev cache
// - Send subChanges request
// - Reply to all changes saying all docs are wanted
// - Wait to receive rev messages for all 5 docs
// - Expected: receive all 5 docs (4 revs and 1 norev)
// - Actual: only recieve 4 docs (4 revs)
func TestMissingNoRev(t *testing.T) {

rt := RestTester{}
btSpec := BlipTesterSpec{
restTester: &rt,
}
bt, err := NewBlipTesterFromSpec(btSpec)
assertNoError(t, err, "Unexpected error creating BlipTester")
defer bt.Close()

// Create 5 docs
for i := 0; i < 5; i++ {
docId := fmt.Sprintf("doc-%d", i)
docRev := fmt.Sprintf("1-abc%d", i)
sent, _, resp, err := bt.SendRev(docId, docRev, []byte(`{"key": "val", "channels": ["ABC"]}`), blip.Properties{})
assert.True(t, sent)
log.Printf("resp: %v, err: %v", resp, err)
}

// Get a reference to the database
targetDbContext, err := rt.ServerContext().GetDatabase("db")
assertNoError(t, err, "failed")
targetDb, err := db.GetDatabase(targetDbContext, nil)
assertNoError(t, err, "failed")

// Purge one doc
doc0Id := fmt.Sprintf("doc-%d", 0)
err = targetDb.Purge(doc0Id)
assertNoError(t, err, "failed")

// Flush rev cache
targetDb.FlushRevisionCache()

// Pull docs, expect to pull 4 since one was purged. (also expect to NOT get stuck)
docs := bt.WaitForNumDocsViaChanges(4)
assert.True(t, len(docs) == 4)

}
42 changes: 36 additions & 6 deletions rest/blip_sync.go
Expand Up @@ -451,7 +451,7 @@ func (bh *blipHandler) handleChangesResponse(sender *blip.Sender, response *blip
return
}
}
bh.sendRevision(sender, seq, docID, revID, knownRevs, maxHistory)
bh.sendRevOrNorev(sender, seq, docID, revID, knownRevs, maxHistory)
}
}
}
Expand Down Expand Up @@ -540,15 +540,45 @@ func (bh *blipHandler) handleProposedChanges(rq *blip.Message) error {

//////// DOCUMENTS:

// Pushes a revision body to the client
func (bh *blipHandler) sendRevision(sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) {
bh.Logf(base.LevelDebug, base.KeySync, "Sending rev %q %s based on %d known. User:%s", base.UD(docID), revID, len(knownRevs), base.UD(bh.effectiveUsername))
func (bh *blipHandler) sendRevOrNorev(sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) {

body, err := bh.db.GetRev(docID, revID, true, nil)
if err != nil {
base.Warnf(base.KeyAll, "[%s] blipHandler can't get doc %q/%s: %v", bh.blipContext.ID, base.UD(docID), revID, err)
return
bh.sendNoRev(err, sender, seq, docID, revID)
} else {
bh.sendRevision(body, sender, seq, docID, revID, knownRevs, maxHistory)
}
}

func (bh *blipHandler) sendNoRev(err error, sender *blip.Sender, seq db.SequenceID, docID string, revID string) {

bh.Logf(base.LevelDebug, base.KeySync, "Sending norev %q %s due to error: %v. User:%s", base.UD(docID), revID, err, base.UD(bh.effectiveUsername))

outrq := blip.NewRequest()
outrq.SetProfile("norev")
outrq.Properties["id"] = docID
outrq.Properties["rev"] = revID
seqJSON, marshalErr := json.Marshal(seq)
if marshalErr == nil {
outrq.Properties["sequence"] = string(seqJSON)
}

status, reason := base.ErrorAsHTTPStatus(err)
outrq.Properties["error"] = fmt.Sprintf("%s", status)

// Add a "reason" field that gives more detailed explanation on the cause of the error.
outrq.Properties["reason"] = fmt.Sprintf("%s", reason)

outrq.SetNoReply(true)
sender.Send(outrq)

}

// Pushes a revision body to the client
func (bh *blipHandler) sendRevision(body db.Body, sender *blip.Sender, seq db.SequenceID, docID string, revID string, knownRevs map[string]bool, maxHistory int) {

bh.Logf(base.LevelDebug, base.KeySync, "Sending rev %q %s based on %d known. User:%s", base.UD(docID), revID, len(knownRevs), base.UD(bh.effectiveUsername))

// Get the revision's history as a descending array of ancestor revIDs:
history := db.ParseRevisions(body)[1:]
delete(body, "_revisions")
Expand Down
8 changes: 8 additions & 0 deletions rest/utilities_testing.go
Expand Up @@ -1190,6 +1190,14 @@ func (bt *BlipTester) PullDocs() (docs map[string]RestDocument) {

}

// -------- Norev handler callback --------
bt.blipContext.HandlerForProfile["norev"] = func(request *blip.Message) {
// If a norev is received, then don't bother waiting for one of the expected revisions, since it will never come.
// The norev could be added to the returned docs map, but so far there is no need for that. The ability
// to assert on the number of actually received revisions (which norevs won't affect) meets current test requirements.
defer revsFinishedWg.Done()
}

// Send subChanges to subscribe to changes, which will cause the "changes" profile handler above to be called back
changesFinishedWg.Add(1)
subChangesRequest := blip.NewRequest()
Expand Down

0 comments on commit 3510112

Please sign in to comment.