Skip to content
Permalink
Browse files

Fixed chunk reconstruction and JSON parsing.

  • Loading branch information
Roach Roach
Roach authored and Roach committed May 22, 2014
1 parent fbaba55 commit be1cf97802a9a83b3efce015728ee51b6a79bcfd
Showing with 20 additions and 45 deletions.
  1. +20 −45 lib/user-stream.js
@@ -61,7 +61,7 @@ Stream.prototype.stream = function(params) {
}

//required params for lib
params.delimited = 'length';
// params.delimited = 'length';
params.stall_warnings = 'true';

var request = this.oauth.post(
@@ -82,70 +82,45 @@ Stream.prototype.stream = function(params) {
}

request.on('response', function(response) {

// Any response code greater then 200 from steam API is an error
if(response.statusCode > 200) {

stream.emit('error', {type: 'response', data: {code:response.statusCode}});

} else {

var buffer = '',
next_data_length = 0,
end = '\r\n';

//emit connected event
stream.emit('connected');

//set chunk encoding
response.setEncoding('utf8');

var data = '';

response.on('data', function(chunk) {

//is heartbeat?
if (chunk == end) {
data += chunk.toString('utf8');

//is heartbeat?
if (data == '\r\n') {
stream.emit('heartbeat');
return;
}

//check whether new incomming data set
if (!buffer.length) {
//get length of incomming data
var line_end_pos = chunk.indexOf(end);
next_data_length = parseInt(chunk.slice(0, line_end_pos));
//slice data length string from chunk
chunk = chunk.slice(line_end_pos+end.length);
}

if (buffer.length != next_data_length) {
//data set recieved
//first remove end and append to buffer
buffer+= chunk.slice(0, chunk.indexOf(end));
//parse json
var parsed = false;
try {
//try parse & emit
buffer = JSON.parse(buffer);
parsed = true;
} catch(e) {
stream.emit('garbage', buffer);
}

//don't emit into "try" and emit only if data formatted
if (parsed) {
stream.emit('data', buffer);
var index, json;

while((index = data.indexOf('\r\n')) > -1) {
json = data.slice(0, index);
data = data.slice(index + 2);
if(json.length > 0) {
try {
stream.emit('data', JSON.parse(json));
} catch(e) {
stream.emit('garbage', data);
}
}

//clean buffer
buffer = '';

} else {
//append to buffer
buffer+=chunk;
}

});

response.on('error', function(error) {

stream.emit('close', error);

0 comments on commit be1cf97

Please sign in to comment.
You can’t perform that action at this time.