Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Synchronize scheduling backfill tasks.

As backfill can be additionally requested by TAP consumers because
of replication chain support in 1.7, synchronization of scheduling
backfill tasks is necessary.

Change-Id: Iedd71f7aa5fb81d72770404cc9adf5cd16e4fd22
Reviewed-on: http://review.couchbase.org/6804
Reviewed-by: Trond Norbye <trond.norbye@gmail.com>
Tested-by: Chiyoung Seo <chiyoung.seo@gmail.com>
  • Loading branch information...
commit 2a75ad0fca235c64321b08254da7e6e0fb70b746 1 parent 12bf464
@chiyoung chiyoung authored
Showing with 37 additions and 11 deletions.
  1. +11 −9 ep_engine.cc
  2. +1 −1  ep_engine.h
  3. +19 −1 tapconnection.hh
  4. +6 −0 vbucket.hh
View
20 ep_engine.cc
@@ -1029,10 +1029,10 @@ class BackfillDiskLoad : public DispatcherCallback, public Callback<GetValue> {
class BackFillVisitor : public VBucketVisitor {
public:
BackFillVisitor(EventuallyPersistentEngine *e, TapProducer *tc,
- const void *token):
+ const void *token, const VBucketFilter &backfillVBfilter):
VBucketVisitor(), engine(e), name(tc->getName()),
queue(new std::list<queued_item>),
- found(), filter(tc->backFillVBucketFilter), validityToken(token),
+ found(), filter(backfillVBfilter), validityToken(token),
maxBackfillSize(e->tapBacklogLimit), valid(true),
efficientVBDump(e->epstore->getStorageProperties().hasEfficientVBDump()),
residentRatioBelowThreshold(false) {
@@ -1216,8 +1216,9 @@ class BackFillThreadData {
public:
BackFillThreadData(EventuallyPersistentEngine *e, TapProducer *tc,
- EventuallyPersistentStore *s, const void *tok):
- bfv(e, tc, tok), engine(e), epstore(s) {
+ EventuallyPersistentStore *s, const void *tok,
+ const VBucketFilter &backfillVBFilter):
+ bfv(e, tc, tok, backfillVBFilter), engine(e), epstore(s) {
}
~BackFillThreadData() {
@@ -1987,7 +1988,8 @@ inline tap_event_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie
}
// Do not schedule the backfill for the registered TAP client (e.g., incremental backup client)
- if (connection->doRunBackfill && !(connection->registeredTAPClient)) {
+ VBucketFilter backFillVBFilter;
+ if (connection->runBackfill(backFillVBFilter) && !(connection->registeredTAPClient)) {
LockHolder holder(backfillThreads.sync);
if (backfillThreads.shutdown) {
return TAP_PAUSE;
@@ -2002,7 +2004,7 @@ inline tap_event_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie
return TAP_DISCONNECT;
}
- queueBackfill(connection, cookie);
+ queueBackfill(backFillVBFilter, connection, cookie);
}
if (connection->isTimeForNoop()) {
@@ -2696,9 +2698,9 @@ void EventuallyPersistentEngine::startEngineThreads(void)
startedEngineThreads = true;
}
-void EventuallyPersistentEngine::queueBackfill(TapProducer *tc, const void *tok) {
- tc->doRunBackfill = false;
- BackFillThreadData *bftd = new BackFillThreadData(this, tc, epstore, tok);
+void EventuallyPersistentEngine::queueBackfill(const VBucketFilter &backfillVBFilter,
+ TapProducer *tc, const void *tok) {
+ BackFillThreadData *bftd = new BackFillThreadData(this, tc, epstore, tok, backfillVBFilter);
pthread_attr_t attr;
if (pthread_attr_init(&attr) != 0 ||
View
2  ep_engine.h
@@ -390,7 +390,7 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
* Visit the objects and add them to the tap connecitons queue.
* @todo this code should honor the backfill time!
*/
- void queueBackfill(TapProducer *tc, const void *tok);
+ void queueBackfill(const VBucketFilter &backfillVBFilter, TapProducer *tc, const void *tok);
void notifyIOComplete(const void *cookie, ENGINE_ERROR_CODE status) {
if (cookie == NULL) {
View
20 tapconnection.hh
@@ -839,10 +839,18 @@ private:
}
void scheduleBackfill_UNLOCKED(const std::vector<uint16_t> &vblist) {
+ if (doRunBackfill) {
+ std::vector<uint16_t>::const_iterator it = vblist.begin();
+ for(; it != vblist.end(); ++it) {
+ backFillVBucketFilter.addVBucket(*it);
+ }
+ } else {
+ backFillVBucketFilter.assign(vblist);
+ }
+
doRunBackfill = true;
pendingBackfill = true;
backfillCompleted = false;
- backFillVBucketFilter.assign(vblist);
// Send an initial_vbucket_stream message to the destination node so that it can
// delete the corresponding vbucket before receiving the backfill stream.
std::vector<uint16_t>::const_iterator it = vblist.begin();
@@ -858,6 +866,16 @@ private:
scheduleBackfill_UNLOCKED(vblist);
}
+ bool runBackfill(VBucketFilter &vbFilter) {
+ LockHolder lh(queueLock);
+ bool rv = doRunBackfill;
+ if (doRunBackfill) {
+ doRunBackfill = false;
+ vbFilter = backFillVBucketFilter;
+ }
+ return rv;
+ }
+
/**
* A TapProducer is complete when it has nothing to transmit and
* a disconnect was requested at the end.
View
6 vbucket.hh
@@ -74,6 +74,12 @@ public:
const std::vector<uint16_t> &getVector() const { return acceptable; }
+ void addVBucket(uint16_t vbucket) {
+ if (!operator ()(vbucket)) {
+ acceptable.push_back(vbucket);
+ }
+ }
+
/**
* Dump the filter in a human readable form ( "{ bucket, bucket, bucket }"
* to the specified output stream.
Please sign in to comment.
Something went wrong with that request. Please try again.