Skip to content
Browse files

Added Connection Pooling

  • Loading branch information...
1 parent 97f8244 commit 137a12f464952eb6f691571f4b7e65e7f80e9848 Andris Reinman committed Feb 8, 2012
Showing with 638 additions and 22 deletions.
  1. +66 −16 README.md
  2. +2 −1 index.js
  3. +0 −3 lib/client.js
  4. +291 −0 lib/pool.js
  5. +3 −2 package.json
  6. +276 −0 test/pool.js
View
82 README.md
@@ -6,13 +6,13 @@ This is a module to easily create custom SMTP servers and clients - use SMTP as
[Autogenerated docs](http://node.ee/smtpdoc/)
-# SMTP Server
+## SMTP Server
-## Usage
+### Usage
Create a new SMTP server instance with
- var smtp = new simplesmtp.createServer([options]);
+ var smtp = simplesmtp.createServer([options]);
And start listening on selected port
@@ -33,12 +33,12 @@ SMTP options can include the following:
* **authMethods** - allowed authentication methods, defaults to `["PLAIN", "LOGIN"]`
* **disableEHLO** - if set to true, support HELO command only
-## Example
+### Example
var simplesmtp = require("simplesmtp"),
fs = require("fs");
- var smtp = new simplesmtp.createServer();
+ var smtp = simplesmtp.createServer();
smtp.listen(25);
smtp.on("startData", function(envelope){
@@ -59,7 +59,7 @@ SMTP options can include the following:
});
-## Events
+### Events
* **startData** *(envelope)* - DATA stream is opened by the client (`envelope` is an object with `from`, `to`, `host` and `remoteAddress` properties)
* **data** *(envelope, chunk)* - e-mail data chunk is passed from the client
@@ -68,9 +68,9 @@ SMTP options can include the following:
* **validateSender** *(envelope, email, callback)* - will be emitted if `validateSender` option is set to true
* **validateRecipient** *(envelope, email, callback)* - will be emitted it `validataRecipients` option is set to true
-# SMTP Client
+## SMTP Client
-## Usage
+### Usage
SMTP client can be created with `simplesmptp.connect(port[,host][, options])`
where
@@ -79,7 +79,7 @@ where
* **host** is the hostname to connect to (defaults to "localhost")
* **options** is an optional options object (see below)
-### Connection options
+#### Connection options
The following connection options can be used with `simplesmtp.connect`:
@@ -90,7 +90,7 @@ The following connection options can be used with `simplesmtp.connect`:
* **debug** - output client and server messages to console
* **instanceId** - unique instance id for debugging (will be output console with the messages)
-### Connection events
+#### Connection events
Once a connection is set up the following events can be listened to:
@@ -101,7 +101,7 @@ Once a connection is set up the following events can be listened to:
* **'error'** `(err)` - An error occurred. The connection is closed and an 'end' event is emitted shortly
* **'end'** - connection to the client is closed
-### Sending an envelope
+#### Sending an envelope
When an `'idle'` event is emitted, an envelope object can be sent to the server.
This includes a string `from` and an array of strings `to` property.
@@ -128,7 +128,7 @@ event is emitted.
If the envelope is set up correctly a `'message'` event is emitted.
-### Sending a message
+#### Sending a message
When `'message'` event is emitted, it is possible to send mail. To do this
you can pipe directly a message source (for example an .eml file) to the client
@@ -151,11 +151,16 @@ parameter which indicates if the message was transmitted( (true) or not (false).
}
});
-### Error types
+#### Error types
-On errors
+Emitted errors include the reason for failing in the `name` property
-### About reusing the connection
+ * **UnknowAuthError** - the client tried to authenticate but the method was not supported
+ * **AuthError** - the username/password used were rejected
+ * **SenderError** - the sender e-mail address was rejected
+ * **RecipientError** - all recipients were rejected (if only some of the recipients are rejected, a `'rcptFailed'` event is raised instead
+
+#### About reusing the connection
You can reuse the same connection several times but you can't send a mail
through the same connection concurrently. So if you catch and `'idle'` event
@@ -164,14 +169,59 @@ lock the connection to a message process and unlock after `'ready'`.
On '`error'` events you should reschedule the message and on `'end'` events
you should recreate the connection.
-### Closing the client
+#### Closing the client
By default the client tries to keep the connection up. If you want to close it,
run `client.quit()` - this sends a `QUIT` command to the server and closes the
connection
client.quit();
+## SMTP Client Connection pool
+
+**simplesmtp** has the option for connection pooling if you want to reuse a bulk
+of connections.
+
+### Usage
+
+Create a connection pool of SMTP clients with
+
+ simplesmtp.createClientPool(port[,host][, options])
+
+where
+
+ * **port** is the port to connect to
+ * **host** is the hostname to connect to (defaults to "localhost")
+ * **options** is an optional options object (see below)
+
+#### Connection options
+
+The following connection options can be used with `simplesmtp.connect`:
+
+ * **secureConnection** - use SSL
+ * **name** - the name of the client server
+ * **auth** - authentication object `{user:"...", pass:"..."}`
+ * **ignoreTLS** - ignore server support for STARTTLS
+ * **debug** - output client and server messages to console
+ * **maxConnections** - how many connections to keep in the pool (defaults to 5)
+
+### Send an e-mail
+
+E-mails can be sent through the pool with
+
+ pool.sendMail(mail[, callback])
+
+where
+
+ * **mail** is a [MailComposer](/andris9/mailcomposer) compatible object
+ * **callback** `(error, failedRecipients)` - is the callback function to run after the message is delivered or an error occured. `failedRecipients` is an array with e-mail addresses that were rejected.
+
+### Errors
+
+In addition to SMTP client errors another error name is used
+
+ * **DeliveryError** - used if the message was not accepted by the SMTP server
+
## License
**MIT**
View
3 index.js
@@ -1,4 +1,5 @@
// expose the API to the world
module.exports.createServer = require("./lib/server.js");
-module.exports.connect = require("./lib/client.js");
+module.exports.connect = require("./lib/client.js");
+module.exports.createClientPool = require("./lib/pool.js");
View
3 lib/client.js
@@ -307,13 +307,10 @@ SMTPClient.prototype.end = function(chunk){
// indicate that the stream has ended by sending a single dot on its own line
// if the client already closed the data with \r\n no need to do it again
if(this._lastDataBytes[0] == 0x0D && this._lastDataBytes[1] == 0x0A){
- console.log(".\\r\\n");
this.socket.write(new Buffer(".\r\n", "utf-8"));
}else if(this._lastDataBytes[1] == 0x0D){
- console.log("\\n.\\r\\n");
this.socket.write(new Buffer("\n.\r\n"));
}else{
- console.log("\\r\\n.\\r\\n");
this.socket.write(new Buffer("\r\n.\r\n"));
}
View
291 lib/pool.js
@@ -0,0 +1,291 @@
+var simplesmtp = require("../index"),
+ EventEmitter = require('events').EventEmitter,
+ utillib = require("util");
+
+// expose to the world
+module.exports = function(port, host, options){
+ var pool = new SMTPConnectionPool(port, host, options);
+ return pool;
+};
+
+/**
+ * <p>Creates a SMTP connection pool</p>
+ *
+ * <p>Optional options object takes the following possible properties:</p>
+ * <ul>
+ * <li><b>secureConnection</b> - use SSL</li>
+ * <li><b>name</b> - the name of the client server</li>
+ * <li><b>auth</b> - authentication object <code>{user:"...", pass:"..."}</code>
+ * <li><b>ignoreTLS</b> - ignore server support for STARTTLS</li>
+ * <li><b>debug</b> - output client and server messages to console</li>
+ * <li><b>maxConnections</b> - how many connections to keep in the pool</li>
+ * </ul>
+ *
+ * @constructor
+ * @param {Number} [port=25] The port number to connecto to
+ * @param {String} [host="localhost"] THe hostname to connect to
+ * @param {Object} [options] optional options object
+ */
+function SMTPConnectionPool(port, host, options){
+ EventEmitter.call(this);
+
+ /**
+ * Port number to connect to
+ * @public
+ */
+ this.port = port || 25;
+
+ /**
+ * Hostname to connect to
+ * @public
+ */
+ this.host = host || "localhost";
+
+ /**
+ * Options object
+ * @public
+ */
+ this.options = options || {};
+ this.options.maxConections = this.options.maxConections || 5;
+
+ /**
+ * An array of connections that are currently idle
+ * @private
+ */
+ this._connectionsAvailable = [];
+
+ /**
+ * An array of connections that are currently in use
+ * @private
+ */
+ this._connectionsInUse = [];
+
+ /**
+ * Message queue (FIFO)
+ * @private
+ */
+ this._messageQueue = [];
+
+ /**
+ * Counter for generating ID values for debugging
+ * @private
+ */
+ this._idgen = 0;
+}
+utillib.inherits(SMTPConnectionPool, EventEmitter);
+
+/**
+ * <p>Sends a message. If there's any idling connections available
+ * use one to send the message immediatelly, otherwise add to queue.</p>
+ *
+ * @param {Object} message MailComposer object
+ * @param {Function} callback Callback function to run on finish, gets an
+ * <code>error</code> object as a parameter if the sending failed
+ * and on success a list of addresses that were rejected (if any)
+ */
+SMTPConnectionPool.prototype.sendMail = function(message, callback){
+ var connection;
+
+ message.returnCallback = callback;
+
+ if(this._connectionsAvailable.length){
+ // if available connections pick one
+ connection = this._connectionsAvailable.pop();
+ this._connectionsInUse.push(connection);
+ this._processMessage(message, connection);
+ }else{
+ this._messageQueue.push(message);
+
+ if(this._connectionsAvailable.length + this._connectionsInUse.length < this.options.maxConections){
+ this._createConnection();
+ }
+ }
+
+};
+
+/**
+ * <p>Closes all connections</p>
+ */
+SMTPConnectionPool.prototype.close = function(callback){
+ var connection;
+
+ // for some reason destroying the connections seem to be the only way :S
+ while(this._connectionsAvailable.length){
+ connection = this._connectionsAvailable.pop();
+ if(connection.socket){
+ connection.socket.destroy();
+ }
+ }
+
+ while(this._connectionsInUse.length){
+ connection = this._connectionsInUse.pop();
+ if(connection.socket){
+ connection.socket.destroy();
+ }
+ }
+
+ if(callback){
+ process.nextTick(callback);
+ }
+}
+
+/**
+ * <p>Initiates a connection to the SMTP server and adds it to the pool</p>
+ */
+SMTPConnectionPool.prototype._createConnection = function(){
+ var connectionOptions = {
+ instanceId: ++this._idgen,
+ debug: !!this.options.debug,
+ ignoreTLS: !!this.options.ignoreTLS,
+ auth: this.options.auth || false,
+ name: this.options.name || false,
+ secureConnection: !!this.options.secureConnection
+ },
+ connection = simplesmtp.connect(this.port, this.host, connectionOptions);
+
+ connection.on("idle", this._onConnectionIdle.bind(this, connection));
+ connection.on("message", this._onConnectionMessage.bind(this, connection));
+ connection.on("ready", this._onConnectionReady.bind(this, connection));
+ connection.on("error", this._onConnectionError.bind(this, connection));
+ connection.on("end", this._onConnectionEnd.bind(this, connection));
+ connection.on("rcptFailed", this._onConnectionRCPTFailed.bind(this, connection));
+
+ // as the connection is not ready yet, add to "in use" queue
+ this._connectionsInUse.push(connection);
+};
+
+/**
+ * <p>Processes a message by assigning it to a connection object and initiating
+ * the sending process by setting the envelope</p>
+ *
+ * @param {Object} message MailComposer message object
+ * @param {Object} connection <code>simplesmtp.connect</code> connection
+ */
+SMTPConnectionPool.prototype._processMessage = function(message, connection){
+ connection.currentMessage = message;
+ message.currentConnection = connection;
+
+ // send envelope
+ connection.useEnvelope(message.getEnvelope());
+};
+
+/**
+ * <p>Will be fired on <code>'idle'</code> events by the connection, if
+ * there's a message currently in queue</p>
+ *
+ * @event
+ * @param {Object} connection Connection object that fired the event
+ */
+SMTPConnectionPool.prototype._onConnectionIdle = function(connection){
+
+ var message = this._messageQueue.shift();
+
+ if(message){
+ this._processMessage(message, connection);
+ }else{
+ for(var i=0, len = this._connectionsInUse.length; i<len; i++){
+ if(this._connectionsInUse[i] == connection){
+ this._connectionsInUse.splice(i,1); // remove from list
+ break;
+ }
+ }
+ this._connectionsAvailable.push(connection);
+ }
+
+};
+
+/**
+ * @event
+ * @param {Object} connection Connection object that fired the event
+ * @param {Array} addresses Failed addresses as an array of strings
+ */
+SMTPConnectionPool.prototype._onConnectionRCPTFailed = function(connection, addresses){
+ if(connection.currentMessage){
+ connection.currentMessage.failedRecipients = addresses;
+ }
+};
+
+/**
+ * @event
+ * @param {Object} connection Connection object that fired the event
+ */
+SMTPConnectionPool.prototype._onConnectionMessage = function(connection){
+ if(connection.currentMessage){
+ connection.currentMessage.streamMessage();
+ connection.currentMessage.pipe(connection);
+ }
+};
+
+/**
+ * @event
+ * @param {Object} connection Connection object that fired the event
+ * @param {Boolean} success True if the message was queued by the SMTP server
+ */
+SMTPConnectionPool.prototype._onConnectionReady = function(connection, success){
+ var error;
+ if(connection.currentMessage && connection.currentMessage.returnCallback){
+ if(success){
+ connection.currentMessage.returnCallback(null,
+ connection.currentMessage.failedRecipients || false);
+ }else{
+ error = new Error("Message delivery failed");
+ error.name = "DeliveryError";
+ connection.currentMessage.returnCallback(error);
+ }
+ }
+ connection.currentMessage = false;
+};
+
+/**
+ * @event
+ * @param {Object} connection Connection object that fired the event
+ * @param {Object} error Error object
+ */
+SMTPConnectionPool.prototype._onConnectionError = function(connection, error){
+ var message = connection.currentMessage;
+ connection.currentMessage = false;
+
+ // clear a first message from the list, otherwise an infinite loop will emerge
+ if(!message){
+ message = this._messageQueue.shift();
+ }
+
+ if(message && message.returnCallback){
+ message.returnCallback(error);
+ }
+};
+
+/**
+ * @event
+ * @param {Object} connection Connection object that fired the event
+ */
+SMTPConnectionPool.prototype._onConnectionEnd = function(connection){
+ var removed = false;
+
+ // if in "available" list, remove
+ for(var i=0, len = this._connectionsAvailable.length; i<len; i++){
+ if(this._connectionsAvailable[i] == connection){
+ this._connectionsAvailable.splice(i,1); // remove from list
+ removed = true;
+ break;
+ }
+ }
+
+ if(!removed){
+ // if in "in use" list, remove
+ for(var i=0, len = this._connectionsInUse.length; i<len; i++){
+ if(this._connectionsInUse[i] == connection){
+ this._connectionsInUse.splice(i,1); // remove from list
+ removed = true;
+ break;
+ }
+ }
+ }
+
+ // if there's still unprocessed mail and available connection slots, create
+ // a new connection
+ if(this._messageQueue.length &&
+ this._connectionsInUse.length + this._connectionsAvailable.length < this.options.maxConections){
+ this._createConnection();
+ }
+};
View
5 package.json
@@ -1,7 +1,7 @@
{
"name": "simplesmtp",
"description": "Simple SMTP server module to create custom SMTP servers",
- "version": "0.1.7",
+ "version": "0.1.8",
"author" : "Andris Reinman",
"maintainers":[
{
@@ -27,7 +27,8 @@
"rai": "*"
},
"devDependencies": {
- "nodeunit": "*"
+ "nodeunit": "*",
+ "mailcomposer": "*"
},
"engines": ["node >=0.4.0"],
"keywords": ["servers", "text-based", "smtp", "email", "mail", "e-mail"]
View
276 test/pool.js
@@ -0,0 +1,276 @@
+var testCase = require('nodeunit').testCase,
+ runClientMockup = require("rai").runClientMockup,
+ simplesmtp = require("../index"),
+ netlib = require("net"),
+ MailComposer = require("mailcomposer").MailComposer,
+ fs = require("fs");
+
+var PORT_NUMBER = 8397;
+
+exports["General tests"] = {
+ setUp: function (callback) {
+ this.server = new simplesmtp.createServer({});
+ this.server.listen(PORT_NUMBER, function(err){
+ if(err){
+ throw err;
+ }else{
+ callback();
+ }
+ });
+
+ },
+
+ tearDown: function (callback) {
+ this.server.end(callback);
+ },
+
+ "Send single message": function(test){
+
+ var pool = simplesmtp.createClientPool(PORT_NUMBER),
+ mc = new MailComposer({escapeSMTP: true});
+
+ mc.setMessageOption({
+ from: "andmekala@hot.ee",
+ to: "andris@node.ee",
+ subject:"Hello!",
+ body: "Hello world!",
+ html: "<b>Hello world!</b>"
+ });
+
+ this.server.on("dataReady", function(envelope, callback){
+ test.ok(true);
+ callback();
+ });
+
+ pool.sendMail(mc, function(error){
+ test.ifError(error);
+ pool.close(function(){
+ test.ok(true);
+ test.done();
+ });
+ });
+ },
+
+ "Send several messages": function(test){
+ var total = 10;
+
+ test.expect(total*2);
+
+ var pool = simplesmtp.createClientPool(PORT_NUMBER),
+ mc;
+
+ this.server.on("dataReady", function(envelope, callback){
+ process.nextTick(callback);
+ });
+
+ var completed = 0;
+ for(var i=0; i<total; i++){
+ mc = new MailComposer({escapeSMTP: true});
+ mc.setMessageOption({
+ from: "andmekala@hot.ee",
+ to: "andris@node.ee",
+ subject:"Hello!",
+ body: "Hello world!",
+ html: "<b>Hello world!</b>"
+ });
+ pool.sendMail(mc, function(error){
+ test.ifError(error);
+ test.ok(true);
+ completed++;
+ if(completed >= total){
+ pool.close(function(){
+ test.done();
+ });
+ }
+ });
+ }
+ },
+
+ "Delivery error once": function(test){
+
+ var pool = simplesmtp.createClientPool(PORT_NUMBER),
+ mc = new MailComposer({escapeSMTP: true});
+
+ mc.setMessageOption({
+ from: "andmekala@hot.ee",
+ to: "andris@node.ee",
+ subject:"Hello!",
+ body: "Hello world!",
+ html: "<b>Hello world!</b>"
+ });
+
+ this.server.on("dataReady", function(envelope, callback){
+ test.ok(true);
+ callback(new Error("Spam!"));
+ });
+
+ pool.sendMail(mc, function(error){
+ test.equal(error && error.name, "DeliveryError");
+ pool.close(function(){
+ test.ok(true);
+ test.done();
+ });
+ });
+ },
+
+ "Delivery error several times": function(test){
+ var total = 10;
+
+ test.expect(total);
+
+ var pool = simplesmtp.createClientPool(PORT_NUMBER),
+ mc;
+
+ this.server.on("dataReady", function(envelope, callback){
+ process.nextTick(function(){callback(new Error("Spam!"))});
+ });
+
+ var completed = 0;
+ for(var i=0; i<total; i++){
+ mc = new MailComposer({escapeSMTP: true});
+ mc.setMessageOption({
+ from: "andmekala@hot.ee",
+ to: "andris@node.ee",
+ subject:"Hello!",
+ body: "Hello world!",
+ html: "<b>Hello world!</b>"
+ });
+
+ pool.sendMail(mc, function(error){
+ test.equal(error && error.name, "DeliveryError");
+ completed++;
+ if(completed >= total){
+ pool.close(function(){
+ test.done();
+ });
+ }
+ });
+ }
+ }
+};
+
+exports["Auth fail tests"] = {
+ setUp: function (callback) {
+ this.server = new simplesmtp.createServer({
+ requireAuthentication: true
+ });
+
+ this.server.listen(PORT_NUMBER, function(err){
+ if(err){
+ throw err;
+ }else{
+ callback();
+ }
+ });
+
+ this.server.on("authorizeUser", function(envelope, username, password, callback){
+ callback(null, username == password);
+ });
+ },
+
+ tearDown: function (callback) {
+ this.server.end(callback);
+ },
+
+ "Authentication passes once": function(test){
+ var pool = simplesmtp.createClientPool(PORT_NUMBER, false, {
+ auth: {
+ "user": "test",
+ "pass": "test"
+ }
+ }),
+ mc = new MailComposer({escapeSMTP: true});
+
+ mc.setMessageOption({
+ from: "andmekala2@hot.ee",
+ to: "andris2@node.ee",
+ subject:"Hello2!",
+ body: "Hello2 world!",
+ html: "<b>Hello2 world!</b>"
+ });
+
+ this.server.on("dataReady", function(envelope, callback){
+ test.ok(true);
+ callback();
+ });
+
+ pool.sendMail(mc, function(error){
+ test.ifError(error);
+ pool.close(function(){
+ test.ok(true);
+ test.done();
+ });
+ });
+
+ },
+
+ "Authentication error once": function(test){
+ var pool = simplesmtp.createClientPool(PORT_NUMBER, false, {
+ auth: {
+ "user": "test1",
+ "pass": "test2"
+ }
+ }),
+ mc = new MailComposer({escapeSMTP: true});
+
+ mc.setMessageOption({
+ from: "andmekala2@hot.ee",
+ to: "andris2@node.ee",
+ subject:"Hello2!",
+ body: "Hello2 world!",
+ html: "<b>Hello2 world!</b>"
+ });
+
+ this.server.on("dataReady", function(envelope, callback){
+ test.ok(true);
+ callback();
+ });
+
+ pool.sendMail(mc, function(error){
+ test.equal(error && error.name, "AuthError");
+ pool.close(function(){
+ test.ok(true);
+ test.done();
+ });
+ });
+
+ },
+
+ "Authentication error several times": function(test){
+ var total = 10;
+ test.expect(total);
+
+ var pool = simplesmtp.createClientPool(PORT_NUMBER, false, {
+ auth: {
+ "user": "test1",
+ "pass": "test2"
+ }
+ }),
+ mc;
+ this.server.on("dataReady", function(envelope, callback){
+ process.nextTick(function(){callback(new Error("Spam!"))});
+ });
+
+ var completed = 0;
+ for(var i=0; i<total; i++){
+ mc = new MailComposer({escapeSMTP: true});
+ mc.setMessageOption({
+ from: "andmekala@hot.ee",
+ to: "andris@node.ee",
+ subject:"Hello!",
+ body: "Hello world!",
+ html: "<b>Hello world!</b>"
+ });
+
+ pool.sendMail(mc, function(error){
+ test.equal(error && error.name, "AuthError");
+ completed++;
+ if(completed >= total){
+ pool.close(function(){
+ test.done();
+ });
+ }
+ });
+ }
+ }
+}

0 comments on commit 137a12f

Please sign in to comment.
Something went wrong with that request. Please try again.