Permalink
Browse files

The return of TDSocketChangeTracker

* Fixes #177: Using it instead of TDConnectionChangeTracker to get around running out of sockets with 5+ replications active at once. (Also fixes #124 since the hostname-munging workaround is no longer needed.)
* It now uses CFReadStreamCreateForHTTPRequest.
* It now supports one-shot and long-poll modes, not just continuous.
  • Loading branch information...
1 parent a4e35e5 commit c2fc5fea62e56247f60b94aeaf7a51a26bf96658 @snej snej committed Oct 26, 2012
@@ -28,12 +28,12 @@
typedef enum TDChangeTrackerMode {
kOneShot,
- kLongPoll
+ kLongPoll,
+ kContinuous
} TDChangeTrackerMode;
-/** Reads the continuous-mode _changes feed of a database, and sends the individual change entries to its client's -changeTrackerReceivedChange:.
- This class is abstract. Instantiate TDConnectionChangeTracker instead. */
+/** Reads the continuous-mode _changes feed of a database, and sends the individual change entries to its client. */
@interface TDChangeTracker : NSObject <NSStreamDelegate>
{
@protected
@@ -86,6 +86,8 @@ typedef enum TDChangeTrackerMode {
- (void) setUpstreamError: (NSString*)message;
- (void) failedWithError: (NSError*)error;
- (NSInteger) receivedPollResponse: (NSData*)body errorMessage: (NSString**)errorMessage;
+- (BOOL) receivedChanges: (NSArray*)changes errorMessage: (NSString**)errorMessage;
+- (BOOL) receivedChange: (NSDictionary*)change;
- (void) stopped; // override this
@end
@@ -16,7 +16,7 @@
// <http://wiki.apache.org/couchdb/HTTP_database_API#Changes>
#import "TDChangeTracker.h"
-#import "TDConnectionChangeTracker.h"
+#import "TDSocketChangeTracker.h"
#import "TDAuthorizer.h"
#import "TDMisc.h"
#import "TDStatus.h"
@@ -28,9 +28,6 @@
#define kMaxRetryDelay 300.0 // ...but will never get longer than this
-static NSURL* AddDotToURLHost( NSURL* url );
-
-
@interface TDChangeTracker ()
@property (readwrite, copy, nonatomic) id lastSequenceID;
@end
@@ -50,9 +47,17 @@ - (id)initWithDatabaseURL: (NSURL*)databaseURL
client: (id<TDChangeTrackerClient>)client {
NSParameterAssert(databaseURL);
NSParameterAssert(client);
- Assert([self class] != [TDChangeTracker class]); // abstract!
self = [super init];
if (self) {
+ if([self class] == [TDChangeTracker class]) {
+ // TDChangeTracker is abstract; instantiate a concrete subclass instead.
+ [self release];
+ return [[TDSocketChangeTracker alloc] initWithDatabaseURL: databaseURL
+ mode: mode
+ conflicts: includeConflicts
+ lastSequence: lastSequenceID
+ client: client];
+ }
_databaseURL = [databaseURL retain];
_client = client;
_mode = mode;
@@ -91,13 +96,7 @@ - (NSString*) changesFeedPath {
}
- (NSURL*) changesFeedURL {
- // Really ugly workaround for CFNetwork, to make sure that long-running connections like these
- // don't end up using the same socket pool as regular connections to the same host; otherwise
- // the regular connections can get stuck indefinitely behind a long-running one.
- // (This substitution appends a "." to the host name, if it didn't already end with one.)
- NSURL* url = AddDotToURLHost(_databaseURL);
-
- NSMutableString* urlStr = [[url.absoluteString mutableCopy] autorelease];
+ NSMutableString* urlStr = [[_databaseURL.absoluteString mutableCopy] autorelease];
if (![urlStr hasSuffix: @"/"])
[urlStr appendString: @"/"];
[urlStr appendString: self.changesFeedPath];
@@ -185,6 +184,27 @@ - (BOOL) receivedChange: (NSDictionary*)change {
return YES;
}
+- (BOOL) receivedChanges: (NSArray*)changes errorMessage: (NSString**)errorMessage {
+ if ([_client respondsToSelector: @selector(changeTrackerReceivedChanges:)]) {
+ [_client changeTrackerReceivedChanges: changes];
+ if (changes.count > 0)
+ self.lastSequenceID = [[changes lastObject] objectForKey: @"seq"];
+ } else {
+ for (NSDictionary* change in changes) {
+ if (![self receivedChange: change]) {
+ if (errorMessage) {
+ *errorMessage = $sprintf(@"Invalid change object: %@",
+ [TDJSON stringWithJSONObject: change
+ options:TDJSONWritingAllowFragments
+ error: nil]);
+ }
+ return NO;
+ }
+ }
+ }
+ return YES;
+}
+
- (NSInteger) receivedPollResponse: (NSData*)body errorMessage: (NSString**)errorMessage {
if (!body) {
*errorMessage = @"No body in response";
@@ -202,71 +222,9 @@ - (NSInteger) receivedPollResponse: (NSData*)body errorMessage: (NSString**)erro
*errorMessage = @"No 'changes' array in response";
return -1;
}
-
- if ([_client respondsToSelector: @selector(changeTrackerReceivedChanges:)]) {
- [_client changeTrackerReceivedChanges: changes];
- if (changes.count > 0)
- self.lastSequenceID = [[changes lastObject] objectForKey: @"seq"];
- } else {
- for (NSDictionary* change in changes) {
- if (![self receivedChange: change]) {
- *errorMessage = $sprintf(@"Invalid change object: %@",
- [TDJSON stringWithJSONObject: change
- options:TDJSONWritingAllowFragments
- error: nil]);
- return -1;
- }
- }
- }
+ if (![self receivedChanges: changes errorMessage: errorMessage])
+ return -1;
return changes.count;
}
@end
-
-
-static NSURL* AddDotToURLHost( NSURL* url ) {
- CAssert(url);
- UInt8 urlBytes[1024];
- CFIndex nBytes = CFURLGetBytes((CFURLRef)url, urlBytes, sizeof(urlBytes) - 1);
- if (nBytes > 0) {
- CFRange range;
- CFURLGetByteRangeForComponent((CFURLRef)url, kCFURLComponentHost, &range);
- if (range.length >= 2) {
- CFIndex end = range.location + range.length - 1;
- if (urlBytes[end] == '/' || urlBytes[end] == ':')
- --end;
- if (isalpha(urlBytes[end])) {
- // Alright, insert the '.' after end:
- memmove(&urlBytes[end+2], &urlBytes[end+1], nBytes - end);
- urlBytes[end+1] = '.';
- NSURL* newURL = (id)(CFURLCreateWithBytes(NULL, urlBytes, nBytes + 1,
- kCFStringEncodingUTF8, NULL));
- if (newURL)
- url = [newURL autorelease];
- else
- Warn(@"AddDotToURLHost: Failed to add dot to <%@> -- result is <%.*s>",
- url, (int)nBytes+1, urlBytes);
- }
- }
- }
- return url;
-}
-
-
-#if DEBUG
-static NSString* addDot( NSString* urlStr ) {
- return AddDotToURLHost([NSURL URLWithString: urlStr]).absoluteString;
-}
-
-TestCase(AddDotToURLHost) {
- CAssertEqual(addDot(@"http://x/y"), @"http://x./y");
- CAssertEqual(addDot(@"http://foo.com"), @"http://foo.com.");
- CAssertEqual(addDot(@"http://foo.com/"), @"http://foo.com./");
- CAssertEqual(addDot(@"http://foo.com/bar"), @"http://foo.com./bar");
- CAssertEqual(addDot(@"http://foo.com:123/"), @"http://foo.com.:123/");
- CAssertEqual(addDot(@"http://user:pass@foo.com/"), @"http://user:pass@foo.com./");
- CAssertEqual(addDot(@"http://foo.com./"), @"http://foo.com./");
- CAssertEqual(addDot(@"http://localhost/"), @"http://localhost./");
- CAssertEqual(addDot(@"http://10.0.1.12/"), @"http://10.0.1.12/");
-}
-#endif
@@ -23,17 +23,26 @@
#import "MYURLUtils.h"
+static NSURL* AddDotToURLHost( NSURL* url );
static SecTrustRef CopyTrustWithPolicy(SecTrustRef trust, SecPolicyRef policy);
@implementation TDConnectionChangeTracker
+- (NSURL*) changesFeedURL {
+ // Really ugly workaround for CFNetwork, to make sure that long-running connections like these
+ // don't end up using the same socket pool as regular connections to the same host; otherwise
+ // the regular connections can get stuck indefinitely behind a long-running one.
+ // (This substitution appends a "." to the host name, if it didn't already end with one.)
+ return AddDotToURLHost([super changesFeedURL]);
+}
+
- (BOOL) start {
if(_connection)
return NO;
[super start];
_inputBuffer = [[NSMutableData alloc] init];
-
+
NSMutableURLRequest* request = [NSMutableURLRequest requestWithURL: self.changesFeedURL];
request.cachePolicy = NSURLRequestReloadIgnoringCacheData;
request.timeoutInterval = 6.02e23;
@@ -291,3 +300,51 @@ static SecTrustRef CopyTrustWithPolicy(SecTrustRef trust, SecPolicyRef policy) {
return trust;
#endif
}
+
+
+static NSURL* AddDotToURLHost( NSURL* url ) {
+ CAssert(url);
+ UInt8 urlBytes[1024];
+ CFIndex nBytes = CFURLGetBytes((CFURLRef)url, urlBytes, sizeof(urlBytes) - 1);
+ if (nBytes > 0) {
+ CFRange range;
+ CFURLGetByteRangeForComponent((CFURLRef)url, kCFURLComponentHost, &range);
+ if (range.length >= 2) {
+ CFIndex end = range.location + range.length - 1;
+ if (urlBytes[end] == '/' || urlBytes[end] == ':')
+ --end;
+ if (isalpha(urlBytes[end])) {
+ // Alright, insert the '.' after end:
+ memmove(&urlBytes[end+2], &urlBytes[end+1], nBytes - end);
+ urlBytes[end+1] = '.';
+ NSURL* newURL = (id)(CFURLCreateWithBytes(NULL, urlBytes, nBytes + 1,
+ kCFStringEncodingUTF8, NULL));
+ if (newURL)
+ url = [newURL autorelease];
+ else
+ Warn(@"AddDotToURLHost: Failed to add dot to <%@> -- result is <%.*s>",
+ url, (int)nBytes+1, urlBytes);
+ }
+ }
+ }
+ return url;
+}
+
+
+#if DEBUG
+static NSString* addDot( NSString* urlStr ) {
+ return AddDotToURLHost([NSURL URLWithString: urlStr]).absoluteString;
+}
+
+TestCase(AddDotToURLHost) {
+ CAssertEqual(addDot(@"http://x/y"), @"http://x./y");
+ CAssertEqual(addDot(@"http://foo.com"), @"http://foo.com.");
+ CAssertEqual(addDot(@"http://foo.com/"), @"http://foo.com./");
+ CAssertEqual(addDot(@"http://foo.com/bar"), @"http://foo.com./bar");
+ CAssertEqual(addDot(@"http://foo.com:123/"), @"http://foo.com.:123/");
+ CAssertEqual(addDot(@"http://user:pass@foo.com/"), @"http://user:pass@foo.com./");
+ CAssertEqual(addDot(@"http://foo.com./"), @"http://foo.com./");
+ CAssertEqual(addDot(@"http://localhost/"), @"http://localhost./");
+ CAssertEqual(addDot(@"http://10.0.1.12/"), @"http://10.0.1.12/");
+}
+#endif
@@ -0,0 +1,28 @@
+//
+// TDSocketChangeTracker.h
+// TouchDB
+//
+// Created by Jens Alfke on 12/2/11.
+// Copyright (c) 2011 Couchbase, Inc. All rights reserved.
+//
+
+#import "TDChangeTracker.h"
+
+
+/** TDChangeTracker implementation that uses a raw TCP socket to read the chunk-mode HTTP response. */
+@interface TDSocketChangeTracker : TDChangeTracker
+{
+ @private
+ NSInputStream* _trackingInput;
+
+ NSMutableData* _inputBuffer;
+ NSMutableData* _changeBuffer;
+ CFHTTPMessageRef _unauthResponse;
+ NSURLCredential* _credential;
+ CFAbsoluteTime _startTime;
+ bool _gotResponseHeaders;
+ bool _parsing;
+ bool _inputAvailable;
+ bool _atEOF;
+}
+@end
Oops, something went wrong.

0 comments on commit c2fc5fe

Please sign in to comment.