Skip to content

Commit

Permalink
refactor, in preparation for changing to base events model
Browse files Browse the repository at this point in the history
  • Loading branch information
dominictarr committed Aug 27, 2012
1 parent cade4c7 commit 8e78c6f
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 129 deletions.
9 changes: 9 additions & 0 deletions id.js
@@ -0,0 +1,9 @@
var map = require('iterate').map

module.exports =
function createID () {
return map(3, function (i) {
return Math.random().toString(16).substring(2).toUpperCase()
}).join('')
}

244 changes: 125 additions & 119 deletions index.js
Expand Up @@ -4,137 +4,143 @@ var EventEmitter = require('events').EventEmitter
var i = require('iterate')
var timestamp = require('./timestamp')
var duplex = require('duplex')
var inherits = require('util').inherits

function createID () {
return i.map(3, function (i) {
return Math.random().toString(16).substring(2).toUpperCase()
}).join('')
}
var createID = require('./id')

exports =
module.exports = Scuttlebutt

module.exports = Model
exports.createID = createID
exports.timestamp = timestamp

Model.createID = createID
Model.timestamp = timestamp
function validate (data) {
//must be an 4 element array
//string, *, string, number
//log a message and ignore if invalid.
function error () {
console.error('invalid update', data)
}
var key = data[0], source = data[2], ts = data[3]

/*console.log(!Array.isArray(data)
, data.length !== 4
, 'string' !== typeof key
, 'string' !== typeof source
, 'number' !== typeof ts
)*/

if( !Array.isArray(data)
|| data.length !== 4
|| 'string' !== typeof key
|| 'string' !== typeof source
|| 'number' !== typeof ts
)
return error(), false
return true
}

function Model (id) {
var emitter = new EventEmitter()
inherits (Scuttlebutt, EventEmitter)

function Scuttlebutt (id) {
if(!(this instanceof Scuttlebutt)) return new Scuttlebutt(id)
var emitter = this

emitter.store = {}
emitter.timestamps = {}
emitter.sources = {}
emitter.id = id = id || createID()
emitter.set = function (k, v) {
emitter._update(k, v, id, timestamp())
}
emitter.get = function (k) {
if(emitter.store[k])
return emitter.store[k][1]
}
emitter._update = function (key, value, source, ts) {
var cur = emitter.timestamps[key]
var latest = emitter.sources[source]
var update = [].slice.call(arguments)
//if this message is older for it's source,
//ignore it. it's out of order.
//each node must emit it's changes in order!
if(latest && latest >= ts)
return emitter.emit('old_data', update), false

emitter.sources[source] = ts

//check if this message is older than
//the value we already have.
//do nothing if so
//emit an 'old-data' event because i'll want to track how many
//unnecessary messages are sent.
if(cur && cur > ts)
return emitter.emit('old-data', [key, value, source, ts]), false

emitter.store[key] = update
//key, value,
emitter.emit('data', update)
emitter.emit('update', key, value, source, ts)

return true

}

function validate (data) {
//must be an 4 element array
//string, *, string, number
//log a message and ignore if invalid.
function error () {
console.error('invalid update', data)
}
var key = data[0], source = data[2], ts = data[3]
/*console.log(!Array.isArray(data)
, data.length !== 4
, 'string' !== typeof key
, 'string' !== typeof source
, 'number' !== typeof ts
)*/
if( !Array.isArray(data)
|| data.length !== 4
|| 'string' !== typeof key
|| 'string' !== typeof source
|| 'number' !== typeof ts
)
return error(), false

return true
}
}

emitter.createStream = function () {
//the sources for the remote end.
var sources = {}
var d = duplex()
.on('write', function (data) {
//if it's an array, it's an update.
//if it's an object, it's a scuttlebut digest.
if(Array.isArray(data) && validate(data))
return emitter._update.apply(emitter, data)
if('object' === typeof data && data) {
//when the digest is recieved from the other end,
//send the histroy.
//merge with the current list of sources.

sources = data
i.each(emitter.histroy(sources), d.emitData.bind(d))

this.emit('sync')
}
}).on('ended', function () { d.emitEnd() })
.on('close', function () {
emitter.removeListener('update', onUpdate)
})

function onUpdate (key, value, source, ts) {
if(sources[source] && sources[source] >= ts)
return //the other end has already seen this message.
d.emit('data', [key, value, source, ts])
//update source
sources[source] = ts
}
d.emitData(emitter.sources)

emitter.on('update', onUpdate)
return d
}
var sb = Scuttlebutt.prototype

emitter.filter = function (e, filter) {
var source = e[2]
var ts = e[3]
return (!filter || !filter[source] || filter[source] < ts)
}

emitter.histroy = function (filter) {
var h = []
i.each(emitter.store, function (e) {
if(emitter.filter(e, filter))
h.push(e)
sb.localUpdate = function (key, value) {
this._update(key, value, this.id, timestamp())
return this
}

//checks whether this update is valid.

sb._update = function (key, value, source, ts) {
var emitter = this
var cur = this.timestamps[key]
var latest = this.sources[source]
var update = [].slice.call(arguments)

//if this message is older for it's source,
//ignore it. it's out of order.
//each node must emit it's changes in order!

if(latest && latest >= ts)
return this.emit('old_data', update), false

this.sources[source] = ts

//check if this message is older than
//the value we already have.
//do nothing if so
//emit an 'old-data' event because i'll want to track how many
//unnecessary messages are sent.
if(cur && cur > ts)
return this.emit('old-data', [key, value, source, ts]), false

//move this out?
//this.store[key] = update

//key, value, timestamp, source
this.emit('data', update)
this.emit('update', key, value, source, ts)
return true
}

sb.createStream = function () {
var self = this
//the sources for the remote end.
var sources = {}
var d = duplex()
.on('write', function (data) {
//if it's an array, it's an update.
//if it's an object, it's a scuttlebut digest.
if(Array.isArray(data) && validate(data))
return self._update.apply(self, data)
if('object' === typeof data && data) {
//when the digest is recieved from the other end,
//send the histroy.
//merge with the current list of sources.
sources = data
i.each(self.histroy(sources), d.emitData.bind(d))

This comment has been minimized.

Copy link
@Raynos

Raynos Aug 30, 2012

Contributor

Don't have single letter variable names that are not function local. var d = duplex() is also bad but at least it's only 10 lines up.

this.emit('sync')
}
}).on('ended', function () { d.emitEnd() })
.on('close', function () {
self.removeListener('update', onUpdate)
})
return h

function onUpdate (key, value, source, ts) {
if(sources[source] && sources[source] >= ts)
return //the other end has already seen this message.
d.emit('data', [key, value, source, ts])
//update source
sources[source] = ts
}
d.emitData(self.sources)

self.on('update', onUpdate)
return d
}

sb.filter = function (e, filter) {
var source = e[2]
var ts = e[3]
return (!filter || !filter[source] || filter[source] < ts)
}

return emitter
sb.histroy = function (filter) {
var self = this
var h = []
i.each(this.store, function (e) {
if(self.filter(e, filter))
h.push(e)
})
return h
}
40 changes: 40 additions & 0 deletions model.js
@@ -0,0 +1,40 @@
var Scuttlebutt = require('./index')
var inherits = require('util').inherits
var each = require('iterate').each

module.exports = Model

inherits(Model, Scuttlebutt)

function Model (id) {
if(!(this instanceof Model)) return new Model(id)
Scuttlebutt.call(this, id)

var store = this.store = {}

this.on('data', function (update) {
store[update[0]] = update
})
}

var m = Model.prototype

m.set = function (k, v) {
return this.localUpdate(k, v)
}

m.get = function (k) {
if(this.store[k])
return this.store[k][1]
}

m.histroy = function (filter) {
var self = this
var h = []
each(this.store, function (e) {
if(self.filter(e, filter))
h.push(e)
})
return h
}

1 change: 1 addition & 0 deletions package.json
Expand Up @@ -12,6 +12,7 @@
"iterate": "0.0.2",
"duplex": "~0.1.2"
},
"main": "./model.js",
"scripts": {
"test": "node test/index.js && node test/integrate.js && node test/integrate2.js"
}
Expand Down
19 changes: 11 additions & 8 deletions test/index.js
@@ -1,7 +1,9 @@

var gossip = require('..')
var gossip = require('../model')
var i = require('iterate')
var assert = require('assert')
var timestamp = require('../timestamp')
var createID = require('../id')

function test(name, test) {
console.log('#', name)
Expand All @@ -11,8 +13,8 @@ function test(name, test) {
test('updates appear in histroy', function (g) {
var key = 'key'
var value = Math.random()
var source = gossip.createID()
var ts = gossip.timestamp()
var source = 'source' //gossip.createID()
var ts = timestamp()


assert.equal(g._update(key, value, source, ts)
Expand All @@ -36,14 +38,15 @@ test('updates appear in histroy', function (g) {
})

test('can filter histroy with {sources: timestamps}', function (g) {
var A = gossip.createID()
var B = gossip.createID()
var C = gossip.createID()
var ts = gossip.timestamp()
var A = createID()
var B = createID()
var C = createID()
var ts = timestamp()

g._update('A', 'aaa', A, ts)
g._update('B', 'bbb', B, ts)
g._update('C', 'ccc', C, ts)

//filter should only return timestamps that are after
//the given timestamps.
var filter = {}
Expand Down
2 changes: 1 addition & 1 deletion test/integrate.js
@@ -1,4 +1,4 @@
var gossip = require('..')
var gossip = require('../model')
var assert = require('assert')

var g1 = gossip()
Expand Down
2 changes: 1 addition & 1 deletion test/integrate2.js
@@ -1,4 +1,4 @@
var gossip = require('..')
var gossip = require('../model')
var assert = require('assert')

var g1 = gossip()
Expand Down

0 comments on commit 8e78c6f

Please sign in to comment.