Skip to content
This repository has been archived by the owner on Sep 27, 2021. It is now read-only.

Commit

Permalink
feat: 🎸 implement event ack rejection handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Ľubomír Jesze committed Sep 27, 2018
1 parent f8fb161 commit f106349
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 6 deletions.
61 changes: 56 additions & 5 deletions src/Connection/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,14 @@ class Connection extends Emittery {
return
}

/**
* Ack error packet
*/
if (msp.isAckErrorPacket(packet)) {
this._processAckError(packet)
return
}

/**
* Ping from client
*/
Expand Down Expand Up @@ -265,11 +273,14 @@ class Connection extends Emittery {
const result = this.getSubscription(topic).serverMessage(packet.d)

if (typeof (id) !== 'undefined') {
result.then((responses) => {
const data = responses.find((response) => typeof (response) !== 'undefined')

this.sendAckPacket(topic, id, data)
})
result
.then((responses) => {
const data = responses.find((response) => typeof (response) !== 'undefined')
this.sendAckPacket(topic, id, data)
})
.catch((error) => {
this.sendAckErrorPacket(topic, id, error.message)
})
}
}

Expand Down Expand Up @@ -297,6 +308,30 @@ class Connection extends Emittery {
this.getSubscription(packet.d.topic).serverAck(packet.d)
}

/**
* Processes the ack error by ensuring the packet is valid and there
* is a subscription for the given topic.
*
* @method _processAckError
*
* @param {Object} packet
*
* @return {void}
*/
_processAckError (packet) {
if (!msp.isValidAckErrorPacket(packet)) {
this._notifyPacketDropped('_processAckError', 'dropping ack error since packet is invalid %j', packet)
return
}

if (!this.hasSubscription(packet.d.topic)) {
this._notifyPacketDropped('_processAckError', 'dropping ack error since there are no subscription %j', packet)
return
}

this.getSubscription(packet.d.topic).serverAckError(packet.d)
}

/**
* Process the subscription packets, one at a time in
* sequence.
Expand Down Expand Up @@ -663,6 +698,22 @@ class Connection extends Emittery {
this.sendPacket(msp.ackPacket(topic, id, data))
}

/**
* Sends the ack error packet, when the client requested the
* response for given event.
*
* @method sendAckErrorPacket
*
* @param {String} topic
* @param {Number} id
* @param {String} message
*
* @return {void}
*/
sendAckErrorPacket (topic, id, message) {
this.sendPacket(msp.ackErrorPacket(topic, id, message))
}

/**
* Makes the event packet from the topic and the
* body
Expand Down
22 changes: 21 additions & 1 deletion src/Socket/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,27 @@ class Socket {
serverAck ({ id, data }) {
if (this._acks.has(id)) {
const ack = this._acks.get(id)
ack(data)
ack(null, data)
this._acks.delete(id)
} else {
debug('bad ack %s for %s topic', id, this.topic)
}
}

/**
* A new ack error received
*
* @method serverAckError
*
* @param {Number} options.id
* @param {String} options.message
*
* @return {void}
*/
serverAckError ({ id, message }) {
if (this._acks.has(id)) {
const ack = this._acks.get(id)
ack(new Error(message))
this._acks.delete(id)
} else {
debug('bad ack %s for %s topic', id, this.topic)
Expand Down

0 comments on commit f106349

Please sign in to comment.