Skip to content

Commit

Permalink
Buffer the request data with new EventBuffer class
Browse files Browse the repository at this point in the history
  • Loading branch information
Mike Ihbe committed Feb 1, 2012
1 parent e1afc40 commit a9d2ed7
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 2 deletions.
17 changes: 15 additions & 2 deletions lib/app.js
Expand Up @@ -32,6 +32,8 @@ var fs = require('fs')
, BaseController = require('./base_controller').BaseController
, sessions = require('./sessions')
, CookieCollection = require('./cookies').CookieCollection
, EventEmitter = require('events').EventEmitter
, EventBuffer = require('./utils/event_buffer').EventBuffer
, InFlight = require('./in_flight').InFlight;

// Set up a bunch of aliases
Expand Down Expand Up @@ -217,7 +219,10 @@ var App = function () {
// Handle the requests
// ==================
geddy.server.addListener('request', function (req, resp) {
var params
var reqBuffer
, reqObj
, reqProperties = ['method', 'url', 'headers', 'trailers', 'httpVersion', 'connection']
, params
, urlParams
, method
, ctor
Expand All @@ -238,6 +243,13 @@ var App = function () {
, accessTime = (new Date()).getTime()
, inFlighId;

// Buffer the request data, we'll pass this proxy object to the controller
reqBuffer = new EventBuffer(req);
reqObj = new EventEmitter();
for(var i in reqProperties) {
reqObj[reqProperties[i]] = req[reqProperties[i]];
}

finish = function (step) {
steps[step] = true;
for (var p in steps) {
Expand All @@ -247,6 +259,7 @@ var App = function () {
}

controller._handleAction.call(controller, params.action);
reqBuffer.flush(reqObj);
};

if (router) {
Expand Down Expand Up @@ -348,7 +361,7 @@ var App = function () {
controller.accessTime = accessTime;

if (typeof controller[params.action] == 'function') {
controller.request = req;
controller.request = reqObj;
controller.response = resp;
controller.method = method;
controller.params = params;
Expand Down
98 changes: 98 additions & 0 deletions lib/utils/event_buffer.js
@@ -0,0 +1,98 @@
/*
* Geddy JavaScript Web development framework
* Copyright 2112 Matthew Eernisse (mde@fleegix.org)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

/*
This is a very simple buffer for a predetermined set of events. It is unbounded.
It forwards all arguments to any outlet emitters attached with pipe().
Example:
var source = new Stream()
, dest1 = new EventEmitter()
, dest2 = new EventEmitter();
source.writeable = true;
source.readable = true;
var buff = new EventBuffer(source)
, data1 = ''
, data2 = '';
source.emit('data', 'abcdef');
source.emit('data', '123456');
dest1.on('data', function(d) { data1 += d; });
dest2.on('data', function(d) { data2 += d; });
buff.addOutlet(dest1);
buff.flush(dest2);
*/

var EventBuffer = function(srcEmitter, events) {
// By default, we service the default stream events
var self = this
, streamEvents = ['data', 'end', 'error', 'close', 'fd', 'drain', 'pipe'];
this.events = events || streamEvents;
this.emitter = srcEmitter;
this.eventBuffer = [];
this.outlets = [];
for(var i = 0; i < this.events.length; i++) {
(function(){
var name = self.events[i]; // Close over the event name
self.emitter.addListener(name, function() {
self.addEvent(name, Array.prototype.slice.call(arguments));
});
})();
}
};

EventBuffer.prototype = new (function() {
this.addEvent = function(name, args) {
if (this.outlets.length > 0) {
this.emit(name, args)
} else {
this.eventBuffer.push({name: name, args: args});
}
};

this.emit = function(name, args) {
args.unshift(name);
for(var i = 0; i < this.outlets.length; i++) {
geddy.log.notice(name);
geddy.log.notice(args);
this.outlets[i].emit.apply(this.outlets[i], args);
}
};

// Hook up as many output streams as you want
this.addOutlet = function(destEmitter) {
this.outlets.push(destEmitter);
}

// Flush the buffer and continue piping new events
// destEmitter is optional and provided as a convenience to avoid unnecessary addOutlet calls
this.flush = function(destEmitter) {
if (destEmitter) {
this.addOutlet(destEmitter);
}
for(var i = 0; i < this.eventBuffer.length; i++) {
this.emit(this.eventBuffer[i].name, this.eventBuffer[i].args);
}
this.eventBuffer = [];
};
});

module.exports.EventBuffer = EventBuffer;
41 changes: 41 additions & 0 deletions test/event_buffer.js
@@ -0,0 +1,41 @@
// Load the basic Geddy toolkit
require('../lib/geddy');

var Stream = require('stream').Stream
, EventEmitter = require('events').EventEmitter
, EventBuffer = require('../lib/utils/event_buffer.js').EventBuffer
, assert = require('assert')
, tests;

tests = new (function () {

this.testEventBuffer = function () {
var source = new Stream()
, dest1 = new EventEmitter()
, dest2 = new EventEmitter();
source.writeable = true;
source.readable = true;
var buff = new EventBuffer(source)
, data1 = ''
, data2 = '';
source.emit('data', 'abcdef');
source.emit('data', '123456');
dest1.on('data', function(d) { data1 += d; });
dest2.on('data', function(d) { data2 += d; });
buff.addOutlet(dest1);
buff.flush(dest2);
assert.equal('abcdef123456', data1);
assert.equal('abcdef123456', data2);
source.emit('data', '---');
assert.equal('abcdef123456---', data2);
};

})();

for (var p in tests) {
if (typeof tests[p] == 'function') {
tests[p]();
}
}


0 comments on commit a9d2ed7

Please sign in to comment.