Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Overhauled checkpointing for pull replications
Added TDSequenceMap to keep track of the latest sequence ID we can checkpoint.
(Should be used for push replications too, but I haven't done that work yet.)
  • Loading branch information
snej committed Feb 21, 2012
1 parent 56af126 commit 22db43e
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 23 deletions.
5 changes: 2 additions & 3 deletions Source/TDPuller.h
Expand Up @@ -8,16 +8,15 @@

#import "TDReplicator.h"
#import "TDRevision.h"
@class TDChangeTracker;
@class TDChangeTracker, TDSequenceMap;


/** Replicator that pulls from a remote CouchDB. */
@interface TDPuller : TDReplicator
{
@private
TDChangeTracker* _changeTracker;
unsigned _nextFakeSequence;
unsigned _maxInsertedFakeSequence;
TDSequenceMap* _pendingSequences;
NSMutableArray* _revsToPull;
NSUInteger _httpConnectionCount;
TDBatcher* _downloadsToInsert;
Expand Down
43 changes: 23 additions & 20 deletions Source/TDPuller.m
Expand Up @@ -20,6 +20,7 @@
#import "TDChangeTracker.h"
#import "TDBatcher.h"
#import "TDMultipartDownloader.h"
#import "TDSequenceMap.h"
#import "TDInternal.h"
#import "TDMisc.h"
#import "ExceptionUtils.h"
Expand All @@ -46,6 +47,7 @@ - (void)dealloc {
[_changeTracker release];
[_revsToPull release];
[_downloadsToInsert release];
[_pendingSequences release];
[super dealloc];
}

Expand All @@ -59,7 +61,8 @@ - (void) beginReplicating {
}];
}

_nextFakeSequence = _maxInsertedFakeSequence = 0;
[_pendingSequences release];
_pendingSequences = [[TDSequenceMap alloc] init];
LogTo(SyncVerbose, @"%@ starting ChangeTracker with since=%@", self, _lastSequence);
_changeTracker = [[TDChangeTracker alloc]
initWithDatabaseURL: _remote
Expand Down Expand Up @@ -97,7 +100,7 @@ - (BOOL) goOffline {

// Got a _changes feed entry from the TDChangeTracker.
- (void) changeTrackerReceivedChange: (NSDictionary*)change {
NSString* lastSequence = [[change objectForKey: @"seq"] description];
NSString* lastSequenceID = [[change objectForKey: @"seq"] description];
NSString* docID = [change objectForKey: @"id"];
if (!docID)
return;
Expand All @@ -115,8 +118,9 @@ - (void) changeTrackerReceivedChange: (NSDictionary*)change {
continue;
TDPulledRevision* rev = [[TDPulledRevision alloc] initWithDocID: docID revID: revID
deleted: deleted];
rev.remoteSequenceID = lastSequence;
rev.sequence = ++_nextFakeSequence;
// Remember its remote sequence ID (opaque), and make up a numeric sequence based
// on the order in which it appeared in the _changes feed:
rev.remoteSequenceID = lastSequenceID;
[self addToInbox: rev];
[rev release];
}
Expand Down Expand Up @@ -158,9 +162,13 @@ - (void) processInbox: (TDRevisionList*)inbox {
self.changesTotal = total + inbox.count;

if (inbox.count == 0) {
// Nothing to do. Just bump the lastSequence.
LogTo(SyncVerbose, @"%@ no new remote revisions to fetch", self);
self.lastSequence = lastInboxSequence;
// Nothing to do; just count all the revisions as processed.
// Instead of adding and immediately removing the revs to _pendingSequences,
// just do the latest one (equivalent but faster):
LogTo(SyncVerbose, @"%@: no new remote revisions to fetch", self);
SequenceNumber seq = [_pendingSequences addValue: lastInboxSequence];
[_pendingSequences removeSequence: seq];
self.lastSequence = _pendingSequences.checkpointedValue;
return;
}

Expand Down Expand Up @@ -240,20 +248,18 @@ - (void) insertDownloads:(NSArray *)downloads {
downloads = [downloads sortedArrayUsingComparator: ^(id dl1, id dl2) {
return TDSequenceCompare( [[dl1 revision] sequence], [[dl2 revision] sequence]);
}];
BOOL allGood = YES;
TDPulledRevision* lastGoodRev = nil;

[_db beginTransaction];
BOOL success = NO;
@try{
for (TDMultipartDownloader* download in downloads) {
@autoreleasepool {
TDPulledRevision* rev = (TDPulledRevision*)download.revision;
SequenceNumber fakeSequence = rev.sequence;
NSArray* history = [TDDatabase parseCouchDBRevisionHistory: rev.properties];
if (!history) {
Warn(@"%@: Missing revision history in response for %@", self, rev);
self.error = TDHTTPError(502, nil);
allGood = NO;
continue;
}

Expand All @@ -265,23 +271,20 @@ - (void) insertDownloads:(NSArray *)downloads {
else {
Warn(@"%@ failed to write %@: status=%d", self, rev, status);
self.error = TDHTTPError(status, nil);
allGood = NO; // stop advancing lastGoodRev
continue;
}
}

if (allGood)
lastGoodRev = rev;
// Mark this revision's fake sequence as processed:
[_pendingSequences removeSequence: fakeSequence];
}
}

// Now update self.lastSequence from the latest consecutively inserted revision:
unsigned lastGoodFakeSequence = (unsigned) lastGoodRev.sequence;
if (lastGoodFakeSequence > _maxInsertedFakeSequence) {
_maxInsertedFakeSequence = lastGoodFakeSequence;
self.lastSequence = lastGoodRev.remoteSequenceID;
}

LogTo(Sync, @"%@ finished inserting %u revisions", self, downloads.count);

// Checkpoint:
self.lastSequence = _pendingSequences.checkpointedValue;

success = YES;
} @catch (NSException *x) {
MYReportException(x, @"%@: Exception inserting revisions", self);
Expand Down
39 changes: 39 additions & 0 deletions Source/TDSequenceMap.h
@@ -0,0 +1,39 @@
//
// TDSequenceMap.h
// TouchDB
//
// Created by Jens Alfke on 2/21/12.
// Copyright (c) 2012 Couchbase, Inc. All rights reserved.
//

#import "TDRevision.h"


/** A data structure representing a type of array that allows object values to be added to the end, and removed in arbitrary order; it's used by the replicator to keep track of which revisions have been transferred and what sequences to checkpoint. */
@interface TDSequenceMap : NSObject
{
NSMutableIndexSet* _sequences; // Sequence numbers currently in the map
NSUInteger _lastSequence; // last generated sequence
NSMutableArray* _values; // values of remaining sequences
NSUInteger _firstValueSequence; // sequence # of first item in _values
}

- (id) init;

/** Adds a value to the map, assigning it a sequence number and returning it.
Sequence numbers start at 1 and increment from there. */
- (SequenceNumber) addValue: (id)value;

/** Removes a sequence and its associated value. */
- (void) removeSequence: (SequenceNumber)sequence;

@property (readonly) BOOL isEmpty;

/** Returns the maximum consecutively-removed sequence number.
This is one less than the minimum remaining sequence number. */
- (SequenceNumber) checkpointedSequence;

/** Returns the value associated with the checkpointedSequence. */
- (id) checkpointedValue;

@end
124 changes: 124 additions & 0 deletions Source/TDSequenceMap.m
@@ -0,0 +1,124 @@
//
// TDSequenceMap.m
// TouchDB
//
// Created by Jens Alfke on 2/21/12.
// Copyright (c) 2012 Couchbase, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
// except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed under the
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.

#import "TDSequenceMap.h"


@implementation TDSequenceMap


- (id)init
{
self = [super init];
if (self) {
_sequences = [[NSMutableIndexSet alloc] init];
_values = [[NSMutableArray alloc] initWithCapacity: 100];
_firstValueSequence = 1;
}
return self;
}


- (void)dealloc
{
[_sequences release];
[_values release];
[super dealloc];
}


- (SequenceNumber) addValue: (id)value {
[_sequences addIndex: ++_lastSequence];
[_values addObject: value];
return _lastSequence;
}


- (void) removeSequence: (SequenceNumber)sequence {
Assert(sequence > 0 && sequence <= (SequenceNumber)_lastSequence,
@"Invalid sequence %lld (latest is %u)", sequence, _lastSequence);
[_sequences removeIndex: (NSUInteger) sequence];
}


- (BOOL) isEmpty {
return _sequences.firstIndex == NSNotFound;
}


- (SequenceNumber) checkpointedSequence {
NSUInteger sequence = _sequences.firstIndex;
sequence = (sequence == NSNotFound) ? _lastSequence : sequence-1;

if (sequence > _firstValueSequence) {
// Garbage-collect inaccessible values:
NSUInteger numToRemove = sequence - _firstValueSequence;
[_values removeObjectsInRange: NSMakeRange(0, numToRemove)];
_firstValueSequence += numToRemove;
}
return sequence;
}


- (id) checkpointedValue {
NSInteger index = (NSInteger)([self checkpointedSequence] - _firstValueSequence);
return (index >= 0) ? [_values objectAtIndex: index] : nil;
}


@end



TestCase(TDSequenceMap) {
TDSequenceMap* map = [[[TDSequenceMap alloc] init] autorelease];
CAssertEq(map.checkpointedSequence, 0);
CAssertEqual(map.checkpointedValue, nil);
CAssert(map.isEmpty);

CAssertEq([map addValue: @"one"], 1);
CAssertEq(map.checkpointedSequence, 0);
CAssertEqual(map.checkpointedValue, nil);
CAssert(!map.isEmpty);

CAssertEq([map addValue: @"two"], 2);
CAssertEq(map.checkpointedSequence, 0);
CAssertEqual(map.checkpointedValue, nil);

CAssertEq([map addValue: @"three"], 3);
CAssertEq(map.checkpointedSequence, 0);
CAssertEqual(map.checkpointedValue, nil);

[map removeSequence: 2];
CAssertEq(map.checkpointedSequence, 0);
CAssertEqual(map.checkpointedValue, nil);

[map removeSequence: 1];
CAssertEq(map.checkpointedSequence, 2);
CAssertEqual(map.checkpointedValue, @"two");

CAssertEq([map addValue: @"four"], 4);
CAssertEq(map.checkpointedSequence, 2);
CAssertEqual(map.checkpointedValue, @"two");

[map removeSequence: 3];
CAssertEq(map.checkpointedSequence, 3);
CAssertEqual(map.checkpointedValue, @"three");

[map removeSequence: 4];
CAssertEq(map.checkpointedSequence, 4);
CAssertEqual(map.checkpointedValue, @"four");
CAssert(map.isEmpty);
}
10 changes: 10 additions & 0 deletions TouchDB.xcodeproj/project.pbxproj
Expand Up @@ -145,6 +145,9 @@
279906F1149ABFC2003D4338 /* TDBatcher.m in Sources */ = {isa = PBXBuildFile; fileRef = 279906ED149ABFC2003D4338 /* TDBatcher.m */; };
27990703149D2576003D4338 /* CouchCocoa.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 27990702149D2576003D4338 /* CouchCocoa.framework */; };
27990704149E58B0003D4338 /* CouchCocoa.framework in CopyFiles */ = {isa = PBXBuildFile; fileRef = 27990702149D2576003D4338 /* CouchCocoa.framework */; };
279C7E2E14F424090004A1E8 /* TDSequenceMap.h in Headers */ = {isa = PBXBuildFile; fileRef = 279C7E2C14F424090004A1E8 /* TDSequenceMap.h */; };
279C7E2F14F424090004A1E8 /* TDSequenceMap.m in Sources */ = {isa = PBXBuildFile; fileRef = 279C7E2D14F424090004A1E8 /* TDSequenceMap.m */; };
279C7E3014F424090004A1E8 /* TDSequenceMap.m in Sources */ = {isa = PBXBuildFile; fileRef = 279C7E2D14F424090004A1E8 /* TDSequenceMap.m */; };
279CE39014D1EDA0009F3FA6 /* Test.m in Sources */ = {isa = PBXBuildFile; fileRef = 27F0749E11CD5B4F00E9A2AB /* Test.m */; };
279CE3B814D4A885009F3FA6 /* MYBlockUtils.h in Headers */ = {isa = PBXBuildFile; fileRef = 279CE3B614D4A885009F3FA6 /* MYBlockUtils.h */; };
279CE3B914D4A885009F3FA6 /* MYBlockUtils.m in Sources */ = {isa = PBXBuildFile; fileRef = 279CE3B714D4A885009F3FA6 /* MYBlockUtils.m */; };
Expand Down Expand Up @@ -478,6 +481,8 @@
279906EC149ABFC1003D4338 /* TDBatcher.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = TDBatcher.h; sourceTree = "<group>"; };
279906ED149ABFC2003D4338 /* TDBatcher.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = TDBatcher.m; sourceTree = "<group>"; };
27990702149D2576003D4338 /* CouchCocoa.framework */ = {isa = PBXFileReference; lastKnownFileType = wrapper.framework; name = CouchCocoa.framework; path = Frameworks/CouchCocoa.framework; sourceTree = "<group>"; };
279C7E2C14F424090004A1E8 /* TDSequenceMap.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = TDSequenceMap.h; sourceTree = "<group>"; };
279C7E2D14F424090004A1E8 /* TDSequenceMap.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = TDSequenceMap.m; sourceTree = "<group>"; };
279CE3B614D4A885009F3FA6 /* MYBlockUtils.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = MYBlockUtils.h; sourceTree = "<group>"; };
279CE3B714D4A885009F3FA6 /* MYBlockUtils.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = MYBlockUtils.m; sourceTree = "<group>"; };
279CE3FE14D749A7009F3FA6 /* TDMultipartReader.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = TDMultipartReader.h; sourceTree = "<group>"; };
Expand Down Expand Up @@ -980,6 +985,8 @@
2766EFFC14DC7B37009ECCA8 /* TDMultiStreamWriter.m */,
2766EFF614DB7F9F009ECCA8 /* TDMultipartWriter.h */,
2766EFF714DB7F9F009ECCA8 /* TDMultipartWriter.m */,
279C7E2C14F424090004A1E8 /* TDSequenceMap.h */,
279C7E2D14F424090004A1E8 /* TDSequenceMap.m */,
27A073EA14C0BB6200F52FE7 /* TDMisc.h */,
27A073EB14C0BB6200F52FE7 /* TDMisc.m */,
270B3DEE1489359000E0A926 /* TouchDB-Info.plist */,
Expand Down Expand Up @@ -1202,6 +1209,7 @@
27C5305414DF3A050078F886 /* TDMultipartUploader.h in Headers */,
27103F8314E9CE4400DF7209 /* TDReachability.h in Headers */,
27C40C7914EC58BC00994283 /* TDReplicatorManager.h in Headers */,
279C7E2E14F424090004A1E8 /* TDSequenceMap.h in Headers */,
);
runOnlyForDeploymentPostprocessing = 0;
};
Expand Down Expand Up @@ -1721,6 +1729,7 @@
27C5305514DF3A050078F886 /* TDMultipartUploader.m in Sources */,
27103F8414E9CE4400DF7209 /* TDReachability.m in Sources */,
27C40C7A14EC58BC00994283 /* TDReplicatorManager.m in Sources */,
279C7E2F14F424090004A1E8 /* TDSequenceMap.m in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};
Expand Down Expand Up @@ -1821,6 +1830,7 @@
27C5305614DF3A050078F886 /* TDMultipartUploader.m in Sources */,
27103F8514E9CE4400DF7209 /* TDReachability.m in Sources */,
27C40C7B14EC58BC00994283 /* TDReplicatorManager.m in Sources */,
279C7E3014F424090004A1E8 /* TDSequenceMap.m in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};
Expand Down

0 comments on commit 22db43e

Please sign in to comment.