Skip to content
Browse files

initial

  • Loading branch information...
0 parents commit 6094a20e7d6505b7280836c86019413679935cd1 @dominictarr committed
Showing with 537 additions and 0 deletions.
  1. +96 −0 gset.js
  2. +4 −0 index.js
  3. +92 −0 obj.js
  4. +23 −0 package.json
  5. +76 −0 readme.markdown
  6. +66 −0 stream.js
  7. +41 −0 test/gset.js
  8. +34 −0 test/object.js
  9. +105 −0 test/streaming.js
96 gset.js
@@ -0,0 +1,96 @@
+module.exports = GSet
+
+var EventEmitter = require('events').EventEmitter
+var Obj = require('./obj')
+/*
+ GSet -- grow only set.
+
+ base class for other collection types.
+*/
+
+function GSet(id) {
+ this.id = id
+ this.state = {}
+ this.objects = {}
+ this.queue = []
+}
+
+GSet.prototype = new EventEmitter()
+/*
+ this should apply to any
+*/
+GSet.prototype.add = function (key, oKey, val) {
+ if(!this.objects[key]) {
+ var self = this
+ var obj = this.objects[key] = new Obj(key)
+ this.state[key] = obj.get()
+ obj.on('queue', function () {
+ //prepare to flush the changes to this object
+ self.enqueue(obj)
+ })
+ }
+ this.objects[key].set(oKey, val)
+
+}
+
+GSet.prototype.enqueue = function (obj) {
+ if(!~this.queue.indexOf(obj))
+ this.queue.push(obj)
+ this.emit('queue')
+}
+
+/*
+ this can probably be used as the flush implementation for any
+ collection Obj
+*/
+
+GSet.prototype.flush = function (obj) {
+ var id = this.id
+ var updates = []
+ this.queue.forEach(function (e) {
+ var update = e.flush()
+ if(!update) return
+ update = update.slice()
+ update[0].unshift(id)
+ updates.push(update)
+ })
+ return updates
+}
+
+GSet.prototype.update = function (update) {
+ console.log(':::::', update)
+ var key = update[0].shift()
+ var array = this.array
+ if(!this.objects[key]) {
+ this.state[key] = (this.objects[key] = new Obj(key)).state
+ }
+ var obj = this.objects[key]
+ //does this need histroy at this level?
+ //all that can happen is creation.
+ obj.update(update)
+
+/*
+// DELETES. move this to Set.
+//
+//
+
+ if(obj.get('__destroy')) {
+ var i = array.indexOf(obj)
+ if(~i)
+ array.splice(i, 1) //emit splice?
+ } else if(obj.__destroy === false || obj.__destroy === null) {
+ if(!~array.indexOf(obj))
+ array.push(obj) //emit splice?
+ }
+*/
+
+}
+
+GSet.prototype.toArray =
+GSet.prototype.get = function (path) {
+ if(!arguments.length)
+ return this.state
+ //if path is defined, pass to members...
+}
+
+
4 index.js
@@ -0,0 +1,4 @@
+exports.Set =
+exports.GSet = require('./gset')
+exports.Obj = require('./obj')
+exports.createStream = require('./stream')
92 obj.js
@@ -0,0 +1,92 @@
+
+module.exports = Obj
+
+var EventEmitter = require('events').EventEmitter
+
+//this will be injectable,
+//to support different types of models.
+//i.e. using backbone or knockout.
+
+function set (obj, key, val) {
+ obj[key] = val
+}
+
+function merge(to, from) {
+ for(var k in from)
+ set(to, k, from[k])
+ return to
+}
+
+function Obj (id) {
+ this.id = id
+ this.state = {}
+ this.history = []
+ this.changes = null
+}
+
+Obj.prototype = new EventEmitter()
+
+Obj.prototype.update = function (update) {
+ update = update.slice()
+ var path = update.shift()
+ var hist = this.history
+ var last = hist[hist.length - 1]
+ var state = this.state
+
+ if(path.length)
+ throw new Error('should not have path here:' + path)
+
+ //if update is newer than any previous update.
+ if(!last || update[1] > last[1]) { //also use sequence number
+ merge(state, update[0]) //this will be injectable
+ hist.push(update)
+ //if the update has arrived out of order.
+ } else {
+ hist.push(update)
+ hist.sort(function (a, b) {
+ if(a[1] == b[1])
+ console.log(a, b)
+ return (a[1] - b[1]) || (a[2] - b[2])
+ })
+ hist.forEach(function (up) {
+ merge(state, up[0])
+ })
+ }
+}
+
+Obj.prototype.set = function (key, value) {
+ this.changes = this.changes || {}
+ var changed = false
+ if('string' === typeof key) {
+ if(this.changes[key] != value) {
+ changed = true
+ this.changes[key] = value
+ set(this.state, key, value)
+ }
+ } else {
+ for (var k in key) {
+ if(this.changes[k] != key[k]) {
+ changed = true
+ this.changes[k] = key[k]
+ //set(this.state, k, key[k])
+ }
+ }
+ }
+ if(changed)
+ this.emit('queue')
+}
+
+Obj.prototype.get = function () {
+ return this.state
+}
+
+Obj.prototype.flush = function () {
+ if(!this.changes) return
+ var changes = this.changes
+ this.changes = null
+ var update = [[], changes, Date.now()]
+ this.update(update)
+ update[0].unshift(this.id)
+ this.emit('update', changes)
+ return update
+}
23 package.json
@@ -0,0 +1,23 @@
+{
+ "author": "Dominic <dominic.tarr@gmail.com> (http://dominictarr.com)",
+ "name": "crdt",
+ "description": "Commutative Replicated Data Types for easy distributed/collaborative apps",
+ "version": "0.0.0",
+ "homepage": "http://github.com/dominictarr/crdt",
+ "repository": {
+ "url": ""
+ },
+ "scripts": {
+ "test": "tap test/*.js"
+ },
+ "engines": {
+ "node": "*"
+ },
+ "dependencies": {
+ },
+ "devDependencies": {
+ "event-stream": "0.9",
+ "tap": "~0.2.4"
+ },
+ "optionalDependencies": {}
+}
76 readme.markdown
@@ -0,0 +1,76 @@
+#CRDT - Commutative Replicated Data Types
+
+a CRDT is a data type designed so that opperations on it commute - give the same result
+indepent of the order in which they are applied.
+
+CRDTs give you eventual consistancy for free.
+
+## basic
+
+A normal `{key: value, ...}` javascript object. Updates have a sequence number and may
+update the values for multiple keys.
+
+
+``` js
+
+ ['up', path, timestamp, {key: value}]
+//update object at `path` at `timestamp`
+```
+
+Applying each update MUST be idempotent.
+If an update arrives out of order,
+updates to this path with later timestamps can be reapplied,
+to give the correct current value.
+
+### example
+
+```
+['up', 'crdt:example', 0, {type: 'example', foo: 'bar'}]
+['up', 'crdt:example', 1, {qux: 'tun'}]
+['up', 'crdt:example', 2, {qux: 'blerg', foo: null}]
+```
+
+At each update the properties in the update are merged with the properties at `path`.
+applying these updates in order gives the result:
+
+``` js
+{ type: 'example',
+, foo: null,
+, qux: 'blerg' }
+```
+
+note that the information from update 1 is completely overwritten by update 2.
+if update 1 arrives at a given node after time 2, it is necessary to check that
+`qux` already has a value from a later timestamp (2) and so need not be applied.
+
+since updates are idempotent, that is equivalent to reapplying all updates to the given path.
+however, it has the benefit that updates that are totally superceded may be discarded.
+
+## sets
+
+Sets can be implemented by tracking every object at a sub-path.
+items can be deleted by adding a property `__destroy: true` or similar.
+
+Sets are suitable when the order of data is not very important. For example,
+in a chat room, it is sufficant to order each item as they arrive, or in their timestamp order.
+
+## sequences
+
+sometimes the order of a sequence is important, or needs to be changed this can be acomplished by
+sorting members by a certain property. then if you need to move a member, or insert a new member
+give it a sort value between two adjacent keys.
+
+### example
+
+``` js
+['up', 'crdt:seq1:X', 0, {value: 'A', seq: 'B'}] // insert at B
+['up', 'crdt:seq1:Y', 1, {value: 'X', seq: 'A'}] // insert ahead of crdt:seq1:X
+['up', 'crdt:seq1:Z', 2, {value: 'B', seq: 'AL'}] // insert between crdt:seq1:X, and crdt:seq1:Y
+```
+here X, and Y are inserted with near by `seq` values. however, it is always possible to create
+another string that is ordered between any two given strings so it is always possible to move or insert
+items.
+
+## validation.
+
+validation is very simple to apply
66 stream.js
@@ -0,0 +1,66 @@
+
+var Stream = require('stream').Stream
+var crdt = require('./index')
+module.exports =
+function create () {
+ return addStreaming(new crdt.GSet('set'))
+}
+
+function addStreaming(set) {
+
+ s = set
+ var sequence = 1
+ s.pipe = Stream.prototype.pipe
+ //s.set = set
+ var queued = false
+ s.readable = s.writable = true
+
+ s._flush = function () {
+ if(!queued) return
+
+ var updates = set.flush()
+ if(!updates.length)
+ throw new Error('NO UPDATES?')
+ while(updates.length) {
+ var update = updates.shift()
+ if(update) {
+ update.push(sequence++)
+ s.emit('data', update)
+ }
+ }
+ queued = false
+ }
+
+ set.on('queue', function () {
+ if(queued) return
+ queued = true
+ // process.nextTick(s.flush)
+ })
+
+
+ s.write = function (update) {
+ // [path, time, update]
+ // hard code only one Set right now.
+
+ update[0].shift()
+
+ set.update(update)
+
+ // now is when it's time to emit events?
+ /*
+ apply local update with set(key, value)
+ or set(obj)
+ queue changes, then call flush()
+ which adds the update to histroy and returns it.
+
+ */
+ return true
+ }
+
+ //need to know if an event has come from inside
+ //or outside...
+ //should it be sent, or not?
+ //indeed, how to apply a local change?
+
+ return s
+}
41 test/gset.js
@@ -0,0 +1,41 @@
+var test = require('tap').test
+var crdt = require('..')
+
+
+test('GSet', function (t) {
+
+ var s = new crdt.Set()
+
+ s.update([['key'], {initial: true}, 0, 1])
+
+ var ary = s.get()
+
+ t.deepEqual(
+ ary,
+ {key: {initial: true}}
+ )
+ t.end()
+})
+
+test('GSet - 2', function (t) {
+
+ var s = new crdt.GSet()
+
+ s.update([['A'], {initial: true} , 0, 1])
+ s.update([['B'], {initial: false}, 0, 2])
+ s.update([['C'], {initial: 6} , 0, 3])
+
+ s.update([['C'], {initial: 3} , 2, 5])
+ s.update([['B'], {initial: 6}], 1, 4)
+
+ var ary = s.get()
+
+ t.deepEqual(
+ ary,
+ { A: {initial: true}
+ , B: {initial: 6}
+ , C: {initial: 3} }
+ )
+ t.end()
+
+})
34 test/object.js
@@ -0,0 +1,34 @@
+
+//var a = require('assertions')
+var test = require('tap').test
+var crdt = require('..')
+
+
+test('trivial', function (t) {
+ var o = new crdt.Obj()
+ t.type(o, 'object')
+ t.end()
+})
+
+test('idempotent order', function (t) {
+ var updates = [
+ [[], {hello: 'world', name: null}, 0, 1]
+ , [[], {name: 'jim'}, 1, 2]
+ , [[], {hello: 'earth'}, 2, 3]
+ ]
+
+ function rand() {
+ return 0.5 - Math.random()
+ }
+
+ var l = 10
+ while (l--) {
+ var o = new crdt.Obj()
+ updates.sort(rand).forEach(function (update){
+ o.update(update.slice())
+ })
+ t.deepEqual(o.state, {hello: 'earth', name: 'jim'})
+ }
+ t.end()
+})
+
105 test/streaming.js
@@ -0,0 +1,105 @@
+var crdt = require('..')
+var test = require('tap').test
+var es = require('event-stream')
+var assert = require('assert')
+/*
+ here I will figure out some CRDT communication.
+*/
+
+test('random', function (t) {
+
+ var a = crdt.createStream()
+ var b = crdt.createStream()
+ var lastTime
+ var lastSeq
+ var updates = []
+ b.pipe(es.mapSync(function (update) {
+ t.ok(Array.isArray(update[0]))
+ console.log(update)
+ t.type(update[1], 'object')
+ t.type(update[2], 'number')
+ t.type(update[3], 'number')
+ if(lastTime)
+ t.ok(update[2] >= lastTime)
+ if(lastSeq)
+ t.equal(update[3], lastSeq + 1)
+ lastTime = update[2]
+ lastSeq = update[3]
+ updates.push(update)
+ return update
+ })).pipe(a)
+
+ var sets = ['a', 'b', 'c']
+ var keys = ['x', 'y', 'z']
+
+ function rand(of) {
+ var i = Math.floor(Math.random() * (of.length || of))
+ return of[i] || i
+ }
+
+ var l = 100
+
+ while (l--) {
+ b.add(rand(sets), rand(keys), rand(10))
+ }
+
+ b._flush() //this would be called in next tick
+
+ console.log('B:', b.get())
+ console.log('A:', a.get())
+
+ console.log('B:', b.get())
+ console.log('A:', a.get())
+
+
+ //use regular deep equal because tap
+ //fails on key ordering.
+ //which is probably going to be different
+ //because messages may not be in order
+
+ console.log(updates)
+ var A = a.get()
+ var B = b.get()
+ assert.deepEqual(b.get(), a.get())
+
+ t.end()
+})
+
+/*
+OBSERVATIONS.
+
+collection do not maintain any histroy.
+they only hold buckets of objects.
+
+set properties like order
+are maintained by properties of the objects.
+
+each level emits when it has changed.
+and the next level is responsible for deciding when to
+flush the changes.
+
+OBJECT
+ set //set a value
+ get //retrive a value, or the cur state
+ flush //apply queued local changes
+ update //apply an remote update
+
+ 'queue' //when a local change has happened
+ 'update' //when changes have been applied. (local or remote)
+
+SET
+ set/add //set a inner object.
+ get //retrive a inner object/or state
+ flush //apply queued local changes.
+ update //apply an update
+
+ 'queue' //when a local change has been made.
+ 'update' //when changes are applied. (local or remote)
+
+each type implements these methods...
+.. but differently.
+
+*/
+
+
+

0 comments on commit 6094a20

Please sign in to comment.
Something went wrong with that request. Please try again.