Permalink
Browse files

Allow multiple replications with the same (source, target) dbs

It can be useful to have multiple replications with the same source and target,
if their filters are different.
  • Loading branch information...
1 parent 7c5df1f commit 371a311aefbd9e49d92a0daaa5f0d1989e15471e @snej snej committed Jan 11, 2013
View
@@ -69,6 +69,9 @@ extern NSString* TDReplicatorStoppedNotification;
@property (strong) id<TDAuthorizer> authorizer;
+/** Do these two replicators have identical settings? */
+- (bool) hasSameSettingsAs: (TDReplicator*)other;
+
/** Starts the replicator.
Replicators run asynchronously so nothing will happen until later.
A replicator can only be started once; don't reuse it after it stops. */
View
@@ -16,7 +16,7 @@
#import "TDReplicator.h"
#import "TDPusher.h"
#import "TDPuller.h"
-#import <TouchDB/TD_Database.h>
+#import "TD_Database+Replication.h"
#import "TDRemoteRequest.h"
#import "TDAuthorizer.h"
#import "TDBatcher.h"
@@ -137,6 +137,14 @@ - (BOOL) isPush {
}
+- (bool) hasSameSettingsAs: (TDReplicator*)other {
+ return _db == other->_db && $equal(_remote, other->_remote) && self.isPush == other.isPush
+ && _continuous == other->_continuous && $equal(_filterName, other->_filterName)
+ && $equal(_filterParameters, other->_filterParameters) && $equal(_options, other->_options)
+ && $equal(_requestHeaders, other->_requestHeaders);
+}
+
+
- (NSString*) lastSequence {
return _lastSequence;
}
@@ -196,6 +204,8 @@ - (void) start {
Assert(_db, @"Can't restart an already stopped TDReplicator");
LogTo(Sync, @"%@ STARTING ...", self);
+ [_db addActiveReplicator: self];
+
// Did client request a reset (i.e. starting over from first sequence?)
if (_options[@"reset"] != nil) {
[_db setLastSequence: nil withCheckpointID: self.remoteCheckpointDocID];
@@ -340,14 +340,12 @@ - (void) processInsertion: (TD_Revision*)rev {
BOOL continuous = [$castIf(NSNumber, properties[@"continuous"]) boolValue];
LogTo(Sync, @"TDReplicatorManager creating (remote=%@, push=%d, create=%d, continuous=%d)",
remote, push, createTarget, continuous);
- TDReplicator* repl = [localDb replicatorWithRemoteURL: remote
+ TDReplicator* repl = [[TDReplicator alloc] initWithDB: localDb
+ remote: remote
push: push
continuous: continuous];
if (!repl)
return;
- if (!_replicatorsByDocID)
- _replicatorsByDocID = [[NSMutableDictionary alloc] init];
- _replicatorsByDocID[rev.docID] = repl;
NSString* replicationID = properties[@"_replication_id"] ?: TDCreateUUID();
repl.sessionID = replicationID;
repl.filterName = $castIf(NSString, properties[@"filter"]);;
@@ -358,6 +356,10 @@ - (void) processInsertion: (TD_Revision*)rev {
if (push)
((TDPusher*)repl).createTarget = createTarget;
+ if (!_replicatorsByDocID)
+ _replicatorsByDocID = [[NSMutableDictionary alloc] init];
+ _replicatorsByDocID[rev.docID] = repl;
+
[[NSNotificationCenter defaultCenter] addObserver: self
selector: @selector(replicatorChanged:)
name: nil
@@ -207,6 +207,15 @@ static void deleteRemoteDB(void) {
}
+static TDReplicator* findActiveReplicator(TD_Database* db, NSURL* remote, BOOL isPush) {
+ for (TDReplicator* repl in db.activeReplicators) {
+ if (repl.db == db && $equal(repl.remote, remote) && repl.isPush == isPush)
+ return repl;
+ }
+ return nil;
+}
+
+
TestCase(TDReplicatorManager) {
RequireTestCase(ParseReplicatorProperties);
TD_DatabaseManager* server = [TD_DatabaseManager createEmptyAtTemporaryPath: @"TDReplicatorManagerTest"];
@@ -249,7 +258,7 @@ static void deleteRemoteDB(void) {
CAssert([newRev[@"_replication_state_time"] longLongValue] >= 1000);
// Check that a TDReplicator exists:
- TDReplicator* repl = [sourceDB activeReplicatorWithRemoteURL: remote push: YES];
+ TDReplicator* repl = findActiveReplicator(sourceDB, remote, YES);
CAssert(repl);
CAssertEqual(repl.sessionID, sessionID);
CAssert(repl.running);
@@ -271,7 +280,7 @@ static void deleteRemoteDB(void) {
CAssert([newRev[@"_replication_state_time"] longLongValue] >= 1000);
// Check that this restarted the replicator:
- TDReplicator* newRepl = [sourceDB activeReplicatorWithRemoteURL: remote push: YES];
+ TDReplicator* newRepl = findActiveReplicator(sourceDB, remote, YES);
CAssert(newRepl);
CAssert(newRepl != repl);
CAssertEqual(newRepl.sessionID, sessionID);
View
@@ -76,27 +76,31 @@ - (TDStatus) do_POST_replicate {
return status;
BOOL continuous = [$castIf(NSNumber, body[@"continuous"]) boolValue];
- BOOL cancel = [$castIf(NSNumber, body[@"cancel"]) boolValue];
- if (!cancel) {
+
+ TDReplicator* repl = [[TDReplicator alloc] initWithDB: db
+ remote: remote
+ push: push
+ continuous: continuous];
+ if (!repl)
+ return kTDStatusServerError;
+ repl.filterName = $castIf(NSString, body[@"filter"]);
+ repl.filterParameters = $castIf(NSDictionary, body[@"query_params"]);
+ repl.options = body;
+ repl.requestHeaders = headers;
+ repl.authorizer = authorizer;
+ if (push)
+ ((TDPusher*)repl).createTarget = createTarget;
+
+ if ([$castIf(NSNumber, body[@"cancel"]) boolValue]) {
+ // Cancel replication:
+ TDReplicator* activeRepl = [db activeReplicatorLike: repl];
+ if (!activeRepl)
+ return kTDStatusNotFound;
+ [activeRepl stop];
+ } else {
// Start replication:
- TDReplicator* repl = [db replicatorWithRemoteURL: remote push: push continuous: continuous];
- if (!repl)
- return kTDStatusServerError;
- repl.filterName = $castIf(NSString, body[@"filter"]);;
- repl.filterParameters = $castIf(NSDictionary, body[@"query_params"]);
- repl.options = body;
- repl.requestHeaders = headers;
- repl.authorizer = authorizer;
- if (push)
- ((TDPusher*)repl).createTarget = createTarget;
[repl start];
_response.bodyObject = $dict({@"session_id", repl.sessionID});
- } else {
- // Cancel replication:
- TDReplicator* repl = [db activeReplicatorWithRemoteURL: remote push: push];
- if (!repl)
- return kTDStatusNotFound;
- [repl stop];
}
return kTDStatusOK;
}
@@ -14,12 +14,9 @@
@property (readonly) NSArray* activeReplicators;
-- (TDReplicator*) activeReplicatorWithRemoteURL: (NSURL*)remote
- push: (BOOL)push;
+- (TDReplicator*) activeReplicatorLike: (TDReplicator*)repl;
-- (TDReplicator*) replicatorWithRemoteURL: (NSURL*)remote
- push: (BOOL)push
- continuous: (BOOL)continuous;
+- (void) addActiveReplicator: (TDReplicator*)repl;
- (BOOL) findMissingRevisions: (TD_RevisionList*)revs;
@@ -32,37 +32,25 @@ - (NSArray*) activeReplicators {
return _activeReplicators;
}
-- (TDReplicator*) activeReplicatorWithRemoteURL: (NSURL*)remote
- push: (BOOL)push {
- TDReplicator* repl;
- for (repl in _activeReplicators) {
- if ($equal(repl.remote, remote) && repl.isPush == push && repl.running)
- return repl;
- }
- return nil;
-}
-
-- (TDReplicator*) replicatorWithRemoteURL: (NSURL*)remote
- push: (BOOL)push
- continuous: (BOOL)continuous {
- TDReplicator* repl = [self activeReplicatorWithRemoteURL: remote push: push];
- if (repl)
- return repl;
- repl = [[TDReplicator alloc] initWithDB: self
- remote: remote
- push: push
- continuous: continuous];
- if (!repl)
- return nil;
+- (void) addActiveReplicator: (TDReplicator*)repl {
if (!_activeReplicators) {
_activeReplicators = [[NSMutableArray alloc] init];
[[NSNotificationCenter defaultCenter] addObserver: self
selector: @selector(replicatorDidStop:)
name: TDReplicatorStoppedNotification
object: nil];
}
- [_activeReplicators addObject: repl];
- return repl;
+ if (![_activeReplicators containsObject: repl])
+ [_activeReplicators addObject: repl];
+}
+
+
+- (TDReplicator*) activeReplicatorLike: (TDReplicator*)repl {
+ for (TDReplicator* activeRepl in _activeReplicators) {
+ if ([activeRepl hasSameSettingsAs: repl])
+ return activeRepl;
+ }
+ return nil;
}

0 comments on commit 371a311

Please sign in to comment.