Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

[test] updated tests to fix a bug in pushing out data

  • Loading branch information...
commit b3ce53c29f5413e90074c33308b3ef9a62187518 1 parent 9da615e
@hij1nx authored
View
2  README.md
@@ -11,7 +11,7 @@ Eventual consistency allows peers who wish to replicate an opportunity to
update regardless of the state of synchronization.
This implementation is limited to the distribution of peer information and
-data structures in order to support heterogeneous computers and networks.
+data structures in order to support heterogeneous infrastructure.
# What is the Gossip Protocol
View
179 Vine.js
@@ -10,24 +10,6 @@ var BallotBox = require('./common/BallotBox'); // for voting
var timers = {};
-var dataStore = SHash();
-var ballotbox = BallotBox();
-
-var clearTimers = function() {
-
- for(var timer in timers) {
- clearTimeout(timers[timer].timer);
- delete timers[timer];
- }
-};
-
-//
-// a timer is used to keep track of how long its been
-// since we've heard from a peer. If the timer runs out
-// we stop trying to broadcast to that peer by marking
-// it as dead. If we hear from it again, it gets marked
-// as alive.
-//
var Timer = function Timer(timeout, uuid, callback) {
if(!(this instanceof Timer)) {
@@ -55,6 +37,14 @@ Timer.prototype.reset = function() {
this.start();
};
+//
+// a timer is used to keep track of how long its been
+// since we've heard from a peer. If the timer runs out
+// we stop trying to broadcast to that peer by marking
+// it as dead. If we hear from it again, it gets marked
+// as alive.
+//
+
var Vine = module.exports = function Vine(opts, callback) {
if(!(this instanceof Vine)) {
@@ -70,22 +60,15 @@ var Vine = module.exports = function Vine(opts, callback) {
var that = this;
+ this.dataStore = SHash();
+ this.ballotbox = BallotBox();
+
this.peers = opts.peers || {};
this.defaultTimeout = opts.timeout || 1e4;
var server = this.server = net.createServer(function(socket) {
- //
- // when we get data, decide if
- // we are interested in it or not.
- //
socket.on('data', function(data) {
-
- //
- // we pass in the socket so that
- // we can have a conversation if
- // the need arises.
- //
that.write(data, socket);
});
});
@@ -107,8 +90,8 @@ var Vine = module.exports = function Vine(opts, callback) {
lifetime: 0,
timeout: this.defaultTimeout,
heartbeatInterval: opts.heartbeatInterval || 100,
- listInterval: opts.listInterval || 300,
- hashInterval: opts.hashInterval || 300
+ listInterval: opts.listInterval || 500,
+ hashInterval: opts.hashInterval || 500
};
//
@@ -148,69 +131,60 @@ Vine.prototype.write = function(msg, socket) {
var type = msg.meta.type;
var data = msg.data;
- that.emit(type, data, socket);
-
if (type === 'gossip') {
var delta = [];
for (var key in data) {
- if (dataStore.interest(key, data[key].hash, data[key].ctime)) {
+ if (this.dataStore.interest(key, data[key].hash, data[key].ctime)) {
delta.push(key);
}
}
- if (delta > 0) {
-
- socket.write({
- meta: {
- type: 'gossip-request'
- },
- data: delta
- });
-
+ if (delta.length > 0) {
+ this.send('gossip-request', delta, socket);
}
else {
socket.end();
}
}
else if (type === 'gossip-request') {
-
//
// there has been a request for a value,
// in this case we can be sure its wanted.
//
var delta = {};
+ var key = '';
- for (var i = 0, l = data.length; i > l; i++) {
- delta[data[i]] = dataStore[data[i]].value;
+ for (var i=0, l=data.length; i < l; i++) {
+ key = data[i];
+ delta[key] = this.dataStore.get(key);
}
-
- socket.write({
- meta: {
- type: 'gossip-response'
- },
- data: delta
- });
+
+ this.send('gossip-response', delta, socket);
}
else if (type === 'gossip-response') {
socket.end();
for (var key in data) {
- dataStore.set(key, data[key]);
+
+ this.dataStore.set(key, data[key]);
+ that.emit('gossip', key, data[key]);
}
- }
+ }
else if (type === 'quorum') {
+ socket.end();
+
var data = msg.data;
var topic = data.topic;
//
// merge or create the election
//
- var election = ballotbox.merge(this.details.uuid, topic, data);
+ var election = this.ballotbox.merge(this.details.uuid, topic, data);
if (election.result === null) {
@@ -218,23 +192,22 @@ Vine.prototype.write = function(msg, socket) {
// we have not yet come to a quorum, we should end this
// socket and send the votes to another random peer.
//
- this.send('quorum', ballotbox.elections[topic]);
+ this.send('quorum', this.ballotbox.elections[topic]);
}
else {
var origin = this.peers[election.origin];
if (origin) {
+
this.send('quorum-request', topic, origin.port, origin.address);
}
}
-
- socket.end();
}
else if (type === 'quorum-request') {
var topic = msg.data;
- var election = ballotbox.elections[topic];
+ var election = this.ballotbox.elections[topic];
//
// if there is a request for the election, that means that
@@ -245,30 +218,24 @@ Vine.prototype.write = function(msg, socket) {
election.closed = true;
- socket.write({
- meta: {
- type: 'quorum-response',
- },
- data: election
- });
+ this.send('quorum-response', election, socket);
}
}
else if (type === 'quorum-response') {
socket.end();
-
- this.emit(
- 'quorum',
- msg.data
- );
+ this.emit('quorum', msg.data);
}
else if (type === 'list') {
- var peers = msg.data; // the message data is a list of peers.
+ that.emit(type, data, socket);
+ socket.end();
+
+ var peers = msg.data;
for (peerId in peers) {
- var knownPeer = that.peers[peerId]; // do we know this peer?
+ var knownPeer = that.peers[peerId];
if (knownPeer) {
@@ -311,8 +278,6 @@ Vine.prototype.write = function(msg, socket) {
timer.start();
}
}
-
- socket.end(); // we got the list, no need to have a conversation.
}
return this;
@@ -321,16 +286,16 @@ Vine.prototype.write = function(msg, socket) {
//
// send a message to a random peer.
//
-Vine.prototype.send = function(type, data, port, address) {
-
- ++this.details.lifetime;
+Vine.prototype.send = function(type, data) {
var that = this;
+ var port = arguments[2];
+ var address = arguments[3];
//
// get a random peer, or provide one
//
- if (!address && !port) {
+ if (!arguments[2] && !arguments[3]) {
var peer = this.randomPeer();
@@ -342,7 +307,7 @@ Vine.prototype.send = function(type, data, port, address) {
port = peer.port;
}
- else if (!address) {
+ else if (!arguments[3]) {
address = '127.0.0.1';
}
@@ -354,9 +319,15 @@ Vine.prototype.send = function(type, data, port, address) {
data: data
};
- that.emit('send', port, address, msg);
- var message = new Buffer(JSON.stringify(msg));
+ var message = JSON.stringify(msg);
+
+ if (typeof arguments[2] === 'object') {
+
+ var socket = arguments[2];
+ socket.write(message);
+ return this;
+ }
var client = net.connect({
port: port,
@@ -364,12 +335,14 @@ Vine.prototype.send = function(type, data, port, address) {
});
client.on('error', function(err) {
- // do nothing
+ // do nothing for now.
})
- client.on('connect', function() {
+ client.on('data', function(data) {
+ that.write(data, client);
+ })
- that.emit('sent', port, address, msg);
+ client.on('connect', function() {
client.write(message);
});
@@ -379,17 +352,9 @@ Vine.prototype.send = function(type, data, port, address) {
//
// set a local value on this peer.
//
-Vine.prototype.set = function(key, val) {
-
- dataStore.set(key, val);
-};
-
-//
-// get a local value from this peer.
-//
-Vine.prototype.get = function(key) {
+Vine.prototype.gossip = function(key, val) {
- return dataStore.get(key);
+ this.dataStore.set(key, val);
};
//
@@ -404,7 +369,7 @@ Vine.prototype.vote = function(topic, value) {
// we have reached a quorum, if not then send off the
// votes that we know about to the next random peer.
//
- var election = ballotbox.vote(this.details.uuid, topic, value);
+ var election = this.ballotbox.vote(this.details.uuid, topic, value);
if (election.closed) {
@@ -421,20 +386,19 @@ Vine.prototype.vote = function(topic, value) {
// and it has a result, we should attempt to
// request quorum.
//
- var origin = this.peers[result.origin];
+ var origin = this.peers[election.result.origin];
if (origin) {
this.send(
- 'quorum-request',
+ 'quorum-request',
topic,
- origin.port,
+ origin.port,
origin.address
);
}
}
}
else {
-
this.send('quorum', election);
}
return this;
@@ -448,7 +412,7 @@ Vine.prototype.election = function(opts) {
//
opts.origin = this.details.uuid;
- ballotbox.election(opts);
+ this.ballotbox.election(opts);
return this;
};
@@ -469,7 +433,9 @@ Vine.prototype.listen = function(port, address) {
// we want to send of the list at an interval.
//
that.listInterval = setInterval(function() {
- that.send('list', that.peers);
+ if (Object.keys(that.peers).length > 0) {
+ that.send('list', that.peers);
+ }
}, that.details.listInterval);
//
@@ -479,7 +445,9 @@ Vine.prototype.listen = function(port, address) {
// corresponding values for those keys.
//
that.hashInterval = setInterval(function() {
- that.send('gossip', dataStore.meta);
+ if (Object.keys(that.dataStore.meta).length > 0) {
+ that.send('gossip', that.dataStore.meta);
+ }
}, that.details.hashInterval);
//
@@ -493,13 +461,16 @@ Vine.prototype.listen = function(port, address) {
return this;
};
-Vine.prototype.close = function() {
+Vine.prototype.end = function() {
clearInterval(this.heartbeatInterval);
clearInterval(this.listInterval);
clearInterval(this.hashInterval);
- clearTimers();
+ for(var timer in timers) {
+ clearTimeout(timers[timer].timer);
+ delete timers[timer];
+ }
this.server.close();
View
10 common/SHash.js
@@ -25,16 +25,18 @@ SHash.prototype.randomPair = function() {
// if the key does not exist at all, we want it,
// also if the key exists and the hash is different.
//
-SHash.prototype.interest = function(key, sha1, ctime) {
+SHash.prototype.interest = function(key, hash, ctime) {
var meta = this.meta[key];
+ if (typeof meta === 'undefined') {
+ return true;
+ }
+
var thisCTime = new Date(meta.ctime);
var thatCTime = new Date(ctime);
- return (typeof meta === 'undefined' ||
- (meta && meta.hash !== sha1) ||
- (thisCTime > thatCTime));
+ return (meta.hash !== hash) && (thisCTime > thatCTime);
};
SHash.prototype.delete = function(key) {
View
11 common/ballotbox.js
@@ -178,13 +178,12 @@ BallotBox.prototype.decide = function(uuid, topic) {
var V = Vc + Va;
- //
- // ensure that a transaction cannot be committed and
- // aborted at the same time.
- //
-
- if (V >= total) {
+ if (V >= min) {
+ //
+ // ensure that a transaction cannot be committed and
+ // aborted at the same time.
+ //
if (Vc === Va) {
return election;
}
View
4 package.json
@@ -1,10 +1,10 @@
{
"name": "vines",
- "version": "0.0.3",
+ "version": "0.0.4",
"description": "an implementation of the gossip protocol with quorum-based voting machinery",
"main": "index.js",
"scripts": {
- "test": "node test/test.js"
+ "test": "./test/test"
},
"repository": {
"type": "git",
View
13 test/test.js → test/test 100644 → 100755
@@ -1,18 +1,21 @@
-//
-// test harness...
-//
+#!/usr/bin/env node
+
var fs = require('fs');
var path = require('path');
var tap = require('tap').test;
-var pathname = './test/tests';
+var pathname = path.join(__dirname, 'tests');
+
+var filter = process.argv[2] || null;
fs.readdirSync(pathname).forEach(function (name) {
+ if (filter && name.indexOf(filter) < 0) { return; }
+
if (path.extname(name) === '.js') {
var basename = path.basename(name, '.js');
- var rawfile = path.join(process.cwd(), pathname, basename);
+ var rawfile = path.join(pathname, basename);
var tests = require(rawfile);
View
27 test/tests/gossip.js
@@ -15,8 +15,8 @@ module.exports = {
vine1.on('list', function(data) {
test.ok(data, 'got the list')
- vine1.close()
- vine2.close()
+ vine1.end()
+ vine2.end()
})
vine2 = Vine().listen(8002).join(8001)
@@ -31,18 +31,25 @@ module.exports = {
var vine2
vine1 = Vine().listen(8003)
-
- vine1.set('foo', 'hello, world')
-
vine2 = Vine().listen(8004).join(8003)
- setTimeout(function() {
+ vine2.on('gossip', function(key, data) {
//need to get this from an emit
- test.equal('hello, world', vine2.get('foo'))
- vine1.close()
- vine2.close()
+ if (key === 'foo') {
+
+ test.ok(data, 'got the gossip with the correct key')
+
+ vine1.end()
+ vine2.end()
+ }
+
+ })
+
+ setTimeout(function() {
- }, 4000);
+ vine1.gossip('foo', 'hello, world')
+ }, 1000)
+
}
};
View
180 test/tests/quorum.js
@@ -4,147 +4,129 @@ var Vine = require('../../vine')
module.exports = {
- "A peer should commit to an action after a quorum is reached": function(test) {
+ "Create an instance of a peer": function(test) {
+
+ test.plan(1)
+
+ var v1 = Vine().listen(8000)
+ setTimeout(function() {
+ v1.end()
+ test.ok(true, 'done')
+ }, 500)
+
+ },
+
+ "Create two instances of a peer": function(test) {
+
test.plan(1)
- var vine1
- var vine2
- var vine3
+ var v1 = Vine().listen(7001)
+ var v2 = Vine().listen(7002)
- var now = new Date(Date.now())
+ setTimeout(function() {
+ v1.end()
+ v2.end()
+ test.ok(true, 'done')
+ }, 500)
- var electionCriteria = {
- topic: 'a',
- expire: String(new Date(now.setMinutes(now.getMinutes() + 1))),
- min: 2, // in the real world, this would be a percentage of the total
- total: 3 // in the real world, this would be discovered dynamically
- }
+ },
+
+ "Connect two peers that will initiate an election": function(test) {
+
+ test.plan(1)
+
+ var v1 = Vine().listen(7003)
+ var v2 = Vine().listen(7004).join(7003)
+
+ var now = new Date(Date.now())
var onQuorum = function(election) {
if (election.topic === 'a') {
- vine1.close()
- vine2.close()
- vine3.close()
+ v1.end()
+ v2.end()
- test.equal(election.topic, 'a', 'quorum has been reached')
+ test.equal(
+ election.topic,
+ 'a',
+ 'quorum has been reached, executed by only one peer'
+ )
}
}
- vine1 = Vine()
- .listen(8005)
- .election(electionCriteria)
- .on('quorum', onQuorum)
- ;
+ var electionCriteria = {
+ topic: 'a',
+ expire: String(new Date(now.setMinutes(now.getMinutes() + 5))),
+ min: 2,
+ total: 2
+ }
- vine2 = Vine()
- .listen(8006)
- .join(8005)
- .election(electionCriteria)
- .on('quorum', onQuorum)
- ;
+ v1.on('quorum', onQuorum).election(electionCriteria)
+ v2.on('quorum', onQuorum).election(electionCriteria)
- vine3 = Vine()
- .listen(8007)
- .join(8005)
- .election(electionCriteria)
- .on('quorum', onQuorum)
- ;
+ setTimeout(function() {
+ v1.vote('a', true);
+ }, 300)
- setTimeout(function() { vine1.vote('a', true) }, 100)
- setTimeout(function() { vine2.vote('a', true) }, 200)
- setTimeout(function() { vine3.vote('a', true) }, 300)
- },
+ setTimeout(function() {
+ v2.vote('a', true);
+ }, 800)
- "An expired election should never reach quorum": function(test) {
+ },
+ "An election should expire": function(test) {
+
test.plan(1)
- var vines = {}
- var expired = false
+ var v1 = Vine().listen(7003)
+ var v2 = Vine().listen(7004).join(7003)
var now = new Date(Date.now())
-
- var electionCriteria = {
- topic: 'b',
- expire: 60,
- min: 4,
- total: 5
- }
+ var counter = 0;
var onQuorum = function(election) {
- if (election.topic === 'b') {
-
- test.fail(election.topic, 'b', 'Should not reach quorum')
-
- }
}
var onExpire = function(election) {
- if (election.topic === 'b' && expired === false) {
-
- expired = true;
- test.ok(true, 'Should emit the expire event')
+ ++counter;
+
+ if (election.topic === 'a' && counter === 2) {
- //
- // close all of the open servers
- //
- Object.keys(vines).forEach(function(key) {
- vines[key].close()
- })
+ v1.end()
+ v2.end()
+
+ test.ok(true, 'Should emit the expire event')
}
}
- vines.vine1 = Vine()
- .listen(8008)
- .election(electionCriteria)
- .on('expire', onExpire)
- .on('quorum', onQuorum)
- ;
+ var electionCriteria = {
+ topic: 'a',
+ expire: String(new Date(now.setMinutes(now.getMinutes() + .1))),
+ min: 2,
+ total: 2
+ }
- vines.vine2 = Vine()
- .listen(8009)
- .election(electionCriteria)
- .join(8008)
- .on('expire', onExpire)
+ v1
.on('quorum', onQuorum)
- ;
-
- vines.vine3 = Vine()
- .listen(8010)
- .election(electionCriteria)
- .join(8008)
.on('expire', onExpire)
- .on('quorum', onQuorum)
- ;
-
- vines.vine4 = Vine()
- .listen(8011)
.election(electionCriteria)
- .join(8009)
- .on('expire', onExpire)
- .on('quorum', onQuorum)
- ;
- vines.vine5 = Vine()
- .listen(8012)
- .election(electionCriteria)
- .join(8009)
- .on('expire', onExpire)
+ v2
.on('quorum', onQuorum)
- ;
+ .on('expire', onExpire)
+ .election(electionCriteria)
- Object.keys(vines).forEach(function(key) {
+ setTimeout(function() {
+ v1.vote('a', true);
+ }, 300)
- setTimeout(
- function() { vines[key].vote('b', true) },
- Math.floor(Math.random() * 100)
- )
- })
+ setTimeout(function() {
+ v2.vote('a', true);
+ }, 800)
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.