Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'outbound_control' of https://github.com/qiw2009/Haraka

…into qiw2009_outbound_control
  • Loading branch information...
commit 6b897dfab5687f0b1fde3759570327a7b183a275 2 parents 81ec865 + 7452752
@baudehlo authored
View
1  config/outbound_control
@@ -0,0 +1 @@
+true
View
108 config/outbound_limit.json
@@ -0,0 +1,108 @@
+{ "163.com" : {
+ "conn_limit" : 4,
+ "sessions" : 16,
+ "micro_limit" : 2,
+ "tiny_limit" : 3,
+ "medium_limit" : 6000,
+ "big_limit" : 30000,
+ "micro" : 50000,
+ "tiny" : 900000,
+ "medium": 3600000,
+ "big": 86400000
+ },
+"126.com" : {
+ "conn_limit" : 4,
+ "sessions" : 16,
+ "micro_limit" : 50,
+ "tiny_limit" : 150,
+ "medium_limit" : 4500,
+ "big_limit" : 30000,
+ "micro" : 5000,
+ "tiny" : 15000,
+ "medium": 3600000,
+ "big": 86400000
+ },
+"yeah.net" : {
+ "conn_limit" : 4,
+ "sessions" : 16,
+ "micro_limit" : 50,
+ "tiny_limit" : 150,
+ "medium_limit" : 4500,
+ "big_limit" : 30000,
+ "micro" : 5000,
+ "tiny" : 15000,
+ "medium": 3600000,
+ "big": 86400000
+ },
+ "sohu.com" : {
+ "conn_limit" : 5,
+ "sessions" : 16,
+ "micro_limit" : 50,
+ "tiny_limit" : 150,
+ "medium_limit" :4500,
+ "big_limit" : 30000,
+ "micro" : 5000,
+ "tiny" : 15000,
+ "medium": 3600000,
+ "big": 86400000
+ },
+ "sina.com" : {
+ "conn_limit" : 3,
+ "sessions" : 4,
+ "micro_limit" : 50,
+ "tiny_limit" : 150,
+ "medium_limit" :4500,
+ "big_limit" : 30000,
+ "micro" : 5000,
+ "tiny" : 15000,
+ "medium": 3600000,
+ "big": 86400000
+ },
+ "sina.cn" : {
+ "conn_limit" : 3,
+ "sessions" : 4,
+ "micro_limit" : 50,
+ "tiny_limit" : 150,
+ "medium_limit" :4500,
+ "big_limit" : 30000,
+ "micro" : 5000,
+ "tiny" : 15000,
+ "medium": 3600000,
+ "big": 86400000
+ },
+ "sina.com.cn" : {
+ "conn_limit" : 1,
+ "sessions" : 4,
+ "micro_limit" : 50,
+ "tiny_limit" : 150,
+ "medium_limit" :4500,
+ "big_limit" : 30000,
+ "micro" : 5000,
+ "tiny" : 15000,
+ "medium": 3600000,
+ "big": 86400000
+ },
+ "qq.com" : {
+ "conn_limit" : 16,
+ "sessions" : 16,
+ "micro_limit": 60,
+ "tiny_limit" : 180,
+ "medium_limit" : 36000,
+ "big_limit" : 300000,
+ "micro" : 5000,
+ "tiny" : 15000,
+ "medium": 3600000,
+ "big": 86400000
+ },
+ "default" : {
+ "conn_limit" : 5,
+ "micro_limit" : 6,
+ "tiny_limit" : 12,
+ "medium_limit" :1000,
+ "big_limit" : 30000,
+ "micro" : 5000,
+ "tiny" : 30000,
+ "medium": 3600000,
+ "big": 86400000
+ }
+}
View
20 config/plugins
@@ -4,29 +4,31 @@
# log.syslog
# block mails from known bad hosts (see config/dnsbl.zones for the DNS zones queried)
-dnsbl
+# dnsbl
# Check mail headers are valid
-data.rfc5322_header_checks
+# data.rfc5322_header_checks
# block mail from some known bad HELOs - see config/helo.checks.ini for configuration
-helo.checks
+# helo.checks
# control which "MAIL FROM" addresses you accept. See docs.
-mail_from.access
+# mail_from.access
# Only accept mail where the MAIL FROM domain is resolvable to an MX record
-mail_from.is_resolvable
+# mail_from.is_resolvable
# Disconnect client if they spew bad SMTP commands at us
-max_unrecognized_commands
+# max_unrecognized_commands
# control which "RCPT TO" addresses you reject. See docs.
-rcpt_to.access
+# rcpt_to.access
# Only accept mail for your personal list of hosts. Edit config/host_list
# NOTE: THIS IS REQUIRED for inbound email.
-rcpt_to.in_host_list
+# rcpt_to.in_host_list
# Queue mail via smtp - see config/smtp_forward.ini for where your mail goes
-queue/smtp_forward
+# queue/smtp_forward
+
+outbound_control/outbound_limit
View
2  connection.js
@@ -132,7 +132,7 @@ function Connection(client, server) {
this.deny_includes_uuid = config.get('deny_includes_uuid') ? true : false;
this.early_talker = 0;
this.pipelining = 0;
- this.relaying = false;
+ this.relaying = true;
this.disconnected = false;
this.esmtp = false;
this.last_response = null;
View
9 constants.js
@@ -12,6 +12,15 @@ exports.next_hook = 907;
exports.delay = 908;
exports.denysoftdisconnect = 909;
+exports.not_send = 910;
+exports.spam = 911;
+exports.invalid = 912;
+
+exports.data_sent = 1;
+exports.error = 2;
+exports.bad_code = 4;
+exports.no = 0;
+
exports.import = function (object) {
for (var k in exports) {
if (exports.hasOwnProperty(k) && k !== "import") {
View
459 outbound.js
@@ -34,6 +34,14 @@ var uniq = Math.round(Math.random() * MAX_UNIQ);
var max_concurrency = config.get('outbound.concurrency_max') || 100;
var queue_count = 0;
+var Queue = require('./plugins/outbound_control/queue').Queue;
+var processing_queue = new Queue();
+var get_ISPConfig = require('./plugins/outbound_control/rate_policy').get_ISPConfig;
+var get_policy = require('./plugins/outbound_control/rate_policy').get_policy;
+var deliver_client = require('./plugins/outbound_control/send_client');
+var delivery_concurrency = require('./plugins/outbound_control/rate_policy').delivery_concurrency;
+var conn_pool = require('./plugins/outbound_control/rate_policy').conn_pool;
+
exports.list_queue = function () {
this._load_cur_queue("_list_file");
}
@@ -85,9 +93,9 @@ exports.send_email = function () {
}
var from = arguments[0],
- to = arguments[1],
- contents = arguments[2],
- next = arguments[3];
+ to = arguments[1],
+ contents = arguments[2],
+ next = arguments[3];
this.loginfo("Sending email via params");
@@ -131,7 +139,7 @@ exports.send_email = function () {
transaction.rcpt_to = to;
- // Set data_lines to lines in contents
+ // Set dataf_lines to lines in contents
var match;
var re = /^([^\n]*\n?)/;
while (match = re.exec(contents)) {
@@ -219,7 +227,7 @@ exports.send_trans_email = function (transaction, next) {
exports.process_domain = function (todo, hmails, cb) {
var plugin = this;
this.loginfo("Processing domain: " + todo.domain);
- var fname = _fname();
+ var fname = _fname() + '@' + todo.domain;
var tmp_path = path.join(queue_dir, '.' + fname);
var ws = fs.createWriteStream(tmp_path, { flags: WRITE_EXCL });
var data_pos = 0;
@@ -233,7 +241,7 @@ exports.process_domain = function (todo, hmails, cb) {
cb(tmp_path, DENY, "Queue error");
}
else {
- hmails.push(new HMailItem (fname, dest_path, todo.notes));
+ hmails.push(new HMailItem (fname, dest_path, todo));
cb(tmp_path, OK, "Queued!");
}
});
@@ -265,10 +273,10 @@ exports.build_todo = function (todo, ws, write_more) {
// Replacer function to exclude items from the queue file header
function exclude_from_json(key, value) {
switch (key) {
- case 'data_lines':
- return undefined;
- default:
- return value;
+ case 'data_lines':
+ return undefined;
+ default:
+ return value;
}
}
var todo_str = JSON.stringify(todo, exclude_from_json);
@@ -356,19 +364,11 @@ exports._load_cur_queue = function (cb_name) {
exports.load_queue_files = function (cb_name, files) {
var plugin = this;
- if (files.length === 0) return;
-
+ if (files.length === 0)
+ return plugin.heart_beat();
this.loginfo("Loading some of the queue...");
-
- if ((delivery_concurrency >= max_concurrency)
- || config.get('outbound.disabled'))
- {
- // try again in 1 second if delivery is disabled
- setTimeout(function () {plugin.load_queue_files(cb_name, files)}, 1000);
- return;
- }
-
- for (var i=1; i <= max_concurrency; i++) {
+ // try to send every email; failed sendings will be push into the queue
+ for (var i=1; i <= files.length; i++) {
if (files.length === 0) break;
var file = files.shift();
if (/^\./.test(file)) {
@@ -377,16 +377,30 @@ exports.load_queue_files = function (cb_name, files) {
}
var hmail = new HMailItem(file, path.join(queue_dir, file));
this[cb_name](hmail);
+ }
- if ((files.length === 0) || (i === max_concurrency)) {
- // end of loop or end of files
- var self = this;
- hmail.on('ready', function () {self.load_queue_files(cb_name, files)});
+ plugin.heart_beat();
+}
+
+exports.heart_beat = function(){
+ console.log("heart beat");
+ var queue_size = processing_queue.size();
+ for (var i=1; i <= max_concurrency; i++) {
+ if (processing_queue.size() == 0)
break;
- }
+
+ var keys = Object.keys(processing_queue.mails);
+ var index = i % keys.length;
+ var mail = processing_queue.shift(index, keys);
+ mail.send();
}
-}
+ var interval = processing_queue.size() > 20 ? 3000 : 1000;
+ var plugin = this;
+ setTimeout(function() {
+ plugin.heart_beat();
+ }, interval);
+}
exports._add_file = function (hmail) {
var self = this;
this.loginfo("Adding file: " + hmail.filename);
@@ -426,7 +440,7 @@ function TODOItem (domain, recipients, transaction) {
/////////////////////////////////////////////////////////////////////////////
// HMailItem - encapsulates an individual outbound mail item
-function HMailItem (filename, path, notes) {
+function HMailItem (filename, path, todo) {
events.EventEmitter.call(this);
var matches = filename.match(fn_re);
if (!matches) {
@@ -436,8 +450,8 @@ function HMailItem (filename, path, notes) {
this.filename = filename;
this.next_process = matches[1];
this.num_failures = matches[2];
- this.notes = notes || {};
-
+ this.notes = todo ? todo.notes : {};
+ this._domain = todo ? todo.domain : filename.split('@')[1];
this.size_file();
}
@@ -518,15 +532,55 @@ HMailItem.prototype.read_todo = function () {
}
HMailItem.prototype.send = function () {
- if (!this.todo) {
- var self = this;
- this.on('ready', function () { self._send() });
+ // check if this email can be delivered
+ plugins.run_hooks("check_limit", this);
+}
+
+HMailItem.prototype.check_limit_respond = function(retval) {
+ console.log('$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$' + retval);
+ if (retval == constants.ok) {
+ if (!this.todo) {
+ var self = this;
+ this.on('ready', function () { self._send() });
+ }
+ else {
+ this._send();
+ }
}
- else {
- this._send();
+ // push this email back to the queue
+ else
+ {
+ processing_queue.push(this);
}
}
+HMailItem.prototype.process_bad_code = function(code, msg) {
+ var self = this;
+ self.clear_timers();
+
+ // if this email is delivered successfully; not neccessary
+ // to goto next steps
+ if (self.sent) return;
+
+ // in case the server side respond two error messages to
+ // a single command
+ if (!self.erred) {
+ self.erred = true;
+ self.bounce(msg);
+ }
+}
+
+HMailItem.prototype.try_again = function(status) {
+ var self = this;
+ get_policy(this.dom).dispose();
+ self.loginfo(self.filename + ': not send....' + status);
+ if (status === constants.no)
+ processing_queue.push(this);
+ else if (status === constants.error)
+ self.temp_fail('connection closed before sent out all data');
+}
+
+
HMailItem.prototype._send = function () {
if ((delivery_concurrency >= max_concurrency)
|| config.get('outbound.disabled'))
@@ -537,7 +591,7 @@ HMailItem.prototype._send = function () {
setTimeout(function () {hmail._send()}, 1000);
return;
}
-
+
plugins.run_hooks('send_email', this);
}
@@ -562,30 +616,30 @@ HMailItem.prototype.get_mx = function () {
HMailItem.prototype.get_mx_respond = function (retval, mx) {
switch(retval) {
- case constants.ok:
- var mx_list;
- if (Array.isArray(mx)) {
- mx_list = mx;
- }
- else if (typeof mx === "object") {
- mx_list = [mx];
- }
- else {
- // assume string
- var matches = /^(.*?)(:(\d+))?$/.exec(mx);
- if (!matches) {
- throw("get_mx returned something that doesn't match hostname or hostname:port");
- }
- mx_list = [{priority: 0, exchange: matches[1], port: matches[3]}];
- }
- this.logdebug("Got an MX from Plugin: " + this.todo.domain + " => 0 " + mx);
- return this.found_mx(null, mx_list);
- case constants.deny:
- this.logwarn("get_mx plugin returned DENY: " + mx);
- return this.bounce("No MX for " + this.domain);
- case constants.denysoft:
- this.logwarn("get_mx plugin returned DENYSOFT: " + mx);
- return this.temp_fail("Temporary MX lookup error for " + this.domain);
+ case constants.ok:
+ var mx_list;
+ if (Array.isArray(mx)) {
+ mx_list = mx;
+ }
+ else if (typeof mx === "object") {
+ mx_list = [mx];
+ }
+ else {
+ // assume string
+ var matches = /^(.*?)(:(\d+))?$/.exec(mx);
+ if (!matches) {
+ throw("get_mx returned something that doesn't match hostname or hostname:port");
+ }
+ mx_list = [{priority: 0, exchange: matches[1], port: matches[3]}];
+ }
+ this.logdebug("Got an MX from Plugin: " + this.todo.domain + " => 0 " + mx);
+ return this.found_mx(null, mx_list);
+ case constants.deny:
+ this.logwarn("get_mx plugin returned DENY: " + mx);
+ return this.bounce("No MX for " + this.domain);
+ case constants.denysoft:
+ this.logwarn("get_mx plugin returned DENYSOFT: " + mx);
+ return this.temp_fail("Temporary MX lookup error for " + this.domain);
}
// if none of the above return codes, drop through to this...
@@ -714,7 +768,7 @@ HMailItem.prototype.try_deliver = function () {
}
this.loginfo("Looking up A records for: " + host);
-
+
// now we have a host, we have to lookup the addresses for that host
// and try each one in order they appear
dns.resolve(host, function (err, addresses) {
@@ -738,208 +792,51 @@ var smtp_regexp = /^([0-9]{3})([ -])(.*)/;
HMailItem.prototype.try_deliver_host = function (mx) {
if (this.hostlist.length === 0) {
- delivery_concurrency--;
return this.try_deliver(); // try next MX
}
- var host = this.hostlist.shift();
- var port = mx.port || 25;
- var socket = sock.connect(port, host);
- var self = this;
- var processing_mail = true;
-
- this.loginfo("Attempting to deliver to: " + host + ":" + port + " (" + delivery_concurrency + ")");
-
- socket.on('error', function (err) {
- self.logerror("Ongoing connection failed: " + err);
- processing_mail = false;
- // try the next MX
- self.try_deliver_host(mx);
- });
-
- socket.on('close', function () {
- if (processing_mail) {
- return self.try_deliver_host(mx);
- }
- });
-
- socket.setTimeout(300 * 1000); // TODO: make this configurable
-
- var command = 'connect';
- var response = [];
-
- var recipients = this.todo.rcpt_to.map(function (a) { return new Address (a.original) });
-
- var mail_from = new Address (this.todo.mail_from.original);
-
- var data_marker = 0;
- var last_recip;
- var ok_recips = 0;
- var fail_recips = [];
- var smtp_properties = {
- "tls": false,
- "max_size": 0,
- "eightbitmime": false,
- "enh_status_codes": false,
- };
-
- socket.send_command = function (cmd, data) {
- if (!this.writable) {
- self.logerror("Socket writability went away");
- return self.try_deliver_host(mx);
- }
- var line = cmd + (data ? (' ' + data) : '');
- if (cmd === 'dot') {
- line = '.';
- }
- self.logprotocol("C: " + line);
- this.write(line + "\r\n");
- command = cmd.toLowerCase();
- response = [];
- };
-
- socket.process_ehlo_data = function () {
- for (var i=0,l=response.length; i < l; i++) {
- var r = response[i];
- if (r.toUpperCase() === '8BITMIME') {
- smtp_properties.eightbitmime = true;
- }
- else if (r.toUpperCase() === 'STARTTLS') {
- smtp_properties.tls = true;
- }
- else if (r.toUpperCase() === 'ENHANCEDSTATUSCODES') {
- smtp_properties.enh_status_codes = true;
- }
- else {
- var matches;
- matches = r.match(/^SIZE\s+(\d+)$/);
- if (matches) {
- smtp_properties.max_size = matches[1];
- }
- }
- }
-
- if (smtp_properties.tls && config.get('outbound.enable_tls')) {
- this.on('secure', function () {
- socket.send_command('EHLO', config.get('me'));
- });
- this.send_command('STARTTLS');
- }
- else {
- this.send_command('MAIL', 'FROM:' + mail_from);
- }
- }
-
- socket.on('timeout', function () {
- self.logerror("Outbound connection timed out");
- processing_mail = false;
- socket.end();
- self.try_deliver_host(mx);
- });
-
- socket.on('connect', function () {
- });
+ var domain = this.dom
+ var host = this.hostlist.shift();
+ var port = mx.port || 25;
+
+ this.loginfo("Attempting to deliver to: " + host + " (" + delivery_concurrency + ")");
+ var from = this.todo.mail_from.original.replace('>','').replace('<','');
+ var timeout = 60 * 1000;
+ var enable_tls = false;
+ var max = get_ISPConfig(domain, 'conn_limit');
+ var hmail = this;
- socket.on('line', function (line) {
- var matches;
- self.logprotocol("S: " + line);
- if (matches = smtp_regexp.exec(line)) {
- var code = matches[1],
- cont = matches[2],
- rest = matches[3];
- response.push(rest);
- if (cont === ' ') {
- if (code.match(/^4/)) {
- if (/^rcpt/.test(command)) {
- // this recipient was rejected
- fail_recips.push(last_recip);
- if (!(ok_recips || recipients.length)) {
- // no accepted recipients, and no more left so bail out
- socket.send_command('QUIT');
- return self.temp_fail("Upstream error: " + code + " " + rest);
- }
- }
- else {
- socket.send_command('QUIT');
- return self.temp_fail("Upstream error: " + code + " " + rest);
- }
- }
- else if (code.match(/^5/)) {
- socket.send_command('QUIT');
- return self.bounce(rest);
- }
- switch (command) {
- case 'connect':
- socket.send_command('EHLO', config.get('me'));
- break;
- case 'ehlo':
- socket.process_ehlo_data();
- break;
- case 'starttls':
- var key = config.get('tls_key.pem', 'data').join("\n");
- var cert = config.get('tls_cert.pem', 'data').join("\n");
- var tls_options = { key: key, cert: cert };
-
- smtp_properties = {};
- socket.upgrade(tls_options);
- break;
- case 'helo':
- socket.send_command('MAIL', 'FROM:' + mail_from);
- break;
- case 'mail':
- case 'rcpt_':
- if (command === 'rcpt_') ok_recips++;
- last_recip = recipients.shift();
- socket.send_command('RCPT', 'TO:' + last_recip.format());
- if (recipients.length) {
- // don't move to next state if we have more recipients
- command = 'rcpt_';
- }
- break;
- case 'rcpt':
- socket.send_command('DATA');
- break;
- case 'data':
- var data_stream = self.data_stream();
- data_stream.on('data', function (data) {
- self.logdata("C: " + data);
- });
- data_stream.on('error', function (err) {
- self.logerror("Reading from the data stream failed: " + err);
- });
- data_stream.on('end', function () {
- socket.send_command('dot');
- });
- data_stream.pipe(socket, {end: false});
- break;
- case 'dot':
- processing_mail = false;
- socket.send_command('QUIT');
- if (fail_recips.length) {
- exports.split_to_new_recipients(self, fail_recips);
- }
- else {
- self.delivered(rest);
- }
- break;
- case 'quit':
- socket.end();
- break;
- default:
- // should never get here - means we did something
- // wrong.
- throw new Error("Unknown command: " + command);
- }
- }
- }
- else {
- // Unrecognised response.
- self.logerror("Unrecognised response from upstream server: " + line);
- processing_mail = false;
- socket.end();
- return self.bounce("Unrecognised response from upstream server: " + line);
- }
- });
+ deliver_client.run_send(conn_pool, domain, port, host, timeout, enable_tls, max, hmail,
+ function(err, client) {
+ if (err) {
+ hmail.try_again(constants.no);
+ return;
+ }
+ // actually we only have one address for each mail
+ var rcpts = hmail.todo.rcpt_to
+ var send_rcpt = function () {
+ if (hmail.todo.rcpt_to.length != 0) {
+ var a = hmail.todo.rcpt_to[0];
+ var rcpt = new Address (a.original);
+ client.send_command('RCPT',
+ 'TO:' + rcpt);
+ }
+ else
+ client.send_command('RCPT',
+ 'TO: ' + '<>');
+ };
+
+ var send_data = function() {
+ client.send_command('DATA');
+ }
+
+ client.on('mail', send_rcpt);
+ client.on('rcpt', send_data);
+
+ client.on('data', function () {
+ client.start_data(hmail.data_stream());
+ });
+ });
}
function populate_bounce_message (from, to, reason, hmail, cb) {
@@ -1024,9 +921,29 @@ HMailItem.prototype.bounce_respond = function (retval, msg) {
});
}
+
+HMailItem.prototype.process_bad_code = function(code, msg) {
+ var self = this;
+
+ // if this email is delivered successfully; not neccessary
+ // to goto next steps
+ if (self.sent) return;
+
+ // in case the server side respond two error messages to
+ // a single command
+ if (!self.erred) {
+ self.erred = true;
+ self.bounce(msg);
+ }
+}
+
+
HMailItem.prototype.double_bounce = function (err) {
this.logerror("Double bounce: " + err);
- fs.unlink(this.path);
+ fs.unlink(this.path, function() {
+ hmail.loginfo(this.filename + ' deleted');
+ });
+ this.send_next();
// TODO: fill this in... ?
// One strategy is perhaps log to an mbox file. What do other servers do?
// Another strategy might be delivery "plugins" to cope with this.
@@ -1070,16 +987,30 @@ HMailItem.prototype.temp_fail = function (err) {
hmail.path = path.join(queue_dir, new_filename);
hmail.filename = new_filename;
-
+
setTimeout(function () {hmail.send()}, delay);
});
}
// The following handler has an impact on outgoing mail. It does remove the queue file.
HMailItem.prototype.delivered_respond = function (retval, msg) {
+ var self = this;
if (retval != constants.cont && retval != constants.ok) {
this.logwarn("delivered plugin responded with: " + retval + " msg=" + msg + ".");
}
- // Remove the file.
- fs.unlink(this.path);
+
+ // Remove the file; send next mail of the same domain
+ fs.unlink(this.path, function(err){
+ self.sent = true;
+ self.send_next();
+ });
};
+
+HMailItem.prototype.send_next = function() {
+ var hmail = processing_queue.dequeue(this);
+ if (hmail)
+ {
+ get_policy(this.dom).prepose();
+ setTimeout(function(){hmail.send()}, 0);
+ }
+}
View
2  plugins.js
@@ -229,7 +229,7 @@ plugins.run_next_hook = function(hook, object, params) {
return;
}
called_once++;
- if (!retval) retval = constants.cont;
+ if (!retval) retval = constants.cont;
// Log what is being run
if (item && hook !== 'log') {
var log = 'logdebug';
View
18 plugins/outbound_control/README
@@ -0,0 +1,18 @@
+outbound limit control of this implementation works as follows:
+
+1 outbound rates are set in config/outbound_limit.json by domain;
+ you set a couple of time windows, and numbers of emails could be
+ sent in those windowns;
+
+2 every time an email comes, we first check if more emails can be
+ sent to that domain; if negative, push the email into a processing
+ queue.
+
+3 every time an email is delivered, an new email is picked up immediately
+ to be delivered.
+
+4 when haraka starts for the first time, all emails in the queue dir
+ are pushed into a im-memory processing queue
+
+5 a heart beater is used to periodically check if there are remaining
+ emails in the processing queue to be sent.
View
22 plugins/outbound_control/outbound_limit.js
@@ -0,0 +1,22 @@
+var config = exports.config;
+var constants = require('./constants');
+var policy_factory = require('./plugins/outbound_control/rate_policy');
+
+exports.register = function() {
+ this.register_hook('check_limit', 'limit_checker');
+}
+
+exports.limit_checker = function(next, param) {
+ var hmail = param;
+ var domain = hmail._domain;
+ var policy = policy_factory.get_policy(domain);
+ var exceeded = policy.exceed_limit(hmail);
+ if (exceeded) {
+ next(constants.cont, param);
+ }
+ else {
+ hmail.loginfo(policy);
+ policy.prepose();
+ next(constants.ok, param);
+ }
+}
View
50 plugins/outbound_control/queue.js
@@ -0,0 +1,50 @@
+function Queue()
+{
+ this.mails = {};
+ this.length = 0;
+}
+
+Queue.prototype.shift = function(index, keys) {
+ var domain = keys[index];
+ this.mails[domain] = this.mails[domain] || [];
+
+ while (this.mails[domain].length === 0) {
+ index = ++index % keys.length;
+ domain = keys[index];
+ }
+
+ this.length--;
+ var file = this.mails[domain].shift();
+ if (this.mails[domain].length === 0) {
+ delete this.mails[domain];
+ }
+ return file;
+}
+
+Queue.prototype.dequeue = function(hmail) {
+ var domain = hmail._domain;
+ var list = this.mails[domain] || [];
+ if (list.length === 0)
+ return null;
+ else
+ {
+ var mail = list.shift();
+ this.length--;
+ if(list.length === 0)
+ delete this.mails[domain];
+ return mail;
+ }
+}
+
+Queue.prototype.push = function(hmail) {
+ var domain = hmail.dom;
+ this.mails[domain] = this.mails[domain] || [] ;
+ this.mails[domain].push(hmail);
+ this.length++;
+}
+
+Queue.prototype.size = function() {
+ return this.length;
+}
+
+exports.Queue = Queue;
View
149 plugins/outbound_control/rate_policy.js
@@ -0,0 +1,149 @@
+var util = require("util");
+var events = require("events")
+var config = require("../../config")
+var config_data = config.get('outbound_limit.json', 'json');
+var delivery_concurrency = 0;
+var conn_pool = {};
+var policies = {};
+
+
+function get_policy(domain) {
+ if (!policies[domain]) {
+ policies[domain] = new Policy(domain);
+ }
+ return policies[domain];
+}
+
+function get_ISPConfig(dom, name) {
+ var data = config_data[dom];
+ if (!data)
+ data = config_data['default'];
+ return data[name];
+}
+
+function Policy(dom) {
+ this.domain = dom;
+ // this.frozen_ts = -128000;
+
+ var data = config_data[dom];
+ if (!data)
+ data = config_data['default'];
+
+ this.cur_conn = 0;
+ this.conn_limit = data['conn_limit'];
+
+ this.micro_deliveries = 0;
+ this.micro_limit = data['micro_limit'];
+
+ this.tiny_deliveries = 0;
+ this.tiny_limit = data['tiny_limit'];
+
+ this.medium_deliveries = 0;
+ this.medium_limit = data['medium_limit'];
+
+ this.big_deliveries = 0;
+ this.big_limit = data['big_limit'];
+
+ this.MICRO = data['micro'];
+ this.TINY = data['tiny'];
+ this.MEDIUM = data['medium'];
+ this.BIG = data['big'];
+
+ // initialize three timestamps
+ this.tiny_timestamp = this.medium_timestamp
+ = this.big_timestamp = new Date().getTime();
+}
+
+Policy.prototype.exceed_limit = function()
+{
+ // check if we are over connection limit
+ if (this.cur_conn > this.conn_limit)
+ return true;
+
+ var cur_time = new Date().getTime();
+
+ // if (cur_time - this.frozen_ts < 64 * 1000)
+ // return true;
+
+ var exceed_micro_limit = false,
+ exceed_tiny_limit = false,
+ exceed_medium_limit = false,
+ exceed_big_limit = false;
+
+ // check if we are over daily limit
+ if (cur_time - this.big_timestamp < this.BIG) {
+ if (this.big_deliveries >= this.big_limit)
+ exceed_big_limit = true;
+ }
+ else {
+ this.big_timestamp = cur_time;
+ this.big_deliveries = 0;
+ this.conn_tries = 0;
+ }
+ if (exceed_big_limit)
+ return true;
+
+ // check if we are over hourly limit
+ if (cur_time - this.medium_timestamp < this.MEDIUM) {
+ if (this.medium_deliveries > this.medium_limit)
+ exceed_medium_limit = true;
+ }
+ else {
+ this.medium_timestamp = cur_time;
+ this.medium_deliveries = 0;
+ }
+ if (exceed_medium_limit)
+ return true;
+
+ // check if we are over minute limit
+ if (cur_time - this.tiny_timestamp < this.TINY) {
+ if (this.tiny_deliveries >= this.tiny_limit)
+ exceed_tiny_limit = true;
+ }
+ else {
+ this.tiny_timestamp = cur_time;
+ this.tiny_deliveries = 0;
+ }
+
+ if (exceed_tiny_limit)
+ return true;
+
+ if (cur_time - this.micro_timestamp < this.MICRO) {
+ if (this.micro_deliveries >= this.micro_limit)
+ exceed_micro_limit = true;
+ }
+ else {
+ this.micro_timestamp = cur_time;
+ this.micro_deliveries = 0;
+ }
+
+ if (exceed_micro_limit)
+ return true;
+
+ return false;
+}
+
+// Policy.prototype.freeze = function() {
+// this.frozen_ts = new Date().getTime();
+// }
+
+Policy.prototype.dispose = function(delivered, not_send) {
+ delivery_concurrency--;
+ this.cur_conn--;
+}
+
+Policy.prototype.prepose = function() {
+ delivery_concurrency++;
+ this.cur_conn++;
+ this.micro_deliveries++;
+ this.tiny_deliveries++;
+ this.medium_deliveries++;
+ this.big_deliveries++;
+}
+
+exports.get_policy = get_policy;
+exports.policies = policies;
+exports.get_ISPConfig = get_ISPConfig;
+exports.delivery_concurrency = delivery_concurrency;
+exports.conn_pool = conn_pool;
+// exports.tracked_deliveries = tracked_deliveries;
View
472 plugins/outbound_control/send_client.js
@@ -0,0 +1,472 @@
+// SMTP client object and class. This allows for every part of the client
+// protocol to be hooked for different levels of control, such as
+// smtp_forward and smtp_proxy queue plugins.
+var util = require('util');
+var events = require('events');
+var generic_pool = require('generic-pool');
+var policy = require('./rate_policy');
+var config = require('../../config');
+var logger = require('../../logger');
+var constants = require('../../constants');
+var uuid = require('../../utils').uuid;
+var line_socket = require('../../line_socket');
+var Address = require('../../address').Address;
+
+var smtp_regexp = /^([0-9]{3})([ -])(.*)/;
+
+var STATE_IDLE = 1;
+var STATE_ACTIVE = 2;
+var STATE_RELEASED = 3;
+var STATE_DEAD = 4;
+var STATE_DESTROYED = 5;
+
+function SendClient(port, host, timeout, enable_tls) {
+ events.EventEmitter.call(this);
+ this.uuid = uuid();
+ this.socket = line_socket.connect(port, host, null, null);
+ this.socket.setTimeout(30000);
+ this.state = STATE_IDLE;
+ this.command = 'greeting';
+ this.response = [];
+ this.connected = false;
+ this.sent = 0;
+ var self = this;
+
+ this.socket.on('line', function (line) {
+ self.emit('server_protocol', line);
+ var matches = smtp_regexp.exec(line);
+ if (!matches) {
+ self.emit('error', self.uuid + ': Unrecognised response from upstream server: ' + line);
+ // self.destroy(); error is handled by the error event handler
+ // e.g; renren mail server returns error code like 5.1.2 or something like that
+ return;
+ }
+
+ var code = matches[1],
+ cont = matches[2],
+ msg = matches[3];
+ self.response.push(msg);
+ if (cont !== ' ') {
+ return;
+ }
+
+ if (self.command === 'ehlo') {
+ if (code.match(/^5/)) {
+ // Handle fallback to HELO if EHLO is rejected
+ self.emit('greeting', 'HELO');
+ return;
+ }
+ self.emit('capabilities');
+ if (self.command != 'ehlo') {
+ return;
+ }
+ }
+
+ if (self.command === 'xclient' && code.match(/^5/)) {
+ // XCLIENT command was rejected (no permission?)
+ // Carry on without XCLIENT
+ self.command = 'helo';
+ }
+ else if (code.match(/^[45]/)) {
+ self.emit('bad_code', code, msg);
+ if (self.state != STATE_ACTIVE) {
+ return;
+ }
+ }
+
+ if (self.command == 'dot' && code.match(/250/)) {
+ self.emit('delivered');
+ }
+
+ switch (self.command) {
+ case 'xclient':
+ self.xclient = true;
+ self.emit('xclient', 'EHLO');
+ break;
+ case 'starttls':
+ this.upgrade({key: key, cert: cert});
+ break;
+ case 'greeting':
+ self.connected = true;
+ self.emit('greeting', 'EHLO');
+ break;
+ case 'ehlo':
+ self.emit('helo');
+ break;
+ case 'helo':
+ case 'mail':
+ case 'rcpt':
+ case 'data':
+ case 'dot' : // we don't do anything for dot
+ case 'rset':
+ self.emit(self.command);
+ break;
+ case 'quit':
+ self.emit('quit');
+ break;
+ default:
+ throw new Error("Unknown command: " + self.command);
+ }
+ });
+
+ this.socket.on('drain', function () {
+ if (self.command === 'mailbody') {
+ process.nextTick(function () { self.continue_data() });
+ }
+ });
+}
+
+util.inherits(SendClient, events.EventEmitter);
+
+SendClient.prototype.send_command = function (command, data) {
+ var line = (command == 'dot') ? '.' : command + (data ? (' ' + data) : '');
+
+ this.emit('client_protocol', line);
+ this.command = command.toLowerCase();
+ this.response = [];
+ this.socket.write(line + "\r\n");
+};
+
+SendClient.prototype.start_data = function (data) {
+ this.command = 'mailbody';
+ if (data instanceof Function) {
+ this.send_data = data;
+ }
+ else if (data instanceof Array) {
+ var data_marker = 0;
+ this.send_data = function () {
+ while (data_marker < data.length) {
+ var line = data[data_marker];
+ data_marker++;
+ if (!this.send_data_line(line)) {
+ return false;
+ }
+ }
+ return true;
+ };
+ }
+ else {
+ var self = this;
+ this.send_data = function () {
+
+ data.removeAllListeners('data');
+ data.removeAllListeners('end');
+ data.removeAllListeners('error');
+
+ // this.socket
+
+ data.pipe(this.socket, {end: false});
+
+ data.on('error', function (err) {
+ self.destroy();
+ });
+
+ data.on('data', function (data) {
+
+ });
+
+ data.on('end', function () {
+ // in case somehow 'end' got emitted twice
+ if (self.command === 'dot')
+ return;
+ self.send_command('dot');
+ return false;
+ });
+ };
+ }
+ this.continue_data();
+};
+
+SendClient.prototype.continue_data = function () {
+ if (!this.send_data()) {
+ return;
+ }
+};
+
+SendClient.prototype.send_data_line = function (line) {
+ return this.socket.write(line);
+};
+
+SendClient.prototype.release = function () {
+ if (!this.connected || this.command == 'data' || this.command == 'mailbody') {
+ // Destroy here, we can't reuse a connection that was mid-data.
+ this.destroy();
+ return;
+ }
+
+ if (this.sent === policy.get_ISPConfig(this.dom, 'sessions')) {
+ this.destroy();
+ return;
+ }
+
+ logger.logdebug('[send_client_pool] ' + this.uuid + ' resetting, state=' + this.state);
+ if (this.state == STATE_DESTROYED) {
+ return;
+ }
+
+ this.state = STATE_RELEASED;
+ this.removeAllListeners('greeting');
+ this.removeAllListeners('capabilities');
+ this.removeAllListeners('xclient');
+ this.removeAllListeners('helo');
+ this.removeAllListeners('mail');
+ this.removeAllListeners('rcpt');
+ this.removeAllListeners('data');
+ this.removeAllListeners('dot');
+ this.removeAllListeners('rset');
+ this.removeAllListeners('client_protocol');
+ this.removeAllListeners('server_protocol');
+ this.removeAllListeners('error');
+ this.removeAllListeners('delivered');
+
+ this.on('rset', function () {
+ logger.logdebug('[send_client_pool] ' + this.uuid + ' releasing, state=' + this.state);
+ if (this.state == STATE_DESTROYED) {
+ return;
+ }
+ this.state = STATE_IDLE;
+ this.removeAllListeners('rset');
+ this.removeAllListeners('bad_code');
+ this.pool.release(this);
+ });
+
+ this.send_command('RSET');
+};
+
+SendClient.prototype.destroy = function () {
+ if (this.state != STATE_DESTROYED) {
+ this.pool.destroy(this);
+ }
+};
+
+// Separate pools are kept for each set of server attributes.
+exports.get_pool = function (conn_pool, dom, port, host, timeout, enable_tls, max) {
+ var port = port || 25;
+ var host = host || 'localhost';
+ var timeout = (timeout == undefined) ? 300 : timeout;
+ var enable_tls = /(true|yes|1)/i.exec(enable_tls) != null;
+
+ // connections are pooled by dom
+ var name = dom;
+
+ if (!conn_pool) {
+ conn_pool = {};
+ }
+
+ if (!conn_pool[name]) {
+ var pool = generic_pool.Pool({
+ name: name,
+ create: function (callback) {
+ var send_client = new SendClient(port, host, timeout, enable_tls);
+ logger.logdebug('[send_client_pool] ' + send_client.uuid + ' created');
+ callback(null, send_client);
+ },
+ destroy: function(send_client) {
+ logger.logdebug('[send_client_pool] ' + send_client.uuid
+ + ' destroyed, state=' + send_client.state);
+ send_client.state = STATE_DESTROYED;
+ send_client.socket.destroy();
+ },
+ max: max || 1000,
+ idleTimeoutMillis: 300000,
+ log: function (str, level) {
+ level = (level == 'verbose') ? 'debug' : level;
+ logger['log' + level]('[send_client_pool] ' + str);
+ }
+ });
+
+ var acquire = pool.acquire;
+ pool.acquire = function (callback, priority) {
+ var callback_wrapper = function (err, send_client) {
+ send_client.pool = pool;
+ send_client.dom = dom;
+ if (send_client.state == STATE_DEAD) {
+ send_client.destroy();
+ pool.acquire(callback, priority);
+ return;
+ }
+ send_client.state = STATE_ACTIVE;
+ send_client.data_sent = constants.no;
+ send_client.bad_code = constants.no;
+ send_client.error = constants.no;
+ callback(err, send_client);
+ };
+ acquire.call(pool, callback_wrapper, priority);
+ };
+ conn_pool[name] = pool;
+ }
+ return conn_pool[name];
+};
+
+exports.run_send = function(conn_pool, dom, port, host, timeout, enable_tls,
+ max, hmail, callback) {
+ var pool = exports.get_pool(conn_pool, dom, port, host, timeout,
+ enable_tls, max);
+ // no connection is available for this domain, push it back to queue
+ if (pool.getPoolSize() === max && pool.availableObjectsCount() === 0)
+ {
+ hmail.try_again(constants.no);
+ return;
+ }
+
+ // acquire a connection
+ pool.acquire(function (err, send_client) {
+ // log client side data flow
+ send_client.on('client_protocol', function (line) {
+ hmail.logprotocol(send_client.uuid + ' C: ' + line);
+ });
+
+ // log server side data flow
+ send_client.on('server_protocol', function (line) {
+ hmail.logprotocol(send_client.uuid + ' S:' + line);
+ });
+
+ var closed = function (msg) {
+ return function (error) {
+ if (!error) {
+ error = '';
+ }
+ if (send_client.state === STATE_ACTIVE) {
+ // if an email is delivered; or the process_bad_code
+ // procedure got called, we don't go to next steps
+ if (hmail.sent || hmail.erred) return;
+
+ // closed is invoked from the STATE_ACTIVE state, there
+ // must be an error
+ send_client.error = constants.error
+ hmail.try_again(send_client.bad_code | send_client.data_sent
+ | send_client.error);
+
+ // destroy this client after error processing; then
+ // status becomes destroyed, nothing will happen
+ send_client.destroy();
+ }
+ else {
+ // state is initialized to STATE_IDLE or reset to STATE_IDLE
+ // after rset command; in this case, either the email is
+ // delivered; or nothing happens so far. In the former case,
+ // we did nothing in try_again(); in the latter case, call
+ // temp_fail in try_again; destroy the client
+ if (send_client.state === STATE_IDLE) {
+ send_client.state = STATE_DEAD;
+ hmail.try_again(send_client.bad_code |
+ send_client.data_sent |
+ send_client.error);
+ send_client.destroy();
+ }
+
+ // When the email is delivered or an error occurs, the state
+ // of send_client becomes STATE_RELEASED; in the former
+ // case, we did nothing; in the latter case, the
+ // process_bad_code error got called already, nothing needs
+ // to be done; just destroy thie client
+ else if (send_client.state === STATE_RELEASED) {
+ send_client.destroy();
+ hmail.try_again(send_client.bad_code |
+ send_client.data_sent |
+ send_client.error);
+ }
+ }
+ };
+ };
+
+ // transmission error occured
+ // if (!send_client.socket['_events']['error'])
+ send_client.socket.removeAllListeners('error');
+ send_client.socket.removeAllListeners('timeout');
+ send_client.socket.removeAllListeners('end');
+ send_client.socket.removeAllListeners('close');
+
+ send_client.socket.on('error', closed('error!!!'));
+
+ // timeout due to inactivity
+ // if (!send_client.socket['_events']['timeout'])
+ send_client.socket.on('timeout', closed('timeout!!!'));
+
+ // an transmission error occured, and the socket is fully closed
+ // if (!send_client.socket['_events']['close'])
+ send_client.socket.on('close', closed('closed!!!!!'));
+
+ // the other side close the socket
+ // if (!send_client.socket['_events']['end'])
+ send_client.socket.on('end', closed('end!!!!!!!'));
+
+ // process helo
+ var helo = function (command) {
+ if (send_client.xclient)
+ send_client.send_command(command, connection.hello_host);
+ else
+ send_client.send_command(command, config.get('me'));
+ };
+
+ send_client.on('greeting', helo);
+ send_client.on('xclient', helo);
+
+ // not sure what this's for ? keep it here now
+ send_client.on('capabilities', function () {
+ for (var line in send_client.response) {
+ if (send_client.response[line].match(/^XCLIENT/)) {
+ if(!send_client.xclient) {
+ send_client.send_command('XCLIENT',
+ 'ADDR=' + connection.remote_ip);
+ return;
+ }
+ }
+ if (send_client.response[line].match(/^STARTTLS/)) {
+ var key = config.get('tls_key.pem', 'data').join("\n");
+ var cert = config.get('tls_cert.pem', 'data').join("\n");
+ if (key && cert && enable_tls) {
+ send_client.socket.on('secure', function () {
+ send_client.emit('greeting', 'EHLO');
+ });
+ send_client.send_command('STARTTLS');
+ return;
+ }
+ }
+ }
+ });
+
+ send_client.on('helo', function () {
+ var mail_from = new Address (hmail.todo.mail_from.original);
+ send_client.send_command('MAIL',
+ 'FROM:' + mail_from);
+ });
+
+ send_client.on('delivered', function () {
+ send_client.sent++;
+ send_client.data_sent = constants.data_sent;
+ hmail.delivered();
+ send_client.release();
+ });
+
+ send_client.on('error', function (msg) {
+ send_client.error = constants.error;
+ hmail.try_again(constants.error);
+ send_client.destroy();
+ });
+
+ send_client.on('bad_code', function (code, msg) {
+ hmail.process_bad_code(code, msg);
+ send_client.bad_code = constants.bad_code;
+ send_client.release();
+ });
+
+ if (send_client.connected) {
+ if (send_client.xclient) {
+ send_client.send_command('XCLIENT',
+ 'ADDR=' + connection.remote_ip);
+ }
+ else {
+ send_client.emit('helo');
+ }
+ }
+
+ // hmail.timeouts = hmail.timeouts || [];
+ // hmail.timeouts.push(setTimeout(function(){
+ // hmail.loginfo(send_client);
+ // // send_client.socket.emit('timeout');
+ // }, 150 * 1000));
+
+ callback(err, send_client);
+ });
+}
View
3  server.js
@@ -231,6 +231,5 @@ function listening () {
logger.lognotice("Listening on port " + config_data.main.port);
Server.ready = 1;
-
- out.load_queue();
+ out.load_queue()
}
Please sign in to comment.
Something went wrong with that request. Please try again.