Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: fix strategy on choosing partition assignment protocol #1816

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 20 additions & 10 deletions hstream-kafka/HStream/Kafka/Group/Group.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import qualified Data.ByteString as BS
import qualified Data.HashTable.IO as H
import Data.Int (Int32)
import qualified Data.IORef as IO
import qualified Data.List as List
import qualified Data.List as L
import qualified Data.List.Extra as L
import qualified Data.Map as Map
import Data.Maybe (fromMaybe, isJust,
listToMaybe)
Expand Down Expand Up @@ -560,7 +561,7 @@ getJoinResponseMember protocol m = do
getMemberMetadata :: Member -> T.Text -> IO BS.ByteString
getMemberMetadata member protocol = do
memberProtocols <- IO.readIORef member.supportedProtocols
return $ snd . fromMaybe ("", "") $ List.find (\(n, _) -> n == protocol) memberProtocols
return $ snd . fromMaybe ("", "") $ L.find (\(n, _) -> n == protocol) memberProtocols

computeProtocolName :: Group -> IO T.Text
computeProtocolName group@Group{..} = do
Expand All @@ -571,15 +572,24 @@ computeProtocolName group@Group{..} = do
pure pn
Just pn -> pure pn

-- choose protocol name from supportedProtocols
-- | Select the protocol for this group which is supported by all members.
-- This is done by letting each member vote for one of the protocols
-- and choose the one with the most votes. See Member.hs#voteForProtocol.
-- Also see kafka.coordinator.group.GroupMetadata$selectProtocol.
-- FIXME: How about protocols with the same occurrence?
chooseProtocolName :: Group -> IO T.Text
chooseProtocolName Group {..} = do
ps <- IO.readIORef supportedProtocols
let pn = head $ Set.toList ps
Log.info $ "choose protocol:" <> Log.build pn
<> ", current supported protocols:" <> Log.buildString' ps
<> ", group:" <> Log.build groupId
return pn
chooseProtocolName group = do
candicateProtocols <- IO.readIORef group.supportedProtocols
members_ <- (L.map snd) <$> H.toList group.members
chosenProtocol <-
mostOccur <$> mapM (voteForProtocol candicateProtocols) members_
Log.info $ "choose protocol:" <> Log.build chosenProtocol
<> ", current supported protocols:" <> Log.buildString' candicateProtocols
<> ", group:" <> Log.build group.groupId
return chosenProtocol
where
mostOccur :: [T.Text] -> T.Text
mostOccur = L.head . L.maximumOn L.length . L.group . L.sort

updateSupportedProtocols :: Group -> [(T.Text, BS.ByteString)] -> IO ()
updateSupportedProtocols Group{..} protocols = do
Expand Down
40 changes: 29 additions & 11 deletions hstream-kafka/HStream/Kafka/Group/Member.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@

module HStream.Kafka.Group.Member where

import qualified Control.Concurrent as C
import Control.Monad (join)
import qualified Data.ByteString as BS
import Data.Int (Int32, Int64)
import qualified Data.IORef as IO
import Data.Maybe (fromMaybe)
import qualified Data.Text as T
import qualified HStream.Common.Server.MetaData as CM
import qualified HStream.Kafka.Common.Utils as Utils
import qualified Kafka.Protocol as K
import qualified Kafka.Protocol.Service as K
import qualified Control.Concurrent as C
import Control.Exception (throw)
import Control.Monad (join)
import qualified Data.ByteString as BS
import Data.Int (Int32, Int64)
import qualified Data.IORef as IO
import qualified Data.List as L
import Data.Maybe (fromMaybe)
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified HStream.Common.Server.MetaData as CM
import HStream.Kafka.Common.KafkaException (ErrorCodeException (..))
import qualified HStream.Kafka.Common.Utils as Utils
import qualified Kafka.Protocol as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Service as K

data Member
= Member
Expand Down Expand Up @@ -92,3 +97,16 @@ newMemberFromValue groupValue value = do
, clientId=value.clientId
, clientHost=value.clientHost
}

-- | Vote for a protocol the member prefers from a set of candidates,
-- which **all members support**. "prefer" means following the order
-- of protocols the member supports.
-- Throw an exception if no protocol is found. **This should not happen**
-- because it is caller's responsibility to ensure the 'candidates'
-- argument is the common subset of all members' supported protocols!
voteForProtocol :: Set.Set T.Text -> Member -> IO T.Text
voteForProtocol candidates member = do
supportedProtocols' <- (L.map fst) <$> IO.readIORef member.supportedProtocols
case L.find (`Set.member` candidates) supportedProtocols' of
Nothing -> throw (ErrorCodeException K.INCONSISTENT_GROUP_PROTOCOL)
Just protocol -> return protocol