Skip to content

Commit

Permalink
v0.0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
diversario committed Aug 10, 2013
2 parents c6bd00d + f4038c5 commit e00cea4
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 6 deletions.
6 changes: 4 additions & 2 deletions lib/Eventcast.js
Expand Up @@ -33,12 +33,14 @@ var defaultConfig = {

replPort: null, // random
replHost: 'localhost',
replEnabled: false,

encrypt: false,

maxPayloadSize: 1024,

messageTtl: 3000
messageTtl: 3000,
messageRetransmitAttemps: 3
}


Expand All @@ -57,7 +59,7 @@ function Eventcast(opts) {
EE.call(this)

this._init(opts)
this._startRepl()
if (this.config.replEnabled) this._startRepl()
}

util.inherits(Eventcast, EE)
Expand Down
101 changes: 99 additions & 2 deletions lib/MessageBuffer.js
Expand Up @@ -5,21 +5,40 @@ var EE = require('events').EventEmitter

module.exports = MessageBuffer



/**
* Message buffering mechanism.
*
* @param config
* @constructor
*/
function MessageBuffer(config) {
this._timeout = config && config.messageTtl
this._timeout = config.messageTtl
this._messageRetransmitAttemps = config.messageRetransmitAttemps
this._timers = {
receive: {},
send: {},
nack: {},
message: {}
}

this._timesMissed = {}

this._incomingMessages = {}
this._outgoingMessages = {}
this._expired = []
}

util.inherits(MessageBuffer, EE)



/**
* Buffers outgoing message chunks.
*
* @param messageArray
*/
MessageBuffer.prototype.bufferOutgoing = function bufferOutgoing(messageArray) {
var self = this
, seqId = messageArray[0].meta().seqId
Expand All @@ -35,6 +54,15 @@ MessageBuffer.prototype.bufferOutgoing = function bufferOutgoing(messageArray) {
}, this._timeout * 3)
}



/**
* Returns an array of message packets specified in `seqs` from outgoing buffer.
*
* @param {String} seqId
* @param {Array} seqs
* @returns {Array}
*/
MessageBuffer.prototype.getOutgoingPackets = function getOutgoingPackets(seqId, seqs) {
var self = this
, packets = []
Expand All @@ -48,10 +76,23 @@ MessageBuffer.prototype.getOutgoingPackets = function getOutgoingPackets(seqId,
return packets
}



/**
* Buffers incoming message chunks.
*
* @param {Message} msg Message
* @param {String} seqId Sequence ID
* @param {Number} seq Packet number
*/
MessageBuffer.prototype.bufferIncoming = function bufferIncoming(msg, seqId, seq) {
// start TTL timer when first packet for `seqId` is received
seq = seq.toString()

if (!this._timesMissed[seqId]) {
this._timesMissed[seqId] = 0
}

if (this._incomingMessages[seqId]) {
clearTimeout(this._timers.receive[seqId])
} else {
Expand All @@ -68,13 +109,30 @@ MessageBuffer.prototype.bufferIncoming = function bufferIncoming(msg, seqId, seq
)
}



/**
* Returns buffered incoming message packets from `seqId` sequence.
* Buffer is destroyed.
*
* @param {String} seqId
* @returns {*}
*/
MessageBuffer.prototype.getIncomingBuffer = function getIncomingBuffer(seqId) {
this._clearTimers(seqId)
var m = this._incomingMessages[seqId]
delete this._incomingMessages[seqId]
return m
}



/**
* Checks if sequence `seqId` is fully received.
*
* @param {String} seqId
* @returns {Boolean}
*/
MessageBuffer.prototype.isComplete = function isComplete(seqId) {
if (!this._incomingMessages[seqId]) return false

Expand All @@ -84,6 +142,13 @@ MessageBuffer.prototype.isComplete = function isComplete(seqId) {
return lastSeq.meta().seqEnd && lastSeq.meta().seq === parts.length - 1
}



/**
* Returns array of missed packets from `seqId` sequence.
* @param {String} seqId
* @returns {Array}
*/
MessageBuffer.prototype.getMissingSeq = function getMissingSeq(seqId) {
var parts = Object.keys(this._incomingMessages[seqId])
, missingSeq = []
Expand All @@ -100,18 +165,35 @@ MessageBuffer.prototype.getMissingSeq = function getMissingSeq(seqId) {
return missingSeq
}



/**
* Returns metadata from `seqId` sequence.
* @param {String} seqId
* @returns {Object}
*/
MessageBuffer.prototype.getSenderMeta = function getSenderMeta(seqId) {
if (!this._incomingMessages[seqId]) return null

var parts = Object.keys(this._incomingMessages[seqId])

return this._incomingMessages[seqId][parts[0]].meta()
}




/**
* Notifies listeners that `seqId` has missing packets.
*
* @param {String} seqId
*/
MessageBuffer.prototype.miss = function miss(seqId) {
delete this._timers.receive[seqId]

if (this._timesMissed[seqId] > this._messageRetransmitAttemps) {
return this.expire(seqId)
}

this.emit('miss', seqId, this.getMissingSeq(seqId))

this._timers.nack[seqId] = setTimeout(
Expand All @@ -120,13 +202,28 @@ MessageBuffer.prototype.miss = function miss(seqId) {
)
}



/**
* Expires `seqId` buffer.
*
* @param seqId
*/
MessageBuffer.prototype.expire = function expire(seqId) {
this._clearTimers(seqId)
this._expired.push(seqId)
delete this._incomingMessages[seqId]
delete this._timesMissed[seqId]
this.emit('expired', seqId)
}



/**
* Clears all timers for `seqId`.
* @param seqId
* @private
*/
MessageBuffer.prototype._clearTimers = function (seqId) {
clearTimeout(this._timers.receive[seqId])
clearTimeout(this._timers.nack[seqId])
Expand Down
2 changes: 1 addition & 1 deletion package.json
@@ -1,6 +1,6 @@
{
"name": "eventcast",
"version": "0.0.3",
"version": "0.0.4",
"description": "Events over multicast",
"main": "index.js",
"scripts": {
Expand Down
3 changes: 2 additions & 1 deletion test/eventcast.test.js
Expand Up @@ -861,7 +861,8 @@ describe('REPL', function() {
var counter = 0

var server1 = new Eventcast({
replPort: 33333
replPort: 33333,
replEnabled: true
})

setTimeout(function() {
Expand Down

0 comments on commit e00cea4

Please sign in to comment.