Skip to content

Commit

Permalink
Merge 35a434f into 640d9ef
Browse files Browse the repository at this point in the history
  • Loading branch information
cakuki committed Oct 1, 2018
2 parents 640d9ef + 35a434f commit 8162f6f
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 9 deletions.
38 changes: 29 additions & 9 deletions src/components/requester.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module.exports = class Requester extends Monitorable(Configurable(Component)) {

this.sock = new axon.types[this.type]();
this.sock.set('retry timeout', 0);
this.timeout = advertisement.timeout || process.env.COTE_REQUEST_TIMEOUT;

this.startDiscovery();
}
Expand All @@ -28,16 +29,17 @@ module.exports = class Requester extends Monitorable(Configurable(Component)) {
}

send(...args) {
if (args.length == 1 || typeof args[args.length - 1] != 'function') {
return new Promise((resolve, reject) => {
this.sock.send(...args, (err, res) => {
if (err) return reject(err);
resolve(res);
});
});
}
const hasCallback = typeof args[args.length - 1] == 'function';
const timeout = args[0].__timeout || this.timeout;

if (hasCallback) return sendOverSocket(this.sock, timeout, ...args);

this.sock.send(...args);
return new Promise((resolve, reject) => {
sendOverSocket(this.sock, timeout, ...args, (err, res) => {
if (err) return reject(err);
resolve(res);
});
});
}

get type() {
Expand All @@ -47,3 +49,21 @@ module.exports = class Requester extends Monitorable(Configurable(Component)) {
return 'rep';
}
};

function sendOverSocket(sock, timeout, ...args) {
if (!timeout) return sock.send(...args);

const cb = args.pop();

const timeoutHandle = setTimeout(() => {
delete sock.callbacks[messageCallback.id];
cb(new Error('Request timed out.'));
}, timeout);

const messageCallback = (...args) => {
clearTimeout(timeoutHandle);
cb(...args);
};

sock.send(...args, messageCallback);
}
116 changes: 116 additions & 0 deletions test/request-response-timeout.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import test from 'ava';
import LogSuppress from '../lib/log-suppress';
import r from 'randomstring';

const { Requester, Responder } = require('../')();

LogSuppress.init(console);

test.cb(`Use request data for setting requester timeout`, (t) => {
const key = r.generate();

const requester = new Requester({ name: `${t.title}: timeout requester`, key });
const responder = new Responder({ name: `${t.title}: timeout responder`, key });

responder.on('Be quick or be dead', (req, cb) => {
setTimeout(() => cb(null, { message: 'You should not see me!' }), 200);
});

requester.send({ type: 'Be quick or be dead', __timeout: 100 }, (err, res) => {
t.is(res, undefined);
t.is(err instanceof Error, true);
t.is(err.message, 'Request timed out.');
t.end();
});
});

test.cb(`Use advertisement for setting requester timeout`, (t) => {
const key = r.generate();

const requester = new Requester({ name: `${t.title}: timeout requester`, key, timeout: 100 });
const responder = new Responder({ name: `${t.title}: timeout responder`, key });

responder.on('Be quick or be dead', (req, cb) => {
setTimeout(() => cb(null, { message: 'You should not see me!' }), 200);
});

requester.send({ type: 'Be quick or be dead' }, (err, res) => {
t.is(res, undefined);
t.is(err instanceof Error, true);
t.is(err.message, 'Request timed out.');
t.end();
});
});

test.cb(`Use environment var for setting requester timeout`, (t) => {
const key = r.generate();

process.env.COTE_REQUEST_TIMEOUT = 100;

const requester = new Requester({ name: `${t.title}: timeout requester`, key });
const responder = new Responder({ name: `${t.title}: timeout responder`, key });

responder.on('Be quick or be dead', (req, cb) => {
setTimeout(() => cb(null, { message: 'You should not see me!' }), 200);
});

requester.send({ type: 'Be quick or be dead' }, (err, res) => {
t.is(res, undefined);
t.is(err instanceof Error, true);
t.is(err.message, 'Request timed out.');
t.end();
});
});

test.cb(`Use request data for setting requester timeout (response before timeout)`, (t) => {
const key = r.generate();

const requester = new Requester({ name: `${t.title}: timeout requester`, key });
const responder = new Responder({ name: `${t.title}: timeout responder`, key });

responder.on('Be quick or be dead', (req, cb) => {
setTimeout(() => cb(null, { message: 'Faster!' }), 100);
});

requester.send({ type: 'Be quick or be dead', __timeout: 200 }, (err, res) => {
t.is(err, null);
t.is(res.message, 'Faster!');
t.end();
});
});

test.cb(`Use advertisement for setting requester timeout (response before timeout)`, (t) => {
const key = r.generate();

const requester = new Requester({ name: `${t.title}: timeout requester`, key, timeout: 200 });
const responder = new Responder({ name: `${t.title}: timeout responder`, key });

responder.on('Be quick or be dead', (req, cb) => {
setTimeout(() => cb(null, { message: 'Faster!' }), 100);
});

requester.send({ type: 'Be quick or be dead' }, (err, res) => {
t.is(err, null);
t.is(res.message, 'Faster!');
t.end();
});
});

test.cb(`Use environment var for setting requester timeout (response before timeout)`, (t) => {
const key = r.generate();

process.env.COTE_REQUEST_TIMEOUT = 200;

const requester = new Requester({ name: `${t.title}: timeout requester`, key });
const responder = new Responder({ name: `${t.title}: timeout responder`, key });

responder.on('Be quick or be dead', (req, cb) => {
setTimeout(() => cb(null, { message: 'Faster!' }), 100);
});

requester.send({ type: 'Be quick or be dead' }, (err, res) => {
t.is(err, null);
t.is(res.message, 'Faster!');
t.end();
});
});

0 comments on commit 8162f6f

Please sign in to comment.