Skip to content

Commit

Permalink
Elections work (for #1)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Hobden committed Feb 21, 2015
1 parent 4f2dda5 commit cf08db1
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 18 deletions.
76 changes: 60 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ pub mod types;
use std::old_io::net::ip::SocketAddr;
use std::old_io::net::udp::UdpSocket;
use std::old_io::timer::Timer;
use std::time::Duration;
use std::old_io::IoError;
use std::io;
use std::time::Duration;
use std::thread::Thread;
use std::num::Float;
use std::sync::mpsc::{channel, Sender, Receiver};
use std::str;
use std::collections::{HashMap, VecDeque};
Expand Down Expand Up @@ -231,20 +232,22 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
self.persistent_state.inc_current_term();
self.persistent_state.set_voted_for(Some(self.own_id)); // TODO: Is this correct?
self.reset_timer();
let mut status = Vec::with_capacity(self.id_to_addr.len());
println!("{}", status.len());

for (&id, &addr) in self.id_to_addr.clone().iter() {
let mut status = vec![0; self.id_to_addr.len()].into_iter().enumerate().map(|(id, v)| {
// Do it in the loop so we different Uuids.
let (uuid, request) = RemoteProcedureCall::request_vote(
self.persistent_state.get_current_term(),
self.persistent_state.get_voted_for().unwrap(), // TODO: Safe because we just set it. But correct
self.volatile_state.last_applied,
0); // TODO: Get this.
status.push(Transaction { uuid: uuid, state: TransactionState::Polling });
self.send(addr, request);
}
// The old map
if id as u64 == self.own_id {
// Don't request of self.
Transaction { uuid: uuid, state: TransactionState::Accepted }
} else {
let addr = self.id_to_addr[id as u64];
self.send(addr, request);
Transaction { uuid: uuid, state: TransactionState::Polling }
}
}).collect();
self.state = Candidate(status);
// We rely on the loop to handle incoming responses regarding `RequestVote`, don't worry
// about that here.
Expand All @@ -263,6 +266,7 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
Some(id) => *id,
None => unimplemented!(),
};
println!("Node {} handle_request_vote: {:?}", self.own_id, call);
match self.state {
Leader(_) => {
// Re-assert leadership.
Expand All @@ -281,19 +285,22 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
let checks = [
self.persistent_state.get_current_term() < call.term,
self.persistent_state.get_voted_for().is_none(),
self.volatile_state.last_applied < call.last_log_index,
self.volatile_state.last_applied <= call.last_log_index,
true, // TODO: Is the last log term the same?
];
println!("Node {} checks {:?}", self.own_id, checks);
let last_index = self.persistent_state.get_last_index();
match checks.iter().all(|&x| x) {
true => {
println!("Node {} accepts request_vote", self.own_id);
self.leader_id = Some(call.candidate_id);
RemoteProcedureResponse::accept(call.uuid, call.term,
self.persistent_state.get_last_index(), self.volatile_state.commit_index)
},
false => {
// TODO: Handle various error cases.
// Decrement next_index
println!("Node {} rejects request_vote", self.own_id);
let prev = {
let idx = self.persistent_state.get_last_index();
if idx == 0 { 0 } else { idx - 1 }
Expand Down Expand Up @@ -332,9 +339,27 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
Some(id) => *id,
None => unimplemented!(),
};
println!("Node {} handle_append_entries", self.own_id);
match self.state {
Leader(ref state) => {
unimplemented!();
Leader(_) => {
// **This is a non-standard implementation detail.
// If a follower recieves an append_request it will forward it to the leader.
// The leader will treat this no differently than an append_request from it's client.
// TODO: The terms get updated, not sure if that's the right approach.
let updated_terms = call.entries.into_iter().map(|(t, v)| v).collect();
match ClientRequest::append_request(call.prev_log_index, call.prev_log_term, updated_terms) {
ClientRequest::AppendRequest(transformed) => {
match self.handle_append_request(transformed) {
Ok(_) => RemoteProcedureResponse::accept(call.uuid, self.persistent_state.get_current_term(),
self.persistent_state.get_last_index(), // TODO Maybe wrong.
self.volatile_state.commit_index),
Err(_) => RemoteProcedureResponse::reject(call.uuid, self.persistent_state.get_current_term(),
self.leader_id, self.persistent_state.get_last_index(), // TODO Maybe wrong
self.volatile_state.commit_index),
}
},
_ => unreachable!()
}
},
Follower => {
// We need to append the entries to our log and respond.
Expand Down Expand Up @@ -406,6 +431,7 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
Some(id) => *id,
None => return,
};
println!("Node {} handle_accepted", self.own_id);
match self.state {
Leader(ref mut state) => {
// Should be an AppendEntries request response.
Expand All @@ -415,6 +441,8 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
},
Follower => {
// Must have been a response to our last AppendEntries request?
// We need to notify the client. We should ~not~ act on it.
// TODO: Is it possible that a request_vote might fool us? Check here~
unimplemented!();
},
Candidate(_) => {
Expand All @@ -423,20 +451,27 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
if let Candidate(ref mut status) = self.state {
if status[source_id as usize].uuid == response.uuid {
// Set it.
println!("Node {} accepted from {}, {} == {}", self.own_id, source_id, status[source_id as usize].uuid, response.uuid);
status[source_id as usize].state = TransactionState::Accepted;
check_polls = true;
};
} else {
println!("Node {} accepted from {}, {} != {}", self.own_id, source_id, status[source_id as usize].uuid, response.uuid);
}
}
// Clone state because we'll replace it.
if let (true, Candidate(status)) = (check_polls, self.state.clone()) {
// Do we have a majority?
let number_of_votes = status.iter().filter(|&transaction| {
transaction.state == TransactionState::Accepted
}).count();
if number_of_votes > self.addr_to_id.len() / 2 {
let goal = (self.addr_to_id.len() as f64 / 2.0).ceil() as usize;
println!("Node {} has {} votes, needs {}", self.own_id, number_of_votes, goal);
if number_of_votes > goal { // +1 for itself.
// Won election.
self.candidate_to_leader();
}
} else {
println!("Node {} doesn't check polls", self.own_id);
}
}
}
Expand All @@ -448,6 +483,7 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
Some(id) => *id,
None => return,
};
println!("Node {} handle_rejected: {:?}", self.own_id, response);
match self.state {
Leader(ref state) => {
// Should be an AppendEntries request response.
Expand All @@ -471,6 +507,7 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
}
/// This is called when the consuming application issues an append request on it's channel.
fn handle_append_request(&mut self, request: AppendRequest<T>) -> io::Result<Vec<T>> {
println!("Node {} handle_append_request", self.own_id);
match self.state {
Leader(ref state) => {
// Handle the request appropriately.
Expand Down Expand Up @@ -523,6 +560,7 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
// Timers //
////////////
fn handle_timer(&mut self) {
println!("Node {} handle_timer", self.own_id);
match self.state {
Leader(_) => {
// Send heartbeats.
Expand All @@ -546,6 +584,7 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
}
}
fn reset_timer(&mut self) {
println!("Node {} timer RESET", self.own_id);
self.heartbeat = self.timer.oneshot(Duration::milliseconds(self.rng.gen_range::<i64>(HEARTBEAT_MIN, HEARTBEAT_MAX)));
}
//////////////////
Expand All @@ -570,6 +609,7 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
/// Called on heartbeat timeout.
pub fn follower_to_candidate(&mut self) {
// Need to increase term.
println!("Node {} Follower -> Candidate", self.own_id);
self.persistent_state.inc_current_term();
self.state = match self.state {
Follower => Candidate(Vec::with_capacity(self.id_to_addr.len())),
Expand All @@ -579,24 +619,27 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
}
/// Called when the Leader recieves information that they are not the leader.
pub fn leader_to_follower(&mut self) {
println!("Node {} Leader -> Follower", self.own_id);
self.state = match self.state {
Leader(_) => Follower,
_ => panic!("Called leader_to_follower() but was not Leader.")
};
}
/// Called when a Candidate successfully gets elected.
pub fn candidate_to_leader(&mut self) {
println!("Node {} Candidate -> Leader", self.own_id);
self.state = match self.state {
Candidate(_) => Leader(LeaderState {
next_index: Vec::with_capacity(self.id_to_addr.len()),
match_index: Vec::with_capacity(self.id_to_addr.len())
next_index: vec![0u64; self.id_to_addr.len()],
match_index: vec![0u64; self.id_to_addr.len()],
}),
_ => panic!("Called candidate_to_leader() but was not Candidate.")
};
self.leader_id = Some(self.own_id);
}
/// Called when a candidate fails an election. Takes the new leader's ID.
pub fn candidate_to_follower(&mut self, leader_id: u64) {
println!("Node {} Candidate -> Follower", self.own_id);
self.state = match self.state {
Candidate(_) => Follower,
_ => panic!("Called candidate_to_follower() but was not Candidate.")
Expand All @@ -607,6 +650,7 @@ impl<T: Encodable + Decodable + Send + 'static + Clone> RaftNode<T> {
/// TODO: This is currently pointless, but will be meaningful when Candidates
/// have data as part of their variant.
pub fn reset_candidate(&mut self) {
println!("Node {} Candidate RESET", self.own_id);
self.state = match self.state {
Candidate(_) => Candidate(Vec::with_capacity(self.id_to_addr.len())),
_ => panic!("Called reset_candidate() but was not Candidate.")
Expand Down
11 changes: 9 additions & 2 deletions tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use raft::interchange::{ClientRequest, AppendRequest, IndexRange};
use raft::interchange::{RemoteProcedureResponse};
use raft::RaftNode;

use std::old_io::timer::Timer;
use std::time::Duration;
use std::old_io::net::ip::SocketAddr;
use std::old_io::net::udp::UdpSocket;
use std::old_io::net::ip::IpAddr::Ipv4Addr;
Expand All @@ -26,12 +28,12 @@ fn basic_test() {
let (log_0_sender, log_0_reciever) = RaftNode::<String>::start(
0,
nodes.clone(),
Path::new("/tmp/test1")
Path::new("/tmp/test0")
);
let (log_1_sender, log_1_reciever) = RaftNode::<String>::start(
1,
nodes.clone(),
Path::new("/tmp/test2")
Path::new("/tmp/test1")
);
let (log_2_sender, log_2_reciever) = RaftNode::<String>::start(
2,
Expand All @@ -48,5 +50,10 @@ fn basic_test() {
log_0_sender.send(test_command.clone()).unwrap();
// Get the result.
let event = log_0_reciever.recv().unwrap();
let mut timer = Timer::new().unwrap();
let clock = timer.oneshot(Duration::milliseconds(5000)); // If this fails we're in trouble.
let _ = clock.recv();


assert!(event.is_ok());
}

0 comments on commit cf08db1

Please sign in to comment.