Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
bahung1221 committed Sep 23, 2019
1 parent ad36a0e commit a591d71
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 25 deletions.
2 changes: 2 additions & 0 deletions example/gateway/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ expressApp.get('/api/bar', async (req, res) => {
})

expressApp.listen(port, () => console.log(`Example app listening on port ${port}!`))

module.exports = microApp
2 changes: 2 additions & 0 deletions example/services/bar.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ setTimeout(() => {
app.call('foo.foo', { data: 'secret' })
.then((data) => console.log(data))
}, 1000)

module.exports = app
2 changes: 2 additions & 0 deletions example/services/foo.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,5 @@ app.subscribe('jihaa', (req, res) => {
msg: 'SERVICE foo: jihaa',
})
})

module.exports = app
7 changes: 4 additions & 3 deletions lib/request/incoming.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
*/
class IncomingRequest {
constructor(payload) {
this.body = payload.body || {}
this.headers = payload.headers || {}
this.meta = payload.meta || {}
const input = payload ? { ...payload } : {}
this.body = input.body || {}
this.headers = input.headers || {}
this.meta = input.meta || {}
}
}

Expand Down
14 changes: 8 additions & 6 deletions lib/transporter/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class Transporter {
/**
* Local transporter doesn't need connect to anywhere
*
* @return void
* @return {Promise<void>}
*/
disconnect() {
// Do nothing
Expand All @@ -158,14 +158,16 @@ class Transporter {
* Each message received will be run one by one in functions chain (middleware)
*
* @param {String} subject
* @param {Function|Array} func
* @param {Array} func
*/
subscribe(subject, func) {
this.eventBus.on(subject, (payload) => {
const request = Transporter.makeIncomingRequest(payload.request)
const response = Transporter.makeOutgoingResponse((data) => {
this.eventBus.emit(payload.replyTo, data)
})
const response = payload.replyTo
? Transporter.makeOutgoingResponse((data) => {
this.eventBus.emit(payload.replyTo, data)
})
: () => {}

this.handleImcomingRequest(request, response, func)
})
Expand All @@ -180,7 +182,7 @@ class Transporter {
*/
publish(subject, body, caller) {
const request = Transporter.makeOutgoingRequest(subject, body, caller)
this.eventBus.emit(subject, request)
this.eventBus.emit(subject, { request })
}

/**
Expand Down
21 changes: 13 additions & 8 deletions lib/transporter/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@ class NatsTransporter extends Transporter {
/**
* Disconnect with nats server
*
* @return void
* @return {Promise<void>}
*/
disconnect() {
this.client.flush(() => {
this.client.close()
this.client = null
return new Promise((resolve) => {
this.client.flush(() => {
this.client.close()
this.client = null
resolve()
})
})
}

Expand All @@ -52,14 +55,16 @@ class NatsTransporter extends Transporter {
* Each message received from nats server will be run one by one in functions chain (middleware)
*
* @param {String} subject
* @param {Function|Array} func
* @param {Array} func
*/
subscribe(subject, func) {
this.client.subscribe(subject, async (payload, replyTo) => {
const request = Transporter.makeIncomingRequest(payload)
const response = Transporter.makeOutgoingResponse((data) => {
this.client.publish(replyTo, data)
})
const response = replyTo
? Transporter.makeOutgoingResponse((data) => {
this.client.publish(replyTo, data)
})
: () => {}

super.handleImcomingRequest(request, response, func)
})
Expand Down
197 changes: 189 additions & 8 deletions test/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
/* eslint-disable */
const assert = require('assert')
const Transporter = require('../lib/transporter/base')
const broker = require('../broker')({
const NatsTransporter = require('../lib/transporter/nats')

const configs = {
transporter: {
name: 'nats',
options: {
Expand All @@ -14,10 +16,12 @@ const broker = require('../broker')({
maxRequestRetryAttempts: 3,
},
},
})
const app = broker.createService({ name: 'foo' })
}
const broker = require('../broker')(configs)
const foo = broker.createService({ name: 'foo' })
const bar = broker.createService({ name: 'bar' })

describe('Main methods', function() {
describe('#Main methods and init', function() {
it('Broker should have main methods :)', function() {
assert.ok(broker.start)
assert.ok(broker.createService)
Expand All @@ -27,10 +31,10 @@ describe('Main methods', function() {
})

it('Service should have main methods :)', function() {
assert.ok(app.use)
assert.ok(app.subscribe)
assert.ok(app.publish)
assert.ok(app.call)
assert.ok(foo.use)
assert.ok(foo.subscribe)
assert.ok(foo.publish)
assert.ok(foo.call)
})

it('Local transporter should have main methods :)', function() {
Expand All @@ -51,4 +55,181 @@ describe('Main methods', function() {
assert.ok(broker.transporter.subscribe)
assert.ok(broker.transporter.publish)
})

it('Broker should throw "duplicate name" error', async function() {
try {
broker.createService({ name: 'foo' })
return Promise.reject('Incorrect')
} catch (e) {
return Promise.resolve('OK')
}
})
})

describe('#Local communicate', function() {
it('foo should receive message that was published from bar', async function() {
return new Promise((resolve, reject) => {
foo.subscribe('test', function (req, res) {
resolve('OK')
})
bar.publish('foo.test')

setTimeout(() => {
reject('Timed out')
}, 3000)
})
})

it('foo should receive message that was called from bar', async function() {
foo.subscribe('test', function (req, res) {
res.send({ msg: 'this is foo!' })
})
const res = await bar.call('foo.test')
if (res.msg) {
return Promise.resolve('OK')
}
return Promise.reject('Incorrect')
})

it('foo should receive modified message that was called from bar and modified by middleware', async function() {
return new Promise((resolve, reject) => {
function middleware(req, res, next) {
req.body.msg = null
next()
}
foo.subscribe('test', middleware, function (req, res) {
if (req.body.msg === null) {
return resolve('OK')
}
return reject('Incorrect')
})

bar.call('foo.test', { msg: 'this is bar' })
})
})

it('foo should receive modified message that was called from bar and modified by middleware (app.use)', async function() {
return new Promise((resolve, reject) => {
function middleware(req, res, next) {
req.body.msg = null
next()
}
foo.use(middleware)
foo.subscribe('test', function (req, res) {
if (req.body.msg === null) {
return resolve('OK')
}
return reject('Incorrect')
})

bar.call('foo.test', { msg: 'this is bar' })
})
})
})

describe('#base transporter', function() {
const trans = new Transporter(configs.transporter.options)

it('Should received local message', async function() {
return new Promise((resolve, reject) => {
const handler = (req, res) => {
if (req.body.msg) {
return resolve('OK')
}
return reject('Incorrect')
}
trans.subscribe('base.test', [handler])
trans.publish('base.test', { msg: 'test'}, {})
})
})

it('Should received local message and sent response back to caller', async function() {
return new Promise((resolve, reject) => {
const handler = (req, res) => {
if (req.body.msg) {
return res.send({ msg: 'OK' })
}
res.send({ msg: null })
}
trans.subscribe('base.test', [handler])
trans.request('base.test', { msg: 'test'}, {})
.then(res => {
if (res.msg) {
return resolve('OK')
}
return reject('Incorrect')
})
})
})

it('Should throw error when call to incorrect subject', async function() {
this.timeout(13000) // Retry 3 time => 4 * 3000 = 12000

try {
await trans.request('base.noone', { msg: 'test'}, {})
return Promise.reject('Incorrect')
} catch (e) {
return Promise.resolve('OK')
}
})
})

describe('#Nats transporter', function() {
const nats = new NatsTransporter(configs.transporter.options)
nats.connect()

it('Should received nats message', async function() {
return new Promise((resolve, reject) => {
const handler = (req, res) => {
if (req.body.msg) {
return resolve('OK')
}
return reject('Incorrect')
}
nats.subscribe('nats.test', [handler])
nats.publish('nats.test', { msg: 'test'}, {})
})
})

it('Should received nats message and sent response back to caller', async function() {
return new Promise((resolve, reject) => {
const handler = (req, res) => {
if (req.body.msg) {
return res.send({ msg: 'OK' })
}
res.send({ msg: null })
}
nats.subscribe('nats.test', [handler])
nats.request('nats.test', { msg: 'test'}, {})
.then(res => {
if (res.msg) {
return resolve('OK')
}
return reject('Incorrect')
})
})
})

it('Should throw error when call to incorrect subject', async function() {
this.timeout(13000) // Retry 3 time => 4 * 3000 = 12000

try {
await nats.request('nats.noone', { msg: 'test'}, {})
return Promise.reject('Incorrect')
} catch (e) {
return Promise.resolve('OK')
}
})

it('Should disconnect without error', async function() {
try {
await nats.disconnect()
if (!nats.client) {
return Promise.resolve('OK')
}
return Promise.reject('Incorrect')
} catch (e) {
return Promise.reject('Incorrect')
}
})
})

0 comments on commit a591d71

Please sign in to comment.