Skip to content

Commit

Permalink
Merge pull request haraka#252 from smfreegard/stream_changes
Browse files Browse the repository at this point in the history
Change data_lines into a readable Stream
  • Loading branch information
baudehlo committed Nov 12, 2012
2 parents ed0218d + 359b110 commit 92dd16d
Show file tree
Hide file tree
Showing 17 changed files with 821 additions and 300 deletions.
74 changes: 74 additions & 0 deletions chunkemitter.js
@@ -0,0 +1,74 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;

function ChunkEmitter(buffer_size) {
EventEmitter.call(this);
this.buffer_size = parseInt(buffer_size) || (64 * 1024);
this.buf = null;
this.pos = 0;
this.bufs = [];
this.bufs_size = 0;
}

util.inherits(ChunkEmitter, EventEmitter);

ChunkEmitter.prototype.fill = function (input) {
if (typeof input === 'string') {
input = new Buffer(input);
}

// Optimization: don't allocate a new buffer until
// the input we've had so far is bigger than our
// buffer size.
if (!this.buf) {
// We haven't allocated a buffer yet
this.bufs.push(input);
this.bufs_size += input.length;
if ((input.length + this.bufs_size) > this.buffer_size) {
this.buf = new Buffer(this.buffer_size);
var in_new = Buffer.concat(this.bufs, this.bufs_size);
input = in_new;
// Reset
this.bufs = [];
this.bufs_size = 0;
}
else {
return;
}
}

while (input.length > 0) {
var remaining = this.buffer_size - this.pos;
if (remaining === 0) {
this.emit('data', this.buf); //.slice(0));
this.buf = new Buffer(this.buffer_size);
this.pos = 0;
remaining = this.buffer_size;
}
var to_write = ((remaining > input.length) ? input.length : remaining);
input.copy(this.buf, this.pos, 0, to_write);
this.pos += to_write;
input = input.slice(to_write);
}
}

ChunkEmitter.prototype.end = function (cb) {
var emitted = false;
if (this.bufs_size > 0) {
this.emit('data', Buffer.concat(this.bufs, this.bufs_size));
emitted = true;
}
else if (this.pos > 0) {
this.emit('data', this.buf.slice(0, this.pos));
emitted = true;
}
// Reset
this.buf = null;
this.pos = 0;
this.bufs = [];
this.bufs_size = 0;
if (cb && typeof cb === 'function') cb();
return emitted;
}

module.exports = ChunkEmitter;
11 changes: 10 additions & 1 deletion config/smtp.ini
Expand Up @@ -19,6 +19,15 @@ port=2525
;nodes=cpus

; Daemonize
;daemonize=0
;daemonize=true
;daemon_log_file=/var/log/haraka.log
;daemon_pid_file=/var/run/haraka.pid

; Spooling
; Save memory by spooling large messages to disk
;spool_dir=/var/spool/haraka
; Specify -1 to never spool to disk
; Specify 0 to always spool to disk
; Otherwise specify a size in bytes, once reached the
; message will be spooled to disk to save memory.
;spool_after=
98 changes: 73 additions & 25 deletions connection.js
Expand Up @@ -13,6 +13,7 @@ var Address = require('./address').Address;
var uuid = require('./utils').uuid;
var outbound = require('./outbound');
var date_to_str = require('./utils').date_to_str;
var indexOfLF = require('./utils').indexOfLF;
var ipaddr = require('ipaddr.js');

var version = JSON.parse(fs.readFileSync(path.join(__dirname, 'package.json'))).version;
Expand Down Expand Up @@ -121,7 +122,7 @@ function setupClient(self) {
function Connection(client, server) {
this.client = client;
this.server = server;
this.current_data = '';
this.current_data = null;
this.current_line = null;
this.state = STATE_PAUSE;
this.uuid = uuid();
Expand Down Expand Up @@ -165,16 +166,25 @@ exports.createConnection = function(client, server) {

Connection.prototype.process_line = function (line) {
var self = this;
if (this.state !== STATE_DATA) {
this.logprotocol("C: " + line + ' state=' + this.state);
if (this.state === STATE_DATA) {
if (logger.would_log(logger.LOGDATA)) {
this.logdata("C: " + line);
}
this.accumulate_data(line);
return;
}
else {
this.current_line = line.toString('binary').replace(/\r?\n/, '');
if (logger.would_log(logger.LOGPROTOCOL)) {
this.logprotocol("C: " + this.current_line + ' state=' + this.state);
}
// Check for non-ASCII characters
if (/[^\x00-\x7F]/.test(line)) {
return this.respond(501, 'Syntax error');
if (/[^\x00-\x7F]/.test(this.current_line)) {
return this.respond(501, 'Syntax error (8-bit characters not allowed)');
}
}
if (this.state === STATE_CMD) {
this.state = STATE_PAUSE_SMTP;
this.current_line = line.replace(/\r?\n/, '');
var matches = /^([^ ]*)( +(.*))?$/.exec(this.current_line);
if (!matches) {
return plugins.run_hooks('unrecognized_command', this, this.current_line);
Expand Down Expand Up @@ -208,16 +218,15 @@ Connection.prototype.process_line = function (line) {
}
else if (this.state === STATE_LOOP) {
// Allow QUIT
if (line.replace(/\r?\n/, '').toUpperCase() === 'QUIT') {
if (this.current_line.toUpperCase() === 'QUIT') {
this.cmd_quit();
}
else {
this.respond(this.loop_code, this.loop_msg);
}
}
else if (this.state === STATE_DATA) {
this.logdata("C: " + line);
this.accumulate_data(line);
else {
throw new Error('unknown state ' + this.state);
}
};

Expand All @@ -226,8 +235,19 @@ Connection.prototype.process_data = function (data) {
this.logwarn("data after disconnect from " + this.remote_ip);
return;
}

this.current_data += data.toString('binary'); // TODO: change all this to use Buffers

if (!this.current_data || !this.current_data.length) {
this.current_data = data;
}
else {
// Data left over in buffer
var buf = Buffer.concat(
[ this.current_data, data ],
(this.current_data.length + data.length)
);
this.current_data = buf;
}

this._process_data();
};

Expand All @@ -237,9 +257,10 @@ Connection.prototype._process_data = function() {
// Otherwise if multiple commands are pipelined and then the
// connection is dropped; we'll end up in the function forever.
if (this.disconnected) return;
var results;
while (results = line_regexp.exec(this.current_data)) {
var this_line = results[1];

var offset;
while (this.current_data && ((offset = indexOfLF(this.current_data)) !== -1)) {
var this_line = this.current_data.slice(0, offset+1);
// Hack: bypass this code to allow HAProxy's PROXY extension
if (this.state === STATE_PAUSE && this.proxy && /^PROXY /.test(this_line)) {
if (this.proxy_timer) clearTimeout(this.proxy_timer);
Expand All @@ -250,6 +271,7 @@ Connection.prototype._process_data = function() {
// Detect early_talker but allow PIPELINING extension (ESMTP)
else if ((this.state === STATE_PAUSE || this.state === STATE_PAUSE_SMTP) && !this.esmtp) {
if (!this.early_talker) {
this_line = this_line.toString().replace(/\r?\n/,'');
this.logdebug('[early_talker] state=' + this.state + ' esmtp=' + this.esmtp + ' line="' + this_line + '"');
}
this.early_talker = 1;
Expand All @@ -260,7 +282,7 @@ Connection.prototype._process_data = function() {
}
else if ((this.state === STATE_PAUSE || this.state === STATE_PAUSE_SMTP) && this.esmtp) {
var valid = true;
var cmd = this_line.slice(0,4).toUpperCase();
var cmd = this_line.toString('ascii').slice(0,4).toUpperCase();
switch (cmd) {
case 'RSET':
case 'MAIL':
Expand Down Expand Up @@ -419,11 +441,22 @@ Connection.prototype.tran_uuid = function () {
}

Connection.prototype.reset_transaction = function() {
if (this.transaction) {
this.transaction.message_stream.destroy();
}
delete this.transaction;
};

Connection.prototype.init_transaction = function() {
this.transaction = trans.createTransaction(this.tran_uuid());
// Catch any errors from the message_stream
var self = this;
this.transaction.message_stream.on('error', function (err) {
self.logcrit('message_stream error: ' + err.message);
self.respond('421', 'Internal Server Error', function () {
self.disconnect();
});
});
}

Connection.prototype.loop_respond = function (code, msg) {
Expand Down Expand Up @@ -616,8 +649,8 @@ Connection.prototype.ehlo_respond = function(retval, msg) {
"8BITMIME",
];

var databytes = config.get('databytes');
response.push("SIZE " + databytes || 0);
var databytes = parseInt(config.get('databytes')) || 0;
response.push("SIZE " + databytes);

this.capabilities = response;

Expand Down Expand Up @@ -1118,13 +1151,28 @@ Connection.prototype.data_respond = function(retval, msg) {

Connection.prototype.accumulate_data = function(line) {
var self = this;
if (line === ".\r\n")
return this.data_done();

// Bare LF checks
if (line === ".\r" || line === ".\n") {
this.logerror("Client sent bare line-feed - .\\r or .\\n rather than .\\r\\n");
this.respond(451, "See http://haraka.github.com/barelf.html", function() {
this.transaction.data_bytes += line.length;

// Look for .\r\n
if (line.length === 3 &&
line[0] === 0x2e &&
line[1] === 0x0d &&
line[2] === 0x0a)
{
this.transaction.message_stream.add_line_end(function () {
self.data_done();
});
return;
}

// Look for .\n
if (line.length === 2 &&
line[0] === 0x2e &&
line[1] === 0x0a)
{
this.logerror('Client sent bare line-feed - .\\n rather than .\\r\\n');
this.respond(451, "Bare line-feed; see http://haraka.github.com/barelf.html", function() {
self.reset_transaction();
});
return;
Expand All @@ -1135,7 +1183,7 @@ Connection.prototype.accumulate_data = function(line) {
return;
}

this.transaction.add_data(line.replace(/^\.\./, '.').replace(/\r\n$/, "\n"));
this.transaction.add_data(line);
};

Connection.prototype.data_done = function() {
Expand Down
3 changes: 3 additions & 0 deletions docs/CoreConfig.md
Expand Up @@ -34,6 +34,9 @@ different levels available.
* daemonize - enable this to cause Haraka to fork into the background on start-up (default: 0)
* daemon_log_file - (default: /var/log/haraka.log) where to redirect stdout/stderr when daemonized
* daemon_pid_file - (default: /var/run/haraka.pid) where to write a PID file to
* spool_dir - (default: none) directory to create temporary spool files in
* spool_after - (default: -1) if message exceeds this size in bytes, then spool the message to disk
specify -1 to disable spooling completely or 0 to force all messages to be spooled to disk.

[1]: http://learnboost.github.com/cluster/ or node version >= 0.8

Expand Down
31 changes: 29 additions & 2 deletions docs/Transaction.md
Expand Up @@ -19,9 +19,30 @@ The value of the MAIL FROM command as an `Address` object.

An Array of `Address` objects of recipients from the RCPT TO command.

* transaction.data\_lines
* transaction.message_stream

An Array of the lines of the email after DATA.
A node.js Readable Stream object for the message.

You use it like this:

transaction.message_stream.pipe(WritableStream, options)

Where WritableStream is a node.js Writable Stream object such as a
net.socket, fs.writableStream, process.stdout/stderr or custom stream.

The options argument should be an object that overrides the following
properties:

* line_endings (default: "\r\n")
* dot_stuffing (default: false)
* ending_dot (default: false)
* end (default: true)
* buffer_size (default: 65535)
* clamd_style (default: false)

e.g.

transaction.message_stream.pipe(socket, { dot_stuffing: true, ending_dot: true });

* transaction.data\_bytes

Expand All @@ -32,6 +53,12 @@ The number of bytes in the email after DATA.
Adds a line of data to the email. Note this is RAW email - it isn't useful
for adding banners to the email.

* transaction.add_line_end(cb)

Notifies the message_stream that all the data has been received.
Supply an optional callback function that will be run once any inflight data
is finished being written.

* transaction.notes

A safe place to store transaction specific values.
Expand Down

0 comments on commit 92dd16d

Please sign in to comment.