Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Make TDPusher more careful abut bumping the checkpoint
Browse files Browse the repository at this point in the history
It now works similarly to the puller, using an NSIndexSet to keep track of which sequences haven't
yet been uploaded, and only advancing the checkpoint to just before the first sequence in the set.
This will fix possible edge cases/race conditions where the checkpoint may have been advanced too
optimistically, causing a doc that failed to be pushed once to never be retried. (Fixes #246)
  • Loading branch information
snej committed Mar 29, 2013
1 parent 9dbed86 commit 9c9c730
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 19 deletions.
2 changes: 2 additions & 0 deletions Source/TDPusher.h
Expand Up @@ -19,6 +19,8 @@
BOOL _uploading;
NSMutableArray* _uploaderQueue;
BOOL _dontSendMultipart;
NSMutableIndexSet* _pendingSequences;
SequenceNumber _maxPendingSequence;
}

@property BOOL createTarget;
Expand Down
100 changes: 81 additions & 19 deletions Source/TDPusher.m
Expand Up @@ -89,6 +89,9 @@ - (void) beginReplicating {
// re-invoked after that request finishes; see -maybeCreateRemoteDB above.)
if (_creatingTarget)
return;

_pendingSequences = [NSMutableIndexSet indexSet];
_maxPendingSequence = self.lastSequence.longLongValue;

TD_FilterBlock filter = NULL;
if (_filterName) {
Expand Down Expand Up @@ -155,6 +158,33 @@ - (void) stop {
}


// Adds a local revision to the "pending" set that are awaiting upload:
- (void) addPending: (TD_Revision*)rev {
SequenceNumber seq = rev.sequence;
[_pendingSequences addIndex: (NSUInteger)seq];
_maxPendingSequence = MAX(_maxPendingSequence, seq);
}

// Removes a revision from the "pending" set after it's been uploaded. Advances checkpoint.
- (void) removePending: (TD_Revision*)rev {
SequenceNumber seq = rev.sequence;
bool wasFirst = (seq == (SequenceNumber)_pendingSequences.firstIndex);
if (![_pendingSequences containsIndex: (NSUInteger)seq])
Warn(@"%@ removePending: sequence %lld not in set, for rev %@", self, seq, rev);
[_pendingSequences removeIndex: (NSUInteger)seq];

if (wasFirst) {
// If I removed the first pending sequence, can advance the checkpoint:
SequenceNumber maxCompleted = _pendingSequences.firstIndex;
if (maxCompleted == NSNotFound)
maxCompleted = _maxPendingSequence;
else
--maxCompleted;
self.lastSequence = $sprintf(@"%lld", maxCompleted);
}
}


- (void) dbChanged: (NSNotification*)n {
NSDictionary* userInfo = n.userInfo;
// Skip revisions that originally came from the database I'm syncing to:
Expand Down Expand Up @@ -185,6 +215,7 @@ - (void) processInbox: (TD_RevisionList*)changes {
diffs[docID] = revs;
}
[revs addObject: rev.revID];
[self addPending: rev];
}

// Call _revs_diff on the target db:
Expand All @@ -197,15 +228,16 @@ - (void) processInbox: (TD_RevisionList*)changes {
} else if (results.count) {
// Go through the list of local changes again, selecting the ones the destination server
// said were missing and mapping them to a JSON dictionary in the form _bulk_docs wants:
__block SequenceNumber lastInboxSequence = 0;
NSArray* docsToSend = [changes.allRevisions my_map: ^id(TD_Revision* rev) {
NSDictionary* properties;
@autoreleasepool {
// Is this revision in the server's 'missing' list?
NSDictionary* revResults = results[rev.docID];
NSArray* missing = revResults[@"missing"];
if (![missing containsObject: [rev revID]])
if (![missing containsObject: [rev revID]]) {
[self removePending: rev];
return nil;
}

// Get the revision's properties:
TDContentOptions options = kTDIncludeAttachments | kTDIncludeRevs;
Expand All @@ -231,19 +263,18 @@ - (void) processInbox: (TD_RevisionList*)changes {
if (!_dontSendMultipart && [self uploadMultipartRevision: rev])
return nil;
}
// (to survive impending autorelease-pool drain)
}
lastInboxSequence = rev.sequence;
Assert(properties[@"_id"]);
return properties;
}];

// Post the revisions to the destination:
[self uploadBulkDocs: docsToSend changes: changes lastSequence: lastInboxSequence];
[self uploadBulkDocs: docsToSend changes: changes];

} else {
// If none of the revisions are new to the remote, just bump the lastSequence:
self.lastSequence = $sprintf(@"%lld", [changes.allRevisions.lastObject sequence]);
// None of the revisions are new to the remote
for (TD_Revision* rev in changes.allRevisions)
[self removePending: rev];
}
[self asyncTasksFinished: 1];
}];
Expand All @@ -254,8 +285,8 @@ - (void) processInbox: (TD_RevisionList*)changes {
// use the given _rev IDs instead of making up new ones.
- (void) uploadBulkDocs: (NSArray*)docsToSend
changes: (TD_RevisionList*)changes
lastSequence: (SequenceNumber)lastInboxSequence
{
// http://wiki.apache.org/couchdb/HTTP_Bulk_Document_API
NSUInteger numDocsToSend = docsToSend.count;
if (numDocsToSend == 0)
return;
Expand All @@ -269,26 +300,37 @@ - (void) uploadBulkDocs: (NSArray*)docsToSend
{@"new_edits", $false})
onCompletion: ^(NSDictionary* response, NSError *error) {
if (!error) {
NSMutableSet* failedIDs = [NSMutableSet set];
// _bulk_docs response is really an array, not a dictionary!
for (NSDictionary* item in $castIf(NSArray, response)) {
if (item[@"error"]) {
// One of the docs failed to save:
TDStatus status = statusFromBulkDocsResponseItem(item);
if (TDStatusIsError(status)) {
// One of the docs failed to save.
Warn(@"%@: _bulk_docs got an error: %@", self, item);
TDStatus status = kTDStatusUpstreamError;
if ($equal(item[@"error"], @"unauthorized"))
status = kTDStatusUnauthorized;
NSString* docID = item[@"id"];
NSURL* url = docID ? [_remote URLByAppendingPathComponent: docID] : nil;
error = TDStatusToNSError(status, url);
// 403/Forbidden means validation failed; don't treat it as an error
// because I did my job in sending the revision. Other statuses are
// actual replication errors.
if (status != kTDStatusForbidden) {
NSString* docID = item[@"id"];
[failedIDs addObject: docID];
NSURL* url = docID ? [_remote URLByAppendingPathComponent: docID]
: nil;
error = TDStatusToNSError(status, url);
}
}
}

// Remove from the pending list all the revs that didn't fail:
for (TD_Revision* rev in changes.allRevisions) {
if (![failedIDs containsObject: rev.docID])
[self removePending: rev];
}
}
if (error) {
self.error = error;
[self revisionFailed];
} else {
LogTo(SyncVerbose, @"%@: Sent %@", self, changes.allRevisions);
self.lastSequence = $sprintf(@"%lld", lastInboxSequence);
}
self.changesProcessed += numDocsToSend;
[self asyncTasksFinished: 1];
Expand All @@ -297,6 +339,26 @@ - (void) uploadBulkDocs: (NSArray*)docsToSend
}


static TDStatus statusFromBulkDocsResponseItem(NSDictionary* item) {
NSString* errorStr = item[@"error"];
if (!errorStr)
return kTDStatusOK;
// 'status' property is nonstandard; TouchDB returns it, others don't.
TDStatus status = $castIf(NSNumber, item[@"status"]).intValue;
if (status >= 400)
return status;
// If no 'status' present, interpret magic hardcoded CouchDB error strings:
if ($equal(errorStr, @"unauthorized"))
return kTDStatusUnauthorized;
else if ($equal(errorStr, @"forbidden"))
return kTDStatusForbidden;
else if ($equal(errorStr, @"conflict"))
return kTDStatusConflict;
else
return kTDStatusUpstreamError;
}


- (BOOL) uploadMultipartRevision: (TD_Revision*)rev {
// Find all the attachments with "follows" instead of a body, and put 'em in a multipart stream.
// It's important to scan the _attachments entries in the same order in which they will appear
Expand Down Expand Up @@ -350,7 +412,7 @@ - (BOOL) uploadMultipartRevision: (TD_Revision*)rev {
}
} else {
LogTo(SyncVerbose, @"%@: Sent %@, response=%@", self, rev, response);
self.lastSequence = $sprintf(@"%lld", rev.sequence);
[self removePending: rev];
}
self.changesProcessed++;
[self asyncTasksFinished: 1];
Expand Down Expand Up @@ -393,7 +455,7 @@ - (void) uploadJSONRevision: (TD_Revision*)rev {
[self revisionFailed];
} else {
LogTo(SyncVerbose, @"%@: Sent %@ (JSON), response=%@", self, rev, response);
self.lastSequence = $sprintf(@"%lld", rev.sequence);
[self removePending: rev];
}
[self asyncTasksFinished: 1];
}];
Expand Down

0 comments on commit 9c9c730

Please sign in to comment.