Skip to content
Permalink
Browse files

Fixed C++ exception pushing huge numbers of revs

Pusher::revHistoryString() would sometimes write every revision to
the history, ignoring the maxRevTreeDepth limit. If there were
thousands of revs in the history, this could overflow the BLIP
message's properties and cause an exception on the receiving end.

I fixed this. (Also moved the method to RevToSend, since it makes
more sense there.)

On the receiving side, I changed RevTree::insertHistory so it won't
complain about truncated histories, as long as the truncated part is
revs that would be pruned anyway.

Unit tests added for both loopback and SG replications.

Fixes CBL-106
  • Loading branch information...
snej committed Jun 24, 2019
1 parent 0960144 commit 71d4a674d9488dcab50766b26575c02836b4547a
@@ -387,16 +387,22 @@ string C4Test::createNewRev(C4Database *db, C4Slice docID, C4Slice body, C4Revis
auto curDoc = c4doc_get(db, docID, false, &error);
// REQUIRE(curDoc != nullptr); // can't use Catch on bg threads
Assert(curDoc != nullptr);
string revID = createNewRev(db, docID, curDoc->revID, body, flags);
c4doc_free(curDoc);
return revID;
}

C4Slice history[2] = {curDoc->revID};
string C4Test::createNewRev(C4Database *db, C4Slice docID, C4Slice curRevID, C4Slice body, C4RevisionFlags flags) {
C4Slice history[2] = {curRevID};

C4DocPutRequest rq = {};
rq.docID = docID;
rq.history = history;
rq.historyCount = (curDoc->revID.buf != nullptr);
rq.historyCount = (curRevID.buf != nullptr);
rq.body = body;
rq.revFlags = flags;
rq.save = true;
C4Error error;
auto doc = c4doc_put(db, &rq, nullptr, &error);
if (!doc) {
char buf[256];
@@ -406,7 +412,6 @@ string C4Test::createNewRev(C4Database *db, C4Slice docID, C4Slice body, C4Revis
Assert(doc != nullptr);
string revID((char*)doc->revID.buf, doc->revID.size);
c4doc_free(doc);
c4doc_free(curDoc);
return revID;
}

@@ -177,7 +177,10 @@ public:
void createRev(C4Slice docID, C4Slice revID, C4Slice body, C4RevisionFlags flags =0);
static void createRev(C4Database *db, C4Slice docID, C4Slice revID, C4Slice body, C4RevisionFlags flags =0);
static std::string createFleeceRev(C4Database *db, C4Slice docID, C4Slice revID, C4Slice jsonBody, C4RevisionFlags flags =0);
static std::string createNewRev(C4Database *db, C4Slice docID, C4Slice body, C4RevisionFlags flags =0);
static std::string createNewRev(C4Database *db, C4Slice docID, C4Slice curRevID,
C4Slice body, C4RevisionFlags flags =0);
static std::string createNewRev(C4Database *db, C4Slice docID,
C4Slice body, C4RevisionFlags flags =0);

static void createConflictingRev(C4Database *db,
C4Slice docID,
@@ -75,6 +75,7 @@ namespace c4Internal {

void init() {
_versionedDoc.owner = this;
_versionedDoc.setPruneDepth(_db->maxRevTreeDepth());
flags = (C4DocumentFlags)_versionedDoc.flags();
if (_versionedDoc.exists())
flags = (C4DocumentFlags)(flags | kDocExists);
@@ -232,11 +233,12 @@ namespace c4Internal {
return new Doc(_versionedDoc.scopeFor(body), body, Doc::kTrusted);
}

bool save(unsigned maxRevTreeDepth) override {
bool save(unsigned maxRevTreeDepth =0) override {
requireValidDocID();
if (maxRevTreeDepth == 0)
maxRevTreeDepth = _db->maxRevTreeDepth();
_versionedDoc.prune(maxRevTreeDepth);
if (maxRevTreeDepth > 0)
_versionedDoc.prune(maxRevTreeDepth);
else
_versionedDoc.prune();
switch (_versionedDoc.save(_db->transaction())) {
case litecore::VersionedDocument::kConflict:
return false;
@@ -353,6 +355,9 @@ namespace c4Internal {
if (!body)
return -1;

if (rq.maxRevTreeDepth > 0)
_versionedDoc.setPruneDepth(rq.maxRevTreeDepth);

auto priorCurrentRev = _versionedDoc.currentRevision();
int httpStatus;
commonAncestor = _versionedDoc.insertHistory(revIDBuffers,
@@ -415,6 +420,9 @@ namespace c4Internal {
error::_throw(error::InvalidParameter, "remoteDBID cannot be used when existing=false");
bool deletion = (rq.revFlags & kRevDeleted) != 0;

if (rq.maxRevTreeDepth > 0)
_versionedDoc.setPruneDepth(rq.maxRevTreeDepth);

C4Error err;
alloc_slice body = requestBody(rq, &err);
if (!body)
@@ -449,7 +457,7 @@ namespace c4Internal {
bool saveNewRev(const C4DocPutRequest &rq, const Rev *newRev NONNULL, bool reallySave =true) {
selectRevision(newRev);
if (rq.save && reallySave) {
if (!save(rq.maxRevTreeDepth))
if (!save())
return false;
if (_db->dataFile()->willLog(LogLevel::Verbose)) {
_db->dataFile()->_logVerbose( "%-s '%.*s' rev #%s as seq %" PRIu64,
@@ -357,8 +357,17 @@ namespace litecore {
for (i = 0; i < historyCount; i++) {
unsigned gen = history[i].generation();
if (lastGen > 0 && gen != lastGen - 1) {
httpStatus = 400;
return -1; // generation numbers not in sequence
// Generation numbers not in sequence:
if (gen < lastGen && i >= _pruneDepth - 1) {
// As a special case, allow this gap in the history as long as it's at a depth
// that's going to be pruned away anyway. This allows very long histories to
// be represented in short form by skipping revs in the middle.
;
} else {
// Otherwise this is an error.
httpStatus = 400;
return -1;
}
}
lastGen = gen;

@@ -140,7 +140,9 @@ namespace litecore {
// Sets/clears the kIsConflict flag for a Rev and its ancestors.
void markBranchAsConflict(const Rev*, bool);

void setPruneDepth(unsigned depth) {_pruneDepth = depth;}
unsigned prune(unsigned maxDepth);
unsigned prune() {return prune(_pruneDepth);}

void keepBody(const Rev* NONNULL);
void removeBody(const Rev* NONNULL);
@@ -197,6 +199,7 @@ namespace litecore {
std::deque<Rev> _revsStorage; // Actual storage of the Rev objects
std::vector<alloc_slice> _insertedData; // Storage for new revids
RemoteRevMap _remoteRevs; // Tracks current rev for a remote DB URL
unsigned _pruneDepth {UINT_MAX};// Tree depth to prune to
};

}
@@ -261,7 +261,7 @@ namespace litecore { namespace repl {
auto revisionFlags = doc->selectedRev.flags;
if (revisionFlags & kRevDeleted)
msg["deleted"_sl] = "1"_sl;
string history = revHistoryString(doc, *request);
string history = request->historyString(doc);
if (!history.empty())
msg["history"_sl] = history;

@@ -326,33 +326,6 @@ namespace litecore { namespace repl {
}


string Pusher::revHistoryString(C4Document *doc, const RevToSend &request) {
Assert(c4doc_selectRevision(doc, request.revID, true, nullptr));
stringstream historyStream;
int nWritten = 0;
unsigned lastGen = c4rev_getGeneration(doc->selectedRev.revID);
for (int n = 0; n < request.maxHistory; ++n) {
if (!c4doc_selectParentRevision(doc))
break;
slice revID = doc->selectedRev.revID;
unsigned gen = c4rev_getGeneration(revID);
while (gen < --lastGen) {
char fakeID[50];
sprintf(fakeID, "%u-faded000%.08x%.08x", lastGen, RandomNumber(), RandomNumber());
if (nWritten++ > 0)
historyStream << ',';
historyStream << fakeID;
}
if (nWritten++ > 0)
historyStream << ',';
historyStream << revID.asString();
if (request.hasRemoteAncestor(revID))
break;
}
return historyStream.str();
}


alloc_slice Pusher::createRevisionDelta(C4Document *doc, RevToSend *request,
Dict root, size_t revisionSize,
bool sendLegacyAttachments)
@@ -86,7 +86,6 @@ namespace litecore { namespace repl {
fleece::Dict root, size_t revSize,
bool sendLegacyAttachments);
fleece::slice getRevToSend(C4Document*, const RevToSend&, C4Error *outError);
static std::string revHistoryString(C4Document*, const RevToSend&);

static constexpr unsigned kDefaultChangeBatchSize = 200; // # of changes to send in one msg
static const unsigned kDefaultMaxHistory = 20; // If "changes" response doesn't have one
@@ -18,7 +18,10 @@

#include "ReplicatorTypes.hh"
#include "IncomingRev.hh"
#include "SecureRandomize.hh"
#include "StringUtil.hh"
#include "make_unique.h"
#include <sstream>

using namespace std;

@@ -28,6 +31,9 @@ namespace litecore { namespace repl { namespace tuning {

namespace litecore { namespace repl {

#pragma mark - REVTOSEND:


RevToSend::RevToSend(const C4DocumentInfo &info)
:ReplicatedRev(info.docID, info.revID, info.sequence)
,bodySize(info.bodySize)
@@ -64,6 +70,62 @@ namespace litecore { namespace repl {
}


string RevToSend::historyString(C4Document *doc) {
int nWritten = 0;
stringstream historyStream;
stringstream::pos_type lastPos = 0;

auto append = [&](slice revID) {
lastPos = historyStream.tellp();
if (nWritten++ > 0)
historyStream << ',';
historyStream.write((const char*)revID.buf, revID.size);
};

auto removeLast = [&]() {
string buf = historyStream.str();
buf.resize(lastPos);
historyStream.str(buf);
historyStream.seekp(lastPos);
--nWritten;
};

// Go back through history, starting with the desired rev's parent, until we either reach
// a rev known to the peer or we run out of history. Do not write more than `maxHistory`
// revisions, but always write the rev known to the peer if there is one.
// There may be gaps in the history (non-consecutive generations) if revs have been pruned.
// If sending these, make up random revIDs for them since they don't matter.
Assert(c4doc_selectRevision(doc, revID, true, nullptr));
unsigned lastGen = c4rev_getGeneration(doc->selectedRev.revID);
while (c4doc_selectParentRevision(doc)) {
slice revID = doc->selectedRev.revID;
unsigned gen = c4rev_getGeneration(revID);
while (gen < --lastGen && nWritten < maxHistory) {
// We don't have this revision (the history got deeper than the local db's
// maxRevTreeDepth), so make up a random revID. The server probably won't care.
append(slice(format("%u-faded000%.08x%.08x",
lastGen, RandomNumber(), RandomNumber())));
}
lastGen = gen;

if (hasRemoteAncestor(revID)) {
// Always write the common ancestor, making room if necessary:
if (nWritten == maxHistory)
removeLast();
append(revID);
break;
} else {
if (nWritten < maxHistory)
append(revID);
}
}
return historyStream.str();
}


#pragma mark - REVTOINSERT:


RevToInsert::~RevToInsert()
{ }

@@ -74,6 +74,8 @@ namespace litecore { namespace repl {

Dir dir() const override {return Dir::kPushing;}
void trim() override;

std::string historyString(C4Document*);

protected:
~RevToSend() =default;
@@ -150,7 +150,7 @@ public:
auto doc = docs[i];
if (doc->error.code) {
c4error_getDescriptionC(doc->error, message, sizeof(message));
C4Log(">> Replicator %serror %s '%.*s': %s",
C4Warn(">> Replicator %serror %s '%.*s': %s",
(doc->errorIsTransient ? "transient " : ""),
(pushing ? "pushing" : "pulling"),
SPLAT(doc->docID), message);
@@ -145,6 +145,28 @@ TEST_CASE_METHOD(ReplicatorLoopbackTest, "Incremental Push", "[Push]") {
}


TEST_CASE_METHOD(ReplicatorLoopbackTest, "Push 5000 Changes", "[Push]") {
string revID;
{
TransactionHelper t(db);
revID = createNewRev(db, "Doc"_sl, nullslice, kFleeceBody);
}
_expectedDocumentCount = 1;
runPushReplication();

Log("-------- Mutations --------");
{
TransactionHelper t(db);
for (int i = 2; i <= 5000; ++i)
revID = createNewRev(db, "Doc"_sl, slice(revID), kFleeceBody);
}

Log("-------- Second Replication --------");
runPushReplication();
compareDatabases();
}


TEST_CASE_METHOD(ReplicatorLoopbackTest, "Pull Resetting Checkpoint", "[Pull]") {
createRev("eenie"_sl, kRevID, kFleeceBody);
createRev("meenie"_sl, kRevID, kFleeceBody);
@@ -106,6 +106,26 @@ TEST_CASE_METHOD(ReplicatorAPITest, "API Push Large-Docs DB", "[.SyncServer]") {
#endif


TEST_CASE_METHOD(ReplicatorAPITest, "API Push 5000 Changes", "[Push]") {
string revID;
{
TransactionHelper t(db);
revID = createNewRev(db, "Doc"_sl, nullslice, kFleeceBody);
}
replicate(kC4OneShot, kC4Disabled);

C4Log("-------- Mutations --------");
{
TransactionHelper t(db);
for (int i = 2; i <= 5000; ++i)
revID = createNewRev(db, "Doc"_sl, slice(revID), kFleeceBody);
}

C4Log("-------- Second Replication --------");
replicate(kC4OneShot, kC4Disabled);
}


TEST_CASE_METHOD(ReplicatorAPITest, "API Pull", "[.SyncServer]") {
_remoteDBName = kITunesDBName;
replicate(kC4Disabled, kC4OneShot);

0 comments on commit 71d4a67

Please sign in to comment.
You can’t perform that action at this time.