Skip to content

Commit

Permalink
Queuing mechanism implemented for post req
Browse files Browse the repository at this point in the history
  • Loading branch information
Balaji committed Oct 18, 2012
1 parent d7c731f commit 6acb9f3
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 9 deletions.
Binary file modified external/wami/Wami.swf
Binary file not shown.
8 changes: 8 additions & 0 deletions lib/NodeIcecast.js
Expand Up @@ -8,6 +8,8 @@ var fs = require('fs.extra'),
util = require('util'),
EventEmitter = require('events').EventEmitter ;

var ReqQueue= require('./ReqQueue.js');

var DEFAULT_INACTIVITY_TIMEOUT= 5*60; // 5 minutes

var NodeIcecast= function(id, config_dir, options){
Expand Down Expand Up @@ -67,6 +69,7 @@ NodeIcecast.prototype._spawnProcesses= function(){
self._ogg_header= data;
});
this._encoder= encoder;
this._reqq= new ReqQueue(encoder.stdin);
if (this._savefile){
var f= fs.createWriteStream(this._savefile, { flags: 'w', encoding: "binary", mode: 0666 });
this._encoder.stdout.pipe(f, {end: false});
Expand All @@ -79,6 +82,11 @@ NodeIcecast.prototype.pipe= function(stream){
stream.pipe(this._encoder.stdin, {end: false});
};

NodeIcecast.prototype.addReq= function(req, index){
this._last_activity_ts= Date.now();
this._reqq.add(req, index);
};

NodeIcecast.prototype.write= function(buffer){
this._last_activity_ts= Date.now();
this._encoder.stdin.write(buffer);
Expand Down
46 changes: 46 additions & 0 deletions lib/ReqQueue.js
@@ -0,0 +1,46 @@
/**
* @description: Implements a Post Request Queue. Buffers the post data and outputs in order.
*
*/

var util = require('util'),
EventEmitter = require('events').EventEmitter ;

var ReqQueue= function(writestream){
EventEmitter.call(this);
this._q= [];
this._writestream= writestream;
};

util.inherits(ReqQueue, EventEmitter);

ReqQueue.prototype.add= function(req, index){
var self= this;
var pkt= {complete: false, index: index, data: []};
this._q.push(pkt);
req.on("data", function(data){
pkt.data.push(data);
});
req.on("end", function(){
pkt.complete= true;
self.flush();
});
};

ReqQueue.prototype.flush= function(){
while(this._q.length){
var pkt= this._q[0];
if (pkt.complete){
//console.log("Writing packet -- "+pkt.index);
for (var i=0, count= pkt.data.length; i< count; i++){
this._writestream.write(pkt.data[i]);
}
this._q.shift();
}
else {
break;
}
}
};

module.exports= ReqQueue;
11 changes: 3 additions & 8 deletions lib/js-cast.js
Expand Up @@ -92,16 +92,11 @@ JSCast.prototype.configure= function(app, config){
require('bufferjs/add-chunk');
app.post('/js-cast/stream', function(req, res, next){
var client= self._clients[req.query.id];
var buffer= new Buffer(8820, 'binary');
var packet_no= req.query.chunk;
//console.log("packet no :"+ packet_no+" received at " + new Date());
if (client){
//console.log("START WAMI post at "+new Date());
req.on("data", function(data){
buffer.addChunk(data);
})
//client.pipe(req);
client.addReq(req, packet_no);
req.on("end", function() {
//console.log("END WAMI post at "+ new Date());
client.write(buffer);
self._send200(res);
});
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
@@ -1,7 +1,7 @@
{
"name": "js-cast",
"description": "Voice Streaming from client Browser using icecast server",
"version": "0.0.12",
"version": "0.0.13",
"author": {
"name": "D Balaji",
"email": "balaji@ariemtech.com"
Expand Down

0 comments on commit 6acb9f3

Please sign in to comment.