Permalink
Browse files

Bug when leader grants vote to candidate with lesser commitIndex is f…

…ixed.
  • Loading branch information...
Yuriy Shelyag
Yuriy Shelyag committed Apr 27, 2016
1 parent 5930adb commit afdd449cee33d64a2a111f49c4eb0a86aef9143f
@@ -1,5 +1,6 @@
{-# LANGUAGE GeneralizedNewtypeDeriving,
FlexibleInstances,
FlexibleContexts,
MultiParamTypeClasses,
TypeFamilies,
StandaloneDeriving #-}
@@ -21,6 +22,8 @@ import Prelude hiding (log)
import Control.Applicative
import Control.Monad.RWS
import Control.Monad.Writer (MonadWriter)
import Control.Monad.Reader (MonadReader)
import Data.ByteString (ByteString)
import Data.ByteString.Lazy.Builder (Builder, byteString)
@@ -61,7 +64,7 @@ send :: (Monad m, IsMessage t a) => NodeId -> t -> TransitionT a f m ()
send n m = tell [CSend n (toMessage m)]
-- | Resets the election timeout.
resetElectionTimeout :: Monad m => TransitionT a f m ()
resetElectionTimeout :: (Monad m , MonadWriter [Command a] m , MonadReader Config m) => m ()
resetElectionTimeout = do
t <- view configElectionTimeout
tell [CResetElectionTimeout t (2 * t)]
@@ -73,11 +76,11 @@ resetHeartbeatTimeout = do
tell [CResetHeartbeatTimeout t]
-- | Logs a message from this `Builder' `b'.
log :: Monad m => Builder -> TransitionT a f m ()
log :: (Monad m , MonadWriter [Command a] m) => Builder -> m ()
log b = tell [CLog b]
-- | Logs a message from this `ByteString'.
logS :: Monad m => ByteString -> TransitionT a f m ()
logS :: Monad m => MonadWriter [Command a] m => ByteString -> m ()
logS = log . byteString
-- | Truncates the log of events to `Index' `i'.
@@ -1,5 +1,6 @@
{-# LANGUAGE GADTs,
RankNTypes #-}
RankNTypes,
OverloadedStrings #-}
-----------------------------------------------------------------------------
-- |
-- Module : Network.Kontiki.Raft
@@ -33,6 +34,11 @@ import Network.Kontiki.Monad
import qualified Network.Kontiki.Raft.Follower as Follower
import qualified Network.Kontiki.Raft.Candidate as Candidate
import qualified Network.Kontiki.Raft.Leader as Leader
import Network.Kontiki.Raft.Utils
import Data.Maybe (fromJust)
import Control.Monad.Writer (runWriterT)
import Control.Monad.Reader (runReader)
-- | Main handler function which, given the `config' of the cluster
-- and `state' of the node, runs the Raft protocol and
@@ -43,15 +49,49 @@ handle :: (Functor m, Monad m, MonadLog m a, GetNewNodeSet a)
-> SomeState -- ^ current state of the node
-> Event a -- ^ incoming event
-> m (SomeState, [Command a]) -- ^ new state and list of commands
handle config state event = case state of
WrapState(Follower s') -> select `fmap` runTransitionT (Follower.handle event) config s'
WrapState(Candidate s') -> select `fmap` runTransitionT (Candidate.handle event) config s'
WrapState(Leader s') -> select `fmap` runTransitionT (Leader.handle event) config s'
handle config state event = do
let (state' , cmd) = updateTerm config state event
(state'', cmd') <- case state' of
WrapState(Follower s') -> select `fmap` runTransitionT (Follower.handle event) config s'
WrapState(Candidate s') -> select `fmap` runTransitionT (Candidate.handle event) config s'
WrapState(Leader s') -> select `fmap` runTransitionT (Leader.handle event) config s'
return (state'', cmd ++ cmd')
where
-- | Drops the middle value from a three-tuple
select :: (a, b, c) -> (a, c)
select (a, _, c) = (a, c)
updateTerm :: Config
-> SomeState
-> Event a
-> (SomeState, [Command a])
updateTerm config state event =
if maybe False (> currentTerm state) newTerm
then (`runReader` config) $ runWriterT $ do
logS "Update current term"
stepDown (fromJust newTerm) (currentIndex state)
else (state,[])
where
newTerm = messageTerm event
messageTerm :: Event a -> Maybe Term
messageTerm (EMessage _ (MRequestVote m)) = Just $ rvTerm m
messageTerm (EMessage _ (MRequestVoteResponse m)) = Just $ rvrTerm m
messageTerm (EMessage _ (MAppendEntries m)) = Just $ aeTerm m
messageTerm (EMessage _ (MAppendEntriesResponse m)) = Just $ aerTerm m
messageTerm EElectionTimeout = Nothing
messageTerm EHeartbeatTimeout = Nothing
currentTerm :: SomeState -> Term
currentTerm (WrapState (Follower s)) = _fCurrentTerm s
currentTerm (WrapState (Leader s)) = _lCurrentTerm s
currentTerm (WrapState (Candidate s)) = _cCurrentTerm s
currentIndex :: SomeState -> Network.Kontiki.Types.Index
currentIndex (WrapState (Follower s)) = _fCommitIndex s
currentIndex (WrapState (Leader s)) = _lCommitIndex s
currentIndex (WrapState (Candidate s)) = _cCommitIndex s
-- | Initial state of all nodes.
initialState :: SomeState
initialState = wrap FollowerState { _fCurrentTerm = term0
@@ -30,16 +30,12 @@ import qualified Network.Kontiki.Raft.Leader as Leader
handleRequestVote :: (Functor m, Monad m) => MessageHandler RequestVote a Candidate m
handleRequestVote sender RequestVote{..} = do
currentTerm <- use cCurrentTerm
commitIndex <- use cCommitIndex
if rvTerm > currentTerm
then stepDown sender rvTerm commitIndex
else do
logS "Not granting vote"
send sender $ RequestVoteResponse { rvrTerm = currentTerm
, rvrVoteGranted = False
}
currentState
logS "Not granting vote"
send sender $ RequestVoteResponse { rvrTerm = currentTerm
, rvrVoteGranted = False
}
currentState
-- | Handles `RequestVoteResponse'.
handleRequestVoteResponse :: (Functor m, Monad m, MonadLog m a)
@@ -52,7 +48,6 @@ handleRequestVoteResponse sender RequestVoteResponse{..} = do
if | rvrTerm < currentTerm -> do
logS "Ignoring RequestVoteResponse for old term"
currentState
| rvrTerm > currentTerm -> stepDown sender rvrTerm commitIndex
| not rvrVoteGranted -> do
logS "Ignoring RequestVoteResponse since vote wasn't granted"
currentState
@@ -83,7 +78,7 @@ handleAppendEntries sender AppendEntries{..} = do
if currentTerm <= aeTerm
then do
logS "Received AppendEntries for current or newer term"
stepDown sender aeTerm commitIndex
stepDown aeTerm commitIndex
else do
logS "Ignoring AppendEntries for old term"
currentState
@@ -89,13 +89,8 @@ handleRequestVote sender RequestVote{..} = do
-- | Handles `RequestVoteResponse'.
handleRequestVoteResponse :: (Functor m, Monad m)
=> MessageHandler RequestVoteResponse a Follower m
handleRequestVoteResponse sender RequestVoteResponse{..} = do
currentTerm <- use fCurrentTerm
commitIndex <- use fCommitIndex
if rvrTerm > currentTerm
then stepDown sender rvrTerm commitIndex
else currentState
handleRequestVoteResponse sender RequestVoteResponse{..} =
currentState
-- | Handles `AppendEntries'.
handleAppendEntries :: (Functor m, Monad m, MonadLog m a, GetNewNodeSet a)
@@ -106,8 +101,7 @@ handleAppendEntries sender AppendEntries{..} = do
e <- logLastEntry
let lastIndex = maybe index0 eIndex e
if | aeTerm > currentTerm -> stepDown sender aeTerm commitIndex
| aeTerm < currentTerm -> do
if | aeTerm < currentTerm -> do
send sender $ AppendEntriesResponse { aerTerm = currentTerm
, aerSuccess = False
, aerLastIndex = lastIndex
@@ -39,38 +39,26 @@ handleRequestVote :: (Functor m, Monad m)
=> MessageHandler RequestVote a Leader m
handleRequestVote sender RequestVote{..} = do
currentTerm <- use lCurrentTerm
commitIndex <- use lCommitIndex
if rvTerm > currentTerm
then stepDown sender rvTerm commitIndex
else do
logS "Not granting vote"
send sender $ RequestVoteResponse { rvrTerm = currentTerm
, rvrVoteGranted = False
}
currentState
-- | TODO: Check that leader can't get not up-to-date log in the same
-- term
logS "Not granting vote"
send sender $ RequestVoteResponse { rvrTerm = currentTerm
, rvrVoteGranted = False
}
currentState
-- | Handle `RequestVoteResponse'.
handleRequestVoteResponse :: (Functor m, Monad m)
=> MessageHandler RequestVoteResponse a Leader m
handleRequestVoteResponse sender RequestVoteResponse{..} = do
currentTerm <- use lCurrentTerm
commitIndex <- use lCommitIndex
if rvrTerm > currentTerm
then stepDown sender rvrTerm commitIndex
else currentState
handleRequestVoteResponse sender RequestVoteResponse{..} =
currentState
-- | Handles `AppendEntries'.
handleAppendEntries :: (Functor m, Monad m)
=> MessageHandler (AppendEntries a) a Leader m
handleAppendEntries sender AppendEntries{..} = do
currentTerm <- use lCurrentTerm
commitIndex <- use lCommitIndex
if aeTerm > currentTerm
then stepDown sender aeTerm commitIndex
else currentState
handleAppendEntries sender AppendEntries{..} =
currentState
-- | Handles `AppendEntriesResponse'.
handleAppendEntriesResponse :: (Functor m, Monad m)
@@ -82,7 +70,6 @@ handleAppendEntriesResponse sender AppendEntriesResponse{..} = do
if | aerTerm < currentTerm -> do
logS "Ignoring old AppendEntriesResponse"
currentState
| aerTerm > currentTerm -> stepDown sender aerTerm commitIndex
| not aerSuccess -> do
lNextIndex %= Map.alter (\i -> Just $ maybe index0 prevIndex i) sender
currentState
@@ -20,6 +20,8 @@ import qualified Data.Set as Set
import Data.ByteString.Char8 ()
import Control.Monad.State.Class (get)
import Control.Monad.Writer (MonadWriter)
import Control.Monad.Reader (MonadReader)
import Control.Lens (view)
@@ -92,17 +94,14 @@ isMajority votes = do
-- `MFollower' mode.
--
-- Can't have this in Follower due to recursive imports, bummer
stepDown :: Monad m => NodeId -> Term -> Index -> TransitionT a f m SomeState
stepDown sender term commitIndex = do
stepDown :: (Monad m , MonadWriter [Command a] m , MonadReader Config m)
=> Term -> Index -> m SomeState
stepDown term commitIndex = do
logS "Stepping down to Follower state"
resetElectionTimeout
send sender RequestVoteResponse { rvrTerm = term
, rvrVoteGranted = True
}
return $ wrap FollowerState { _fCurrentTerm = term
, _fCommitIndex = commitIndex
, _fVotedFor = Just sender
, _fVotedFor = Nothing
}

0 comments on commit afdd449

Please sign in to comment.