Skip to content

Commit

Permalink
Replicator shouldn't go to Busy on disconnect if it never connected (#…
Browse files Browse the repository at this point in the history
…799)

On a connection failure, the replicator will sometimes go from
Connecting to Busy state before going to Stopped. This confuses the
platform code that handles retries of failed connections, because it
looks like the connection initially succeeded.

I changed Replicator::computeActivityLevel() so that, when the
connection is closed but the replicator has more work do do, it will
report the status Connecting instead of Busy if its previous status
was Connecting.

I updated the "API Connection Failure" test to assert that the
replicator status never goes to busy -- this new assertion then
failed until I implemented the above fix.

Also added some test assertions to make sure that Stopped is always
the final state.

Fixes CBL-45
  • Loading branch information
snej authored and borrrden committed Jun 18, 2019
1 parent 22220fa commit 8801641
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 11 deletions.
9 changes: 6 additions & 3 deletions Replicator/Replicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ namespace litecore { namespace repl {

Worker::ActivityLevel Replicator::computeActivityLevel() const {
// Once I've announced I've stopped, don't return any other status again:
if (status().level == kC4Stopped)
auto currentLevel = status().level;
if (currentLevel == kC4Stopped)
return kC4Stopped;

ActivityLevel level;
Expand Down Expand Up @@ -221,11 +222,13 @@ namespace litecore { namespace repl {
break;
case Connection::kDisconnected:
case Connection::kClosed:
// After connection closes, remain busy while I wait for db to finish writes
// and for myself to process any pending messages:
// After connection closes, remain Busy (or Connecting) while I wait for db to
// finish writes and for myself to process any pending messages; then go to Stopped.
level = Worker::computeActivityLevel();
if (level < kC4Busy)
level = kC4Stopped;
else if (currentLevel == kC4Connecting)
level = kC4Connecting;
break;
}
if (SyncBusyLog.effectiveLevel() <= LogLevel::Info) {
Expand Down
4 changes: 4 additions & 0 deletions Replicator/tests/ReplicatorAPITest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ TEST_CASE_METHOD(ReplicatorAPITest, "API Connection Failure", "[Push]") {
CHECK(_callbackStatus.error.code == ECONNREFUSED);
CHECK(_callbackStatus.progress.unitsCompleted == 0);
CHECK(_callbackStatus.progress.unitsTotal == 0);
CHECK(_numCallbacksWithLevel[kC4Busy] == 0);
CHECK(_numCallbacksWithLevel[kC4Idle] == 0);
}


Expand All @@ -166,6 +168,8 @@ TEST_CASE_METHOD(ReplicatorAPITest, "API DNS Lookup Failure", "[Push]") {
CHECK(_callbackStatus.error.code == kC4NetErrUnknownHost);
CHECK(_callbackStatus.progress.unitsCompleted == 0);
CHECK(_callbackStatus.progress.unitsTotal == 0);
CHECK(_numCallbacksWithLevel[kC4Busy] == 0);
CHECK(_numCallbacksWithLevel[kC4Idle] == 0);
}


Expand Down
7 changes: 5 additions & 2 deletions Replicator/tests/ReplicatorAPITest.hh
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,13 @@ public:
lock_guard<mutex> lock(_mutex);

Assert(r == _repl); // can't call REQUIRE on a background thread
logState(s);
_callbackStatus = s;
++_numCallbacks;
Assert(_numCallbacksWithLevel[(int)kC4Stopped] == 0); // Stopped must be the final state
_numCallbacksWithLevel[(int)s.level]++;
logState(_callbackStatus);
if (s.level == kC4Busy)
Assert(s.error.code == 0); // Busy state shouldn't have error

if (!_headers) {
_headers = AllocedDict(alloc_slice(c4repl_getResponseHeaders(_repl)));
Expand Down Expand Up @@ -213,7 +216,7 @@ public:
if (!db2)
CHECK(_headers);
}
CHECK(_numCallbacksWithLevel[kC4Stopped] > 0);
CHECK(_numCallbacksWithLevel[kC4Stopped] == 1);
CHECK(_callbackStatus.level == status.level);
CHECK(_callbackStatus.error.domain == status.error.domain);
CHECK(_callbackStatus.error.code == status.error.code);
Expand Down
15 changes: 9 additions & 6 deletions Replicator/tests/ReplicatorLoopbackTest.hh
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ public:
const Replicator::Status &status) override
{
// Note: Can't use Catch (CHECK, REQUIRE) on a background thread
unique_lock<mutex> lock(_mutex);

if (repl == _replClient) {
Assert(_gotResponse);
++_statusChangedCalls;
Expand All @@ -187,18 +189,15 @@ public:
}

{
lock_guard<mutex> lock(_mutex);
_statusReceived = status;
_checkStopWhenIdle();
}
}

bool &finished = (repl == _replClient) ? _replicatorClientFinished : _replicatorServerFinished;
Assert(!finished);
if (status.level == kC4Stopped) {
unique_lock<mutex> lock(_mutex);
if (repl == _replClient)
_replicatorClientFinished = true;
else
_replicatorServerFinished = true;
finished = true;
if (_replicatorClientFinished && _replicatorServerFinished)
_cond.notify_all();
}
Expand All @@ -209,6 +208,8 @@ public:
{
if (repl == _replClient) {
// Note: Can't use Catch (CHECK, REQUIRE) on a background thread
unique_lock<mutex> lock(_mutex);
Assert(!_replicatorClientFinished);
for (auto &rev : revs) {
auto dir = rev->dir();
if (rev->error.code) {
Expand Down Expand Up @@ -241,6 +242,7 @@ public:
virtual void replicatorBlobProgress(Replicator *repl,
const Replicator::BlobProgress &p) override
{
unique_lock<mutex> lock(_mutex);
if (p.dir == Dir::kPushing) {
++_blobPushProgressCallbacks;
_lastBlobPushProgress = p;
Expand All @@ -257,6 +259,7 @@ public:

virtual void replicatorConnectionClosed(Replicator* repl, const CloseStatus &status) override {
// Note: Can't use Catch (CHECK, REQUIRE) on a background thread
unique_lock<mutex> lock(_mutex);
if (repl == _replClient) {
Log(">> Replicator closed with code=%d/%d, message=%.*s",
status.reason, status.code, SPLAT(status.message));
Expand Down

0 comments on commit 8801641

Please sign in to comment.