Permalink
Browse files

Use EventBuffer in buffered Request object.

  • Loading branch information...
1 parent 53ce889 commit 6c33c2c6087ed6abf7d925386e761398c5890438 @mde mde committed Feb 6, 2012
Showing with 75 additions and 77 deletions.
  1. +6 −13 lib/app.js
  2. +27 −0 lib/request.js
  3. +35 −52 lib/utils/event_buffer.js
  4. +7 −12 test/event_buffer.js
View
@@ -32,8 +32,7 @@ var fs = require('fs')
, BaseController = require('./base_controller').BaseController
, sessions = require('./sessions')
, CookieCollection = require('./cookies').CookieCollection
- , EventEmitter = require('events').EventEmitter
- , EventBuffer = require('./utils/event_buffer').EventBuffer
+ , Request = require('./request').Request
, InFlight = require('./in_flight').InFlight;
// Set up a bunch of aliases
@@ -220,9 +219,7 @@ var App = function () {
// Handle the requests
// ==================
geddy.server.addListener('request', function (req, resp) {
- var reqBuffer
- , reqObj
- , reqProperties = ['method', 'url', 'headers', 'trailers', 'httpVersion', 'connection']
+ var reqObj
, params
, urlParams
, method
@@ -244,12 +241,9 @@ var App = function () {
, accessTime = (new Date()).getTime()
, inFlighId;
- // Buffer the request data, we'll pass this proxy object to the controller
- reqBuffer = new EventBuffer(req);
- reqObj = new EventEmitter();
- for(var i in reqProperties) {
- reqObj[reqProperties[i]] = req[reqProperties[i]];
- }
+ // Buffered request-obj -- buffer the request data,
+ // and pass this proxy object to the controller
+ reqObj = new Request(req);
finish = function (step) {
steps[step] = true;
@@ -258,9 +252,8 @@ var App = function () {
return false;
}
}
-
controller._handleAction.call(controller, params.action);
- reqBuffer.flush(reqObj);
+ reqObj.sync(); // Flush buffered events and begin emitting
};
if (router) {
View
@@ -0,0 +1,27 @@
+var EventBuffer = require('./utils/event_buffer').EventBuffer
+ , EventEmitter = require('events').EventEmitter;
+
+var Request = function (httpReq) {
+ var self = this
+ , reqProperties = [
+ 'method'
+ , 'url'
+ , 'headers'
+ , 'trailers'
+ , 'httpVersion'
+ , 'connection'
+ ];
+ this.buffer = new EventBuffer(httpReq);
+ reqProperties.forEach(function (prop) {
+ self[prop] = httpReq[prop];
+ });
+};
+
+Request.prototype = new EventEmitter();
+Request.prototype.constructor = Request;
+
+Request.prototype.sync = function () {
+ this.buffer.sync(this);
+};
+
+module.exports.Request = Request;
View
@@ -14,83 +14,66 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*
-*/
-
-/*
-This is a very simple buffer for a predetermined set of events. It is unbounded.
-It forwards all arguments to any outlet emitters attached with pipe().
+This is a very simple buffer for a predetermined set of events. It is unbounded.
+It forwards all arguments to any outlet emitter attached with sync().
Example:
-
var source = new Stream()
- , dest1 = new EventEmitter()
- , dest2 = new EventEmitter();
+ , dest = new EventEmitter()
+ , buff = new EventBuffer(source)
+ , data = '';
+ dest.on('data', function(d) { data += d; });
source.writeable = true;
source.readable = true;
- var buff = new EventBuffer(source)
- , data1 = ''
- , data2 = '';
source.emit('data', 'abcdef');
source.emit('data', '123456');
- dest1.on('data', function(d) { data1 += d; });
- dest2.on('data', function(d) { data2 += d; });
- buff.addOutlet(dest1);
- buff.flush(dest2);
-
+ buff.sync(dest);
*/
-var EventBuffer = function(srcEmitter, events) {
+var EventBuffer = function (src, events) {
// By default, we service the default stream events
var self = this
, streamEvents = ['data', 'end', 'error', 'close', 'fd', 'drain', 'pipe'];
this.events = events || streamEvents;
- this.emitter = srcEmitter;
+ this.emitter = src;
this.eventBuffer = [];
- this.outlets = [];
- for(var i = 0; i < this.events.length; i++) {
- (function(){
- var name = self.events[i]; // Close over the event name
- self.emitter.addListener(name, function() {
- self.addEvent(name, Array.prototype.slice.call(arguments));
- });
- })();
- }
+ this.outlet = null;
+ this.events.forEach(function (name) {
+ self.emitter.addListener(name, function () {
+ self.proxyEmit(name, arguments);
+ });
+ });
};
-EventBuffer.prototype = new (function() {
- this.addEvent = function(name, args) {
- if (this.outlets.length > 0) {
- this.emit(name, args)
- } else {
+EventBuffer.prototype = new (function () {
+ this.proxyEmit = function (name, args) {
+ if (this.outlet) {
+ this.emit(name, args);
+ }
+ else {
this.eventBuffer.push({name: name, args: args});
}
};
- this.emit = function(name, args) {
- args.unshift(name);
- for(var i = 0; i < this.outlets.length; i++) {
- this.outlets[i].emit.apply(this.outlets[i], args);
- }
+ this.emit = function (name, args) {
+ // Prepend name to args
+ var outlet = this.outlet;
+ Array.prototype.splice.call(args, 0, 0, name);
+ outlet.emit.apply(outlet, args);
};
- // Hook up as many output streams as you want
- this.addOutlet = function(destEmitter) {
- this.outlets.push(destEmitter);
- }
-
- // Flush the buffer and continue piping new events
- // destEmitter is optional and provided as a convenience to avoid unnecessary addOutlet calls
- this.flush = function(destEmitter) {
- if (destEmitter) {
- this.addOutlet(destEmitter);
- }
- for(var i = 0; i < this.eventBuffer.length; i++) {
- this.emit(this.eventBuffer[i].name, this.eventBuffer[i].args);
+ // Flush the buffer and continue piping new events to the outlet
+ this.sync = function (outlet) {
+ var buffer = this.eventBuffer
+ , bufferItem;
+ this.outlet = outlet;
+ while ((bufferItem = buffer.shift())) {
+ this.emit(bufferItem.name, bufferItem.args);
}
- this.eventBuffer = [];
};
-});
+})();
+EventBuffer.prototype.constructor = EventBuffer;
module.exports.EventBuffer = EventBuffer;
View
@@ -11,23 +11,18 @@ tests = new (function () {
this.testEventBuffer = function () {
var source = new Stream()
- , dest1 = new EventEmitter()
- , dest2 = new EventEmitter();
+ , dest = new EventEmitter()
+ , buff = new EventBuffer(source)
+ , data = '';
+ dest.on('data', function(d) { data += d; });
source.writeable = true;
source.readable = true;
- var buff = new EventBuffer(source)
- , data1 = ''
- , data2 = '';
source.emit('data', 'abcdef');
source.emit('data', '123456');
- dest1.on('data', function(d) { data1 += d; });
- dest2.on('data', function(d) { data2 += d; });
- buff.addOutlet(dest1);
- buff.flush(dest2);
- assert.equal('abcdef123456', data1);
- assert.equal('abcdef123456', data2);
+ buff.sync(dest);
+ assert.equal('abcdef123456', data);
source.emit('data', '---');
- assert.equal('abcdef123456---', data2);
+ assert.equal('abcdef123456---', data);
};
})();

0 comments on commit 6c33c2c

Please sign in to comment.