Skip to content

Commit

Permalink
bump new version, wait for accept and continue responses before sendi…
Browse files Browse the repository at this point in the history
…ng email
  • Loading branch information
indutny committed Jan 21, 2011
1 parent d34106f commit eace7c1
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 50 deletions.
152 changes: 103 additions & 49 deletions lib/node_mailer.js
Expand Up @@ -72,35 +72,75 @@ Connection.prototype = {
console.log(err); console.log(err);
return; return;
} }
self._process; doProcess();
}); });
return; return;
} } else doProcess();

var email = self._queue.shift();


if (email) { function doProcess() {
// TODO: incremental sending.. (think drain + pause/resume) var email = self._queue.shift();
var m = email.options.from.match(/.*<([^>]+)>\s*$/);
var from = m ? m[1] : email.options.from;

self._stream.write("mail from: <" + from + ">\r\n");
self._stream.write("rcpt to: " + email.options.to + "\r\n");
self._stream.write("data\r\n");
self._stream.write("From: " + email.options.from + "\r\n");
self._stream.write("To: " + email.options.to + "\r\n");
self._stream.write("Subject: " + email.options.subject + "\r\n");
self._stream.write("Content-Type: " + email.options['content-type'] + "\r\n");
self._stream.write(email.options.body + "\r\n");
self._stream.write(".\r\n");
self._stream.write("RSET\r\n");
}


if (self._queue.length > 0) { if (!email) return;
process.nextTick(dequeue); var from = email.options.from,
} else { msg;
self._processing = false;
self.disconnect(); function _cb_tpl(fn) {
return function(err) {
if (err) {
try {
self.disconnect();
} catch(e) {
}
return;
}
fn();
};
}

writeFrom();

function writeFrom() {
self._stream.write(
'mail from: ' + (/<.+>/.test(from) ? from : '<' + from + '>') +
'\r\n'
);

self._stream.promise.wait('accepted', _cb_tpl(writeTo));
}

function writeTo() {
self._stream.write('rcpt to: ' + email.options.to + '\r\n');
self._stream.promise.wait('accepted', _cb_tpl(writeDataStart));
}

function writeDataStart() {
self._stream.write('data\r\n');
self._stream.promise.wait('continue', _cb_tpl(writeData));
}

function writeData() {
self._stream.write([
'From: ' + email.options.from,
'To: ' + email.options.to,
'Subject: ' + email.options.subject,
'Content-Type: ' + email.options['content-type'],
'',
email.options.body,
'.',
'RSET',
''
].join('\r\n'));

onEnd();
}

function onEnd() {
if (self._queue.length > 0) {
process.nextTick(dequeue);
} else {
self.disconnect();
}
}
} }
}; };


Expand All @@ -116,15 +156,41 @@ Connection.prototype = {
stream = tcp.createConnection(this.options.port, this.options.host), stream = tcp.createConnection(this.options.port, this.options.host),
stream_promise = new process.EventEmitter; stream_promise = new process.EventEmitter;


stream.promise = stream_promise;

// Parse reply lines // Parse reply lines
carrier.carry(stream, function(line) { carrier.carry(stream, function(line) {

// If we got successfull auth // If we got successfull auth
if (/^235\s/.test(line) ) { if (/^235\s/.test(line)) {
// Emit 'auth' event
stream_promise.emit('auth'); stream_promise.emit('auth');
} else
// If server has accepted something
if (/^250\s/.test(line)) {
stream_promise.emit('accepted');
} else
// If server says Continue
if (/^354\s/.test(line)) {
stream_promise.emit('continue');
} }
}); });


// Wait for event with timeout
stream_promise.wait = function(event, callback, timeout) {
stream_promise.on(event, onEvent);

function onEvent() {
stream_promise.removeListener(event, onEvent);
clearTimeout(timeout);
callback(null);
}

var timeout = setTimeout(function() {
stream_promise.removeListener(event, onEvent);
callback('timeout');
}, timeout || 5000);
};

stream.setEncoding("utf8"); stream.setEncoding("utf8");
stream.on("connect", function() { stream.on("connect", function() {
self._stream = stream; self._stream = stream;
Expand All @@ -138,29 +204,15 @@ Connection.prototype = {


if (typeof fn === 'function') { if (typeof fn === 'function') {
// Set auth callback // Set auth callback
stream_promise.on('auth', onAuth); stream_promise.wait('auth', function(err) {
function onAuth() { if (err) {
// Reset timeout counter try {
clearTimeout(auth_timeout); self.disconnect();

} catch (e) {
// Call callback }
fn(null, self._stream);
};

// Remove callback after timeout
var auth_timeout = setTimeout(function() {
// Remove listener
stream_promise.removeListener('auth', onAuth);

// Do not leave connection
try {
self.disconnect();
} catch (e) {
} }

fn(err, self._stream);
// Call callback }, self.options.auth_timeout);
fn('auth_timeout');
}, self.options.auth_timeout || 3000);
} }
} else { } else {
if (typeof fn === 'function') { if (typeof fn === 'function') {
Expand All @@ -171,6 +223,7 @@ Connection.prototype = {


stream.on("error", function() { stream.on("error", function() {
self.callback(new Error("could not connect")); self.callback(new Error("could not connect"));
console.log(arguments);
stream.destroy(); stream.destroy();
self._connecting = false; self._connecting = false;
stream = null; stream = null;
Expand Down Expand Up @@ -204,6 +257,7 @@ Connection.prototype = {
}); });
}, },
disconnect : function() { disconnect : function() {
this._processing = false;
if (this._stream) { if (this._stream) {
this._stream.end('quit\r\n'); this._stream.end('quit\r\n');
this._stream = null; this._stream = null;
Expand Down
2 changes: 1 addition & 1 deletion package.json
@@ -1,7 +1,7 @@
{ {
"name": "mailer", "name": "mailer",
"description": "send emails from node.js to a smtp server, simple as cake", "description": "send emails from node.js to a smtp server, simple as cake",
"version": "0.4.4", "version": "0.4.5",
"author": "Marak Squires", "author": "Marak Squires",
"contributors" : [ "contributors" : [
"Elijah Insua <tmpvar@gmail.com> (http://tmvpar.com/)" "Elijah Insua <tmpvar@gmail.com> (http://tmvpar.com/)"
Expand Down

0 comments on commit eace7c1

Please sign in to comment.