Skip to content

Commit

Permalink
Fixing missing last messages when connection closed immediately after
Browse files Browse the repository at this point in the history
  • Loading branch information
mikelikespie committed Apr 22, 2012
1 parent 3cd883b commit 9e37c74
Showing 1 changed file with 41 additions and 30 deletions.
71 changes: 41 additions & 30 deletions SocketRocket/SRWebSocket.m
Expand Up @@ -646,6 +646,7 @@ - (void)handlePong;

- (void)_handleMessage:(id)message
{
SRFastLog(@"Received message");
dispatch_async(_callbackQueue, ^{
[self.delegate webSocket:self didReceiveMessage:message];
});
Expand Down Expand Up @@ -1004,28 +1005,27 @@ - (void)_pumpWriting;

- (void)_addConsumerWithScanner:(stream_scanner)consumer callback:(data_callback)callback;
{
assert(dispatch_get_current_queue() == _workQueue);
[self _addConsumerWithScanner:consumer callback:callback dataLength:0];
}

- (void)_addConsumerWithDataLength:(size_t)dataLength callback:(data_callback)callback readToCurrentFrame:(BOOL)readToCurrentFrame unmaskBytes:(BOOL)unmaskBytes;
{
assert(dispatch_get_current_queue() == _workQueue);
assert(dataLength);

dispatch_async(_workQueue, ^{
[_consumers addObject:[[SRIOConsumer alloc] initWithScanner:nil handler:callback bytesNeeded:dataLength readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes]];
[self _pumpScanner];
});
[_consumers addObject:[[SRIOConsumer alloc] initWithScanner:nil handler:callback bytesNeeded:dataLength readToCurrentFrame:readToCurrentFrame unmaskBytes:unmaskBytes]];
[self _pumpScanner];
}

- (void)_addConsumerWithScanner:(stream_scanner)consumer callback:(data_callback)callback dataLength:(size_t)dataLength;
{
dispatch_async(_workQueue, ^{
[_consumers addObject:[[SRIOConsumer alloc] initWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]];
[self _pumpScanner];
});
assert(dispatch_get_current_queue() == _workQueue);
[_consumers addObject:[[SRIOConsumer alloc] initWithScanner:consumer handler:callback bytesNeeded:dataLength readToCurrentFrame:NO unmaskBytes:NO]];
[self _pumpScanner];
}


static const char CRLFCRLFBytes[] = {'\r', '\n', '\r', '\n'};

- (void)_readUntilHeaderCompleteWithCallback:(data_callback)dataHandler;
Expand Down Expand Up @@ -1058,27 +1058,29 @@ - (void)_readUntilBytes:(const void *)bytes length:(size_t)length callback:(data
[self _addConsumerWithScanner:consumer callback:dataHandler];
}

-(void)_pumpScanner;
{
assert(dispatch_get_current_queue() == _workQueue);

// Returns true if did work
- (BOOL)_innerPumpScanner {

BOOL didWork = NO;

if (self.readyState >= SR_CLOSING) {
return;
return didWork;
}

if (!_consumers.count) {
return;
return didWork;
}

size_t curSize = _readBuffer.length - _readBufferOffset;
if (!curSize) {
return;
return didWork;
}

SRIOConsumer *consumer = [_consumers objectAtIndex:0];

size_t bytesNeeded = consumer.bytesNeeded;

size_t foundSize = 0;
if (consumer.consumer) {
NSData *tempView = [NSData dataWithBytesNoCopy:(char *)_readBuffer.bytes + _readBufferOffset length:_readBuffer.length - _readBufferOffset freeWhenDone:NO];
Expand All @@ -1091,37 +1093,37 @@ -(void)_pumpScanner;
foundSize = curSize;
}
}

NSData *slice = nil;
if (consumer.readToCurrentFrame || foundSize) {
NSRange sliceRange = NSMakeRange(_readBufferOffset, foundSize);
slice = [_readBuffer subdataWithRange:sliceRange];

_readBufferOffset += foundSize;

if (_readBufferOffset > 4096 && _readBufferOffset > (_readBuffer.length >> 1)) {
_readBuffer = [[NSMutableData alloc] initWithBytes:(char *)_readBuffer.bytes + _readBufferOffset length:_readBuffer.length - _readBufferOffset]; _readBufferOffset = 0;
}

if (consumer.unmaskBytes) {
NSMutableData *mutableSlice = [slice mutableCopy];

NSUInteger len = mutableSlice.length;
uint8_t *bytes = mutableSlice.mutableBytes;

for (int i = 0; i < len; i++) {
bytes[i] = bytes[i] ^ _currentReadMaskKey[_currentReadMaskOffset % sizeof(_currentReadMaskKey)];
_currentReadMaskOffset += 1;
}

slice = mutableSlice;
}

if (consumer.readToCurrentFrame) {
[_currentFrameData appendData:slice];

_readOpCount += 1;

if (_currentFrameOpcode == SROpCodeTextFrame) {
// Validate UTF8 stuff.
size_t currentDataSize = _currentFrameData.length;
Expand All @@ -1138,28 +1140,36 @@ -(void)_pumpScanner;
dispatch_async(_workQueue, ^{
[self _disconnect];
});
return;
return didWork;
} else {
_currentStringScanPosition += valid_utf8_size;
}
}

}

consumer.bytesNeeded -= foundSize;

if (consumer.bytesNeeded == 0) {
consumer.handler(self, nil);
[_consumers removeObjectAtIndex:0];
consumer.handler(self, nil);
didWork = YES;
}
} else if (foundSize) {
consumer.handler(self, slice);
[_consumers removeObjectAtIndex:0];
consumer.handler(self, slice);
didWork = YES;
}
}
return didWork;
}

-(void)_pumpScanner;
{
assert(dispatch_get_current_queue() == _workQueue);

while ([self _innerPumpScanner]) {

dispatch_async(_workQueue, ^{
[self _pumpScanner];
});
}
}

Expand Down Expand Up @@ -1306,6 +1316,7 @@ - (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode;
}

case NSStreamEventEndEncountered: {
[self _pumpScanner];
SRFastLog(@"NSStreamEventEndEncountered %@", aStream);
if (aStream.streamError) {
[self _failWithError:aStream.streamError];
Expand Down

0 comments on commit 9e37c74

Please sign in to comment.