This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

@@ -1,14 +1,28 @@
package edu.duke.raft;

import java.util.Timer;

public class CandidateMode extends RaftMode {
public void go () {

//generate a randomize timeout in the range
private final static int ELECTION_ROUND =(int)(((double)ELECTION_TIMEOUT_MAX-(double)ELECTION_TIMEOUT_MIN)*Math.random())+ELECTION_TIMEOUT_MIN;
private Timer mTimer;

public void go () {
synchronized (mLock) {
int term = 0;
//int term = 0;
int term = mConfig.getCurrentTerm()+1;
System.out.println ("S" +
mID +
"." +
term +
": switched to candidate mode.");
mConfig.setCurrentTerm(term, mID);
RaftResponses.setTerm(term);
RaftResponses.clearAppendResponses(term);
RaftResponses.clearVotes(term);
//part 2应该改成commitIndex和对应的term
this.requestVote(term, mID, mLog.getLastIndex(), mLog.getLastTerm());
}
}

@@ -24,6 +38,19 @@ public int requestVote (int candidateTerm,
int lastLogTerm) {
synchronized (mLock) {
int term = mConfig.getCurrentTerm ();
int num = mConfig.getNumServers();
int lastIndex = mLog.getLastIndex();
int lastTerm = mLog.getLastTerm();
for (int i = 1; i<=num;i++)
{
if (i == mID)
{
continue;
}
this.remoteRequestVote(i, term, mID, lastIndex, lastTerm);
}
//start timer
mTimer = this.scheduleTimer(ELECTION_ROUND, mID);
int result = term;
return result;
}
@@ -47,13 +74,40 @@ public int appendEntries (int leaderTerm,
synchronized (mLock) {
int term = mConfig.getCurrentTerm ();
int result = term;
//do not consider the case when client send requestion to candidate
//always say no
return result;
}
}

// @param id of the timer that timed out
public void handleTimeout (int timerID) {
synchronized (mLock) {
mTimer.cancel();
//check response
int[] vResponses = RaftResponses.getVotes(mConfig.getCurrentTerm());
int yes = 0;
for(int i = 0; i<vResponses.length;i++)
{
yes += (vResponses[i]==0?1:0);
}
int majority = (mConfig.getNumServers()+1)/2;
if (yes>=majority)
{
//switch to leader
RaftMode mode = new LeaderMode();
mode.go();
}
else
{
//switch back to follower mode
int term = mConfig.getCurrentTerm()-1;
mConfig.setCurrentTerm(term, 0); //0 means no vote history
RaftResponses.clearAppendResponses(term);
RaftResponses.clearVotes(term);
RaftMode mode = new FollowerMode();
mode.go();
}
}
}
}
@@ -1,15 +1,26 @@
package edu.duke.raft;

import java.util.Timer;


public class FollowerMode extends RaftMode {
public void go () {
private Timer mTimer;
private int ELECTION_TIMEOUT;

public void go () {
synchronized (mLock) {
int term = 0;
System.out.println ("S" +
mID +
"." +
term +
": switched to follower mode.");
//set a time to handle timeout
ELECTION_TIMEOUT = (int)(((double)ELECTION_TIMEOUT_MAX-(double)ELECTION_TIMEOUT_MIN)*Math.random())+ELECTION_TIMEOUT_MIN;
mTimer = this.scheduleTimer(ELECTION_TIMEOUT,mID);

}
//got heartbeat or appendrequests
}

// @param candidate’s term
@@ -21,18 +32,26 @@ public void go () {
public int requestVote (int candidateTerm,
int candidateID,
int lastLogIndex,
int lastLogTerm) { //知道candidate的信息,对比自己并且set response
int lastLogTerm) {
synchronized (mLock) {
mTimer.cancel();
int term = mConfig.getCurrentTerm ();
int vote = term;
if (lastLogTerm>term || (lastLogTerm == term && lastLogTerm>=mLog.getLastIndex()))

if (candidateTerm<=term || mConfig.getVotedFor() != 0)
{
//vote
return vote;
}
else
//candidateTerm>term

if (lastLogTerm>term || (lastLogTerm == term && lastLogIndex>=mLog.getLastIndex()))
{
//say no
//say yes, update local term
vote = 0;
mConfig.setCurrentTerm(candidateTerm, candidateID); //set current term and voted for
}
//default say no
mTimer = this.scheduleTimer(ELECTION_TIMEOUT,mID);
return vote;
}
}
@@ -53,16 +72,56 @@ public int appendEntries (int leaderTerm,
Entry[] entries,
int leaderCommit) {
synchronized (mLock) {
//cancel local timer
mTimer.cancel();
int term = mConfig.getCurrentTerm ();
int result = term;

if (entries == null) //is heartbeat, no append, just update term and lastApplied
{
mTimer = this.scheduleTimer(ELECTION_TIMEOUT,mID);
mConfig.setCurrentTerm(Math.max(term, prevLogTerm), 0);
mLastApplied = Math.max(mLastApplied, mCommitIndex);
return term;
}
//true append
if (prevLogIndex == -1) //should append from start
{
mLog.append(entries);
result =0;
if (leaderCommit>mCommitIndex) //only effecive when we append something
{
mCommitIndex = Math.min(leaderCommit, mLog.getLastIndex());
}
}
else
{
Entry testEntry = mLog.getEntry(prevLogIndex);
if (testEntry != null &&testEntry.term == prevLogTerm)
{
//append, say yes and setCurrentTerm
mLog.insert(entries, prevLogIndex, prevLogTerm);
result = 0;
if (leaderCommit>mCommitIndex)
{
mCommitIndex = Math.min(leaderCommit, mLog.getLastIndex());
}
}
}
mConfig.setCurrentTerm(Math.max(term, prevLogTerm), 0);
mLastApplied = Math.max(mLastApplied, mCommitIndex);

//set a new timer
mTimer = this.scheduleTimer(ELECTION_TIMEOUT,mID);
return result;
}
}

// @param id of the timer that timed out
public void handleTimeout (int timerID) {
synchronized (mLock) {
RaftMode mode = new CandidateMode();
RaftServerImpl.setMode (mode);
}
}
}

}
@@ -1,14 +1,25 @@
package edu.duke.raft;

import java.rmi.Naming;
import java.util.Timer;

public class LeaderMode extends RaftMode {
public void go () {

private Timer heartTimer,mTimer; //heartbeat and appendrequest

public void go () {
synchronized (mLock) {
int term = 0;
//int term = 0;
int term = mConfig.getCurrentTerm(); //already +1 in candidate mode
System.out.println ("S" +
mID +
"." +
term +
": switched to leader mode.");
RaftResponses.setTerm(term); //should be modified in candidate
RaftResponses.clearVotes(term);
RaftResponses.clearAppendResponses(term);
heartTimer = scheduleTimer (HEARTBEAT_INTERVAL, mID); //start send heartbeat
}
}

@@ -25,7 +36,10 @@ public int requestVote (int candidateTerm,
synchronized (mLock) {
int term = mConfig.getCurrentTerm ();
int vote = term;
return vote;
//if leader recover from failure, it is in follower mode
//weird to receive such request in leader mode
System.out.println("Leader get requeste vote");
return vote; //always say no
}
}

@@ -45,15 +59,65 @@ public int appendEntries (int leaderTerm,
Entry[] entries,
int leaderCommit) {
synchronized (mLock) {
//set append round time
mTimer = scheduleTimer (HEARTBEAT_INTERVAL, mID+1);
int num = mConfig.getNumServers();
int prevIndex = mLog.getLastIndex()-1;
int lastTerm = mLog.getLastTerm();
int currTerm = mConfig.getCurrentTerm();
int prevTerm = mLog.getEntry(prevIndex).term;
//如何维持与不同follower的entries,index记录?
for (int i = 1; i<=num;i++)
{
this.remoteAppendEntries(i,currTerm, mID, prevIndex, prevTerm, entries, mCommitIndex);
}
int term = mConfig.getCurrentTerm ();
int result = term;
return result;
return result; //meaningless for leader response
}
}

// @param id of the timer that timed out
public void handleTimeout (int timerID) {
synchronized (mLock) {
int num = mConfig.getNumServers(); //number of servers
if (timerID == mID) //heartbeat time out
{
for (int i = 0; i<=num;i++)
{
if (num != mID)
{
this.appendEntries(mConfig.getCurrentTerm(), mID, 0, 0, null, 0); //send heartbeat to every follower
}
}
heartTimer = scheduleTimer (HEARTBEAT_INTERVAL, mID);
}
else if (timerID == mID+1) //append time out
{
mTimer.cancel();
//check response
int[] tResponses = RaftResponses.getAppendResponses(mConfig.getCurrentTerm());
foreach (int r in tResponses)
{
if (r == -1) //follower did not response(fail?)
{
continue;
}
else if (r == 0) //successful
{
continue;
}
else //reject append
{
//如何send不同的entries,previndex给follower
}
}
mTimer = scheduleTimer (HEARTBEAT_INTERVAL, mID+1);
}
else
{
System.out.println("Unknown time out");
}
}
}
}