This repository has been archived by the owner on Mar 9, 2022. It is now read-only.
/
CouchPersistentReplication.m
193 lines (156 loc) · 6.33 KB
/
CouchPersistentReplication.m
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
//
// CouchPersistentReplication.m
// CouchCocoa
//
// Created by Jens Alfke on 9/8/11.
// Copyright (c) 2011 Couchbase, Inc. All rights reserved.
//
// REFERENCES:
// http://docs.couchbase.org/couchdb-release-1.1/index.html#couchb-release-1.1-replicatordb
// https://gist.github.com/832610
#import "CouchPersistentReplication.h"
#import "CouchInternal.h"
#define kProgressPollInterval 0.5
@interface CouchPersistentReplication ()
@property (readwrite) CouchReplicationState state;
- (void) setStatusString: (NSString*)status;
@end
@implementation CouchPersistentReplication
@dynamic source, target, create_target, continuous, filter, query_params, doc_ids;
@synthesize state=_state, completed=_completed, total=_total;
+ (CouchPersistentReplication*) createWithReplicatorDatabase: (CouchDatabase*)replicatorDB
source: (NSString*)source
target: (NSString*)target
{
CouchPersistentReplication* rep = [[self alloc] initWithNewDocumentInDatabase: replicatorDB];
rep.autosaves = YES;
[rep setValue: source ofProperty: @"source"];
[rep setValue: target ofProperty: @"target"];
return [rep autorelease];
}
- (void)dealloc {
self.state = kReplicationIdle; // turns off observing
[_statusString release];
[super dealloc];
}
- (void) actAsUser: (NSString*)username withRoles: (NSArray*)roles {
// See https://gist.github.com/832610 (Section 8)
NSMutableDictionary *userCtx = nil;
if (username || roles) {
userCtx = [NSMutableDictionary dictionary];
[userCtx setValue: username forKey: @"name"];
[userCtx setValue: roles forKey: @"roles"];
}
[self setValue: userCtx ofProperty: @"user_ctx"];
}
- (void) actAsAdmin {
[self actAsUser: nil withRoles: [NSArray arrayWithObject: @"_admin"]];
}
- (CouchReplicationState) state {
return _state;
}
- (void) setState:(CouchReplicationState)state {
// Add/remove myself as an observer of the server's activeTasks:
CouchServer* server = self.database.server;
if (state == kReplicationTriggered) {
if (_state != kReplicationTriggered) {
[server addObserver: self forKeyPath: @"activeTasks"
options:0 context: NULL];
}
} else {
if (_state == kReplicationTriggered) {
[server removeObserver: self forKeyPath: @"activeTasks"];
[self setStatusString: nil];
}
}
_state = state;
}
- (void) didLoadFromDocument {
// Update state:
static NSArray* kStateNames;
if (!kStateNames)
kStateNames = [[NSArray alloc] initWithObjects: @"", @"triggered", @"completed", @"error",
nil];
NSString* stateStr = [self getValueOfProperty: @"_replication_state"];
NSUInteger state = stateStr ? [kStateNames indexOfObject: stateStr] : NSNotFound;
if (state == NSNotFound)
state = kReplicationIdle;
if (state != _state) {
COUCHLOG(@"%@: state := %@", self, stateStr);
self.state = (CouchReplicationState) state;
}
}
- (RESTOperation*) deleteWithRetries: (int)retries {
RESTOperation* op = [super deleteDocument];
[op onCompletion:^{
if (op.httpStatus == 409 && retries > 0) {
COUCHLOG(@"%@: retrying DELETE (%i tries left)", self, retries);
[self deleteWithRetries: retries - 1];
} else if (op.error) {
Warn(@"%@: DELETE failed, %@", self, op.error);
}
}];
return op;
}
- (RESTOperation*) deleteDocument {
self.state = kReplicationIdle; // turns off observing
// Replication documents are problematic to delete, because the CouchDB replicator process
// updates them with status information. This can result in race conditions deleting them,
// where the app sends a DELETE using the latest rev number it knows, before getting the
// _changes-feed notification that the replicator has added a new revision.
// Currently the only workaround is simply to retry when this happens.
return [self deleteWithRetries: 10];
}
#pragma mark - STATUS TRACKING
- (void) setStatusString: (NSString*)status {
COUCHLOG(@"%@ = %@", self, status);
[_statusString autorelease];
_statusString = [status copy];
int completed = 0, total = 0;
if (status) {
// Current format of status is "Processed \d+ / \d+ changes".
NSScanner* scanner = [NSScanner scannerWithString: status];
if ([scanner scanString: @"Processed" intoString:NULL]
&& [scanner scanInt: &completed]
&& [scanner scanString: @"/" intoString:NULL]
&& [scanner scanInt: &total]
&& [scanner scanString: @"changes" intoString:NULL]) {
} else {
completed = total = 0;
Warn(@"CouchReplication: Unable to parse status string \"%@\"", _statusString);
}
}
if (completed != _completed || total != _total) {
[self willChangeValueForKey: @"completed"];
[self willChangeValueForKey: @"total"];
_completed = completed;
_total = total;
[self didChangeValueForKey: @"total"];
[self didChangeValueForKey: @"completed"];
}
}
- (void) observeValueForKeyPath: (NSString*)keyPath ofObject: (id)object
change: (NSDictionary*)change context: (void*)context
{
CouchServer* server = self.database.server;
if ([keyPath isEqualToString: @"activeTasks"] && object == server) {
// Server's activeTasks changed:
NSString* myReplicationID = [self getValueOfProperty: @"_replication_id"];
NSString* status = nil;
for (NSDictionary* task in server.activeTasks) {
if ([[task objectForKey:@"type"] isEqualToString:@"Replication"]) {
// Can't look up the task ID directly because it's part of a longer string like
// "`6390525ac52bd8b5437ab0a118993d0a+continuous`: ..."
if ([[task objectForKey: @"task"] rangeOfString: myReplicationID].length > 0) {
status = [task objectForKey: @"status"];
break;
}
}
}
if (!$equal(status, _statusString))
[self setStatusString: status];
} else {
[super observeValueForKeyPath: keyPath ofObject: object change: change context: context];
}
}
@end