Skip to content

Commit

Permalink
SERVER-5685 add waitForMore() helper to bgsync
Browse files Browse the repository at this point in the history
  • Loading branch information
milkie committed Jun 13, 2012
1 parent 39031a8 commit 89fc01b
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 50 deletions.
30 changes: 17 additions & 13 deletions src/mongo/db/repl/bgsync.cpp
Expand Up @@ -85,7 +85,7 @@ namespace replset {
try {
{
boost::unique_lock<boost::mutex> lock(_lastOpMutex);
if (_consumedOpTime == theReplSet->lastOpTimeWritten) {
while (_consumedOpTime == theReplSet->lastOpTimeWritten) {
_lastOpCond.wait(lock);
}
}
Expand Down Expand Up @@ -198,8 +198,6 @@ namespace replset {
sethbmsg(str::stream() << "exception in producer: " << e2.what());
sleepsecs(60);
}

sleepsecs(1);
}

cc().shutdown();
Expand Down Expand Up @@ -249,6 +247,7 @@ namespace replset {
boost::unique_lock<boost::mutex> lock(_mutex);

if (_currentSyncTarget == NULL) {
lock.unlock();
sleepsecs(1);
// if there is no one to sync from
return;
Expand Down Expand Up @@ -294,10 +293,9 @@ namespace replset {
if (!r.more())
break;

BSONObj o = r.nextSafe();

BSONObj o = r.nextSafe().getOwned();
// the blocking queue will wait (forever) until there's room for us to push
_buffer.push(o.getOwned());
_buffer.push(o);

{
boost::unique_lock<boost::mutex> lock(_mutex);
Expand All @@ -323,7 +321,7 @@ namespace replset {
}
}

BSONObj* BackgroundSync::peek() {
bool BackgroundSync::peek(BSONObj* op) {
{
boost::unique_lock<boost::mutex> lock(_mutex);

Expand All @@ -332,16 +330,22 @@ namespace replset {
_oplogMarkerTarget = NULL;
}
}
// block for up to 1 second, waiting for an op
// to appear off the network
waitForMore();
return _buffer.peek(*op);
}

if (!_buffer.blockingPeek(_currentOp, 1)) {
return NULL;
}

return &_currentOp;
void BackgroundSync::waitForMore() {
BSONObj op;
// Block for one second before timing out.
// Ignore the value of the op we peeked at.
_buffer.blockingPeek(op, 1);
}

void BackgroundSync::consume() {
// this is just to get the op off the queue, it's been peeked at and applied already
// this is just to get the op off the queue, it's been peeked at
// and queued for application already
_buffer.blockingPop();
}

Expand Down
28 changes: 16 additions & 12 deletions src/mongo/db/repl/bgsync.h
Expand Up @@ -26,11 +26,23 @@
namespace mongo {
namespace replset {


// This interface exists to facilitate easier testing;
// the test infrastructure implements these functions with stubs.
class BackgroundSyncInterface {
public:
virtual ~BackgroundSyncInterface();
virtual BSONObj* peek() = 0;

// Gets the head of the buffer, but does not remove it.
// Returns true if an element was present at the head;
// false if the queue was empty.
virtual bool peek(BSONObj* op) = 0;

// Deletes objects in the queue;
// called by sync thread after it has applied an op
virtual void consume() = 0;

// Returns the member we're currently syncing from (or NULL)
virtual Member* getSyncTarget() = 0;
};

Expand All @@ -55,7 +67,6 @@ namespace replset {
// Production thread
BlockingQueue<BSONObj> _buffer;

BSONObj _currentOp;
OpTime _lastOpTimeFetched;
long long _lastH;
// if produce thread should be running
Expand Down Expand Up @@ -91,7 +102,8 @@ namespace replset {
void stop();
// restart syncing
void start();

// wait up to 1 second for more ops to appear
void waitForMore();

// Tracker thread
// tells the sync target where this member is synced to
Expand All @@ -109,17 +121,9 @@ namespace replset {
// starts the sync target notifying thread
void notifierThread();


// Interface implementation

// Gets the head of the buffer, but does not remove it. Returns a pointer to the list
// element.
virtual BSONObj* peek();

// called by sync thread when it has applied an op
virtual bool peek(BSONObj* op);
virtual void consume();

// return the member we're currently syncing from (or NULL)
virtual Member* getSyncTarget();
};

Expand Down
34 changes: 15 additions & 19 deletions src/mongo/db/repl/rs_sync.cpp
Expand Up @@ -32,8 +32,8 @@ namespace mongo {

replset::SyncTail::~SyncTail() {}

BSONObj* replset::SyncTail::peek() {
return _queue->peek();
bool replset::SyncTail::peek(BSONObj* obj) {
return _queue->peek(obj);
}

void replset::SyncTail::consume() {
Expand Down Expand Up @@ -96,9 +96,10 @@ namespace mongo {
int fails = 0;
while( ts < minValid ) {
try {
BSONObj* o = peek();
BSONObj obj;
bool peekStatus = peek(&obj);

if (!o) {
if (!peekStatus) {
OCCASIONALLY log() << "replSet initial sync oplog: no more records" << endl;
if (fails++ > 30) {
log() << "replSet initial sync couldn't get records for 30 seconds, giving up" << endl;
Expand All @@ -111,8 +112,8 @@ namespace mongo {
}
fails = 0;

ts = (*o)["ts"]._opTime();
applyOp(*o);
ts = obj["ts"]._opTime();
applyOp(obj);
consume();

if ( ++n % 1000 == 0 ) {
Expand Down Expand Up @@ -273,9 +274,10 @@ namespace mongo {
}

{
const BSONObj *next = peek();
BSONObj nextobj;
bool peekStatus(peek(&nextobj));

if (next == NULL) {
if (!peekStatus) {
bool golive = false;

if (!theReplSet->isSecondary()) {
Expand All @@ -292,14 +294,12 @@ namespace mongo {
break;
}

const BSONObj& o = *next;

int sd = theReplSet->myConfig().slaveDelay;

// ignore slaveDelay if the box is still initializing. once
// it becomes secondary we can worry about it.
if( sd && theReplSet->isSecondary() ) {
const OpTime ts = o["ts"]._opTime();
const OpTime ts = nextobj["ts"]._opTime();
long long a = ts.getSecs();
long long b = time(0);
long long lag = b - a;
Expand All @@ -326,7 +326,7 @@ namespace mongo {
}
} // endif slaveDelay

const char *ns = o.getStringField("ns");
const char *ns = nextobj.getStringField("ns");
if( ns ) {
if ( strlen(ns) == 0 ) {
// this is ugly
Expand Down Expand Up @@ -360,19 +360,15 @@ namespace mongo {
return;
}

syncApply(o);
_logOpObjRS(o); // with repl sets we write the ops to our oplog too
syncApply(nextobj);
_logOpObjRS(nextobj); // with repl sets we write the ops to our oplog too
getDur().commitIfNeeded();

// we don't want the catch to reference next after it's been freed
next = NULL;
consume();
}
catch (DBException& e) {
sethbmsg(str::stream() << "syncTail: " << e.toString());
if (next) {
log() << "syncing: " << next->toString() << endl;
}
log() << "syncing: " << nextobj.toString() << endl;
lk.reset();
sleepsecs(30);
return;
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/repl/rs_sync.h
Expand Up @@ -34,7 +34,7 @@ namespace replset {
SyncTail(BackgroundSyncInterface *q);
virtual bool syncApply(const BSONObj &o);
void oplogApplication();
BSONObj* peek();
bool peek(BSONObj* obj);
void consume();
};

Expand Down
13 changes: 8 additions & 5 deletions src/mongo/dbtests/replsettests.cpp
Expand Up @@ -324,11 +324,12 @@ namespace ReplSetTests {
public:
BackgroundSyncTest() {}
virtual ~BackgroundSyncTest() {}
virtual BSONObj* peek() {
virtual bool peek(BSONObj* op) {
if (_queue.empty()) {
return NULL;
return false;
}
return &_queue.front();
*op = _queue.front();
return true;
}
virtual void consume() {
_queue.pop();
Expand Down Expand Up @@ -362,7 +363,8 @@ namespace ReplSetTests {
return true;
}
virtual bool isPrimary() {
return _syncTail->peek() == 0;
BSONObj obj;
return _syncTail->peek(&obj) == false;
}
virtual bool tryToGoLiveAsASecondary(OpTime& minvalid) {
return false;
Expand Down Expand Up @@ -488,7 +490,8 @@ namespace ReplSetTests {
applyOplog();

ASSERT_EQUALS(1, static_cast<int>(client()->count(ns())));
ASSERT(_bgsync->peek() != NULL);
BSONObj obj2;
ASSERT(_bgsync->peek(&obj2));
}
};

Expand Down

0 comments on commit 89fc01b

Please sign in to comment.