Skip to content

Commit

Permalink
Changes as per pull request comments
Browse files Browse the repository at this point in the history
  • Loading branch information
smfreegard committed Nov 12, 2012
1 parent cc17f78 commit 2316647
Show file tree
Hide file tree
Showing 13 changed files with 31 additions and 41 deletions.
12 changes: 1 addition & 11 deletions chunkemitter.js
Expand Up @@ -16,17 +16,7 @@ ChunkEmitter.prototype.fill = function (input) {
if (typeof input === 'string') {
input = new Buffer(input);
}
/*
this.bufs.push(input);
this.bufs_size += input.length;

if (this.bufs_size >= this.buffer_size) {
var new_buf = Buffer.concat(this.bufs, this.bufs_size);
this.emit('data', new_buf);
this.bufs = [];
this.bufs_size = 0;
}
*/
// Optimization: don't allocate a new buffer until
// the input we've had so far is bigger than our
// buffer size.
Expand Down Expand Up @@ -81,4 +71,4 @@ ChunkEmitter.prototype.end = function (cb) {
return emitted;
}

exports.ChunkEmitter = ChunkEmitter;
module.exports = ChunkEmitter;
10 changes: 5 additions & 5 deletions connection.js
Expand Up @@ -442,17 +442,17 @@ Connection.prototype.tran_uuid = function () {

Connection.prototype.reset_transaction = function() {
if (this.transaction) {
this.transaction.messageStream.destroy();
this.transaction.message_stream.destroy();
}
delete this.transaction;
};

Connection.prototype.init_transaction = function() {
this.transaction = trans.createTransaction(this.tran_uuid());
// Catch any errors from the messageStream
// Catch any errors from the message_stream
var self = this;
this.transaction.messageStream.on('error', function (err) {
self.logcrit('messageStream error: ' + err.message);
this.transaction.message_stream.on('error', function (err) {
self.logcrit('message_stream error: ' + err.message);
self.respond('421', 'Internal Server Error', function () {
self.disconnect();
});
Expand Down Expand Up @@ -1160,7 +1160,7 @@ Connection.prototype.accumulate_data = function(line) {
line[1] === 0x0d &&
line[2] === 0x0a)
{
this.transaction.messageStream.add_line_end(function () {
this.transaction.message_stream.add_line_end(function () {
self.data_done();
});
return;
Expand Down
10 changes: 5 additions & 5 deletions docs/Transaction.md
Expand Up @@ -19,13 +19,13 @@ The value of the MAIL FROM command as an `Address` object.

An Array of `Address` objects of recipients from the RCPT TO command.

* transaction.messageStream
* transaction.message_stream

A node.js Readable Stream object for the message.

You use it like this:

transaction.messageStream.pipe(WritableStream, options)
transaction.message_stream.pipe(WritableStream, options)

Where WritableStream is a node.js Writable Stream object such as a
net.socket, fs.writableStream, process.stdout/stderr or custom stream.
Expand All @@ -36,13 +36,13 @@ properties:
* line_endings (default: "\r\n")
* dot_stuffing (default: false)
* ending_dot (default: false)
* emit_end (default: true)
* end (default: true)
* buffer_size (default: 65535)
* clamd_style (default: false)

e.g.

transaction.messageStream.pipe(socket, { dot_stuffing: true, ending_dot: true });
transaction.message_stream.pipe(socket, { dot_stuffing: true, ending_dot: true });

* transaction.data\_bytes

Expand All @@ -55,7 +55,7 @@ for adding banners to the email.

* transaction.add_line_end(cb)

Notifies the messageStream that all the data has been received.
Notifies the message_stream that all the data has been received.
Supply an optional callback function that will be run once any inflight data
is finished being written.

Expand Down
4 changes: 2 additions & 2 deletions messagestream.js
Expand Up @@ -3,7 +3,7 @@
var fs = require('fs');
var util = require('util');
var Stream = require('stream').Stream;
var ChunkEmitter = require('./chunkemitter').ChunkEmitter;
var ChunkEmitter = require('./chunkemitter');
var indexOfLF = require('./utils').indexOfLF;

var STATE_HEADERS = 1;
Expand Down Expand Up @@ -388,4 +388,4 @@ MessageStream.prototype.destroy = function () {
}
}

exports.MessageStream = MessageStream;
module.exports = MessageStream;
14 changes: 7 additions & 7 deletions outbound.js
Expand Up @@ -88,8 +88,8 @@ exports.send_email = function () {

var from = arguments[0],
to = arguments[1],
contents = arguments[2],
next = arguments[3];
contents = arguments[2];
var next = arguments[3];

this.loginfo("Sending email via params");

Expand Down Expand Up @@ -141,14 +141,14 @@ exports.send_email = function () {
var re = /^([^\n]*\n?)/;
while (match = re.exec(contents)) {
var line = match[1];
line = line.replace(/\n?$/, '\n'); // make sure it ends in \n
line = line.replace(/\n?$/, '\r\n'); // make sure it ends in \r\n
transaction.add_data(line);
contents = contents.substr(match[1].length);
if (contents.length === 0) {
break;
}
}
transaction.messageStream.add_line_end();
transaction.message_stream.add_line_end();
this.send_trans_email(transaction, next);
}

Expand Down Expand Up @@ -252,7 +252,7 @@ exports.build_todo = function (todo, ws) {
// Replacer function to exclude items from the queue file header
function exclude_from_json(key, value) {
switch (key) {
case 'messageStream':
case 'message_stream':
return undefined;
default:
return value;
Expand All @@ -271,7 +271,7 @@ exports.build_todo = function (todo, ws) {
var buf = Buffer.concat([todo_length, todo_str], todo_str.length + 4);

ws.write(buf);
todo.messageStream.pipe(ws, { line_endings: '\r\n', dot_stuffing: true, ending_dot: false });
todo.message_stream.pipe(ws, { line_endings: '\r\n', dot_stuffing: true, ending_dot: false });
}

exports.split_to_new_recipients = function (hmail, recipients) {
Expand Down Expand Up @@ -402,7 +402,7 @@ function TODOItem (domain, recipients, transaction) {
this.domain = domain;
this.rcpt_to = recipients;
this.mail_from = transaction.mail_from;
this.messageStream = transaction.messageStream;
this.message_stream = transaction.message_stream;
this.notes = transaction.notes;
this.uuid = transaction.uuid;
return this;
Expand Down
2 changes: 1 addition & 1 deletion plugins/clamd.js
Expand Up @@ -79,7 +79,7 @@ exports.hook_data_post = function (next, connection) {
addressInfo = hp === null ? '' : ' ' + hp.address + ':' + hp.port;
connection.logdebug(plugin, 'connected to host' + addressInfo);
socket.write("zINSTREAM\0", function () {
transaction.messageStream.pipe(socket, { clamd_style: true });
transaction.message_stream.pipe(socket, { clamd_style: true });
});
});

Expand Down
2 changes: 1 addition & 1 deletion plugins/dkim_sign.js
Expand Up @@ -180,5 +180,5 @@ exports.hook_queue_outbound = function (next, connection) {
}
return next();
});
transaction.messageStream.pipe(dkim_sign);
transaction.message_stream.pipe(dkim_sign);
}
2 changes: 1 addition & 1 deletion plugins/queue/quarantine.js
Expand Up @@ -110,7 +110,7 @@ transaction.notes.quarantine = true;
}
);
});
transaction.messageStream.pipe(ws, { line_endings: '\n' });
transaction.message_stream.pipe(ws, { line_endings: '\n' });
});
});

Expand Down
2 changes: 1 addition & 1 deletion plugins/queue/smtp_forward.js
Expand Up @@ -30,7 +30,7 @@ exports.hook_queue = function (next, connection) {
}

smtp_client.on('data', function () {
smtp_client.start_data(connection.transaction.messageStream);
smtp_client.start_data(connection.transaction.message_stream);
});

smtp_client.on('dot', function () {
Expand Down
2 changes: 1 addition & 1 deletion plugins/queue/smtp_proxy.js
Expand Up @@ -61,7 +61,7 @@ exports.hook_queue = function (next, connection) {
var smtp_client = connection.notes.smtp_client;
if (!smtp_client) return next();
smtp_client.next = next;
smtp_client.start_data(connection.transaction.messageStream);
smtp_client.start_data(connection.transaction.message_stream);
};

exports.hook_rset = function (next, connection) {
Expand Down
2 changes: 1 addition & 1 deletion plugins/spamassassin.js
Expand Up @@ -73,7 +73,7 @@ exports.hook_data_post = function (next, connection) {
}

socket.write(headers.join("\r\n"));
connection.transaction.messageStream.pipe(socket);
connection.transaction.message_stream.pipe(socket);
});

var spamd_response = {};
Expand Down
2 changes: 1 addition & 1 deletion plugins/test_queue.js
Expand Up @@ -6,5 +6,5 @@ exports.hook_queue = function(next, connection) {
ws.once('end', function () {
return next(OK);
}
connection.transaction.messageStream.pipe(ws);
connection.transaction.message_stream.pipe(ws);
};
8 changes: 4 additions & 4 deletions transaction.js
Expand Up @@ -6,7 +6,7 @@ var logger = require('./logger');
var Header = require('./mailheader').Header;
var body = require('./mailbody');
var utils = require('./utils');
var MessageStream = require('./messagestream').MessageStream;
var MessageStream = require('./messagestream');

var trans = exports;

Expand All @@ -22,7 +22,7 @@ function Transaction() {
this.parse_body = false;
this.notes = {};
this.header = new Header();
this.messageStream = null;
this.message_stream = null;
this.rcpt_count = {
accept: 0,
tempfail: 0,
Expand All @@ -36,12 +36,12 @@ exports.createTransaction = function(uuid) {
var t = new Transaction();
t.uuid = uuid || utils.uuid();
// Initialize MessageStream here to pass in the UUID
t.messageStream = new MessageStream(config.get('smtp.ini'), t.uuid, t.header.header_list);
t.message_stream = new MessageStream(config.get('smtp.ini'), t.uuid, t.header.header_list);
return t;
};

Transaction.prototype.add_data = function(line) {
this.messageStream.add_line(line);
this.message_stream.add_line(line);
if (typeof line !== 'string') {
line = line.toString('binary').replace(/^\.\./, '.').replace(/\r\n$/, '\n');
}
Expand Down

0 comments on commit 2316647

Please sign in to comment.