Skip to content

Commit

Permalink
Use NSOperationQueue to single-thread access to TDServer/Database.
Browse files Browse the repository at this point in the history
We no longer run everything on the NSURLConnection thread. Instead the TDServer creates an NSOperationQueue and everything else that talks to the server or its database does so via blocks posted to the queue.
This required changing a bunch of code (URL loading, stream, delayed perform) that depended on runloops to use operation or dispatch queues instead.
  • Loading branch information
snej committed Mar 19, 2012
1 parent c251be3 commit d111a9f
Show file tree
Hide file tree
Showing 17 changed files with 134 additions and 96 deletions.
11 changes: 7 additions & 4 deletions Demo-Mac/DemoAppController.m
Expand Up @@ -102,11 +102,14 @@ - (void) applicationDidFinishLaunching: (NSNotification*)n {

#ifdef FOR_TESTING_PURPOSES
// Start a listener socket:
sListener = [[TDListener alloc] initWithTDServer: server.touchServer port: 8888];
[sListener start];
[server tellTDServer: ^(TDServer* tdServer) {
// Register support for handling certain JS functions used in the CouchDB unit tests:
[TDView setCompiler: self];

sListener = [[TDListener alloc] initWithTDServer: tdServer port: 8888];
[sListener start];
}];

// Register support for handling certain JS functions used in the CouchDB unit tests:
[TDView setCompiler: self];
#endif
}

Expand Down
2 changes: 1 addition & 1 deletion Listener/TDHTTPResponse.m
Expand Up @@ -50,7 +50,7 @@ - (id) initWithRouter: (TDRouter*)router forConnection:(TDHTTPConnection*)connec

// Run the router, synchronously:
LogTo(TDListenerVerbose, @"%@: Starting...", self);
[_connection.listener onServerThread: ^{[router start];}];
[router startAsync];
_chunked = !_finished;
}
return self;
Expand Down
1 change: 1 addition & 0 deletions Source/ChangeTracker/TDChangeTracker.h
Expand Up @@ -37,6 +37,7 @@ typedef enum TDChangeTrackerMode {
{
@protected
NSURL* _databaseURL;
NSOperationQueue* _operationQueue;
id<TDChangeTrackerClient> _client;
TDChangeTrackerMode _mode;
id _lastSequenceID;
Expand Down
3 changes: 3 additions & 0 deletions Source/ChangeTracker/TDChangeTracker.m
Expand Up @@ -121,6 +121,7 @@ - (void) setUpstreamError: (NSString*)message {

- (BOOL) start {
self.error = nil;
_operationQueue = [[NSOperationQueue currentQueue] retain];
return NO;
}

Expand All @@ -129,6 +130,8 @@ - (void) stop {
}

- (void) stopped {
[_operationQueue release];
_operationQueue = nil;
if ([_client respondsToSelector: @selector(changeTrackerStopped:)])
[_client changeTrackerStopped: self];
_client = nil; // don't call client anymore even if -stopped is called again (i.e. on dealloc)
Expand Down
6 changes: 5 additions & 1 deletion Source/ChangeTracker/TDConnectionChangeTracker.m
Expand Up @@ -21,6 +21,7 @@

@implementation TDConnectionChangeTracker


- (BOOL) start {
[super start];
// For some reason continuous mode doesn't work with CFNetwork.
Expand All @@ -33,7 +34,10 @@ - (BOOL) start {
request.cachePolicy = NSURLRequestReloadIgnoringCacheData;
request.timeoutInterval = 6.02e23;

_connection = [[NSURLConnection connectionWithRequest: request delegate: self] retain];
_connection = [[NSURLConnection alloc] initWithRequest: request delegate: self
startImmediately: NO];
if (_operationQueue)
[_connection setDelegateQueue: _operationQueue];
[_connection start];
LogTo(ChangeTracker, @"%@: Started... <%@>", self, request.URL);
return YES;
Expand Down
4 changes: 3 additions & 1 deletion Source/ChangeTracker/TDRemoteRequest.m
Expand Up @@ -49,7 +49,9 @@ - (void) setupRequest: (NSMutableURLRequest*)request withBody: (id)body {

- (void) start {
Assert(!_connection);
_connection = [[NSURLConnection connectionWithRequest: _request delegate: self] retain];
_connection = [[NSURLConnection alloc] initWithRequest: _request delegate: self
startImmediately: NO];
[_connection setDelegateQueue: [NSOperationQueue currentQueue]];
[_connection start];
}

Expand Down
43 changes: 32 additions & 11 deletions Source/ChangeTracker/TDSocketChangeTracker.m
Expand Up @@ -83,11 +83,14 @@ - (BOOL) start {

_inputBuffer = [[NSMutableData alloc] initWithCapacity: 1024];

// Schedule the delegate calls. If we're using an operation queue we shouldn't assume the
// current thread has any runloop, so hijack the main one (ugh!)
NSRunLoop* runLoop = _operationQueue ? [NSRunLoop mainRunLoop] : [NSRunLoop currentRunLoop];
[_trackingOutput setDelegate: self];
[_trackingOutput scheduleInRunLoop: [NSRunLoop currentRunLoop] forMode: NSRunLoopCommonModes];
[_trackingOutput scheduleInRunLoop: runLoop forMode: NSRunLoopCommonModes];
[_trackingOutput open];
[_trackingInput setDelegate: self];
[_trackingInput scheduleInRunLoop: [NSRunLoop currentRunLoop] forMode: NSRunLoopCommonModes];
[_trackingInput scheduleInRunLoop: runLoop forMode: NSRunLoopCommonModes];
[_trackingInput open];
return YES;
}
Expand Down Expand Up @@ -189,11 +192,22 @@ - (void) errorOccurred: (NSError*)error {
}


- (void) queueBlock: (void(^)())block {
if (_operationQueue)
[_operationQueue addOperationWithBlock: block];
else
block();
}


// Careful: This is called on the stream's scheduled runloop, and if I'm using an NSOperationQueue,
// this will be on the main thread, not the thread on which I was originally called.
- (void) stream: (NSInputStream*)stream handleEvent: (NSStreamEvent)eventCode {
switch (eventCode) {
case NSStreamEventHasSpaceAvailable: {
LogTo(ChangeTracker, @"%@: HasSpaceAvailable %@", self, stream);
if (_trackingRequest) {
// Don't queue this block. The stream needs us to write before returning.
const char* buffer = [_trackingRequest UTF8String];
NSUInteger written = [(NSOutputStream*)stream write: (void*)buffer maxLength: strlen(buffer)];
NSAssert(written == strlen(buffer), @"Output stream didn't write entire request");
Expand All @@ -207,26 +221,33 @@ - (void) stream: (NSInputStream*)stream handleEvent: (NSStreamEvent)eventCode {
case NSStreamEventHasBytesAvailable: {
LogTo(ChangeTracker, @"%@: HasBytesAvailable %@", self, stream);
while ([stream hasBytesAvailable]) {
uint8_t buffer[1024];
NSInteger bytesRead = [stream read: buffer maxLength: sizeof(buffer)];
NSMutableData* buffer = [NSMutableData dataWithLength: 1024];
NSInteger bytesRead = [stream read: buffer.mutableBytes maxLength: buffer.length];
if (bytesRead > 0) {
[_inputBuffer appendBytes: buffer length: bytesRead];
buffer.length = bytesRead;
LogTo(ChangeTracker, @"%@: read %ld bytes", self, (long)bytesRead);
[self queueBlock: ^{
[_inputBuffer appendData: buffer];
while (_inputBuffer && [self readLine])
;
}];
}
}
while (_inputBuffer && [self readLine])
;
break;
}
case NSStreamEventEndEncountered:
LogTo(ChangeTracker, @"%@: EndEncountered %@", self, stream);
if (_inputBuffer.length > 0)
Warn(@"%@ connection closed with unparsed data in buffer", self);
[self stop];
[self queueBlock: ^{
if (_inputBuffer.length > 0)
Warn(@"%@ connection closed with unparsed data in buffer", self);
[self stop];
}];
break;
case NSStreamEventErrorOccurred:
LogTo(ChangeTracker, @"%@: ErrorOccurred %@: %@", self, stream, stream.streamError);
[self errorOccurred: stream.streamError];
[self queueBlock: ^{
[self errorOccurred: stream.streamError];
}];
break;

default:
Expand Down
9 changes: 4 additions & 5 deletions Source/TDBatcher.m
Expand Up @@ -14,6 +14,8 @@
// and limitations under the License.

#import "TDBatcher.h"
#import "MYBlockUtils.h"


@implementation TDBatcher

Expand Down Expand Up @@ -53,18 +55,15 @@ - (void) queueObject: (id)object {
[self flush];
if (!_inbox) {
_inbox = [[NSMutableArray alloc] init];
[self performSelector: @selector(processNow) withObject: nil afterDelay: _delay];
MYAfterDelay(_delay, ^{[self processNow];});
}
[_inbox addObject: object];
}


- (void) flush {
if (_inbox) {
[NSObject cancelPreviousPerformRequestsWithTarget: self
selector: @selector(processNow) object:nil];
if (_inbox)
[self processNow];
}
}


Expand Down
1 change: 1 addition & 0 deletions Source/TDMultiStreamWriter.h
Expand Up @@ -14,6 +14,7 @@
@interface TDMultiStreamWriter : NSObject
{
@private
NSOperationQueue* _operationQueue;
NSMutableArray* _inputs;
NSInputStream* _currentInput;
uint8_t* _buffer;
Expand Down
33 changes: 26 additions & 7 deletions Source/TDMultiStreamWriter.m
Expand Up @@ -54,6 +54,7 @@ - (void)dealloc {
[self close];
[_output release];
[_input release];
[_operationQueue release];
[super dealloc];
}

Expand Down Expand Up @@ -85,8 +86,12 @@ - (BOOL) isOpen {


- (void) opened {
_operationQueue = [[NSOperationQueue currentQueue] retain];
// Schedule the delegate calls. If we're using an operation queue we shouldn't assume the
// current thread has any runloop, so hijack the main one (ugh!)
NSRunLoop* runLoop = _operationQueue ? [NSRunLoop mainRunLoop] : [NSRunLoop currentRunLoop];
_output.delegate = self;
[_output scheduleInRunLoop: [NSRunLoop currentRunLoop] forMode: NSDefaultRunLoopMode];
[_output scheduleInRunLoop: runLoop forMode: NSRunLoopCommonModes];
[_output open];
}

Expand Down Expand Up @@ -205,21 +210,35 @@ - (BOOL) writeToOutput {
}


- (void) runBlock: (void(^)())block {
if (_operationQueue && _operationQueue != [NSOperationQueue currentQueue]) {
NSOperation* op = [NSBlockOperation blockOperationWithBlock: block];
[_operationQueue addOperations: $array(op) waitUntilFinished: YES];
} else {
block();
}
}


- (void)stream:(NSStream *)stream handleEvent:(NSStreamEvent)event {
if (stream != _output)
return;
LogTo(TDMultiStreamWriter, @"%@: Received event 0x%x", self, event);
switch (event) {
case NSStreamEventOpenCompleted:
[self openNextInput];
[self refillBuffer];
[self runBlock: ^{
[self openNextInput];
[self refillBuffer];
}];
break;

case NSStreamEventHasSpaceAvailable:
if (![self writeToOutput]) {
LogTo(TDMultiStreamWriter, @"%@: At end -- closing _output!", self);
[self close];
}
[self runBlock: ^{
if (![self writeToOutput]) {
LogTo(TDMultiStreamWriter, @"%@: At end -- closing _output!", self);
[self close];
}
}];
break;
}
}
Expand Down
3 changes: 3 additions & 0 deletions Source/TDRouter.h
Expand Up @@ -46,6 +46,9 @@ typedef void (^OnFinishedBlock)();
- (void) start;
- (void) stop;

/** Starts the router on the server thread. Returns immediately. */
- (void) startAsync;

+ (NSString*) versionString;

@end
Expand Down
11 changes: 6 additions & 5 deletions Source/TDRouter.m
Expand Up @@ -325,10 +325,6 @@ - (TDStatus) route {


- (void) start {
// Before handling the request, run any blocks that have been queued with the server,
// because this code is running on the official server thread:
[_server performQueuedBlocks];

// Call the appropriate handler method:
TDStatus status = [self route];

Expand Down Expand Up @@ -375,6 +371,11 @@ - (void) stop {
}


- (void) startAsync {
[_server queue:^{ [self start]; }];
}


- (TDStatus) do_UNKNOWN {
return 400;
}
Expand Down Expand Up @@ -429,7 +430,7 @@ - (void) setBodyObject:(id)bodyObject {

- (void) setMultipartBody: (NSArray*)parts type: (NSString*)type {
TDMultipartWriter* mp = [[TDMultipartWriter alloc] initWithContentType: type
boundary: nil];
boundary: nil];
for (id part in parts) {
if (![part isKindOfClass: [NSData class]]) {
part = [NSJSONSerialization dataWithJSONObject: part options: 0 error: nil];
Expand Down
3 changes: 1 addition & 2 deletions Source/TDServer.h
Expand Up @@ -17,7 +17,7 @@
NSString* _dir;
NSMutableDictionary* _databases;
TDReplicatorManager* _replicatorManager;
NSMutableArray* _queue;
NSOperationQueue* _dispatchQueue;
}

+ (BOOL) isValidDatabaseName: (NSString*)name;
Expand All @@ -38,6 +38,5 @@

- (void) queue: (void(^)())block;
- (void) tellDatabaseNamed: (NSString*)dbName to: (void (^)(TDDatabase*))block;
- (void) performQueuedBlocks;

@end
28 changes: 4 additions & 24 deletions Source/TDServer.m
Expand Up @@ -58,7 +58,8 @@ - (id) initWithDirectory: (NSString*)dirPath error: (NSError**)outError {
if (self) {
_dir = [dirPath copy];
_databases = [[NSMutableDictionary alloc] init];
_queue = [[NSMutableArray alloc] init];
_dispatchQueue = [[NSOperationQueue alloc] init];
_dispatchQueue.maxConcurrentOperationCount = 1; // serial

// Create the directory but don't fail if it already exists:
NSError* error;
Expand Down Expand Up @@ -86,7 +87,7 @@ - (void)dealloc {
[self close];
[_dir release];
[_databases release];
[_queue release];
[_dispatchQueue release];
[super dealloc];
}

Expand Down Expand Up @@ -180,11 +181,7 @@ - (void) close {


- (void) queue: (void(^)())block {
block = [block copy];
@synchronized(_queue) {
[_queue addObject: block];
}
[block release];
[_dispatchQueue addOperationWithBlock: block];
}

- (void) tellDatabaseNamed: (NSString*)dbName to: (void (^)(TDDatabase*))block {
Expand All @@ -193,23 +190,6 @@ - (void) tellDatabaseNamed: (NSString*)dbName to: (void (^)(TDDatabase*))block {
}];
}

- (void) performQueuedBlocks {
while(true) {
void (^block)();
@synchronized(_queue) {
if (_queue.count == 0)
return;
block = [[_queue objectAtIndex: 0] retain];
[_queue removeObjectAtIndex: 0];
}

Log(@"TDServer: Performing queued block...");
block();
[block release];
}
}



@end

Expand Down

0 comments on commit d111a9f

Please sign in to comment.