Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Initial import

  • Loading branch information...
commit 00d278d7e25db597477febdb9acd89fb89ef1590 0 parents
Andris Reinman authored
34 README.md
@@ -0,0 +1,34 @@
+# DirectMail
+
+Sendmail alternative to send e-mails directly to recipients without a relaying service.
+
+## Usage
+
+Require *directmail* object
+
+ var directmail = require("directmail");
+
+And push a message to the outgoing queue
+
+ directmail({
+ from: "sender@example.com",
+ recipients: ["receiver1@example.com", "receiver2@example.com"],
+ message: "Subject: test\r\n\r\nHello world!",
+ debug: false
+ });
+
+Where
+
+ * **from** (string) is the e-mail address of the sender
+ * **recipients** (array) is an array of recipient e-mails. Put all `to`, `cc` and `bcc` addresses here.
+ * **message** (string|buffer) is the RFC2822 message to be sent
+ * **debug** (boolean) if set to true, all data about queue processing is printed to console
+
+*Directmail* is very inefficient as it queues all e-mails to be sent into memory. Additionally, if a message is not yet sent and the process is closed, all data about queued messages are lost. Thus *directmail* is only suitable for low throughput systems, like password remainders and such, where the message can be processed immediatelly. *Directmail* is not suitable for spamming.
+
+While being not 100% reliable, *directmail* can still handle sending errors, graylisting and such. If a message can not be sent, it is requeued and retried later.
+
+## License
+
+**MIT**
+
4 index.js
@@ -0,0 +1,4 @@
+"use strict";
+
+// expose to the world
+module.exports = require("./lib/mailer");
198 lib/mailer.js
@@ -0,0 +1,198 @@
+"use strict";
+
+var createQueue = require("./queue"),
+ simplesmtp = require("simplesmtp"),
+ dns = require("dns");
+
+var directMailer = {
+
+ _queue: createQueue(),
+ _started: false,
+ _lastId: 0,
+
+ send: function(options){
+ options = options || {};
+
+ var from = [].concat(options.from || []).shift() || "",
+ recipients = [].concat(options.recipients || []),
+ message = options.message || "",
+ domainGroups = {};
+
+ if(!from){
+ throw new Error("'From' address missing");
+ }
+
+ if(!recipients.length){
+ throw new Error("'Recipients' addresses missing");
+ }
+
+ if(!message){
+ throw new Error("Nothing to send, 'message' empty");
+ }
+
+ recipients.forEach(function(recipient){
+ recipient = (recipient || "").toString();
+
+ var domain = (recipient.split("@").pop() || "").toLowerCase().trim();
+
+ if(!domainGroups[domain]){
+ domainGroups[domain] = [recipient];
+ }else if(domainGroups[domain].indexOf(recipient) < 0){
+ domainGroups[domain].push(recipient);
+ }
+ });
+
+ Object.keys(domainGroups).forEach((function(domain){
+ this._queue.insert({
+ from: from,
+ to: domainGroups[domain],
+ domain: domain,
+ message: message,
+ debug: !!options.debug,
+ id: ++this._lastId
+ });
+ if(options.debug){
+ console.log("Queued message #%s from %s, to %s", this._lastId, from, domainGroups[domain].join(", "));
+ }
+ }).bind(this));
+
+ // start send loop if needed
+ if(!this._started){
+ this._started = true;
+ this._loop();
+ }
+
+ return true;
+ },
+
+ _loop: function(){
+ this._queue.get((function(data){
+
+ if(data.debug){
+ console.log("Retrieved message #%s from the queue, reolving %s", data.id, data.domain);
+ }
+
+ dns.resolveMx(data.domain, (function(err, list){
+ if(data.debug){
+ if(err){
+ console.log("Resolving %s for #%s failed", data.domain, data.id);
+ console.log(err);
+ }else if(!list || !list.length){
+ console.log("Could not resolve any MX servers for %s", data.domain);
+ }
+ }
+ if(err || !list || !list.length){
+ data.replies = (data.replies || 0) + 1;
+ if(data.replies <= 5){
+ this._queue.insert(data, data.replies * 60 * 1000);
+ if(data.debug){
+ console.log("Message #%s requeued for %s minutes", data.id, data.replies);
+ }
+ }
+ if(typeof setImmediate == "function"){
+ setImmediate(this._loop.bind(this));
+ }else{
+ process.nextTick(this._loop.bind(this));
+ }
+ return;
+ }
+
+ list.sort(function(a, b){
+ return (a && a.priority || 0) - (b && b.priority || 0);
+ });
+
+ var exchange = list[0] && list[0].exchange;
+
+ if(data.debug){
+ console.log("%s resolved to %s for #%s", data.domain, exchange, data.id);
+ }
+
+ this._process(exchange, data, (function(err, response){
+ if(data.debug){
+ if(err){
+ console.log("Failed processing message #");
+ }else{
+ console.log("Server responded for #%s:", data.id);
+ console.log(response);
+ }
+ }
+ if(err){
+ data.replies = (data.replies || 0) + 1;
+ if(data.replies <= 5){
+ this._queue.insert(data, data.replies * 15 * 60 * 1000);
+ if(data.debug){
+ console.log("Message #%s requeued for %s minutes", data.id, data.replies * 15);
+ }
+ }
+ }
+ if(typeof setImmediate == "function"){
+ setImmediate(this._loop.bind(this));
+ }else{
+ process.nextTick(this._loop.bind(this));
+ }
+ }).bind(this));
+
+ }).bind(this));
+
+ }).bind(this));
+ },
+
+ _process: function(exchange, data, callback){
+ if(data.debug){
+ console.log("Connecting to %s:25 for message #%s", exchange, data.id);
+ }
+
+ var client = simplesmtp.connect(25, exchange, {ignoreTLS: true, debug: data.debug}),
+ response = {},
+ ready = false;
+
+ client.once("idle", function(){
+ client.useEnvelope({
+ from: data.from,
+ to: data.to
+ });
+ });
+
+ client.once("rcptFailed", function(addresses){
+ if(data.debug){
+ console.log("The following addresses were rejected for #%s: %s", data.id, addresses.join(", "));
+ }
+ });
+
+ client.once("message", function(){
+ if(data.debug){
+ console.log("Transmitting message #%s", data.id);
+ }
+ client.end(data.message);
+ });
+
+ client.once("ready", function(success, message){
+ response.success = !!success;
+ response.message = message;
+ client.quit();
+ });
+
+ client.once("error", function(err){
+ if(ready){
+ return;
+ }
+ ready = true;
+ callback(err);
+ });
+
+ client.once("end", function(){
+ if(ready){
+ return;
+ }
+ ready = true;
+
+ if(!response.success){
+ callback(new Error("Sending failed with error " + (response.message || "").substr(0, 3)));
+ }else{
+ callback(null, response.message);
+ }
+ });
+ }
+};
+
+module.exports = directMailer.send.bind(directMailer);
118 lib/queue.js
@@ -0,0 +1,118 @@
+"use strict";
+
+// expose to the world
+module.exports = function(){
+ return new Queue();
+};
+
+/**
+ * Creates a queue object
+ *
+ * @constructor
+ */
+function Queue(){
+ this._instantQueue = [];
+ this._sortedQueue = [];
+ this._shiftTimer = null;
+ this._callbackQueue = [];
+}
+
+/**
+ * Sets a callback to be run when something comes available from the queue
+ *
+ * @param {Function} callback Callback function to run with queue element as an argument
+ */
+Queue.prototype.get = function(callback){
+ if(this._instantQueue.length){
+ callback(this._instantQueue.pop());
+ }else{
+ this._callbackQueue.unshift(callback);
+ }
+};
+
+/**
+ * Adds an element to the queue. If delay (ms) is set, the data will not be available before
+ * specified delay has passed. Otherwise the data will be available for processing immediatelly.
+ *
+ * @param {Mixed} data Value to be queued
+ * @param {Number} [delay] If set, delay the availability of the data by {delay} milliseconds
+ */
+Queue.prototype.insert = function(data, delay){
+ var container, added = -1;
+ if(typeof delay != "number"){
+ this._instantQueue.unshift(data);
+ this._processInsert();
+ return true;
+ }else{
+ container = {
+ data: data,
+ available: Date.now() + delay
+ };
+ for(var i = 0, len = this._sortedQueue.length; i < len; i++){
+ if(this._sortedQueue[i].available >= container.available){
+ this._sortedQueue.splice(i, 0, container);
+ added = i;
+ break;
+ }
+ }
+ if(added < 0){
+ this._sortedQueue.push(container);
+ added = 0;
+ }
+
+ if(added === 0){
+ this._updateShiftTimer();
+ }
+ }
+};
+
+/**
+ * Clears previous timer and creates a new one (if needed) to process the element
+ * in the queue that needs to be processed first.
+ */
+Queue.prototype._updateShiftTimer = function(){
+ var nextShift, now = Date.now();
+ clearTimeout(this._shiftTimer);
+
+ if(!this._sortedQueue.length){
+ return;
+ }
+
+ nextShift = this._sortedQueue[0].available;
+
+ if(nextShift <= now){
+ this._shiftSorted();
+ }else{
+ setTimeout(this._shiftSorted.bind(this),
+ // add +15ms to ensure that data is already available when the timer is fired
+ this._sortedQueue[0].available - Date.now() + 15);
+ }
+};
+
+/**
+ * Moves an element from the delayed queue to the immediate queue if an elmenet
+ * becomes avilable
+ */
+Queue.prototype._shiftSorted = function(){
+ var container;
+ if(!this._sortedQueue.length){
+ return;
+ }
+
+ if(this._sortedQueue[0].available <= Date.now()){
+ container = this._sortedQueue.shift();
+ this.insert(container.data);
+ }
+
+ this._updateShiftTimer();
+};
+
+/**
+ * If data from a queue is available and a callback is set, run the callback
+ * with available data
+ */
+Queue.prototype._processInsert = function(){
+ if(this._instantQueue.length && this._callbackQueue.length){
+ this._callbackQueue.pop()(this._instantQueue.pop());
+ }
+};
24 package.json
@@ -0,0 +1,24 @@
+{
+ "name": "directmail",
+ "version": "0.1.0",
+ "description": "Sendmail replacement, sends mail directly to recipients SMTP server",
+ "main": "index.js",
+ "scripts": {
+ "test": "nodeunit tests"
+ },
+ "keywords": [
+ "SMTP"
+ ],
+ "author": "Andris Reinman",
+ "license": "MIT",
+ "dependencies": {
+ "simplesmtp": "~0.3.15"
+ },
+ "devDependencies": {
+ "nodeunit": "~0.8.2"
+ },
+ "repository": {
+ "type": "git",
+ "url": "http://github.com/andris9/directmail.git"
+ }
+}
59 tests/queue.js
@@ -0,0 +1,59 @@
+"use strict";
+
+var createQueue = require("../lib/queue");
+
+exports["General tests"] = {
+
+ setUp: function(done){
+ this.queue = createQueue();
+ done();
+ },
+
+ "Add item to queue": function(test){
+
+ test.deepEqual(this.queue._instantQueue, []);
+ this.queue.insert("value1");
+ test.deepEqual(this.queue._instantQueue, ["value1"]);
+ this.queue.insert("value2");
+ test.deepEqual(this.queue._instantQueue, ["value2", "value1"]);
+
+ test.done();
+ },
+
+ "Pull items from a queue": function(test){
+ var queue = this.queue;
+ queue.insert("value1");
+ queue.insert("value2");
+
+ queue.get(function(value){
+ test.equal(value, "value1");
+
+ queue.get(function(value){
+ test.equal(value, "value2");
+ test.deepEqual(queue._instantQueue, []);
+ test.done();
+ });
+ });
+ },
+
+ "Add delayed items": function(test){
+ var queue = this.queue;
+
+ queue.insert("value1", 300);
+ queue.insert("value2", 100);
+ queue.insert("value3");
+
+ queue.get(function(value){
+ test.equal(value, "value3");
+
+ queue.get(function(value){
+ test.equal(value, "value2");
+
+ queue.get(function(value){
+ test.equal(value, "value1");
+ test.done();
+ });
+ });
+ });
+ }
+};
Please sign in to comment.
Something went wrong with that request. Please try again.