Permalink
Browse files

Change data_lines into a readable Stream

  • Loading branch information...
1 parent ed0218d commit a030aa187b026cf3bccae0c54fa91df2602f47cd @smfreegard smfreegard committed Nov 3, 2012
View
@@ -0,0 +1,70 @@
+var util = require('util');
+var EventEmitter = require('events').EventEmitter;
+
+function ChunkEmitter(buffer_size) {
+ EventEmitter.call(this);
+ this.buffer_size = parseInt(buffer_size) || (64 * 1024);
+ this.buf = null;
+ this.pos = 0;
+ this.bufs = [];
+ this.bufs_size = 0;
+}
+
+util.inherits(ChunkEmitter, EventEmitter);
+
+ChunkEmitter.prototype.fill = function (input) {
+ if (typeof input === 'string') {
+ input = new Buffer(input);
+ }
+
+ // Optimization: don't allocate a new buffer until
+ // the input we've had so far is bigger than our
+ // buffer size.
+ if (!this.buf) {
+ // We haven't allocated a buffer yet
+ this.bufs.push(input);
+ this.bufs_size += input.length;
+ if ((input.length + this.bufs_size) > this.buffer_size) {
+ this.buf = new Buffer(this.buffer_size);
+ var in_new = Buffer.concat(this.bufs, this.bufs_size);
+ input = in_new;
+ // Reset
+ this.bufs = [];
+ this.bufs_size = 0;
+ }
+ else {
+ return;
+ }
+ }
+
+ while (input.length > 0) {
+ var remaining = this.buffer_size - this.pos;
+ if (remaining === 0) {
+ this.emit('data', this.buf); //.slice(0));
+ this.buf = new Buffer(this.buffer_size);
+ this.pos = 0;
+ remaining = this.buffer_size;
+ }
+ var to_write = ((remaining > input.length) ? input.length : remaining);
+ input.copy(this.buf, this.pos, 0, to_write);
+ this.pos += to_write;
+ input = input.slice(to_write);
+ }
+}
+
+ChunkEmitter.prototype.end = function (cb) {
+ if (this.bufs.length > 0) {
+ this.emit('data', Buffer.concat(this.bufs, this.bufs_size));
+ }
+ else if (this.pos > 0) {
+ this.emit('data', this.buf.slice(0, this.pos));
+ }
+ // Reset
+ this.buf = null;
+ this.pos = 0;
+ this.bufs = [];
+ this.bufs_size = 0;
+ if (cb) cb();
+}
+
+exports.ChunkEmitter = ChunkEmitter;
View
@@ -19,6 +19,15 @@ port=2525
;nodes=cpus
; Daemonize
-;daemonize=0
+;daemonize=true
;daemon_log_file=/var/log/haraka.log
;daemon_pid_file=/var/run/haraka.pid
+
+; Spooling
+; Save memory by spooling large messages to disk
+;spool_dir=/var/spool/haraka
+; Specify -1 to never spool to disk
+; Specify 0 to always spool to disk
+; Otherwise specify a size in bytes, once reached the
+; message will be spooled to disk to save memory.
+;spool_after=
View
@@ -13,6 +13,7 @@ var Address = require('./address').Address;
var uuid = require('./utils').uuid;
var outbound = require('./outbound');
var date_to_str = require('./utils').date_to_str;
+var indexOfLF = require('./utils').indexOfLF;
var ipaddr = require('ipaddr.js');
var version = JSON.parse(fs.readFileSync(path.join(__dirname, 'package.json'))).version;
@@ -166,15 +167,18 @@ exports.createConnection = function(client, server) {
Connection.prototype.process_line = function (line) {
var self = this;
if (this.state !== STATE_DATA) {
- this.logprotocol("C: " + line + ' state=' + this.state);
+ this.current_line = line.toString('binary').replace(/\r?\n/, '');
+ if (logger.would_log(logger.LOGPROTOCOL)) {
+ this.logprotocol("C: " + this.current_line + ' state=' + this.state);
+ }
// Check for non-ASCII characters
- if (/[^\x00-\x7F]/.test(line)) {
- return this.respond(501, 'Syntax error');
+ if (/[^\x00-\x7F]/.test(this.current_line)) {
+ return this.respond(501, 'Syntax error (8-bit characters not allowed)');
}
}
if (this.state === STATE_CMD) {
this.state = STATE_PAUSE_SMTP;
- this.current_line = line.replace(/\r?\n/, '');
+ //this.current_line = line.replace(/\r?\n/, '');
var matches = /^([^ ]*)( +(.*))?$/.exec(this.current_line);
if (!matches) {
return plugins.run_hooks('unrecognized_command', this, this.current_line);
@@ -208,15 +212,17 @@ Connection.prototype.process_line = function (line) {
}
else if (this.state === STATE_LOOP) {
// Allow QUIT
- if (line.replace(/\r?\n/, '').toUpperCase() === 'QUIT') {
+ if (this.current_line.toUpperCase() === 'QUIT') {
this.cmd_quit();
}
else {
this.respond(this.loop_code, this.loop_msg);
}
}
else if (this.state === STATE_DATA) {
- this.logdata("C: " + line);
+ if (logger.would_log(logger.LOGDATA)) {
+ this.logdata("C: " + line);
+ }
this.accumulate_data(line);
}
};
@@ -226,8 +232,19 @@ Connection.prototype.process_data = function (data) {
this.logwarn("data after disconnect from " + this.remote_ip);
return;
}
-
- this.current_data += data.toString('binary'); // TODO: change all this to use Buffers
+
+ if (!Buffer.isBuffer(this.current_data)) {
+ this.current_data = data;
+ }
+ else {
+ // Data left over in buffer
+ var buf = Buffer.concat(
+ [ this.current_data, data ],
+ (this.current_data.length + data.length)
+ );
+ this.current_data = buf;
+ }
+
this._process_data();
};
@@ -237,9 +254,10 @@ Connection.prototype._process_data = function() {
// Otherwise if multiple commands are pipelined and then the
// connection is dropped; we'll end up in the function forever.
if (this.disconnected) return;
- var results;
- while (results = line_regexp.exec(this.current_data)) {
- var this_line = results[1];
+
+ var offset;
+ while ((offset = indexOfLF(this.current_data)) !== -1) {
+ var this_line = this.current_data.slice(0, offset+1);
// Hack: bypass this code to allow HAProxy's PROXY extension
if (this.state === STATE_PAUSE && this.proxy && /^PROXY /.test(this_line)) {
if (this.proxy_timer) clearTimeout(this.proxy_timer);
@@ -250,6 +268,7 @@ Connection.prototype._process_data = function() {
// Detect early_talker but allow PIPELINING extension (ESMTP)
else if ((this.state === STATE_PAUSE || this.state === STATE_PAUSE_SMTP) && !this.esmtp) {
if (!this.early_talker) {
+ this_line = this_line.toString().replace(/\r?\n/,'');
this.logdebug('[early_talker] state=' + this.state + ' esmtp=' + this.esmtp + ' line="' + this_line + '"');
}
this.early_talker = 1;
@@ -260,7 +279,7 @@ Connection.prototype._process_data = function() {
}
else if ((this.state === STATE_PAUSE || this.state === STATE_PAUSE_SMTP) && this.esmtp) {
var valid = true;
- var cmd = this_line.slice(0,4).toUpperCase();
+ var cmd = this_line.toString('ascii').slice(0,4).toUpperCase();
switch (cmd) {
case 'RSET':
case 'MAIL':
@@ -419,11 +438,19 @@ Connection.prototype.tran_uuid = function () {
}
Connection.prototype.reset_transaction = function() {
+ this.transaction.messageStream.destroy();
delete this.transaction;
};
Connection.prototype.init_transaction = function() {
this.transaction = trans.createTransaction(this.tran_uuid());
+ // Catch any errors from the messageStream
+ this.transaction.messageStream.on('error', function (err) {
+ self.logcrit('messageStream error: ' + err.message);
+ self.respond('421', 'Internal Server Error', function () {
+ self.disconnect();
+ });
+ });
}
Connection.prototype.loop_respond = function (code, msg) {
@@ -616,8 +643,8 @@ Connection.prototype.ehlo_respond = function(retval, msg) {
"8BITMIME",
];
- var databytes = config.get('databytes');
- response.push("SIZE " + databytes || 0);
+ var databytes = parseInt(config.get('databytes')) || 0;
+ response.push("SIZE " + databytes);
this.capabilities = response;
@@ -1118,24 +1145,57 @@ Connection.prototype.data_respond = function(retval, msg) {
Connection.prototype.accumulate_data = function(line) {
var self = this;
- if (line === ".\r\n")
- return this.data_done();
- // Bare LF checks
- if (line === ".\r" || line === ".\n") {
- this.logerror("Client sent bare line-feed - .\\r or .\\n rather than .\\r\\n");
- this.respond(451, "See http://haraka.github.com/barelf.html", function() {
+ this.transaction.data_bytes += line.length;
+
+ // Look for .\r\n
+ if (line.length === 3 &&
+ line[0] === 0x2e &&
+ line[1] === 0x0d &&
+ line[2] === 0x0a)
+ {
+ //return this.data_done();
+ this.transaction.messageStream.add_line_end(function () {
+ self.data_done();
+ });
+ return;
+ }
+
+ // Look for .\n
+ if (line.length === 2 &&
+ line[0] === 0x2e &&
+ line[1] === 0x0a)
+ {
+ this.logerror('Client sent bare line-feed - .\\n rather than .\\r\\n');
+ this.respond(451, "Bare line-feed; see http://haraka.github.com/barelf.html", function() {
self.reset_transaction();
});
return;
}
+ // Remove any dot stuffing
+ if (line.length >= 3 &&
+ line[0] === 0x2e &&
+ line[1] === 0x2e)
+ {
+ line = line.slice(1);
+ }
+
+ // Remove CR's
+ if (line.length >= 2 &&
+ line[line.length-1] === 0x0a &&
+ line[line.length-2] === 0x0d)
+ {
+ line[line.length-2] = 0x0a;
+ line = line.slice(0, line.length-1);
+ }
+
// Stop accumulating data as we're going to reject at dot.
if (this.max_bytes && this.transaction.data_bytes > this.max_bytes) {
return;
}
- this.transaction.add_data(line.replace(/^\.\./, '.').replace(/\r\n$/, "\n"));
+ this.transaction.add_data(line);
};
Connection.prototype.data_done = function() {
View
@@ -34,6 +34,9 @@ different levels available.
* daemonize - enable this to cause Haraka to fork into the background on start-up (default: 0)
* daemon_log_file - (default: /var/log/haraka.log) where to redirect stdout/stderr when daemonized
* daemon_pid_file - (default: /var/run/haraka.pid) where to write a PID file to
+ * spool_dir - (default: none) directory to create temporary spool files in
+ * spool_after - (default: -1) if message exceeds this size in bytes, then spool the message to disk
+ specify -1 to disable spooling completely or 0 to force all messages to be spooled to disk.
[1]: http://learnboost.github.com/cluster/ or node version >= 0.8
View
@@ -19,9 +19,30 @@ The value of the MAIL FROM command as an `Address` object.
An Array of `Address` objects of recipients from the RCPT TO command.
-* transaction.data\_lines
+* transaction.messageStream
-An Array of the lines of the email after DATA.
+A node.js Readable Stream object for the message.
+
+You use it like this:
+
+ transaction.messageStream.pipe(WritableStream, options)
+
+Where WritableStream is a node.js Writable Stream object such as a
+net.socket, fs.writableStream, process.stdout/stderr or custom stream.
+
+The options argument should be an object that overrides the following
+properties:
+
+ * line_endings (default: "\r\n")
+ * dot_stuffing (default: false)
+ * ending_dot (default: false)
+ * emit_end (default: true)
+ * buffer_size (default: 65535)
+ * clamd_style (default: false)
+
+e.g.
+
+ transaction.messageStream.pipe(socket, { dot_stuffing: true, ending_dot: true });
* transaction.data\_bytes
@@ -32,6 +53,12 @@ The number of bytes in the email after DATA.
Adds a line of data to the email. Note this is RAW email - it isn't useful
for adding banners to the email.
+* transaction.add_line_end(cb)
+
+Notifies the messageStream that all the data has been received.
+Supply an optional callback function that will be run once any inflight data
+is finished being written.
+
* transaction.notes
A safe place to store transaction specific values.
Oops, something went wrong.

0 comments on commit a030aa1

Please sign in to comment.