Skip to content

Commit

Permalink
refactor build functions
Browse files Browse the repository at this point in the history
  • Loading branch information
podviaznikov committed Sep 29, 2016
1 parent 1975053 commit fd2b2cd
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 88 deletions.
48 changes: 23 additions & 25 deletions src/rabbitmq.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,12 @@ class RabbitMQ {
return Promise.try(() => {
const queueName = `${this.name}.${queue}`
this._validatePublish(queue, content, 'tasks')
const payload = RabbitMQ.buildPayload(content)
this.log.info({ queue: queueName, job: content, jobMeta: payload.jobMeta }, 'Publishing task')
const payload = RabbitMQ.buildJobPayload(content)
const meta = RabbitMQ.buildJobMeta(this.name)
this.log.info({ queue: queueName, job: content, jobMeta: meta }, 'Publishing task')
this._incMonitor('task', queueName)
return Promise.resolve(
this.publishChannel.sendToQueue(queueName, payload.jobBuffer, payload.jobMeta)
this.publishChannel.sendToQueue(queueName, payload, meta)
)
})
}
Expand All @@ -291,12 +292,13 @@ class RabbitMQ {
publishEvent (exchange: string, content: Object): Bluebird$Promise<void> {
return Promise.try(() => {
this._validatePublish(exchange, content, 'events')
const payload = RabbitMQ.buildPayload(content)
this.log.info({ event: exchange, job: content, jobMeta: payload.jobMeta }, 'Publishing event')
const payload = RabbitMQ.buildJobPayload(content)
const meta = RabbitMQ.buildJobMeta(this.name)
this.log.info({ event: exchange, job: content, jobMeta: meta }, 'Publishing event')
// events do not need a routing key (so we send '')
this._incMonitor('event', exchange)
return Promise.resolve(
this.publishChannel.publish(exchange, '', payload.jobBuffer, payload.jobMeta)
this.publishChannel.publish(exchange, '', payload, meta)
)
})
}
Expand Down Expand Up @@ -665,31 +667,27 @@ class RabbitMQ {
this.subscribed = this.subscribed.add(subscribedKey)
})
}
/**
* Create payload object with `jobBuffer` and `jobMeta` props
* @private
* @param {Object} content Content to send.
* @return {Object} payload with `jobBuffer` and `jobMeta` props
*/
static buildPayload (content: Object) {
const jobMeta = {
appId: this.name,
timestamp: Date.now(),
headers: {}
}

static getKeyFromClsNamespace (key) {
const ns = getNamespace('ponos')
jobMeta.headers.previousEvent = ns && ns.get('currentWorkerName')
// add tid to message if one does not exist
return ns && ns.get(key)
}
static buildJobPayload (content: Object) {
if (!content.tid) {
const tid = ns && ns.get('tid')
const tid = RabbitMQ.getKeyFromClsNamespace('tid')
content.tid = tid || uuid()
}
const stringContent = JSON.stringify(content)
const jobBuffer = new Buffer(stringContent)
return {
jobBuffer: jobBuffer,
jobMeta: jobMeta
return new Buffer(stringContent)
}
static buildJobMeta (name) {
const jobMeta = {
appId: name,
timestamp: Date.now(),
headers: {}
}
jobMeta.headers.previousEvent = RabbitMQ.getKeyFromClsNamespace('currentWorkerName')
return jobMeta
}
/**
* Validate publish params. Adds a TID to the job it does not already have
Expand Down
128 changes: 65 additions & 63 deletions test/unit/rabbitmq.js
Original file line number Diff line number Diff line change
Expand Up @@ -547,13 +547,15 @@ describe('rabbitmq', () => {
rabbitmq.publishChannel.sendToQueue = sinon.stub().resolves()
sinon.stub(RabbitMQ.prototype, '_validatePublish')
sinon.stub(RabbitMQ.prototype, '_incMonitor')
sinon.stub(RabbitMQ, 'buildPayload')
sinon.stub(RabbitMQ, 'buildJobPayload')
sinon.stub(RabbitMQ, 'buildJobMeta')
})

afterEach(() => {
RabbitMQ.prototype._validatePublish.restore()
RabbitMQ.prototype._incMonitor.restore()
RabbitMQ.buildPayload.restore()
RabbitMQ.buildJobPayload.restore()
RabbitMQ.buildJobMeta.restore()
})

it('should reject if _validatePublish throws', () => {
Expand All @@ -567,22 +569,21 @@ describe('rabbitmq', () => {

it('should publish with a buffer of the content', () => {
const payloadWithTid = Object.assign({ tid: 'test-tid' }, mockJob)
const payload = {
jobBuffer: new Buffer(JSON.stringify(payloadWithTid)),
jobMeta: {
appId: testName,
timestamp: Date.now()
}
const jobBuffer = new Buffer(JSON.stringify(payloadWithTid))
const jobMeta = {
appId: testName,
timestamp: Date.now()
}
RabbitMQ.buildPayload.returns(payload)
RabbitMQ.buildJobPayload.returns(jobBuffer)
RabbitMQ.buildJobMeta.returns(jobMeta)
return assert.isFulfilled(rabbitmq.publishTask(mockQueue, mockJob))
.then(() => {
sinon.assert.calledOnce(rabbitmq.publishChannel.sendToQueue)
sinon.assert.calledWith(
rabbitmq.publishChannel.sendToQueue,
`test-client.${mockQueue}`,
payload.jobBuffer,
payload.jobMeta
jobBuffer,
jobMeta
)
const contentCall = rabbitmq.publishChannel.sendToQueue.firstCall
contentCall.args.pop()
Expand All @@ -594,14 +595,13 @@ describe('rabbitmq', () => {

it('should call _incMonitor before publish', () => {
const payloadWithTid = Object.assign({ tid: 'test-tid' }, mockJob)
const payload = {
jobBuffer: new Buffer(JSON.stringify(payloadWithTid)),
jobMeta: {
appId: testName,
timestamp: Date.now()
}
const jobBuffer = new Buffer(JSON.stringify(payloadWithTid))
const jobMeta = {
appId: testName,
timestamp: Date.now()
}
RabbitMQ.buildPayload.returns(payload)
RabbitMQ.buildJobPayload.returns(jobBuffer)
RabbitMQ.buildJobMeta.returns(jobMeta)
return assert.isFulfilled(rabbitmq.publishTask(mockQueue, mockJob))
.then(() => {
sinon.assert.calledOnce(RabbitMQ.prototype._incMonitor)
Expand All @@ -622,13 +622,15 @@ describe('rabbitmq', () => {
rabbitmq.publishChannel.publish = sinon.stub().resolves()
sinon.stub(RabbitMQ.prototype, '_validatePublish').returns(true)
sinon.stub(RabbitMQ.prototype, '_incMonitor')
sinon.stub(RabbitMQ, 'buildPayload')
sinon.stub(RabbitMQ, 'buildJobPayload')
sinon.stub(RabbitMQ, 'buildJobMeta')
})

afterEach(() => {
RabbitMQ.prototype._validatePublish.restore()
RabbitMQ.prototype._incMonitor.restore()
RabbitMQ.buildPayload.restore()
RabbitMQ.buildJobPayload.restore()
RabbitMQ.buildJobMeta.restore()
})

it('should reject if _validatePublish throws', () => {
Expand All @@ -642,14 +644,13 @@ describe('rabbitmq', () => {

it('should publish with a buffer of the content', () => {
const payloadWithTid = Object.assign({ tid: 'test-tid' }, mockJob)
const payload = {
jobBuffer: new Buffer(JSON.stringify(payloadWithTid)),
jobMeta: {
appId: testName,
timestamp: Date.now()
}
const jobBuffer = new Buffer(JSON.stringify(payloadWithTid))
const jobMeta = {
appId: testName,
timestamp: Date.now()
}
RabbitMQ.buildPayload.returns(payload)
RabbitMQ.buildJobPayload.returns(jobBuffer)
RabbitMQ.buildJobMeta.returns(jobMeta)
return assert.isFulfilled(
rabbitmq.publishEvent(mockExchange, mockJob)
)
Expand All @@ -659,8 +660,8 @@ describe('rabbitmq', () => {
rabbitmq.publishChannel.publish,
mockExchange,
'',
payload.jobBuffer,
payload.jobMeta
jobBuffer,
jobMeta
)
rabbitmq.publishChannel.publish.firstCall.args.pop()
const content = rabbitmq.publishChannel.publish.firstCall.args.pop()
Expand All @@ -670,14 +671,13 @@ describe('rabbitmq', () => {
})
it('should call _incMonitor before publish', () => {
const payloadWithTid = Object.assign({ tid: 'test-tid' }, mockJob)
const payload = {
jobBuffer: new Buffer(JSON.stringify(payloadWithTid)),
jobMeta: {
appId: testName,
timestamp: Date.now()
}
const jobBuffer = new Buffer(JSON.stringify(payloadWithTid))
const jobMeta = {
appId: testName,
timestamp: Date.now()
}
RabbitMQ.buildPayload.returns(payload)
RabbitMQ.buildJobPayload.returns(jobBuffer)
RabbitMQ.buildJobMeta.returns(jobMeta)
return assert.isFulfilled(
rabbitmq.publishEvent(mockExchange, mockJob)
)
Expand Down Expand Up @@ -711,14 +711,14 @@ describe('rabbitmq', () => {
})
}) // end _formatJobs

describe('buildPayload', function () {
describe('buildJobPayload', function () {
it('should add tid if not in namespace', () => {
const ns = cls.createNamespace('other')

return Promise.fromCallback((cb) => {
ns.run(() => {
const payload = RabbitMQ.buildPayload({})
const outObject = JSON.parse(payload.jobBuffer.toString())
const payload = RabbitMQ.buildJobPayload({})
const outObject = JSON.parse(payload.toString())
assert.isString(outObject.tid)
cb()
})
Expand All @@ -727,8 +727,8 @@ describe('rabbitmq', () => {

it('should use tid if passed', () => {
const testTid = '123-2134-234-234-235'
const payload = RabbitMQ.buildPayload({ tid: testTid })
const outObject = JSON.parse(payload.jobBuffer.toString())
const payload = RabbitMQ.buildJobPayload({ tid: testTid })
const outObject = JSON.parse(payload.toString())
return assert.isString(outObject.tid, testTid)
})

Expand All @@ -739,33 +739,14 @@ describe('rabbitmq', () => {
return Promise.fromCallback((cb) => {
ns.run(() => {
ns.set('tid', this.tid)
const payload = RabbitMQ.buildPayload({})
const outObject = JSON.parse(payload.jobBuffer.toString())
const payload = RabbitMQ.buildJobPayload({})
const outObject = JSON.parse(payload.toString())
assert.isString(outObject.tid, testTid)
cb()
})
})
})

it('should use currentWorkerName if in namespace', () => {
const testEvent = 'app.started'
const ns = cls.createNamespace('ponos')

return Promise.fromCallback((cb) => {
ns.run(() => {
ns.set('currentWorkerName', testEvent)
const payload = RabbitMQ.buildPayload({})
assert.isString(payload.jobMeta.headers.previousEvent, testEvent)
cb()
})
})
})

it('should not set previousEvent if namespace does not exist', () => {
const payload = RabbitMQ.buildPayload({})
assert.isUndefined(payload.jobMeta.headers.previousEvent)
})

describe('stringify error', function () {
beforeEach(() => {
sinon.stub(JSON, 'stringify').throws(new Error('custom json error'))
Expand All @@ -777,12 +758,33 @@ describe('rabbitmq', () => {

it('should throw if content fails to be stringified', () => {
return assert.throws(() => {
RabbitMQ.buildPayload({})
RabbitMQ.buildJobPayload({})
}, Error, /custom json error/)
})
}) // end stringify error
})

describe('buildJobMeta', function () {
it('should use currentWorkerName if in namespace', () => {
const testEvent = 'app.started'
const ns = cls.createNamespace('ponos')

return Promise.fromCallback((cb) => {
ns.run(() => {
ns.set('currentWorkerName', testEvent)
const jobMeta = RabbitMQ.buildJobMeta('api')
assert.isString(jobMeta.headers.previousEvent, testEvent)
cb()
})
})
})

it('should not set previousEvent if namespace does not exist', () => {
const jobMeta = RabbitMQ.buildJobMeta('api')
assert.isUndefined(jobMeta.headers.previousEvent)
})
})

describe('_validatePublish', function () {
const mockExchangeName = 'some-exchange'
const mockJob = { hello: 'world' }
Expand Down

0 comments on commit fd2b2cd

Please sign in to comment.