Skip to content

Commit

Permalink
Adds stream to the request body
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniel Neighman committed Jan 26, 2010
1 parent c546f0b commit b9b2827
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 49 deletions.
4 changes: 2 additions & 2 deletions examples/simple.js
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ builder


//sys.puts("Request: " + requestNumber); //sys.puts("Request: " + requestNumber);


env.request.addListener("body", function(data){ env.request.body.addListener("data", function(data){
buffer.push(data); buffer.push(data);
}) })


env.request.addListener("complete", function(){ env.request.body.addListener("finish", function(){
// sys.puts("Complete: " + requestNumber); // sys.puts("Complete: " + requestNumber);
env.response.headers['content-type'] = 'text/html'; env.response.headers['content-type'] = 'text/html';
env.response.body = "<h1>Hello jello world</h1>" + buffer.join(""); env.response.body = "<h1>Hello jello world</h1>" + buffer.join("");
Expand Down
67 changes: 20 additions & 47 deletions lib/chain/request.js
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ var http = require('http');
var events = require('events'); var events = require('events');
var sys = require('sys'); var sys = require('sys');
var Url = require('url'); var Url = require('url');
var Stream = require('./vendor/stream');


// parts of this based on the request object from http://github.com/kriszyp/jsgi-node // parts of this based on the request object from http://github.com/kriszyp/jsgi-node


Expand All @@ -11,12 +12,7 @@ function Request(request, chain){
url = Url.parse( request.url ), url = Url.parse( request.url ),
headers = request.headers, headers = request.headers,
namePort = headers.host.split( ":" ), namePort = headers.host.split( ":" ),
lowerCaseHeaders = {}, lowerCaseHeaders = {};
self = this,
bodyListener = false,
inputBuffer = [],
bodyComplete = false,
completeEmitted = false;


this.chain = chain; this.chain = chain;
this.method = request.method; this.method = request.method;
Expand All @@ -29,6 +25,24 @@ function Request(request, chain){
this.scheme = "http"; this.scheme = "http";
this.headers = request.headers; this.headers = request.headers;
this.Connection = request.Connection; this.Connection = request.Connection;
var body = this.body = new Stream();

body.pause();

request.addListener("body", function(chunk){ body.write(chunk) })
request.addListener("complete", function() { body.close() })

body.addListener("newListener", function(type, listener){
if(type == "data")
body.resume();
})

body.addListener("pause", function(){
sys.puts("Pausing the request");
request.pause()
});
body.addListener("resume", function(){request.resume()});
body.addListener("eof", function(){body.emit("finish")});


for(var i in headers){ for(var i in headers){
lowerCaseHeaders[i.toLowerCase()] = headers[i]; lowerCaseHeaders[i.toLowerCase()] = headers[i];
Expand All @@ -37,47 +51,6 @@ function Request(request, chain){
this.version = [ request.httpVersionMajor, request.httpVersionMinor ]; this.version = [ request.httpVersionMajor, request.httpVersionMinor ];


request.isComplete = false; request.isComplete = false;

this.addListener = function(event){
if(event == "body"){
if( self.bodyEmitStarted )
throw(new Error("Request Body Emitting Already Started"));

bodyListener = true;
}
Request.prototype.addListener.apply(this, arguments);

// Have to give the complete listener a chance to be added
process.nextTick(shiftAndEmit)
}

// From here we deal with buffering the request as it comes in
function shiftAndEmit(){
if(!bodyListener) return;

if(inputBuffer.length > 0){
var chunk = inputBuffer.shift();
self.bodyEmitStarted = true;
self.emit("body", chunk);
}

if(bodyComplete && !completeEmitted && inputBuffer.length == 0){
self.emit("complete");
completeEmitted = true;
} else if(inputBuffer.length > 0) {
process.nextTick(shiftAndEmit);
}
}

request
.addListener( "body", function( data ) {
inputBuffer.push( data );
shiftAndEmit();
})
.addListener("complete", function() {
bodyComplete = true;
shiftAndEmit();
});
} }


sys.inherits(Request, events.EventEmitter); sys.inherits(Request, events.EventEmitter);
Expand Down
50 changes: 50 additions & 0 deletions lib/chain/vendor/stream.js
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,50 @@

module.exports = Stream;

var Emitter = require("events").EventEmitter;

function Stream () {
Emitter.call(this);

var buffer = [];
this.pause = function () {
this.emit("pause");
buffer.paused = true;
};
this.resume = function () {
this.emit("resume");
buffer.paused = false;
flow(this, buffer);
};
this.write = function (data) {
if (buffer.closed) throw new Error("Cannot write after EOF.");
buffer.push(data);
flow(this, buffer);
};
this.close = function () {
buffer.closed = true;
flow(this, buffer);
};
};

function flow (stream, buffer) {
if (buffer.flowing || buffer.paused) return;
buffer.flowing = true;
process.nextTick(function () {
buffer.flowing = false;
write(stream, buffer);
});
};
function write (stream, buffer) {
if (buffer.paused) return;
if (buffer.length === 0) {
stream.emit("drain");
if (buffer.closed) stream.emit("eof");
return;
}
var chunk = buffer.shift();
stream.emit("data", chunk);
flow(stream, buffer);
};

Stream.prototype.__proto__ = Emitter.prototype;

0 comments on commit b9b2827

Please sign in to comment.