From 9e37c74527425510f130e31f24b43c6e9f2066d2 Mon Sep 17 00:00:00 2001 From: Mike Lewis Date: Sun, 22 Apr 2012 16:25:49 -0700 Subject: [PATCH] Fixing missing last messages when connection closed immediately after --- SocketRocket/SRWebSocket.m | 71 ++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/SocketRocket/SRWebSocket.m b/SocketRocket/SRWebSocket.m index 2d411b9a8..c48519bfc 100644 --- a/SocketRocket/SRWebSocket.m +++ b/SocketRocket/SRWebSocket.m @@ -646,6 +646,7 @@ - (void)handlePong; - (void)_handleMessage:(id)message { + SRFastLog(@"Received message"); dispatch_async(_callbackQueue, ^{ [self.delegate webSocket:self didReceiveMessage:message]; }); @@ -1004,28 +1005,27 @@ - (void)_pumpWriting; - (void)_addConsumerWithScanner:(stream_scanner)consumer callback:(data_callback)callback; { + assert(dispatch_get_current_queue() == _workQueue); [self _addConsumerWithScanner:consumer callback:callback dataLength:0]; } - (void)_addConsumerWithDataLength:(size_t)dataLength callback:(data_callback)callback readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes; { + assert(dispatch_get_current_queue() == _workQueue); assert(dataLength); - dispatch_async(_workQueue, ^{ - [_consumers addObject:[[SRIOConsumer alloc] initWithScanner:nil handler:callback bytesNeeded:dataLength readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes]]; - [self _pumpScanner]; - }); + [_consumers addObject:[[SRIOConsumer alloc] initWithScanner:nil handler:callback bytesNeeded:dataLength readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes]]; + [self _pumpScanner]; } - (void)_addConsumerWithScanner:(stream_scanner)consumer callback:(data_callback)callback dataLength:(size_t)dataLength; { - dispatch_async(_workQueue, ^{ - [_consumers addObject:[[SRIOConsumer alloc] initWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]]; - [self _pumpScanner]; - }); + assert(dispatch_get_current_queue() == _workQueue); + [_consumers addObject:[[SRIOConsumer alloc] initWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]]; + [self _pumpScanner]; } - + static const char CRLFCRLFBytes[] = {'\r', '\n', '\r', '\n'}; - (void)_readUntilHeaderCompleteWithCallback:(data_callback)dataHandler; @@ -1058,27 +1058,29 @@ - (void)_readUntilBytes:(const void *)bytes length:(size_t)length callback:(data [self _addConsumerWithScanner:consumer callback:dataHandler]; } --(void)_pumpScanner; -{ - assert(dispatch_get_current_queue() == _workQueue); +// Returns true if did work +- (BOOL)_innerPumpScanner { + + BOOL didWork = NO; + if (self.readyState >= SR_CLOSING) { - return; + return didWork; } if (!_consumers.count) { - return; + return didWork; } size_t curSize = _readBuffer.length - _readBufferOffset; if (!curSize) { - return; + return didWork; } - + SRIOConsumer *consumer = [_consumers objectAtIndex:0]; size_t bytesNeeded = consumer.bytesNeeded; - + size_t foundSize = 0; if (consumer.consumer) { NSData *tempView = [NSData dataWithBytesNoCopy:(char *)_readBuffer.bytes + _readBufferOffset length:_readBuffer.length - _readBufferOffset freeWhenDone:NO]; @@ -1091,24 +1093,24 @@ -(void)_pumpScanner; foundSize = curSize; } } - + NSData *slice = nil; if (consumer.readToCurrentFrame || foundSize) { NSRange sliceRange = NSMakeRange(_readBufferOffset, foundSize); slice = [_readBuffer subdataWithRange:sliceRange]; _readBufferOffset += foundSize; - + if (_readBufferOffset > 4096 && _readBufferOffset > (_readBuffer.length >> 1)) { _readBuffer = [[NSMutableData alloc] initWithBytes:(char *)_readBuffer.bytes + _readBufferOffset length:_readBuffer.length - _readBufferOffset]; _readBufferOffset = 0; } if (consumer.unmaskBytes) { NSMutableData *mutableSlice = [slice mutableCopy]; - + NSUInteger len = mutableSlice.length; uint8_t *bytes = mutableSlice.mutableBytes; - + for (int i = 0; i < len; i++) { bytes[i] = bytes[i] ^ _currentReadMaskKey[_currentReadMaskOffset % sizeof(_currentReadMaskKey)]; _currentReadMaskOffset += 1; @@ -1116,12 +1118,12 @@ -(void)_pumpScanner; slice = mutableSlice; } - + if (consumer.readToCurrentFrame) { [_currentFrameData appendData:slice]; _readOpCount += 1; - + if (_currentFrameOpcode == SROpCodeTextFrame) { // Validate UTF8 stuff. size_t currentDataSize = _currentFrameData.length; @@ -1138,28 +1140,36 @@ -(void)_pumpScanner; dispatch_async(_workQueue, ^{ [self _disconnect]; }); - return; + return didWork; } else { _currentStringScanPosition += valid_utf8_size; } } - + } consumer.bytesNeeded -= foundSize; if (consumer.bytesNeeded == 0) { - consumer.handler(self, nil); [_consumers removeObjectAtIndex:0]; + consumer.handler(self, nil); + didWork = YES; } } else if (foundSize) { - consumer.handler(self, slice); [_consumers removeObjectAtIndex:0]; + consumer.handler(self, slice); + didWork = YES; } + } + return didWork; +} + +-(void)_pumpScanner; +{ + assert(dispatch_get_current_queue() == _workQueue); + + while ([self _innerPumpScanner]) { - dispatch_async(_workQueue, ^{ - [self _pumpScanner]; - }); } } @@ -1306,6 +1316,7 @@ - (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode; } case NSStreamEventEndEncountered: { + [self _pumpScanner]; SRFastLog(@"NSStreamEventEndEncountered %@", aStream); if (aStream.streamError) { [self _failWithError:aStream.streamError];