Skip to content

Commit

Permalink
feat(events): add event ack functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
Ľubomír Jesze committed Jul 24, 2018
1 parent 27143fd commit 647282e
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 8 deletions.
72 changes: 66 additions & 6 deletions src/Connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ class Connection extends Emittery {
return
}

/**
* Ack packet
*/
if (msp.isAckPacket(packet)) {
this._processAck(packet)
return
}

/**
* Ping from client
*/
Expand Down Expand Up @@ -247,12 +255,46 @@ class Connection extends Emittery {
return
}

if (!this.hasSubscription(packet.d.topic)) {
const { topic, id } = packet.d

if (!this.hasSubscription(topic)) {
this._notifyPacketDropped('_processEvent', 'dropping event since there are no subscription %j', packet)
return
}

this.getSubscription(packet.d.topic).serverMessage(packet.d)
const result = this.getSubscription(topic).serverMessage(packet.d)

if (typeof (id) !== 'undefined') {
result.then((responses) => {
const data = responses.find((response) => typeof (response) !== 'undefined')

this.sendAckPacket(topic, id, data)
})
}
}

/**
* Processes the ack by ensuring the packet is valid and there
* is a subscription for the given topic.
*
* @method _processAck
*
* @param {Object} packet
*
* @return {void}
*/
_processAck (packet) {
if (!msp.isValidAckPacket(packet)) {
this._notifyPacketDropped('_processAck', 'dropping ack since packet is invalid %j', packet)
return
}

if (!this.hasSubscription(packet.d.topic)) {
this._notifyPacketDropped('_processAck', 'dropping ack since there are no subscription %j', packet)
return
}

this.getSubscription(packet.d.topic).serverAck(packet.d)
}

/**
Expand Down Expand Up @@ -605,6 +647,22 @@ class Connection extends Emittery {
this.sendPacket(msp.leavePacket(topic))
}

/**
* Sends the ack packet, when the client requested the
* response for given event.
*
* @method sendAckPacket
*
* @param {String} topic
* @param {Number} id
* @param {Mixed} data
*
* @return {void}
*/
sendAckPacket (topic, id, data) {
this.sendPacket(msp.ackPacket(topic, id, data))
}

/**
* Makes the event packet from the topic and the
* body
Expand All @@ -614,10 +672,11 @@ class Connection extends Emittery {
* @param {String} topic
* @param {String} event
* @param {Mixed} data
* @param {Number} [id]
*
* @return {Object}
*/
makeEventPacket (topic, event, data) {
makeEventPacket (topic, event, data, id) {
if (!topic) {
throw new Error('Cannot send event without a topic')
}
Expand All @@ -626,7 +685,7 @@ class Connection extends Emittery {
throw new Error(`Topic ${topic} doesn't have any active subscriptions`)
}

return msp.eventPacket(topic, event, data)
return msp.eventPacket(topic, event, data, id)
}

/**
Expand All @@ -638,11 +697,12 @@ class Connection extends Emittery {
* @param {String} event
* @param {Mixed} data
* @param {Function} [ack]
* @param {Number} [id]
*
* @return {void}
*/
sendEvent (topic, event, data, ack) {
this.sendPacket(this.makeEventPacket(topic, event, data), {}, ack)
sendEvent (topic, event, data, ack, id) {
this.sendPacket(this.makeEventPacket(topic, event, data, id), {}, ack)
}

/**
Expand Down
52 changes: 50 additions & 2 deletions src/Socket/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const ClusterHop = require('../ClusterHop')
*/
class Socket {
constructor (topic, connection) {
this._acks = new Map()
this._nextAckId = 0

this.channel = null

/**
Expand Down Expand Up @@ -116,6 +119,29 @@ class Socket {
this.connection.sendEvent(this.topic, event, data, ack)
}

/**
* Emit message to the client and get the response.
*
* @method send
*
* @param {String} event
* @param {Object} data
*
* @return {void}
*/
send (event, data) {
const id = this._nextAckId++

return new Promise((resolve, reject) => {
this.connection.sendEvent(this.topic, event, data, (err) => {
if (err) {
return reject(err)
}
this._acks.set(id, resolve)
}, id)
})
}

/**
* Broadcast event to everyone except the current socket.
*
Expand Down Expand Up @@ -222,10 +248,30 @@ class Socket {
* @param {String} options.event
* @param {Mixed} options.data
*
* @return {void}
* @return {Promise}
*/
serverMessage ({ event, data }) {
this.emitter.emit(event, data)
return this.emitter.emit(event, data)
}

/**
* A new ack received
*
* @method serverAck
*
* @param {Number} options.id
* @param {Mixed} options.data
*
* @return {void}
*/
serverAck ({ id, data }) {
if (this._acks.has(id)) {
const ack = this._acks.get(id)
ack(data)
this._acks.delete(id)
} else {
debug('bad ack %s for %s topic', id, this.topic)
}
}

/**
Expand All @@ -241,9 +287,11 @@ class Socket {
.emit('close', this)
.then(() => {
this.emitter.clearListeners()
this._acks.clear()
})
.catch(() => {
this.emitter.clearListeners()
this._acks.clear()
})
}

Expand Down

0 comments on commit 647282e

Please sign in to comment.