Browse files

Refactored CouchChangeTracker.

Change-Id: I39a7c4da61c9990ded6059650b73a32b37a6835e
  • Loading branch information...
1 parent 7262cd1 commit 871364253f3b97d2e6c45bd59687b001db268a1a @snej snej committed Dec 14, 2011
View
50 Couch/CouchChangeTracker.h
@@ -14,30 +14,54 @@
// and limitations under the License.
#import <Foundation/Foundation.h>
-@class CouchDatabase;
+@class CouchChangeTracker;
-/** Reads the continuous-mode _changes feed of a database, and sends the individual lines to -[CouchDatabase receivedChangeChunk:].
+@protocol CouchChangeTrackerClient <NSObject>
+- (void) changeTrackerReceivedChange: (NSDictionary*)change;
+@optional
+- (NSURLCredential*) authCredential;
+- (void) changeTrackerStopped: (CouchChangeTracker*)tracker;
+@end
+
+
+typedef enum CouchChangeTrackerMode {
+ kOneShot,
+ kLongPoll,
+ kContinuous
+} CouchChangeTrackerMode;
+
+
+/** Reads the continuous-mode _changes feed of a database, and sends the individual change entries to its client's -changeTrackerReceivedChange:.
This class is used internally by CouchDatabase and you shouldn't need to use it yourself. */
@interface CouchChangeTracker : NSObject <NSStreamDelegate>
{
- @private
- CouchDatabase* _database;
+ @protected
+ NSURL* _databaseURL;
+ id<CouchChangeTrackerClient> _client;
+ CouchChangeTrackerMode _mode;
NSUInteger _lastSequenceNumber;
- NSInputStream* _trackingInput;
- NSOutputStream* _trackingOutput;
- NSString* _trackingRequest;
- int _retryCount;
-
- NSMutableData* _inputBuffer;
- int _state;
}
-- (id)initWithDatabase: (CouchDatabase*)database;
+- (id)initWithDatabaseURL: (NSURL*)databaseURL
+ mode: (CouchChangeTrackerMode)mode
+ lastSequence: (NSUInteger)lastSequence
+ client: (id<CouchChangeTrackerClient>)client;
-@property (nonatomic) NSUInteger lastSequenceNumber;
+@property (readonly, nonatomic) NSURL* databaseURL;
+@property (readonly, nonatomic) NSString* databaseName;
+@property (readonly, nonatomic) CouchChangeTrackerMode mode;
+@property (readonly, nonatomic) NSUInteger lastSequenceNumber;
- (BOOL) start;
- (void) stop;
+// Protected
+@property (readonly) NSURLCredential* authCredential;
+@property (readonly) NSURL* changesFeedURL;
+@property (readonly) NSString* changesFeedPath;
+- (void) receivedChunk: (NSData*)chunk;
+- (BOOL) receivedPollResponse: (NSData*)body;
+- (void) stopped; // override this
+
@end
View
293 Couch/CouchChangeTracker.m
@@ -12,238 +12,133 @@
// License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// either express or implied. See the License for the specific language governing permissions
// and limitations under the License.
+//
+// <http://wiki.apache.org/couchdb/HTTP_database_API#Changes>
#import "CouchChangeTracker.h"
-
-#import "CouchDatabase.h"
+#import "CouchConnectionChangeTracker.h"
+#import "CouchSocketChangeTracker.h"
#import "CouchInternal.h"
-#if TARGET_OS_IPHONE
-#import <UIKit/UIApplication.h>
-#endif
-
-// <http://wiki.apache.org/couchdb/HTTP_database_API#Changes>
-
-
-enum {
- kStateStatus,
- kStateHeaders,
- kStateChunks
-};
-
-#define kMaxRetries 7
-
@implementation CouchChangeTracker
+@synthesize lastSequenceNumber=_lastSequenceNumber, databaseURL=_databaseURL, mode=_mode;
-@synthesize lastSequenceNumber = _lastSequenceNumber;
-
-
-- (id)initWithDatabase: (CouchDatabase*)database {
- NSParameterAssert(database);
+- (id)initWithDatabaseURL: (NSURL*)databaseURL
+ mode: (CouchChangeTrackerMode)mode
+ lastSequence: (NSUInteger)lastSequence
+ client: (id<CouchChangeTrackerClient>)client {
+ NSParameterAssert(databaseURL);
+ NSParameterAssert(client);
self = [super init];
if (self) {
- _database = [database retain];
+ if ([self class] == [CouchChangeTracker class]) {
+ [self release];
+ if (mode == kContinuous && [databaseURL.scheme.lowercaseString hasPrefix: @"http"]) {
+ return (id) [[CouchSocketChangeTracker alloc] initWithDatabaseURL: databaseURL
+ mode: mode
+ lastSequence: lastSequence
+ client: client];
+ } else {
+ return (id) [[CouchConnectionChangeTracker alloc] initWithDatabaseURL: databaseURL
+ mode: mode
+ lastSequence: lastSequence
+ client: client];
+ }
+ }
+
+ _databaseURL = [databaseURL retain];
+ _client = client;
+ _mode = mode;
+ _lastSequenceNumber = lastSequence;
}
return self;
}
+- (NSString*) databaseName {
+ return _databaseURL.lastPathComponent;
+}
+
+- (NSString*) changesFeedPath {
+ static NSString* const kModeNames[3] = {@"normal", @"longpoll", @"continuous"};
+ return [NSString stringWithFormat: @"_changes?feed=%@&heartbeat=300000&since=%u",
+ kModeNames[_mode],
+ _lastSequenceNumber];
+}
+
+- (NSURL*) changesFeedURL {
+ return [NSURL URLWithString: [NSString stringWithFormat: @"%@/%@",
+ _databaseURL.absoluteString, self.changesFeedPath]];
+}
- (NSString*) description {
- return [NSString stringWithFormat: @"%@[%@]", [self class], _database.relativePath];
+ return [NSString stringWithFormat: @"%@[%@]", [self class], self.databaseName];
}
+- (void)dealloc {
+ [self stop];
+ [_databaseURL release];
+ [super dealloc];
+}
-- (BOOL) start {
- NSAssert(!_trackingInput, @"Already started");
-
- NSURL* url = _database.URL;
- NSMutableString* request = [NSMutableString stringWithFormat:
- @"GET /%@/_changes?feed=continuous&heartbeat=300000&since=%u HTTP/1.1\r\n"
- @"Host: %@\r\n",
- _database.relativePath, _lastSequenceNumber, url.host];
- NSURLCredential* credential = [_database credentialForOperation: nil];
- if (credential) {
- NSString* auth = [NSString stringWithFormat: @"%@:%@",
- credential.user, credential.password];
- auth = [RESTBody base64WithData: [auth dataUsingEncoding: NSUTF8StringEncoding]];
- [request appendFormat: @"Authorization: Basic %@\r\n", auth];
- }
- COUCHLOG2(@"%@: Starting with request:\n%@", self, request);
- [request appendString: @"\r\n"];
- _trackingRequest = [request copy];
-
- /* Why are we using raw TCP streams rather than NSURLConnection? Good question.
- NSURLConnection seems to have some kind of bug with reading the output of _changes, maybe
- because it's chunked and the stream doesn't close afterwards. At any rate, at least on
- OS X 10.6.7, the delegate never receives any notification of a response. The workaround
- is to act as a dumb HTTP parser and do the job ourselves. */
-
-#if TARGET_OS_IPHONE
- CFReadStreamRef cfInputStream = NULL;
- CFWriteStreamRef cfOutputStream = NULL;
- CFStreamCreatePairWithSocketToHost(NULL, (CFStringRef)url.host, url.port.intValue,
- &cfInputStream, &cfOutputStream);
- if (!cfInputStream)
- return NO;
- _trackingInput = (NSInputStream*)cfInputStream;
- _trackingOutput = (NSOutputStream*)cfOutputStream;
-#else
- [NSStream getStreamsToHost: [NSHost hostWithName: url.host]
- port: url.port.intValue
- inputStream: &_trackingInput outputStream: &_trackingOutput];
- if (!_trackingOutput)
- return NO;
- [_trackingInput retain];
- [_trackingOutput retain];
-#endif
-
- _state = kStateStatus;
-
- _inputBuffer = [[NSMutableData alloc] initWithCapacity: 1024];
-
- [_trackingOutput setDelegate: self];
- [_trackingOutput scheduleInRunLoop: [NSRunLoop currentRunLoop] forMode: NSRunLoopCommonModes];
- [_trackingOutput open];
- [_trackingInput setDelegate: self];
- [_trackingInput scheduleInRunLoop: [NSRunLoop currentRunLoop] forMode: NSRunLoopCommonModes];
- [_trackingInput open];
- return YES;
+- (NSURLCredential*) authCredential {
+ if ([_client respondsToSelector: @selector(authCredential)])
+ return _client.authCredential;
+ else
+ return nil;
}
+- (BOOL) start {
+ return NO;
+}
- (void) stop {
- COUCHLOG2(@"%@: stop", self);
- [_trackingInput close];
- [_trackingInput release];
- _trackingInput = nil;
-
- [_trackingOutput close];
- [_trackingOutput release];
- _trackingOutput = nil;
-
- [_inputBuffer release];
- _inputBuffer = nil;
+ [self stopped];
}
+- (void) stopped {
+ if ([_client respondsToSelector: @selector(changeTrackerStopped:)])
+ [_client changeTrackerStopped: self];
+}
-- (BOOL) readLine {
- const char* start = _inputBuffer.bytes;
- const char* crlf = strnstr(start, "\r\n", _inputBuffer.length);
- if (!crlf)
- return NO; // Wait till we have a complete line
- ptrdiff_t lineLength = crlf - start;
- NSString* line = [[[NSString alloc] initWithBytes: start
- length: lineLength
- encoding: NSUTF8StringEncoding] autorelease];
- COUCHLOG3(@"%@: LINE: \"%@\"", self, line);
- if (line) {
- switch (_state) {
- case kStateStatus: {
- // Read the HTTP response status line:
- if (![line hasPrefix: @"HTTP/1.1 200 "]) {
- Warn(@"_changes response: %@", line);
- [self stop];
- return NO;
- }
- _state = kStateHeaders;
- break;
- }
- case kStateHeaders:
- if (line.length == 0) {
- _state = kStateChunks;
- _retryCount = 0; // successful connection
- }
- break;
- case kStateChunks: {
- if (line.length == 0)
- break; // There's an empty line between chunks
- NSScanner* scanner = [NSScanner scannerWithString: line];
- unsigned chunkLength;
- if (![scanner scanHexInt: &chunkLength]) {
- Warn(@"Failed to parse _changes chunk length '%@'", line);
- [self stop];
- return NO;
- }
- if (_inputBuffer.length < lineLength + 2 + chunkLength)
- return NO; // Don't read the chunk till it's complete
-
- NSData* chunk = [_inputBuffer subdataWithRange: NSMakeRange(lineLength + 2,
- chunkLength)];
- [_inputBuffer replaceBytesInRange: NSMakeRange(0, lineLength + 2 + chunkLength)
- withBytes: NULL length: 0];
- // Finally! Send the line to the database to parse:
- [_database receivedChangeLine: chunk];
- return YES;
- }
- }
- } else {
- Warn(@"Couldn't read line from _changes");
- }
-
- // Remove the parsed line:
- [_inputBuffer replaceBytesInRange: NSMakeRange(0, lineLength + 2) withBytes: NULL length: 0];
+- (BOOL) receivedChange: (NSDictionary*)change {
+ if (![change isKindOfClass: [NSDictionary class]])
+ return NO;
+ id seq = [change objectForKey: @"seq"];
+ if (!seq)
+ return NO;
+ [_client changeTrackerReceivedChange: change];
+ _lastSequenceNumber = [seq intValue];
return YES;
}
-
-- (void) errorOccurred: (NSError*)error {
- [self stop];
- if (++_retryCount <= kMaxRetries) {
- NSTimeInterval retryDelay = 0.2 * (1 << (_retryCount-1));
- [self performSelector: @selector(start) withObject: nil afterDelay: retryDelay];
- } else {
- Warn(@"%@: Can't connect, giving up: %@", error);
+- (void) receivedChunk: (NSData*)chunk {
+ NSString* line = [[[NSString alloc] initWithData: chunk encoding:NSUTF8StringEncoding]
+ autorelease];
+ if (!line) {
+ Warn(@"Couldn't parse UTF-8 from _changes");
+ return;
}
+ if (line.length == 0 || [line isEqualToString: @"\n"])
+ return;
+ if (![self receivedChange: [RESTBody JSONObjectWithString: line]])
+ Warn(@"Received unparseable change line from server: %@", line);
}
-
-- (void) stream: (NSInputStream*)stream handleEvent: (NSStreamEvent)eventCode {
- switch (eventCode) {
- case NSStreamEventHasSpaceAvailable: {
- COUCHLOG3(@"%@: HasSpaceAvailable %@", self, stream);
- if (_trackingRequest) {
- const char* buffer = [_trackingRequest UTF8String];
- NSUInteger written = [(NSOutputStream*)stream write: (void*)buffer maxLength: strlen(buffer)];
- NSAssert(written == strlen(buffer), @"Output stream didn't write entire request");
- // FIX: It's unlikely but possible that the stream won't take the entire request; need to
- // write the rest later.
- [_trackingRequest release];
- _trackingRequest = nil;
- }
- break;
- }
- case NSStreamEventHasBytesAvailable: {
- COUCHLOG3(@"%@: HasBytesAvailable %@", self, stream);
- while ([stream hasBytesAvailable]) {
- uint8_t buffer[1024];
- NSInteger bytesRead = [stream read: buffer maxLength: sizeof(buffer)];
- if (bytesRead > 0) {
- [_inputBuffer appendBytes: buffer length: bytesRead];
- COUCHLOG3(@"%@: read %ld bytes", self, (long)bytesRead);
- }
- }
- while (_inputBuffer && [self readLine])
- ;
- break;
- }
- case NSStreamEventEndEncountered:
- COUCHLOG(@"%@: EndEncountered %@", self, stream);
- if (_inputBuffer.length > 0)
- Warn(@"%@ connection closed with unparsed data in buffer", self);
- [self stop];
- break;
- case NSStreamEventErrorOccurred:
- COUCHLOG(@"%@: ErrorOccurred %@: %@", self, stream, stream.streamError);
- [self errorOccurred: stream.streamError];
- break;
-
- default:
- COUCHLOG3(@"%@: Event %lx on %@", self, (long)eventCode, stream);
- break;
+- (BOOL) receivedPollResponse: (NSData*)body {
+ if (!body)
+ return NO;
+ NSDictionary* changeDict = $castIf(NSDictionary,
+ [RESTBody JSONObjectWithData: body]);
+ NSArray* changes = $castIf(NSArray, [changeDict objectForKey: @"results"]);
+ if (!changes)
+ return NO;
+ for (NSDictionary* change in changes) {
+ if (![self receivedChange: change])
+ return NO;
}
+ return YES;
}
-
@end
View
22 Couch/CouchConnectionChangeTracker.h
@@ -0,0 +1,22 @@
+//
+// CouchConnectionChangeTracker.h
+// CouchCocoa
+//
+// Created by Jens Alfke on 12/1/11.
+// Copyright (c) 2011 Couchbase, Inc. All rights reserved.
+//
+
+#import "CouchChangeTracker.h"
+
+
+/** CouchChangeTracker that uses a regular NSURLConnection.
+ This unfortunately doesn't work with regular CouchDB in continuous mode, apparently due to some bug in CFNetwork. */
+@interface CouchConnectionChangeTracker : CouchChangeTracker
+{
+ @private
+ NSURLConnection* _connection;
+ int _status;
+ NSMutableData* _inputBuffer;
+}
+
+@end
View
125 Couch/CouchConnectionChangeTracker.m
@@ -0,0 +1,125 @@
+//
+// CouchConnectionChangeTracker.m
+// CouchCocoa
+//
+// Created by Jens Alfke on 12/1/11.
+// Copyright (c) 2011 Couchbase, Inc. All rights reserved.
+//
+// <http://wiki.apache.org/couchdb/HTTP_database_API#Changes>
+
+#import "CouchConnectionChangeTracker.h"
+#import "CouchInternal.h"
+
+
+@implementation CouchConnectionChangeTracker
+
+- (BOOL) start {
+ // For some reason continuous mode doesn't work with CFNetwork.
+ if (_mode == kContinuous)
+ _mode = kLongPoll;
+
+ _inputBuffer = [[NSMutableData alloc] init];
+
+ NSMutableURLRequest* request = [NSMutableURLRequest requestWithURL: self.changesFeedURL];
+ request.cachePolicy = NSURLRequestReloadIgnoringCacheData;
+ request.timeoutInterval = 6.02e23;
+
+ _connection = [[NSURLConnection connectionWithRequest: request delegate: self] retain];
+ [_connection start];
+ COUCHLOG(@"%@: Started... <%@>", self, request.URL);
+ return YES;
+}
+
+
+- (void) clearConnection {
+ [_connection autorelease];
+ _connection = nil;
+ [_inputBuffer release];
+ _inputBuffer = nil;
+ _status = 0;
+}
+
+
+- (void) stopped {
+ COUCHLOG2(@"%@: Stopped", self);
+ [self clearConnection];
+ [super stopped];
+}
+
+
+- (void) stop {
+ [_connection cancel];
+ [super stop];
+}
+
+
+- (void)connection:(NSURLConnection *)connection
+ didReceiveAuthenticationChallenge:(NSURLAuthenticationChallenge *)challenge
+{
+ COUCHLOG2(@"%@: didReceiveAuthenticationChallenge", self);
+ if (challenge.previousFailureCount == 0) {
+ NSURLCredential* credential = self.authCredential;
+ if (credential) {
+ [challenge.sender useCredential: credential forAuthenticationChallenge: challenge];
+ return;
+ }
+ }
+ // give up
+ [challenge.sender cancelAuthenticationChallenge: challenge];
+}
+
+- (void)connection:(NSURLConnection *)connection didReceiveResponse:(NSURLResponse *)response {
+ _status = (int) ((NSHTTPURLResponse*)response).statusCode;
+ COUCHLOG3(@"%@: Got response, status %d", self, _status);
+ if (_status >= 300) {
+ Warn(@"%@: Got status %i", self, _status);
+ [self stop];
+ }
+}
+
+- (void)connection:(NSURLConnection *)connection didReceiveData:(NSData *)data {
+ COUCHLOG3(@"%@: Got %lu bytes", self, (unsigned long)data.length);
+ [_inputBuffer appendData: data];
+
+ if (_mode == kContinuous) {
+ // In continuous mode, break input into lines and parse each as JSON:
+ for (;;) {
+ const char* start = _inputBuffer.bytes;
+ const char* eol = strnstr(start, "\n", _inputBuffer.length);
+ if (!eol)
+ break; // Wait till we have a complete line
+ ptrdiff_t lineLength = eol - start;
+ NSData* chunk = [[[NSData alloc] initWithBytes: start
+ length: lineLength] autorelease];
+ [_inputBuffer replaceBytesInRange: NSMakeRange(0, lineLength + 1)
+ withBytes: NULL length: 0];
+ // Finally! Send the line to the database to parse:
+ [self receivedChunk: chunk];
+ }
+ }
+}
+
+- (void)connection:(NSURLConnection *)connection didFailWithError:(NSError *)error {
+ Warn(@"%@: Got error %@", self, error);
+ [self stopped];
+}
+
+- (void)connectionDidFinishLoading:(NSURLConnection *)connection {
+ if (_mode != kContinuous) {
+ int status = _status;
+ NSData* input = [_inputBuffer retain];
+ COUCHLOG3(@"%@: Got entire body, %u bytes", self, (unsigned)input.length);
+ BOOL responseOK = [self receivedPollResponse: input];
+ [input release];
+
+ [self clearConnection];
+ if (_mode == kLongPoll && status == 200 && responseOK)
+ [self start]; // Next poll...
+ else
+ [self stopped];
+ } else {
+ [self stopped];
+ }
+}
+
+@end
View
33 Couch/CouchDatabase.m
@@ -26,7 +26,7 @@
static const NSUInteger kDocRetainLimit = 50;
-@interface CouchDatabase ()
+@interface CouchDatabase () <CouchChangeTrackerClient>
- (void) processDeferredChanges;
@end
@@ -352,7 +352,6 @@ - (NSUInteger) lastSequenceNumber {
- (void) setLastSequenceNumber:(NSUInteger)lastSequenceNumber {
_lastSequenceNumber = lastSequenceNumber;
_lastSequenceNumberKnown = YES;
- _tracker.lastSequenceNumber = _lastSequenceNumber;
}
@@ -366,8 +365,8 @@ - (void) onChange: (OnDatabaseChangeBlock)block {
}
-- (void) receivedChange: (NSDictionary*)change
-{
+// Part of <CouchChangeTrackerClient> protocol
+- (void) changeTrackerReceivedChange: (NSDictionary*)change {
// Get & check sequence number:
NSNumber* sequenceObj = $castIf(NSNumber, [change objectForKey: @"seq"]);
if (!sequenceObj)
@@ -425,26 +424,14 @@ - (void) processDeferredChanges {
_deferredChanges = nil;
for (NSDictionary* change in changes) {
- [self receivedChange: change];
+ [self changeTrackerReceivedChange: change];
}
}
-- (void) receivedChangeLine: (NSData*)chunk {
- NSString* line = [[[NSString alloc] initWithData: chunk encoding:NSUTF8StringEncoding]
- autorelease];
- if (!line) {
- Warn(@"Couldn't parse UTF-8 from _changes");
- return;
- }
- if (line.length == 0 || [line isEqualToString: @"\n"])
- return;
- NSDictionary* change = $castIf(NSDictionary, [RESTBody JSONObjectWithString: line]);
- if (change) {
- [self receivedChange: change];
- } else {
- Warn(@"Received unparseable change line from server: %@", line);
- }
+// Part of <CouchChangeTrackerClient> protocol
+- (NSURLCredential*) authCredential {
+ return [self credentialForOperation: nil];
}
@@ -455,8 +442,10 @@ - (BOOL) tracksChanges {
- (void) setTracksChanges: (BOOL)track {
if (track && !_tracker) {
- _tracker = [[CouchChangeTracker alloc] initWithDatabase: self];
- _tracker.lastSequenceNumber = self.lastSequenceNumber;
+ _tracker = [[CouchChangeTracker alloc] initWithDatabaseURL: self.URL
+ mode: kContinuous
+ lastSequence: self.lastSequenceNumber
+ client: self];
[_tracker start];
} else if (!track && _tracker) {
[_tracker stop];
View
1 Couch/CouchInternal.h
@@ -35,7 +35,6 @@ typedef void (^OnDatabaseChangeBlock)(CouchDocument*, BOOL externalChange);
@interface CouchDatabase ()
- (void) documentAssignedID: (CouchDocument*)document;
-- (void) receivedChangeLine: (NSData*)chunk;
- (void) beginDocumentOperation: (CouchResource*)resource;
- (void) endDocumentOperation: (CouchResource*)resource;
- (void) onChange: (OnDatabaseChangeBlock)block; // convenience for unit tests
View
24 Couch/CouchSocketChangeTracker.h
@@ -0,0 +1,24 @@
+//
+// CouchSocketChangeTracker.h
+// CouchCocoa
+//
+// Created by Jens Alfke on 12/2/11.
+// Copyright (c) 2011 Couchbase, Inc. All rights reserved.
+//
+
+#import "CouchChangeTracker.h"
+
+
+/** CouchChangeTracker implementation that uses a raw TCP socket to read the chunk-mode HTTP response. */
+@interface CouchSocketChangeTracker : CouchChangeTracker
+{
+ @private
+ NSInputStream* _trackingInput;
+ NSOutputStream* _trackingOutput;
+ NSString* _trackingRequest;
+ int _retryCount;
+
+ NSMutableData* _inputBuffer;
+ int _state;
+}
+@end
View
221 Couch/CouchSocketChangeTracker.m
@@ -0,0 +1,221 @@
+//
+// CouchSocketChangeTracker.m
+// CouchCocoa
+//
+// Created by Jens Alfke on 12/2/11.
+// Copyright (c) 2011 Couchbase, Inc. All rights reserved.
+//
+// <http://wiki.apache.org/couchdb/HTTP_database_API#Changes>
+
+#import "CouchSocketChangeTracker.h"
+#import "RESTBody.h"
+#import "CouchInternal.h"
+
+
+enum {
+ kStateStatus,
+ kStateHeaders,
+ kStateChunks
+};
+
+#define kMaxRetries 7
+
+
+@implementation CouchSocketChangeTracker
+
+- (BOOL) start {
+ NSAssert(!_trackingInput, @"Already started");
+ NSAssert(_mode == kContinuous, @"CouchSocketChangeTracker only supports continuous mode");
+
+ NSMutableString* request = [NSMutableString stringWithFormat:
+ @"GET /%@/%@ HTTP/1.1\r\n"
+ @"Host: %@\r\n",
+ self.databaseName, self.changesFeedPath, _databaseURL.host];
+ NSURLCredential* credential = self.authCredential;
+ if (credential) {
+ NSString* auth = [NSString stringWithFormat: @"%@:%@",
+ credential.user, credential.password];
+ auth = [RESTBody base64WithData: [auth dataUsingEncoding: NSUTF8StringEncoding]];
+ [request appendFormat: @"Authorization: Basic %@\r\n", auth];
+ }
+ COUCHLOG2(@"%@: Starting with request:\n%@", self, request);
+ [request appendString: @"\r\n"];
+ _trackingRequest = [request copy];
+
+ /* Why are we using raw TCP streams rather than NSURLConnection? Good question.
+ NSURLConnection seems to have some kind of bug with reading the output of _changes, maybe
+ because it's chunked and the stream doesn't close afterwards. At any rate, at least on
+ OS X 10.6.7, the delegate never receives any notification of a response. The workaround
+ is to act as a dumb HTTP parser and do the job ourselves. */
+
+#if TARGET_OS_IPHONE
+ CFReadStreamRef cfInputStream = NULL;
+ CFWriteStreamRef cfOutputStream = NULL;
+ CFStreamCreatePairWithSocketToHost(NULL,
+ (CFStringRef)_databaseURL.host,
+ _databaseURL.port.intValue,
+ &cfInputStream, &cfOutputStream);
+ if (!cfInputStream)
+ return NO;
+ _trackingInput = (NSInputStream*)cfInputStream;
+ _trackingOutput = (NSOutputStream*)cfOutputStream;
+#else
+ [NSStream getStreamsToHost: [NSHost hostWithName: _databaseURL.host]
+ port: _databaseURL.port.intValue
+ inputStream: &_trackingInput outputStream: &_trackingOutput];
+ if (!_trackingOutput)
+ return NO;
+ [_trackingInput retain];
+ [_trackingOutput retain];
+#endif
+
+ _state = kStateStatus;
+
+ _inputBuffer = [[NSMutableData alloc] initWithCapacity: 1024];
+
+ [_trackingOutput setDelegate: self];
+ [_trackingOutput scheduleInRunLoop: [NSRunLoop currentRunLoop] forMode: NSRunLoopCommonModes];
+ [_trackingOutput open];
+ [_trackingInput setDelegate: self];
+ [_trackingInput scheduleInRunLoop: [NSRunLoop currentRunLoop] forMode: NSRunLoopCommonModes];
+ [_trackingInput open];
+ return YES;
+}
+
+
+- (void) stop {
+ COUCHLOG2(@"%@: stop", self);
+ [_trackingInput close];
+ [_trackingInput release];
+ _trackingInput = nil;
+
+ [_trackingOutput close];
+ [_trackingOutput release];
+ _trackingOutput = nil;
+
+ [_inputBuffer release];
+ _inputBuffer = nil;
+
+ [super stop];
+}
+
+
+- (BOOL) readLine {
+ const char* start = _inputBuffer.bytes;
+ const char* crlf = strnstr(start, "\r\n", _inputBuffer.length);
+ if (!crlf)
+ return NO; // Wait till we have a complete line
+ ptrdiff_t lineLength = crlf - start;
+ NSString* line = [[[NSString alloc] initWithBytes: start
+ length: lineLength
+ encoding: NSUTF8StringEncoding] autorelease];
+ COUCHLOG3(@"%@: LINE: \"%@\"", self, line);
+ if (line) {
+ switch (_state) {
+ case kStateStatus: {
+ // Read the HTTP response status line:
+ if (![line hasPrefix: @"HTTP/1.1 200 "]) {
+ Warn(@"_changes response: %@", line);
+ [self stop];
+ return NO;
+ }
+ _state = kStateHeaders;
+ break;
+ }
+ case kStateHeaders:
+ if (line.length == 0) {
+ _state = kStateChunks;
+ _retryCount = 0; // successful connection
+ }
+ break;
+ case kStateChunks: {
+ if (line.length == 0)
+ break; // There's an empty line between chunks
+ NSScanner* scanner = [NSScanner scannerWithString: line];
+ unsigned chunkLength;
+ if (![scanner scanHexInt: &chunkLength]) {
+ Warn(@"Failed to parse _changes chunk length '%@'", line);
+ [self stop];
+ return NO;
+ }
+ if (_inputBuffer.length < (size_t)lineLength + 2 + chunkLength)
+ return NO; // Don't read the chunk till it's complete
+
+ NSData* chunk = [_inputBuffer subdataWithRange: NSMakeRange(lineLength + 2,
+ chunkLength)];
+ [_inputBuffer replaceBytesInRange: NSMakeRange(0, lineLength + 2 + chunkLength)
+ withBytes: NULL length: 0];
+ // Finally! Send the line to the database to parse:
+ [self receivedChunk: chunk];
+ return YES;
+ }
+ }
+ } else {
+ Warn(@"Couldn't read line from _changes");
+ }
+
+ // Remove the parsed line:
+ [_inputBuffer replaceBytesInRange: NSMakeRange(0, lineLength + 2) withBytes: NULL length: 0];
+ return YES;
+}
+
+
+- (void) errorOccurred: (NSError*)error {
+ [self stop];
+ if (++_retryCount <= kMaxRetries) {
+ NSTimeInterval retryDelay = 0.2 * (1 << (_retryCount-1));
+ [self performSelector: @selector(start) withObject: nil afterDelay: retryDelay];
+ } else {
+ Warn(@"%@: Can't connect, giving up: %@", self, error);
+ }
+}
+
+
+- (void) stream: (NSInputStream*)stream handleEvent: (NSStreamEvent)eventCode {
+ switch (eventCode) {
+ case NSStreamEventHasSpaceAvailable: {
+ COUCHLOG3(@"%@: HasSpaceAvailable %@", self, stream);
+ if (_trackingRequest) {
+ const char* buffer = [_trackingRequest UTF8String];
+ NSUInteger written = [(NSOutputStream*)stream write: (void*)buffer maxLength: strlen(buffer)];
+ NSAssert(written == strlen(buffer), @"Output stream didn't write entire request");
+ // FIX: It's unlikely but possible that the stream won't take the entire request; need to
+ // write the rest later.
+ [_trackingRequest release];
+ _trackingRequest = nil;
+ }
+ break;
+ }
+ case NSStreamEventHasBytesAvailable: {
+ COUCHLOG3(@"%@: HasBytesAvailable %@", self, stream);
+ while ([stream hasBytesAvailable]) {
+ uint8_t buffer[1024];
+ NSInteger bytesRead = [stream read: buffer maxLength: sizeof(buffer)];
+ if (bytesRead > 0) {
+ [_inputBuffer appendBytes: buffer length: bytesRead];
+ COUCHLOG3(@"%@: read %ld bytes", self, (long)bytesRead);
+ }
+ }
+ while (_inputBuffer && [self readLine])
+ ;
+ break;
+ }
+ case NSStreamEventEndEncountered:
+ COUCHLOG(@"%@: EndEncountered %@", self, stream);
+ if (_inputBuffer.length > 0)
+ Warn(@"%@ connection closed with unparsed data in buffer", self);
+ [self stop];
+ break;
+ case NSStreamEventErrorOccurred:
+ COUCHLOG(@"%@: ErrorOccurred %@: %@", self, stream, stream.streamError);
+ [self errorOccurred: stream.streamError];
+ break;
+
+ default:
+ COUCHLOG3(@"%@: Event %lx on %@", self, (long)eventCode, stream);
+ break;
+ }
+}
+
+
+@end
View
36 CouchCocoa.xcodeproj/project.pbxproj
@@ -55,6 +55,14 @@
27938C63140C01DC00117675 /* CouchModel.h in Headers */ = {isa = PBXBuildFile; fileRef = 27DB82231408225300E57444 /* CouchModel.h */; settings = {ATTRIBUTES = (Public, ); }; };
2795994F140A02DB001C168A /* Test_Model.m in Sources */ = {isa = PBXBuildFile; fileRef = 2795994E140A02DB001C168A /* Test_Model.m */; };
27959950140A02DB001C168A /* Test_Model.m in Sources */ = {isa = PBXBuildFile; fileRef = 2795994E140A02DB001C168A /* Test_Model.m */; };
+ 279906D2149930DA003D4338 /* CouchConnectionChangeTracker.h in Headers */ = {isa = PBXBuildFile; fileRef = 279906CE149930DA003D4338 /* CouchConnectionChangeTracker.h */; };
+ 279906D3149930DA003D4338 /* CouchConnectionChangeTracker.h in Headers */ = {isa = PBXBuildFile; fileRef = 279906CE149930DA003D4338 /* CouchConnectionChangeTracker.h */; };
+ 279906D4149930DA003D4338 /* CouchConnectionChangeTracker.m in Sources */ = {isa = PBXBuildFile; fileRef = 279906CF149930DA003D4338 /* CouchConnectionChangeTracker.m */; };
+ 279906D5149930DA003D4338 /* CouchConnectionChangeTracker.m in Sources */ = {isa = PBXBuildFile; fileRef = 279906CF149930DA003D4338 /* CouchConnectionChangeTracker.m */; };
+ 279906D6149930DA003D4338 /* CouchSocketChangeTracker.h in Headers */ = {isa = PBXBuildFile; fileRef = 279906D0149930DA003D4338 /* CouchSocketChangeTracker.h */; };
+ 279906D7149930DA003D4338 /* CouchSocketChangeTracker.h in Headers */ = {isa = PBXBuildFile; fileRef = 279906D0149930DA003D4338 /* CouchSocketChangeTracker.h */; };
+ 279906D8149930DA003D4338 /* CouchSocketChangeTracker.m in Sources */ = {isa = PBXBuildFile; fileRef = 279906D1149930DA003D4338 /* CouchSocketChangeTracker.m */; };
+ 279906D9149930DA003D4338 /* CouchSocketChangeTracker.m in Sources */ = {isa = PBXBuildFile; fileRef = 279906D1149930DA003D4338 /* CouchSocketChangeTracker.m */; };
279CCBF213F9823F00C38C82 /* CouchReplication.h in Headers */ = {isa = PBXBuildFile; fileRef = 279CCBF013F9823F00C38C82 /* CouchReplication.h */; settings = {ATTRIBUTES = (Public, ); }; };
279CCBF313F9823F00C38C82 /* CouchReplication.h in Headers */ = {isa = PBXBuildFile; fileRef = 279CCBF013F9823F00C38C82 /* CouchReplication.h */; };
279CCBF413F9823F00C38C82 /* CouchReplication.m in Sources */ = {isa = PBXBuildFile; fileRef = 279CCBF113F9823F00C38C82 /* CouchReplication.m */; };
@@ -242,6 +250,10 @@
279276EC14215D5600002958 /* RESTBase64.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = RESTBase64.h; sourceTree = "<group>"; };
279276ED14215D5600002958 /* RESTBase64.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = RESTBase64.m; sourceTree = "<group>"; };
2795994E140A02DB001C168A /* Test_Model.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = Test_Model.m; sourceTree = "<group>"; };
+ 279906CE149930DA003D4338 /* CouchConnectionChangeTracker.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = CouchConnectionChangeTracker.h; sourceTree = "<group>"; };
+ 279906CF149930DA003D4338 /* CouchConnectionChangeTracker.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = CouchConnectionChangeTracker.m; sourceTree = "<group>"; };
+ 279906D0149930DA003D4338 /* CouchSocketChangeTracker.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = CouchSocketChangeTracker.h; sourceTree = "<group>"; };
+ 279906D1149930DA003D4338 /* CouchSocketChangeTracker.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = CouchSocketChangeTracker.m; sourceTree = "<group>"; };
279CCBF013F9823F00C38C82 /* CouchReplication.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = CouchReplication.h; sourceTree = "<group>"; };
279CCBF113F9823F00C38C82 /* CouchReplication.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = CouchReplication.m; sourceTree = "<group>"; };
27A577A613970959002776DB /* DemoQuery.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = DemoQuery.h; sourceTree = "<group>"; };
@@ -403,6 +415,7 @@
27E4DD0A141921E000A3D8F6 /* CouchPersistentReplication.h */,
27E4DD0B141921E000A3D8F6 /* CouchPersistentReplication.m */,
270A663C13A5B3DF00791F4A /* CouchCocoa.h */,
+ 279906DB149930E3003D4338 /* ChangeTracker */,
27333BCB13B7E70100EF5A10 /* Internal */,
);
path = Couch;
@@ -456,8 +469,6 @@
isa = PBXGroup;
children = (
27333BCA13B7E61700EF5A10 /* CouchInternal.h */,
- 2781244413AFA6CD0051A99D /* CouchChangeTracker.h */,
- 2781244513AFA6CD0051A99D /* CouchChangeTracker.m */,
274EB8AA14479E7B001B7DD0 /* CouchbaseMobile.h */,
27D083B7143FBEEA0067702F /* CouchbaseCallbacks.h */,
27CDEC3913C6806400C979BB /* CouchPrefix.pch */,
@@ -493,6 +504,19 @@
path = REST;
sourceTree = "<group>";
};
+ 279906DB149930E3003D4338 /* ChangeTracker */ = {
+ isa = PBXGroup;
+ children = (
+ 2781244413AFA6CD0051A99D /* CouchChangeTracker.h */,
+ 2781244513AFA6CD0051A99D /* CouchChangeTracker.m */,
+ 279906CE149930DA003D4338 /* CouchConnectionChangeTracker.h */,
+ 279906CF149930DA003D4338 /* CouchConnectionChangeTracker.m */,
+ 279906D0149930DA003D4338 /* CouchSocketChangeTracker.h */,
+ 279906D1149930DA003D4338 /* CouchSocketChangeTracker.m */,
+ );
+ name = ChangeTracker;
+ sourceTree = "<group>";
+ };
27A57A51139751B2002776DB /* JSONKit */ = {
isa = PBXGroup;
children = (
@@ -591,6 +615,8 @@
27CB654D143A746700EEA1F2 /* CouchDesignDocument_Embedded.h in Headers */,
27D083B9143FBEEA0067702F /* CouchbaseCallbacks.h in Headers */,
274EB8A514479D72001B7DD0 /* CouchEmbeddedServer.h in Headers */,
+ 279906D3149930DA003D4338 /* CouchConnectionChangeTracker.h in Headers */,
+ 279906D7149930DA003D4338 /* CouchSocketChangeTracker.h in Headers */,
);
runOnlyForDeploymentPostprocessing = 0;
};
@@ -620,6 +646,8 @@
27CDEC3613C67D3500C979BB /* CouchCocoa.h in Headers */,
279276EE14215D5600002958 /* RESTBase64.h in Headers */,
27D083B8143FBEEA0067702F /* CouchbaseCallbacks.h in Headers */,
+ 279906D2149930DA003D4338 /* CouchConnectionChangeTracker.h in Headers */,
+ 279906D6149930DA003D4338 /* CouchSocketChangeTracker.h in Headers */,
);
runOnlyForDeploymentPostprocessing = 0;
};
@@ -968,6 +996,8 @@
279276F114215D5600002958 /* RESTBase64.m in Sources */,
27CB654F143A746700EEA1F2 /* CouchDesignDocument_Embedded.m in Sources */,
274EB8A714479D72001B7DD0 /* CouchEmbeddedServer.m in Sources */,
+ 279906D5149930DA003D4338 /* CouchConnectionChangeTracker.m in Sources */,
+ 279906D9149930DA003D4338 /* CouchSocketChangeTracker.m in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};
@@ -1019,6 +1049,8 @@
279276F014215D5600002958 /* RESTBase64.m in Sources */,
27CB654E143A746700EEA1F2 /* CouchDesignDocument_Embedded.m in Sources */,
274EB8A614479D72001B7DD0 /* CouchEmbeddedServer.m in Sources */,
+ 279906D4149930DA003D4338 /* CouchConnectionChangeTracker.m in Sources */,
+ 279906D8149930DA003D4338 /* CouchSocketChangeTracker.m in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};

0 comments on commit 8713642

Please sign in to comment.