Permalink
Browse files

Log replication w/ partitions.

  • Loading branch information...
1 parent 64e8440 commit 51fd364bf9347c1d1f8e9a1e79c33b8527fdd32d @benbjohnson committed Jan 6, 2014
@@ -191,6 +191,10 @@ define([], function () {
})
.after(1, wait).indefinite()
+ .then(function() {
+ player.next();
+ })
+
player.play();
};
@@ -12,7 +12,9 @@ define([], function () {
node = function(id) { return frame.model().nodes.find(id); },
cluster = function(value) { model().nodes.toArray().forEach(function(node) { node.cluster(value); }); },
wait = function() { var self = this; model().controls.show(function() { self.stop(); }); },
- subtitle = function(s, pause) { model().subtitle = s + model().controls.html(); layout.invalidate(); if (pause === undefined) { model().controls.show() }; };
+ subtitle = function(s, pause) { model().subtitle = s + model().controls.html(); layout.invalidate(); if (pause === undefined) { model().controls.show() }; },
+ clear = function() { subtitle('', false); },
+ removeAllNodes = function() { model().nodes.toArray().forEach(function(node) { node.state("stopped"); }); model().nodes.removeAll(); };
//------------------------------
// Title
@@ -68,142 +70,116 @@ define([], function () {
//------------------------------
.then(function () {
model().clients.create("X");
- subtitle('<h2>When a client sends a change to the leader the change is added as an entry to the log.</h2>', false);
+ subtitle('<h2>First a client sends a change to the leader.</h2>', false);
})
.then(wait).indefinite()
.then(function () {
client("X").send(model().leader(), "SET 5");
})
- .then(wait).indefinite()
-
-
- //------------------------------
- // Candidacy
- //------------------------------
- .at(model(), "stateChange", function(event) {
- return (event.target.state() === "candidate");
+ .after(model().defaultNetworkLatency, function() {
+ subtitle('<h2>The change is appended to the leader\'s log...</h2>');
})
- .after(1, function () {
- subtitle('<h2>After the election timeout the follower becomes a candidate and starts a new <em>election term</em>...</h2>');
+ .at(model(), "appendEntriesRequestsSent", function () {})
+ .after(model().defaultNetworkLatency * 0.25, function(event) {
+ subtitle('<h2>...then the change is sent to the followers on the next heartbeat.</h2>');
})
- .after(1, function () {
- subtitle('<h2>...votes for itself...</h2>');
+ .after(1, clear)
+ .at(model(), "commitIndexChange", function (event) {
+ if(event.target === model().leader()) {
+ subtitle('<h2>An entry is committed once a majority of followers acknowledge it...</h2>');
+ }
})
- .after(model().defaultNetworkLatency * 0.25, function () {
- subtitle('<h2>...and sends out <em>Request Vote</em> messages to other nodes.</h2>');
+ .after(model().defaultNetworkLatency * 0.25, function(event) {
+ subtitle('<h2>...and a response is sent to the client.</h2>');
})
- .after(model().defaultNetworkLatency, function () {
- subtitle('<h2>If the receiving node hasn\'t voted yet in this term then it votes for the candidate...</h2>');
+ .after(1, clear)
+ .after(model().defaultNetworkLatency, function(event) {
+ subtitle('<h2>Now let\'s send a command to increment the value by "2".</h2>');
+ client("X").send(model().leader(), "ADD 2");
})
- .after(1, function () {
- subtitle('<h2>...and the node resets its election timeout.</h2>');
+ .after(1, clear)
+ .at(model(), "recv", function (event) {
+ subtitle('<h2>Our system value is now updated to "7".</h2>', false);
})
+ .after(1, wait).indefinite()
//------------------------------
- // Leadership & heartbeat timeout.
+ // Network Partition
//------------------------------
- .at(model(), "stateChange", function(event) {
- return (event.target.state() === "leader");
- })
.after(1, function () {
- subtitle('<h2>Once a candidate has a majority of votes it becomes leader.</h2>');
- })
- .after(model().defaultNetworkLatency * 0.25, function () {
- subtitle('<h2>The leader begins sending out <em>Append Entries</em> messages to its followers.</h2>');
- })
- .after(1, function () {
- subtitle('<h2>These messages are sent in intervals specified by the <span style="color:red">heartbeat timeout</span>.</h2>');
- })
- .after(model().defaultNetworkLatency, function () {
- subtitle('<h2>Followers then respond to each <em>Append Entries</em> message.</h2>');
- })
- .after(1, function () {
- subtitle('', false);
+ removeAllNodes();
+ model().nodes.create("A");
+ model().nodes.create("B");
+ model().nodes.create("C");
+ model().nodes.create("D");
+ model().nodes.create("E");
+ layout.invalidate();
})
- .after(model().heartbeatTimeout * 2, function () {
- subtitle('<h2>This election term will continue until a follower stops receiving heartbeats and becomes a candidate.</h2>', false);
+ .after(500, function () {
+ node("A").init();
+ node("B").init();
+ node("C").init();
+ node("D").init();
+ node("E").init();
+ cluster(["A", "B", "C", "D", "E"]);
+ model().resetToNextTerm();
+ node("B").state("leader");
})
- .after(100, wait).indefinite()
.after(1, function () {
- subtitle('', false);
+ subtitle('<h2>Raft can even stay consistent in the face of network partitions.</h2>', false);
})
-
- //------------------------------
- // Leader re-election
- //------------------------------
- .after(model().heartbeatTimeout * 2, function () {
- subtitle('<h2>Let\'s stop the leader and watch a re-election happen.</h2>', false);
- })
- .after(100, wait).indefinite()
+ .after(1, wait).indefinite()
.after(1, function () {
- subtitle('', false);
- model().leader().state("stopped")
- })
- .after(model().defaultNetworkLatency, function () {
- model().ensureSingleCandidate()
+ model().latency("A", "C", 0).latency("A", "D", 0).latency("A", "E", 0);
+ model().latency("B", "C", 0).latency("B", "D", 0).latency("B", "E", 0);
+ model().ensureExactCandidate("C");
+ subtitle('<h2>Let\'s separate A & B from C, D & E.</h2>', false);
})
.at(model(), "stateChange", function(event) {
return (event.target.state() === "leader");
})
.after(1, function () {
- subtitle('<h2>Node ' + model().leader().id + ' is now leader of term ' + model().leader().currentTerm() + '.</h2>', false);
+ subtitle('<h2>Because of our partition we now have two leaders in different terms.</h2>', false);
})
.after(1, wait).indefinite()
-
- //------------------------------
- // Split Vote
- //------------------------------
.after(1, function () {
- subtitle('<h2>Requiring a marjority of votes guarantees that only one leader can be elected per term.</h2>', false);
+ model().clients.create("Y");
+ subtitle('<h2>Let\s add another client and try to update both leaders.</h2>', false);
})
.after(1, wait).indefinite()
.after(1, function () {
- subtitle('<h2>If two nodes become candidates at the same time then a split vote can occur.</h2>', false);
+ client("Y").send(node("B"), "SET 3");
+ subtitle('<h2>One client will try to set the value of node B to "3".</h2>', false);
})
.after(1, wait).indefinite()
.after(1, function () {
- subtitle('<h2>Let\'s take a look at a split vote example...</h2>', false);
+ subtitle('<h2>Node B cannot replicate to a majority so its log entry stays uncommitted.</h2>', false);
})
.after(1, wait).indefinite()
.after(1, function () {
- subtitle('', false);
- model().nodes.create("D").init().currentTerm(node("A").currentTerm());
- cluster(["A", "B", "C", "D"]);
-
- // Make sure two nodes become candidates at the same time.
- model().resetToNextTerm();
- var nodes = model().ensureSplitVote();
-
- // Increase latency to some nodes to ensure obvious split.
- model().latency(nodes[0].id, nodes[2].id, model().defaultNetworkLatency * 1.25);
- model().latency(nodes[1].id, nodes[3].id, model().defaultNetworkLatency * 1.25);
+ client("X").send(node("C"), "SET 8");
+ subtitle('<h2>The other client will try to set the value of node C to "8".</h2>', false);
})
- .at(model(), "stateChange", function(event) {
- return (event.target.state() === "candidate");
- })
- .after(model().defaultNetworkLatency * 0.25, function () {
- subtitle('<h2>Two nodes both start an election for the same term...</h2>');
- })
- .after(model().defaultNetworkLatency * 0.75, function () {
- subtitle('<h2>...and each reaches a single follower node before the other.</h2>');
- })
- .after(model().defaultNetworkLatency, function () {
- subtitle('<h2>Now each candidate has 2 votes and can receive no more for this term.</h2>');
+ .after(1, wait).indefinite()
+ .after(1, function () {
+ subtitle('<h2>This will succeed because it can replicate to a majority.</h2>');
})
.after(1, function () {
- subtitle('<h2>The nodes will wait for a new election and try again.</h2>', false);
+ model().resetLatencies();
+ subtitle('<h2>Now let\'s heal the network partition.</h2>');
})
.at(model(), "stateChange", function(event) {
- return (event.target.state() === "leader");
+ return (event.target.id === "B" && event.target.state() === "follower");
})
.after(1, function () {
- model().resetLatencies();
- subtitle('<h2>Node ' + model().leader().id + ' received a majority of votes in term ' + model().leader().currentTerm() + ' so it becomes leader.</h2>', false);
+ subtitle('<h2>Node B will see the higher election term and step down.</h2>');
+ })
+ .after(1, function () {
+ subtitle('<h2>Both nodes A & B will roll back their uncommitted entries and match the new leader\s log.</h2>');
})
.after(1, wait).indefinite()
-
player.play();
};
});
@@ -104,13 +104,17 @@ define([], function () {
for (i = 0; i < messages.length; i += 1) {
message = messages[i];
- source = model.find(message.source).g.transform.baseVal.getItem(0).matrix;
- target = model.find(message.target).g.transform.baseVal.getItem(0).matrix;
- pct = (this.parent().current().playhead() - message.sendTime) / (message.recvTime - message.sendTime);
-
- message.x_px = source.e + ((target.e - source.e) * pct);
- message.y_px = source.f + ((target.f - source.f) * pct);
- message.r = (TYPE[message.type()] ? TYPE[message.type()].size : 2);
+ try {
+ source = model.find(message.source).g.transform.baseVal.getItem(0).matrix;
+ target = model.find(message.target).g.transform.baseVal.getItem(0).matrix;
+ pct = (this.parent().current().playhead() - message.sendTime) / (message.recvTime - message.sendTime);
+
+ message.x_px = source.e + ((target.e - source.e) * pct);
+ message.y_px = source.f + ((target.f - source.f) * pct);
+ message.r = (TYPE[message.type()] ? TYPE[message.type()].size : 2);
+ } catch(e) {
+ // console.log("message layout error: ", e);
+ }
}
};
@@ -35,12 +35,24 @@ define([], function () {
* Sends a command to a node.
*/
Client.prototype.send = function (target, command) {
- var self = this;
- return this.model().send(this, target, command, function () {
+ var message, self = this;
+ message = this.model().send(this, target, command, function () {
self.model().find(target.id).execute(command, function() {
- self.model().send(target, self);
+ self.model().send(target, self, null, function() {
+ self.dispatchChangeEvent("recv");
+ });
});
});
+ self.dispatchChangeEvent("send");
+ return message;
+ };
+
+ /**
+ * Dispatches the event from the client and from the model.
+ */
+ Client.prototype.dispatchEvent = function (event) {
+ playback.DataObject.prototype.dispatchEvent.call(this, event);
+ this.model().dispatchEvent(event);
};
Client.prototype.clone = function (model) {
@@ -35,9 +35,15 @@ define([], function () {
LogEntry.prototype.applyTo = function (node) {
var m = this.command.match(/^(\w+) (\d+)$/);
switch (m[1]) {
+ case "ADD":
+ node._value += parseInt(m[2], 10);
+ break;
case "SET":
node._value = parseInt(m[2], 10);
break;
+ case "SUB":
+ node._value -= parseInt(m[2], 10);
+ break;
}
if (this.callback !== null) {
@@ -145,7 +145,7 @@ define(["./controls", "./client", "./message", "./node"], function (Controls, Cl
var ret,
x = (a < b ? a : b),
y = (a < b ? b : a),
- key = [a, b].join("|");
+ key = [x, y].join("|");
if (arguments.length === 2) {
ret = this.latencies[key];
return (ret !== undefined ? ret : this.defaultNetworkLatency);
@@ -203,6 +203,21 @@ define(["./controls", "./client", "./message", "./node"], function (Controls, Cl
};
/**
+ * Updates the election timers to ensure that a specific node will become candidate.
+ */
+ Model.prototype.ensureExactCandidate = function (candidateId) {
+ var self = this,
+ nodes = this.nodes.toArray().filter(function (node) { return node.id != candidateId && node.electionTimer() !== null; }),
+ minTime = this.nodes.find(candidateId).electionTimer().startTime();
+
+ // Extend other candidate timers.
+ nodes.forEach(function (node) {
+ var minStartTime = minTime + (self.defaultNetworkLatency * 1.25);
+ node.electionTimer().startTime(minStartTime);
+ });
+ };
+
+ /**
* Updates the election timers to ensure that two nodes will become candidates at the same time.
*/
Model.prototype.ensureSplitVote = function () {
@@ -595,18 +595,17 @@ define(["./log_entry"], function (LogEntry) {
Node.prototype.sendAppendEntriesRequest = function (target) {
var self = this,
frame = this.frame(),
- nextIndex = (this._nextIndex[target.id] !== undefined ? this._nextIndex[target.id] : 0),
- prevEntry = this._log[nextIndex-1],
+ nextIndex = (this._nextIndex[target.id] !== undefined ? this._nextIndex[target.id] : 1),
+ prevEntry = this._log[nextIndex-2],
req = {
type: "AEREQ",
term: this.currentTerm(),
leaderId: this.id,
prevLogIndex: (prevEntry !== undefined ? prevEntry.index : 0),
prevLogTerm: (prevEntry !== undefined ? prevEntry.term : 0),
- log: this._log.slice(nextIndex).map(function (entry) { var clone = entry.clone(); clone.callback = null; return clone; }),
+ log: this._log.slice(nextIndex - 1).map(function (entry) { var clone = entry.clone(); clone.callback = null; return clone; }),
leaderCommit: this.commitIndex(),
};
-
this.dispatchChangeEvent("appendEntriesRequestSent", req);
return this.model().send(this, target, req, function() {
@@ -660,6 +659,8 @@ define(["./log_entry"], function (LogEntry) {
// Reset election timeout.
this.resetElectionTimer();
+
+ this.dispatchChangeEvent("logChange");
}
// Send response.
@@ -752,7 +752,7 @@ EventDispatcher.prototype.removeEventListener = function (type, listener) {
if (this._eventListeners[type] !== undefined) {
index = this._eventListeners[type].indexOf(listener);
if (index !== -1) {
- this._eventListeners[type].splice(index, 0);
+ this._eventListeners[type].splice(index, 1);
}
}
return this;
@@ -2142,14 +2142,15 @@ Timer.prototype.after = function (delay, fn) {
*/
Timer.prototype.at = function (target, eventType, fn) {
return this.then(function () {
- var timer = this;
- target.addEventListener(eventType, function (event) {
+ var listener, timer = this;
+ listener = function (event) {
var ret = fn(event);
if (ret !== false) {
- target.removeEventListener(eventType, fn);
+ target.removeEventListener(eventType, listener);
timer.stop();
}
- });
+ };
+ target.addEventListener(eventType, listener);
}).indefinite();
};

0 comments on commit 51fd364

Please sign in to comment.