Skip to content

Commit

Permalink
Get pool (#1868)
Browse files Browse the repository at this point in the history
* pass cfg object instead, simplifying get_pool
* more consistency with variable naming and passing
* add integration test: send message w/smtp_client
  • Loading branch information
msimerson committed Mar 28, 2017
1 parent dbac739 commit d16a9fa
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 103 deletions.
27 changes: 14 additions & 13 deletions host_pool.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use strict";
'use strict';

var net = require('net');
var net = require('net');
var utils = require('haraka-utils');

/* HostPool:
*
Expand All @@ -15,17 +16,16 @@ var net = require('net');
* If failed() is called with one of the hosts, we mark it down for retry_secs
* and don't give it out again until that period has passed.
*
* If *all* the hosts have been marked down, we ignore the marks and just give
* out the next host. That's too keep some random short-lived but widespread
* If *all* the hosts have been marked down, ignore the marks and give
* out the next host. That's to keep a random short-lived but widespread
* network failure from taking the whole system down.
*/

var logger = require('./logger');
var utils = require('haraka-utils');

// takes a comma/space-separated list of ip:ports
// 1.1.1.1:22, 3.3.3.3:44
function HostPool (hostports_str, retry_secs){
function HostPool (hostports_str, retry_secs) {
var self = this;

var hosts = (hostports_str || '')
Expand Down Expand Up @@ -65,14 +65,14 @@ HostPool.prototype.failed = function (host, port) {
logger.logwarn("host " + key + " is still dead, will retry in " +
self.retry_secs + " secs");
self.dead_hosts[key] = true;
console.log(1);
// console.log(1);
setTimeout(function () {
self.probe_dead_host(host, port, cb_if_still_dead, cb_if_alive);
}, retry_msecs);
};

var cb_if_alive = function (){
console.log(2);
// console.log(2);
logger.loginfo("host " + key + " is back! adding back into pool");
delete self.dead_hosts[key];
};
Expand Down Expand Up @@ -115,7 +115,8 @@ HostPool.prototype.probe_dead_host = function (
cb_if_alive();
s.destroy(); // will this conflict with setTimeout's s.destroy?
});
} catch (e){
}
catch (e) {
// only way to catch run-time javascript errors in here;
console.log("ERROR in probe_dead_host, got error " + e);
throw e;
Expand All @@ -140,7 +141,7 @@ HostPool.prototype.get_socket = function () {
* anyway. That should make it more forgiving about transient but widespread
* network problems that make all the hosts look dead.
*/
HostPool.prototype.get_host = function (){
HostPool.prototype.get_host = function () {
var host;
var found;

Expand All @@ -151,19 +152,19 @@ HostPool.prototype.get_host = function (){

for (var i = 0; i < this.hosts.length; ++i){
var j = i + first_i;
if (j >= this.hosts.length){
if (j >= this.hosts.length) {
j = j - this.hosts.length;
}
host = this.hosts[j];
var key = host.host + ':' + host.port;
if (this.dead_hosts[key]){
if (this.dead_hosts[key]) {
continue;
}
this.last_i = j;
found = true;
break;
}
if (found){
if (found) {
return host;
}
else {
Expand Down
98 changes: 50 additions & 48 deletions outbound.js
Original file line number Diff line number Diff line change
Expand Up @@ -1244,19 +1244,19 @@ var cram_md5_response = function (username, password, challenge) {
return utils.base64(username + ' ' + digest);
}

function _create_socket (port, host, local_addr, is_unix_socket, connect_timeout, pool_timeout, callback) {
function _create_socket (port, host, local_addr, is_unix_socket, callback) {
var socket = is_unix_socket ? sock.connect({path: host}) :
sock.connect({port: port, host: host, localAddress: local_addr});
socket.setTimeout(connect_timeout * 1000);
socket.setTimeout(cfg.connect_timeout * 1000);
logger.logdebug('[outbound] host=' +
host + ' port=' + port + ' pool_timeout=' + pool_timeout + ' created');
host + ' port=' + port + ' pool_timeout=' + cfg.pool_timeout + ' created');
socket.once('connect', function () {
socket.removeAllListeners('error'); // these get added after callback
callback(null, socket);
});
socket.once('error', function (err) {
socket.end();
var name = 'outbound::' + port + ':' + host + ':' + local_addr + ':' + pool_timeout;
var name = 'outbound::' + port + ':' + host + ':' + local_addr + ':' + cfg.pool_timeout;
if (server.notes.pool[name]) {
delete server.notes.pool[name];
}
Expand All @@ -1269,64 +1269,66 @@ function _create_socket (port, host, local_addr, is_unix_socket, connect_timeout
}

// Separate pools are kept for each set of server attributes.
function get_pool (port, host, local_addr, is_unix_socket, connect_timeout, pool_timeout, max) {
function get_pool (port, host, local_addr, is_unix_socket) {
port = port || 25;
host = host || 'localhost';
connect_timeout = (connect_timeout === undefined) ? 30 : connect_timeout;
var pool_timeout = cfg.pool_timeout || 300;
var name = 'outbound::' + port + ':' + host + ':' + local_addr + ':' + pool_timeout;
if (!server.notes.pool) {
server.notes.pool = {};
}
if (!server.notes.pool[name]) {
var pool = generic_pool.Pool({
name: name,
create: function (done) {
_create_socket(port, host, local_addr, is_unix_socket, connect_timeout, pool_timeout, done);
},
validate: function (socket) {
return socket.writable;
},
destroy: function (socket) {
logger.logdebug('[outbound] destroying pool entry for ' + host + ':' + port);
// Remove pool object from server notes once empty
var size = pool.getPoolSize();
if (size === 0) {
delete server.notes.pool[name];
}
socket.removeAllListeners();
socket.once('error', function (err) {
logger.logwarn("[outbound] Socket got an error while shutting down: " + err);
});
if (!socket.writable) return;
logger.logprotocol("[outbound] C: QUIT");
socket.write("QUIT\r\n");
socket.end(); // half close
socket.once('line', function (line) {
// Just assume this is a valid response
logger.logprotocol("[outbound] S: " + line);
socket.destroy();
});
},
max: max || 10,
idleTimeoutMillis: pool_timeout * 1000,
log: function (str, level) {
if (/this._availableObjects.length=/.test(str)) return;
level = (level === 'verbose') ? 'debug' : level;
logger['log' + level]('[outbound] [' + name + '] ' + str);
}
});
server.notes.pool[name] = pool;
if (server.notes.pool[name]) {
return server.notes.pool[name];
}
return server.notes.pool[name];

var pool = generic_pool.Pool({
name: name,
create: function (done) {
_create_socket(port, host, local_addr, is_unix_socket, done);
},
validate: function (socket) {
return socket.writable;
},
destroy: function (socket) {
logger.logdebug('[outbound] destroying pool entry for ' + host + ':' + port);
// Remove pool object from server notes once empty
var size = pool.getPoolSize();
if (size === 0) {
delete server.notes.pool[name];
}
socket.removeAllListeners();
socket.once('error', function (err) {
logger.logwarn("[outbound] Socket got an error while shutting down: " + err);
});
if (!socket.writable) return;
logger.logprotocol("[outbound] C: QUIT");
socket.write("QUIT\r\n");
socket.end(); // half close
socket.once('line', function (line) {
// Just assume this is a valid response
logger.logprotocol("[outbound] S: " + line);
socket.destroy();
});
},
max: cfg.pool_concurrency_max || 10,
idleTimeoutMillis: pool_timeout * 1000,
log: function (str, level) {
if (/this._availableObjects.length=/.test(str)) return;
level = (level === 'verbose') ? 'debug' : level;
logger['log' + level]('[outbound] [' + name + '] ' + str);
}
});
server.notes.pool[name] = pool;
return pool;
}

// Get a socket for the given attributes.
function get_client (port, host, local_addr, is_unix_socket, callback) {
if (cfg.pool_concurrency_max == 0) {
return _create_socket(port, host, local_addr, is_unix_socket, cfg.connect_timeout, cfg.pool_timeout, callback);
return _create_socket(port, host, local_addr, is_unix_socket, callback);
}

var pool = get_pool(port, host, local_addr, is_unix_socket, cfg.connect_timeout, cfg.pool_timeout, cfg.pool_concurrency_max);
var pool = get_pool(port, host, local_addr, is_unix_socket);
if (pool.waitingClientsCount() >= cfg.pool_concurrency_max) {
return callback("Too many waiting clients for pool", null);
}
Expand Down
65 changes: 30 additions & 35 deletions smtp_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ class SMTPClient extends events.EventEmitter {
var msg = matches[3];

client.response.push(msg);
if (cont !== ' ') {
return;
}
if (cont !== ' ') return;

if (client.command === 'auth' || client.authenticating) {
logger.loginfo('SERVER RESPONSE, CLIENT ' + client.command + ", authenticating=" + client.authenticating + ",code="+code + ",cont="+cont+",msg=" +msg);
Expand Down Expand Up @@ -194,7 +192,7 @@ SMTPClient.prototype.load_tls_config = function (plugin) {
var tls_options = {};
this.tls_config = net_utils.tls_ini_section_with_defaults(plugin.name);

if (this.host) { tls_options.servername = this.host }
if (this.host) tls_options.servername = this.host;

this.tls_options = tls_options;
}
Expand Down Expand Up @@ -274,11 +272,12 @@ SMTPClient.prototype.is_dead_sender = function (plugin, connection) {
};

// Separate pools are kept for each set of server attributes.
exports.get_pool = function (server, port, host, connect_timeout, pool_timeout, max) {
exports.get_pool = function (server, port, host, cfg) {
port = port || 25;
host = host || 'localhost';
if (connect_timeout === undefined) connect_timeout = 30;
if (pool_timeout === undefined) pool_timeout = 300;
if (cfg === undefined) cfg = {};
var connect_timeout = cfg.connect_timeout || 30;
var pool_timeout = cfg.pool_timeout || cfg.timeout || 300;
var name = port + ':' + host + ':' + pool_timeout;
if (!server.notes.pool) {
server.notes.pool = {};
Expand All @@ -305,7 +304,7 @@ exports.get_pool = function (server, port, host, connect_timeout, pool_timeout,
delete server.notes.pool[name];
}
},
max: max || 1000,
max: cfg.max_connections || 1000,
idleTimeoutMillis: (pool_timeout -1) * 1000,
log: function (str, level) {
level = (level === 'verbose') ? 'debug' : level;
Expand All @@ -327,8 +326,8 @@ exports.get_pool = function (server, port, host, connect_timeout, pool_timeout,
};

// Get a smtp_client for the given attributes.
exports.get_client = function (server, callback, port, host, connect_timeout, pool_timeout, max) {
var pool = exports.get_pool(server, port, host, connect_timeout, pool_timeout, max);
exports.get_client = function (server, callback, port, host, cfg) {
var pool = exports.get_pool(server, port, host, cfg);
pool.acquire(callback);
};

Expand All @@ -347,10 +346,9 @@ exports.get_client_plugin = function (plugin, connection, c, callback) {
}
}

var hostport = get_hostport(connection, connection.server.notes, c);
var hostport = get_hostport(connection, connection.server, c);

var pool = exports.get_pool(connection.server, hostport.port, hostport.host,
c.connect_timeout, c.timeout, c.max_connections);
var pool = exports.get_pool(connection.server, hostport.port, hostport.host, c);

pool.acquire(function (err, smtp_client) {
connection.logdebug(plugin, 'Got smtp_client: ' + smtp_client.uuid);
Expand Down Expand Up @@ -496,34 +494,31 @@ exports.get_client_plugin = function (plugin, connection, c, callback) {
});
};

function get_hostport (connection, server_notes, config_arg) {
function get_hostport (connection, server, cfg) {

var c = config_arg;
if (c.forwarding_host_pool){
if (! server_notes.host_pool){
connection.logwarn("creating a new host_pool from " + c.forwarding_host_pool);
server_notes.host_pool =
if (cfg.forwarding_host_pool) {
if (! server.notes.host_pool) {
connection.logwarn("creating host_pool from " + cfg.forwarding_host_pool);
server.notes.host_pool =
new HostPool(
c.forwarding_host_pool, // 1.2.3.4:420, 5.6.7.8:420
c.dead_forwarding_host_retry_secs
cfg.forwarding_host_pool, // 1.2.3.4:420, 5.6.7.8:420
cfg.dead_forwarding_host_retry_secs
);
}
var host_pool = server_notes.host_pool;

var host = host_pool.get_host();
if (! host){
logger.logerror('[smtp_client_pool] no backend hosts in pool!');
throw new Error("no backend hosts found in pool!");
var host = server.notes.host_pool.get_host();
if (host) {
return host; // { host: 1.2.3.4, port: 567 }
}

return host; // { host: 1.2.3.4, port: 567 }
logger.logerror('[smtp_client_pool] no backend hosts in pool!');
throw new Error("no backend hosts found in pool!");
}
else if (c.host && c.port){
return { host: c.host, port: c.port };
}
else {
logger.logwarn("[smtp_client_pool] forwarding_host_pool or host and port " +
"were not found in config file");
throw new Error("You must specify either forwarding_host_pool or host and port");

if (cfg.host && cfg.port) {
return { host: cfg.host, port: cfg.port };
}

logger.logwarn("[smtp_client_pool] forwarding_host_pool or host and port " +
"were not found in config file");
throw new Error("You must specify either forwarding_host_pool or host and port");
}
12 changes: 6 additions & 6 deletions tests/host_pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ exports.HostPool = {

test.done();
},
"default port 25 ": function (test) {
"default port 25": function (test) {
test.expect(2);

var pool = new HostPool('1.1.1.1, 2.2.2.2');
Expand Down Expand Up @@ -100,7 +100,7 @@ exports.HostPool = {
// after .01 secs the timer to retry the dead host will fire, and then
// we connect using this mock socket, whose "connect" always succeeds
// so the code brings the dead host back to life
"host dead checking timer": function (test){
"host dead checking timer": function (test) {
test.expect(2);

var num_reqs = 0;
Expand All @@ -110,21 +110,21 @@ exports.HostPool = {
// these are the methods called from probe_dead_host

// setTimeout on the socket
self.pretendTimeout = function (){};
self.pretendTimeout = function () {};
self.setTimeout = function (ms, cb){
self.pretendTimeout = cb;
};
// handle socket.on('error', ....
self.listeners = {};
self.on = function (eventname, cb){
self.on = function (eventname, cb) {
self.listeners[eventname] = cb;
};
self.emit = function (eventname){
self.emit = function (eventname) {
self.listeners[eventname]();
};
// handle socket.connect(...
self.connected = function () {};
self.connect = function (port, host, cb){
self.connect = function (port, host, cb) {
switch (++num_reqs){
case 1:
// the first time through we pretend it timed out
Expand Down
Loading

0 comments on commit d16a9fa

Please sign in to comment.