From 45f7e57e035ef69885fb912e33d63d7eb52f729d Mon Sep 17 00:00:00 2001 From: s12f <97083380+s12f@users.noreply.github.com> Date: Fri, 13 Oct 2023 11:19:47 +0800 Subject: [PATCH] feat(kafka): naive consumer group (#1629) --- common/server/HStream/Common/Server/Lookup.hs | 18 +- .../HStream/Kafka/Common/KafkaException.hs | 11 + hstream-kafka/HStream/Kafka/Common/Utils.hs | 45 ++ hstream-kafka/HStream/Kafka/Group/Group.hs | 607 ++++++++++++++++++ .../HStream/Kafka/Group/GroupCoordinator.hs | 119 ++++ .../Kafka/Group/GroupMetadataManager.hs | 102 +-- hstream-kafka/HStream/Kafka/Group/Member.hs | 33 + .../HStream/Kafka/Group/OffsetsStore.hs | 9 +- hstream-kafka/HStream/Kafka/Server/Handler.hs | 8 + .../HStream/Kafka/Server/Handler/Group.hs | 40 ++ .../HStream/Kafka/Server/Handler/Offset.hs | 59 +- hstream-kafka/HStream/Kafka/Server/Types.hs | 36 +- hstream-kafka/hstream-kafka.cabal | 8 + 13 files changed, 980 insertions(+), 115 deletions(-) create mode 100644 hstream-kafka/HStream/Kafka/Common/KafkaException.hs create mode 100644 hstream-kafka/HStream/Kafka/Common/Utils.hs create mode 100644 hstream-kafka/HStream/Kafka/Group/Group.hs create mode 100644 hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs create mode 100644 hstream-kafka/HStream/Kafka/Group/Member.hs create mode 100644 hstream-kafka/HStream/Kafka/Server/Handler/Group.hs diff --git a/common/server/HStream/Common/Server/Lookup.hs b/common/server/HStream/Common/Server/Lookup.hs index fec89772f..98c9f19b1 100644 --- a/common/server/HStream/Common/Server/Lookup.hs +++ b/common/server/HStream/Common/Server/Lookup.hs @@ -85,9 +85,18 @@ lookupNodePersist metaHandle gossipContext loadBalanceHashRing data KafkaResource = KafkaResTopic Text + | KafkaResGroup Text + +kafkaResourceKey :: KafkaResource -> Text +kafkaResourceKey (KafkaResTopic name) = name +kafkaResourceKey (KafkaResGroup name) = name + +kafkaResourceMetaId :: KafkaResource -> Text +kafkaResourceMetaId (KafkaResTopic name) = "KafkaResTopic_" <> name +kafkaResourceMetaId (KafkaResGroup name) = "KafkaResGroup_" <> name lookupKafka :: LoadBalanceHashRing -> Maybe Text -> KafkaResource -> IO A.ServerNode -lookupKafka lbhr alk (KafkaResTopic topicId) = lookupNode lbhr topicId alk +lookupKafka lbhr alk res = lookupNode lbhr (kafkaResourceKey res) alk lookupKafkaPersist :: M.MetaHandle @@ -96,6 +105,7 @@ lookupKafkaPersist -> Maybe Text -> KafkaResource -> IO A.ServerNode -lookupKafkaPersist mh gc lbhr alk (KafkaResTopic topicId) = - let metaId = "KafkaResTopic_" <> topicId - in lookupNodePersist mh gc lbhr topicId metaId alk +lookupKafkaPersist mh gc lbhr alk kafkaResource = + let key = kafkaResourceKey kafkaResource + metaId = kafkaResourceMetaId kafkaResource + in lookupNodePersist mh gc lbhr key metaId alk diff --git a/hstream-kafka/HStream/Kafka/Common/KafkaException.hs b/hstream-kafka/HStream/Kafka/Common/KafkaException.hs new file mode 100644 index 000000000..2958f7a8e --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Common/KafkaException.hs @@ -0,0 +1,11 @@ +module HStream.Kafka.Common.KafkaException + ( ErrorCodeException (..) + ) where + +import qualified Control.Exception as E +import qualified Kafka.Protocol.Error as K + +------------------------------------------------------------------------------- + +newtype ErrorCodeException = ErrorCodeException K.ErrorCode deriving Show +instance E.Exception ErrorCodeException diff --git a/hstream-kafka/HStream/Kafka/Common/Utils.hs b/hstream-kafka/HStream/Kafka/Common/Utils.hs new file mode 100644 index 000000000..06c7d7ded --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Common/Utils.hs @@ -0,0 +1,45 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE DuplicateRecordFields #-} + +module HStream.Kafka.Common.Utils where + +import Control.Exception (throw) +import qualified Control.Monad as M +import qualified Data.HashTable.IO as H +import Data.Maybe (fromMaybe) +import qualified Data.Vector as V +import HStream.Kafka.Common.KafkaException (ErrorCodeException (ErrorCodeException)) +import qualified Kafka.Protocol.Encoding as K + +type HashTable k v = H.BasicHashTable k v + +hashtableGet hashTable key errorCode = H.lookup hashTable key >>= \case + Nothing -> throw (ErrorCodeException errorCode) + Just v -> return v + +hashtableDeleteAll hashTable = do + lst <- H.toList hashTable + M.forM_ lst $ \(key, _) -> H.delete hashTable key + +kaArrayToList :: K.KaArray a -> [a] +kaArrayToList = undefined + +listToKaArray :: [a] -> K.KaArray a +listToKaArray = undefined + +kaArrayToVector :: K.KaArray a -> V.Vector a +kaArrayToVector kaArray = fromMaybe V.empty (K.unKaArray kaArray) + +vectorToKaArray :: V.Vector a -> K.KaArray a +vectorToKaArray vec = K.KaArray (Just vec) + +mapKaArray :: (a -> b) -> K.KaArray a -> K.KaArray b +mapKaArray f arr = K.KaArray (fmap (V.map f) (K.unKaArray arr)) + +mapKaArrayM :: (a -> IO b) -> K.KaArray a -> IO (K.KaArray b) +mapKaArrayM f arr = case K.unKaArray arr of + Nothing -> return (K.KaArray Nothing) + Just vec -> K.KaArray . Just <$> V.mapM f vec + +forKaArrayM :: K.KaArray a -> (a -> IO b) -> IO (K.KaArray b) +forKaArrayM = flip mapKaArrayM diff --git a/hstream-kafka/HStream/Kafka/Group/Group.hs b/hstream-kafka/HStream/Kafka/Group/Group.hs new file mode 100644 index 000000000..f40df9ca9 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Group/Group.hs @@ -0,0 +1,607 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE OverloadedRecordDot #-} +{-# LANGUAGE OverloadedStrings #-} + +module HStream.Kafka.Group.Group where + +import qualified Control.Concurrent as C +import Control.Exception (throw) +import Control.Monad (when) +import qualified Control.Monad as M +import qualified Data.ByteString as BS +import qualified Data.HashTable.IO as H +import Data.Int (Int32) +import qualified Data.IORef as IO +import qualified Data.List as List +import Data.Maybe (fromMaybe) +import qualified Data.Set as Set +import qualified Data.Text as T +import qualified Data.UUID as UUID +import qualified Data.UUID.V4 as UUID +import qualified Data.Vector as V +import qualified HStream.Base.Time as Time +import HStream.Kafka.Common.KafkaException (ErrorCodeException (ErrorCodeException)) +import qualified HStream.Kafka.Common.Utils as Utils +import HStream.Kafka.Group.GroupMetadataManager (GroupMetadataManager) +import qualified HStream.Kafka.Group.GroupMetadataManager as GMM +import HStream.Kafka.Group.Member +import qualified HStream.Logger as Log +import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Protocol.Error as K +import qualified Kafka.Protocol.Message as K + +-- TODO: +-- * kafka/group config +-- * configurable +-- * group metadata manager +-- * store group information + +type HashTable k v = H.BasicHashTable k v + +hashtableGet hashTable key errorCode = H.lookup hashTable key >>= \case + Nothing -> throw (ErrorCodeException errorCode) + Just v -> return v + +hashtableDeleteAll hashTable = do + lst <- H.toList hashTable + M.forM_ lst $ \(key, _) -> H.delete hashTable key + +data GroupState + -- Group is preparing to rebalance + -- + -- action: respond to heartbeats with REBALANCE_IN_PROGRESS + -- respond to sync group with REBALANCE_IN_PROGRESS + -- remove member on leave group request + -- park join group requests from new or existing members until all expected members have joined + -- allow offset commits from previous generation + -- allow offset fetch requests + -- transition: some members have joined by the timeout => CompletingRebalance + -- all members have left the group => Empty + -- group is removed by partition emigration => Dead + = PreparingRebalance + + -- Group is awaiting state assignment from the leader + -- + -- action: respond to heartbeats with REBALANCE_IN_PROGRESS + -- respond to offset commits with REBALANCE_IN_PROGRESS + -- park sync group requests from followers until transition to Stable + -- allow offset fetch requests + -- transition: sync group with state assignment received from leader => Stable + -- join group from new member or existing member with updated metadata => PreparingRebalance + -- leave group from existing member => PreparingRebalance + -- member failure detected => PreparingRebalance + -- group is removed by partition emigration => Dead + | CompletingRebalance + + -- Group is stable + -- + -- action: respond to member heartbeats normally + -- respond to sync group from any member with current assignment + -- respond to join group from followers with matching metadata with current group metadata + -- allow offset commits from member of current generation + -- allow offset fetch requests + -- transition: member failure detected via heartbeat => PreparingRebalance + -- leave group from existing member => PreparingRebalance + -- leader join-group received => PreparingRebalance + -- follower join-group with new metadata => PreparingRebalance + -- group is removed by partition emigration => Dead + | Stable + + -- Group has no more members and its metadata is being removed + -- + -- action: respond to join group with UNKNOWN_MEMBER_ID + -- respond to sync group with UNKNOWN_MEMBER_ID + -- respond to heartbeat with UNKNOWN_MEMBER_ID + -- respond to leave group with UNKNOWN_MEMBER_ID + -- respond to offset commit with UNKNOWN_MEMBER_ID + -- allow offset fetch requests + -- transition: Dead is a final state before group metadata is cleaned up, so there are no transitions + | Dead + + -- Group has no more members, but lingers until all offsets have expired. This state + -- also represents groups which use Kafka only for offset commits and have no members. + -- + -- action: respond normally to join group from new members + -- respond to sync group with UNKNOWN_MEMBER_ID + -- respond to heartbeat with UNKNOWN_MEMBER_ID + -- respond to leave group with UNKNOWN_MEMBER_ID + -- respond to offset commit with UNKNOWN_MEMBER_ID + -- allow offset fetch requests + -- transition: last offsets removed in periodic expiration task => Dead + -- join group from a new member => PreparingRebalance + -- group is removed by partition emigration => Dead + -- group is removed by expiration => Dead + | Empty + deriving (Show, Eq) + +data GroupConfig + = GroupConfig + { + } + +data Group + = Group + { lock :: C.MVar () + , groupId :: T.Text + , groupGenerationId :: IO.IORef Int32 + , state :: IO.IORef GroupState + , config :: GroupConfig + , leader :: IO.IORef (Maybe T.Text) + , members :: HashTable T.Text Member + -- , pendingMembers :: HashTable T.Text () + , delayedJoinResponses :: HashTable T.Text (C.MVar K.JoinGroupResponseV0) + -- , pendingSyncMembers :: HashTable T.Text () + -- , newMemberAdded :: IO.IORef Bool + , delayedRebalance :: IO.IORef (Maybe C.ThreadId) + + , delayedSyncResponses :: HashTable T.Text (C.MVar K.SyncGroupResponseV0) + + , metadataManager :: GroupMetadataManager + + -- protocols + , protocolType :: IO.IORef (Maybe T.Text) + , protocolName :: IO.IORef (Maybe T.Text) + , supportedProtcols :: IO.IORef (Set.Set T.Text) + } + +newGroup :: T.Text -> GroupMetadataManager -> IO Group +newGroup group metadataManager = do + lock <- C.newMVar () + state <- IO.newIORef Empty + groupGenerationId <- IO.newIORef 0 + leader <- IO.newIORef Nothing + members <- H.new + -- pendingMembers <- H.new + delayedJoinResponses <- H.new + -- pendingSyncMembers <- H.new + -- newMemberAdded <- IO.newIORef False + delayedRebalance <- IO.newIORef Nothing + + delayedSyncResponses <- H.new + + protocolType <- IO.newIORef Nothing + protocolName <- IO.newIORef Nothing + supportedProtcols <- IO.newIORef Set.empty + + return $ Group + { lock = lock + , groupId = group + , groupGenerationId = groupGenerationId + , state = state + , config = GroupConfig + , leader = leader + , members = members + -- , pendingMembers = pendingMembers + , delayedJoinResponses = delayedJoinResponses + -- , pendingSyncMembers = pendingSyncMembers + -- , newMemberAdded = newMemberAdded + , delayedRebalance = delayedRebalance + + , delayedSyncResponses = delayedSyncResponses + + , metadataManager = metadataManager + + , protocolType = protocolType + , protocolName = protocolName + , supportedProtcols = supportedProtcols + } + +------------------------------------------------------------------------ + +joinGroup :: Group -> K.JoinGroupRequestV0 -> IO K.JoinGroupResponseV0 +joinGroup group@Group{..} req = do + -- delayed response(join barrier) + Log.debug $ "received joinGroup" + delayedResponse <- C.newEmptyMVar + C.withMVar lock $ \_ -> do + -- TODO: GROUP MAX SIZE + + checkSupportedProtocols group req + Log.debug $ "checked protocols" + + -- check state + IO.readIORef group.state >>= \case + CompletingRebalance -> resetGroup group + Stable -> resetGroup group + PreparingRebalance -> pure () + Empty -> pure () + Dead -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) + + Log.debug $ "checked state" + newMemberId <- if T.null req.memberId + then doNewMemberJoinGoup group req + else doCurrentMemeberJoinGroup group req + + Log.debug $ "add delayed response into response list for member:" <> Log.buildString' newMemberId + H.insert delayedJoinResponses newMemberId delayedResponse + + -- waiting other consumers + resp <- C.takeMVar delayedResponse + Log.info $ "joinGroup: received delayed response:" <> Log.buildString' resp + return resp + +checkSupportedProtocols :: Group -> K.JoinGroupRequestV0 -> IO () +checkSupportedProtocols Group{..} req = do + Log.debug $ "checking protocols" + IO.readIORef protocolType >>= \case + Nothing -> pure () + Just pt -> do + when (pt /= req.protocolType) $ do + throw (ErrorCodeException K.INCONSISTENT_GROUP_PROTOCOL) + ps <- IO.readIORef supportedProtcols + let refinedRequestProtocols = (plainProtocols (refineProtocols req.protocols)) + M.when (Set.null (Set.intersection ps refinedRequestProtocols)) $ do + throw (ErrorCodeException K.INCONSISTENT_GROUP_PROTOCOL) + +-- reset group: make it to logical Empty state +resetGroup :: Group -> IO () +resetGroup group@Group{..} = do + Log.info "reseting group" + + -- cancel all delayedSyncResponses + cancelDelayedSyncResponses group + + -- reset leader + IO.writeIORef leader Nothing + + -- cancelDelayedCheckHeartbeats + cancelDelayedCheckHeartbeats group + + -- remove all members + hashtableDeleteAll members + + -- update protocols + IO.writeIORef protocolType Nothing + IO.writeIORef protocolName Nothing + IO.writeIORef supportedProtcols (Set.empty) + +cancelDelayedSyncResponses :: Group -> IO () +cancelDelayedSyncResponses Group{..} = do + lst <- H.toList delayedSyncResponses + M.forM_ lst $ \(memberId, delayed) -> do + Log.info $ "cancel delayed sync response for " <> Log.buildString' memberId + _ <- C.tryPutMVar delayed $ K.SyncGroupResponseV0 K.REBALANCE_IN_PROGRESS BS.empty + H.delete delayedSyncResponses memberId + +doNewMemberJoinGoup :: Group -> K.JoinGroupRequestV0 -> IO T.Text +doNewMemberJoinGoup group req = do + newMemberId <- generateMemberId + Log.debug $ "generated member id:" <> Log.buildString' newMemberId + doDynamicNewMemberJoinGroup group req newMemberId + return newMemberId + +-- TODO: kafka memberId format: clientId(from request context)/group_instance_id + "-" + UUID +generateMemberId :: IO T.Text +generateMemberId = UUID.toText <$> UUID.nextRandom + +doCurrentMemeberJoinGroup :: Group -> K.JoinGroupRequestV0 -> IO T.Text +doCurrentMemeberJoinGroup group req = do + doDynamicNewMemberJoinGroup group req req.memberId + return req.memberId + +doDynamicNewMemberJoinGroup :: Group -> K.JoinGroupRequestV0 -> T.Text -> IO () +doDynamicNewMemberJoinGroup group req newMemberId = do + addMemberAndRebalance group req newMemberId + +addMemberAndRebalance :: Group -> K.JoinGroupRequestV0 -> T.Text -> IO () +addMemberAndRebalance group req newMemberId = do + member <- newMember newMemberId req.sessionTimeoutMs req.protocolType (refineProtocols req.protocols) + addMember group member + -- TODO: check state + prepareRebalance group + +prepareRebalance :: Group -> IO () +prepareRebalance group@Group{..} = do + -- TODO: check state CompletingRebalance + -- TODO: remoe sync expiration + -- isEmptyState <- (Empty ==) <$> IO.readIORef state + + -- setup delayed rebalance if delayedRebalance is Nothing + -- TODO: configurable rebalanceDelayMs + IO.readIORef delayedRebalance >>= \case + Nothing -> do + delayed <- makeDelayedRebalance group 5000 + Log.info $ "created delayed rebalance thread:" <> Log.buildString' delayed + IO.writeIORef delayedRebalance (Just delayed) + IO.writeIORef state PreparingRebalance + _ -> pure () + +-- TODO: dynamically delay +makeDelayedRebalance :: Group -> Int32 -> IO C.ThreadId +makeDelayedRebalance group rebalanceDelayMs = do + C.forkIO $ do + C.threadDelay (1000 * fromIntegral rebalanceDelayMs) + rebalance group + +rebalance :: Group -> IO () +rebalance group@Group{..} = do + C.withMVar lock $ \() -> do + Log.info "rebalancing is starting" + IO.readIORef leader >>= \case + Nothing -> do + Log.info "cancel rebalance without any join request" + IO.writeIORef delayedRebalance Nothing + Log.info "removed delayedRebalance" + IO.writeIORef state Empty + Log.info "state changed: PreparingRebalance -> Empty" + Just leaderMemberId -> do + doRelance group leaderMemberId + +doRelance :: Group -> T.Text -> IO () +doRelance group@Group{..} leaderMemberId = do + -- next generation id + nextGenerationId <- IO.atomicModifyIORef' groupGenerationId (\ggid -> (ggid + 1, ggid + 1)) + Log.info $ "next generation id:" <> Log.buildString' nextGenerationId + <> ", leader:" <> Log.buildString' leaderMemberId + + -- compute and update protocolName + selectedProtocolName <- computeProtocolName group + Log.info $ "selected protocolName:" <> Log.buildString' selectedProtocolName + + leaderMembersInResponse <- map (\(_, m) -> getJoinResponseMember selectedProtocolName m) <$> H.toList members + Log.debug $ "members in join responses" <> Log.buildString' leaderMembersInResponse + + delayedJoinResponseList <- H.toList delayedJoinResponses + + Log.info $ "set all delayed responses, response list size:" <> Log.buildString' (length delayedJoinResponseList) + -- response all delayedJoinResponses + M.forM_ delayedJoinResponseList $ \(memberId, delayed) -> do + let memebersInResponse = if leaderMemberId == memberId then leaderMembersInResponse else [] + resp = K.JoinGroupResponseV0 { + errorCode = 0 + , generationId = nextGenerationId + , protocolName = selectedProtocolName + , leader = leaderMemberId + , memberId = memberId + , members = K.KaArray (Just $ V.fromList memebersInResponse) + } + Log.debug $ "set delayed response:" <> Log.buildString' resp + <> " for " <> Log.buildString' memberId + _ <- C.tryPutMVar delayed resp + H.delete delayedJoinResponses memberId + IO.writeIORef state CompletingRebalance + Log.info "state changed: PreparingRebalance -> CompletingRebalance" + IO.writeIORef delayedRebalance Nothing + Log.info "rebalancing is finished" + +getJoinResponseMember :: T.Text -> Member -> K.JoinGroupResponseMemberV0 +getJoinResponseMember protocol m = + let metadata = snd. fromMaybe ("", "") $ List.find (\(n, _) -> n == protocol) m.supportedProtcols + in K.JoinGroupResponseMemberV0 m.memberId metadata + +computeProtocolName :: Group -> IO T.Text +computeProtocolName group@Group{..} = do + IO.readIORef protocolName >>= \case + Nothing -> do + pn <- chooseProtocolName group + Log.debug $ "choosed protocolName" <> Log.buildString' pn + IO.writeIORef protocolName (Just pn) + pure pn + Just pn -> pure pn + +-- choose protocol name from supportedProtcols +chooseProtocolName :: Group -> IO T.Text +chooseProtocolName Group {..} = do + ps <- IO.readIORef supportedProtcols + Log.debug $ "protocols:" <> Log.buildString' ps + return . head $ Set.toList ps + +addMember :: Group -> Member -> IO () +addMember Group{..} member = do + -- leaderIsEmpty <- IO.readIORef leader + IO.readIORef leader >>= \case + Nothing -> do + IO.writeIORef leader (Just member.memberId) + IO.writeIORef protocolType (Just member.protocolType) + Log.debug $ "init supportedProtcols:" <> Log.buildString' member.supportedProtcols + Log.debug $ "plain supportedProtcols:" <> Log.buildString' (plainProtocols member.supportedProtcols) + IO.writeIORef supportedProtcols (plainProtocols member.supportedProtcols) + _ -> pure () + H.insert members member.memberId member + +plainProtocols :: [(T.Text, BS.ByteString)] -> Set.Set T.Text +plainProtocols = Set.fromList . (map fst) + +-- should return a non-null protocol list +refineProtocols :: K.KaArray K.JoinGroupRequestProtocolV0 -> [(T.Text, BS.ByteString)] +refineProtocols protocols = case K.unKaArray protocols of + Nothing -> throw (ErrorCodeException K.INCONSISTENT_GROUP_PROTOCOL) + Just ps -> if (V.null ps) + then throw (ErrorCodeException K.INCONSISTENT_GROUP_PROTOCOL) + else map (\p -> (p.name, p.metadata)) (V.toList ps) + +------------------- Sync Group ---------------------- + +syncGroup :: Group -> K.SyncGroupRequestV0 -> IO K.SyncGroupResponseV0 +syncGroup group req@K.SyncGroupRequestV0{..} = do + delayed <- C.newEmptyMVar + C.withMVar (group.lock) $ \() -> do + -- check member id + member <- hashtableGet group.members memberId K.UNKNOWN_MEMBER_ID + + -- TODO: check generation id + IO.readIORef group.state >>= \case + CompletingRebalance -> doSyncGroup group req delayed + Stable -> do + assignment <- IO.readIORef member.assignment + M.void $ C.tryPutMVar delayed (K.SyncGroupResponseV0 0 assignment) + PreparingRebalance -> throw (ErrorCodeException K.REBALANCE_IN_PROGRESS) + _ -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) + C.readMVar delayed + +doSyncGroup :: Group -> K.SyncGroupRequestV0 -> C.MVar K.SyncGroupResponseV0 -> IO () +doSyncGroup group@Group{..} req@K.SyncGroupRequestV0{memberId=memberId} delayedResponse = do + -- set delayed response + H.lookup delayedSyncResponses memberId >>= \case + Nothing -> H.insert delayedSyncResponses memberId delayedResponse + _ -> do + Log.warning $ "received duplicated sync group request:" <> Log.buildString' req <> ", rejected" + throw (ErrorCodeException K.UNKNOWN_SERVER_ERROR) + + -- set assignments if this req from leader + (Just leaderMemberId) <- IO.readIORef leader + Log.info $ "sync group leaderMemberId: " <> Log.buildString' leaderMemberId + <> " memberId:" <> Log.buildString' memberId + when (memberId == leaderMemberId) $ do + Log.info $ "received leader SyncGroup request, " <> Log.buildString' memberId + setAndPropagateAssignment group req + + -- setup delayedCheckHeart + setupDelayedCheckHeartbeat group + + -- set state + IO.writeIORef state Stable + +setAndPropagateAssignment :: Group -> K.SyncGroupRequestV0 -> IO () +setAndPropagateAssignment Group{..} req = do + -- set assignments + let assignments = fromMaybe V.empty (K.unKaArray req.assignments) + Log.info $ "setting assignments:" <> Log.buildString' assignments + V.forM_ assignments $ \assignment -> do + Log.info $ "set member assignment, member:" <> Log.buildString' assignment.memberId + <> ", assignment:" <> Log.buildString' assignment.assignment + Just member <- H.lookup members assignment.memberId + -- set assignments + IO.writeIORef member.assignment assignment.assignment + -- propagate assignments + H.lookup delayedSyncResponses assignment.memberId >>= \case + Nothing -> pure () + Just delayed -> do + M.void $ C.tryPutMVar delayed (K.SyncGroupResponseV0 0 assignment.assignment) + -- delete all pending delayedSyncResponses + hashtableDeleteAll delayedSyncResponses + Log.info $ "setAndPropagateAssignment completed" + +leaveGroup :: Group -> K.LeaveGroupRequestV0 -> IO K.LeaveGroupResponseV0 +leaveGroup group@Group{..} req = do + C.withMVar lock $ \() -> do + -- get member + H.lookup members req.memberId >>= \case + Nothing -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) + _ -> pure () + + -- check state + IO.readIORef state >>= \case + Dead -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) + Empty -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) + CompletingRebalance -> resetGroupAndRebalance group + Stable -> resetGroupAndRebalance group + PreparingRebalance -> do + -- TODO: should NOT BE PASSIBLE in this version + Log.warning $ "received a leave group in PreparingRebalance state, ignored it" + <> ", groupId:" <> Log.buildString' req.groupId + <> ", memberId:" <> Log.buildString' req.memberId + throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) + + return $ K.LeaveGroupResponseV0 0 + + +-- default heartbeat interval: 3s +heartbeat :: Group -> K.HeartbeatRequestV0 -> IO K.HeartbeatResponseV0 +heartbeat group@Group{..} req = do + C.withMVar lock $ \() -> do + -- check generation id + checkGroupGenerationId group req.generationId + + -- check state + IO.readIORef state >>= \case + PreparingRebalance -> throw (ErrorCodeException K.REBALANCE_IN_PROGRESS) + CompletingRebalance -> throw (ErrorCodeException K.REBALANCE_IN_PROGRESS) + Dead -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) + Empty -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) + Stable -> pure () + + H.lookup members req.memberId >>= \case + Nothing -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) + Just member -> updateLatestHeartbeat member + return $ K.HeartbeatResponseV0 0 + +checkGroupGenerationId :: Group -> Int32 -> IO () +checkGroupGenerationId Group{..} generationId = do + currentGenerationId <- IO.readIORef groupGenerationId + M.unless (currentGenerationId == generationId) $ do + Log.debug $ "invalid generation id" + <> ", current generationId:" <> Log.buildString' currentGenerationId + <> ", expected generationId" <> Log.buildString' generationId + throw (ErrorCodeException K.ILLEGAL_GENERATION) + +updateLatestHeartbeat :: Member -> IO () +updateLatestHeartbeat Member{..} = do + newLastHeartbeat <- Time.getSystemMsTimestamp + IO.writeIORef lastHeartbeat newLastHeartbeat + Log.debug $ "lastHeartbeat updated, memeber:" <> Log.buildString' memberId + <> ", newLastHeartbeat:" <> Log.buildString' newLastHeartbeat + +setupDelayedCheckHeartbeat :: Group -> IO () +setupDelayedCheckHeartbeat group@Group{..} = do + (flip H.mapM_) members $ \(_, member) -> do + updateLatestHeartbeat member + threadId <- C.forkIO $ delayedCheckHeart group member member.sessionTimeoutMs + Log.debug $ "setup delayed heartbeat check, threadId:" <> Log.buildString' threadId + <> ", member:" <> Log.buildString' member.memberId + IO.writeIORef member.heartbeatThread (Just threadId) + +-- cancel all delayedCheckHearts +cancelDelayedCheckHeartbeats :: Group -> IO () +cancelDelayedCheckHeartbeats Group{..} = do + (flip H.mapM_) members $ \(mid, member)-> do + IO.readIORef member.heartbeatThread >>= \case + Nothing -> pure () + Just tid -> do + Log.info $ "cancel delayedCheckHeart, member:" <> Log.buildString' mid + C.killThread tid + IO.writeIORef member.heartbeatThread Nothing + +delayedCheckHeart :: Group -> Member -> Int32 -> IO () +delayedCheckHeart group member delayMs = do + C.threadDelay (1000 * fromIntegral delayMs) + nextDelayMs <- checkHeartbeatAndMaybeRebalance group member + M.when (nextDelayMs > 0) $ do + delayedCheckHeart group member nextDelayMs + +resetGroupAndRebalance :: Group -> IO () +resetGroupAndRebalance group = do + Log.info $ "starting reset group and prepare rebalance" + resetGroup group + prepareRebalance group + +-- return: nextDelayMs +-- 0 or <0: timeout +-- >0: nextDelayMs +checkHeartbeatAndMaybeRebalance :: Group -> Member -> IO Int32 +checkHeartbeatAndMaybeRebalance group Member{..} = do + C.withMVar group.lock $ \() -> do + now <- Time.getSystemMsTimestamp + lastUpdated <- IO.readIORef lastHeartbeat + let nextDelayMs = sessionTimeoutMs - (fromIntegral (now - lastUpdated)) + M.when (nextDelayMs <= 0) $ do + Log.info $ "heartbeat timeout, memberId:" <> Log.buildString' memberId + <> ", lastHeartbeat:" <> Log.buildString' lastUpdated + <> ", now:" <> Log.buildString' now + <> ", sessionTimeoutMs:" <> Log.buildString' sessionTimeoutMs + -- remove itself (to avoid kill itself in resetGroupAndRebalance) + IO.writeIORef heartbeatThread Nothing + resetGroupAndRebalance group + return nextDelayMs + +------------------- Commit Offsets ------------------------- +commitOffsets :: Group -> K.OffsetCommitRequestV0 -> IO K.OffsetCommitResponseV0 +commitOffsets Group{..} req = do + C.withMVar lock $ \() -> do + IO.readIORef state >>= \case + CompletingRebalance -> throw (ErrorCodeException K.REBALANCE_IN_PROGRESS) + Dead -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) + _ -> do + topics <- Utils.forKaArrayM req.topics $ \K.OffsetCommitRequestTopicV0{..} -> do + res <- GMM.storeOffsets metadataManager name partitions + return $ K.OffsetCommitResponseTopicV0 {partitions = res, name = name} + return K.OffsetCommitResponseV0 {topics=topics} + +------------------- Fetch Offsets ------------------------- +fetchOffsets :: Group -> K.OffsetFetchRequestV0 -> IO K.OffsetFetchResponseV0 +fetchOffsets Group{..} req = do + topics <- Utils.forKaArrayM req.topics $ \K.OffsetFetchRequestTopicV0{..} -> do + res <- GMM.fetchOffsets metadataManager name partitionIndexes + return $ K.OffsetFetchResponseTopicV0 {partitions = res, name = name} + return K.OffsetFetchResponseV0 {topics=topics} diff --git a/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs b/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs new file mode 100644 index 000000000..a6a9ad290 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs @@ -0,0 +1,119 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE OverloadedRecordDot #-} + +module HStream.Kafka.Group.GroupCoordinator where + +import qualified Control.Concurrent as C +import Control.Exception (handle, throw) +import qualified Data.HashTable.IO as H +import Data.Int (Int32) +import qualified Data.Text as T +import HStream.Kafka.Common.KafkaException (ErrorCodeException (ErrorCodeException)) +import qualified HStream.Kafka.Common.Utils as Utils +import HStream.Kafka.Group.Group (Group) +import qualified HStream.Kafka.Group.Group as G +import HStream.Kafka.Group.GroupMetadataManager (mkGroupMetadataManager) +import HStream.Kafka.Group.OffsetsStore (mkCkpOffsetStorage) +import HStream.Store (LDClient) +import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Protocol.Error as K +import qualified Kafka.Protocol.Message as K + +type HashTable k v = H.BasicHashTable k v + +data GroupCoordinator = GroupCoordinator + { groups :: C.MVar (HashTable T.Text Group) + } + +-- TODO: setup from metadata +mkGroupCoordinator :: IO GroupCoordinator +mkGroupCoordinator = do + groups <- H.new >>= C.newMVar + return $ GroupCoordinator {..} + +joinGroup :: GroupCoordinator -> LDClient -> Int32 -> K.JoinGroupRequestV0 -> IO K.JoinGroupResponseV0 +joinGroup coordinator ldClient serverId req = do + handle (\((ErrorCodeException code)) -> makeErrorResponse code) $ do + -- get or create group + group <- getOrMaybeCreateGroup coordinator ldClient serverId req.groupId req.memberId + + -- join group + G.joinGroup group req + where + makeErrorResponse code = return $ K.JoinGroupResponseV0 { + errorCode = code + , generationId = -1 + , protocolName = "" + , leader = "" + , memberId = req.memberId + , members = K.KaArray Nothing + } + +getOrMaybeCreateGroup :: GroupCoordinator -> LDClient -> Int32 -> T.Text -> T.Text -> IO Group +getOrMaybeCreateGroup GroupCoordinator{..} ldClient serverId groupId memberId = do + C.withMVar groups $ \gs -> do + H.lookup gs groupId >>= \case + Nothing -> if T.null memberId + then do + metadataManager <- mkGroupMetadataManager ldClient serverId groupId + ng <- G.newGroup groupId metadataManager + H.insert gs groupId ng + return ng + else throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) + Just g -> return g + +getGroup :: GroupCoordinator -> T.Text -> IO Group +getGroup GroupCoordinator{..} groupId = do + C.withMVar groups $ \gs -> do + H.lookup gs groupId >>= \case + Nothing -> throw (ErrorCodeException K.GROUP_ID_NOT_FOUND) + Just g -> return g + +getGroupM :: GroupCoordinator -> T.Text -> IO (Maybe Group) +getGroupM GroupCoordinator{..} groupId = do + C.withMVar groups $ \gs -> H.lookup gs groupId + +syncGroup :: GroupCoordinator -> K.SyncGroupRequestV0 -> IO K.SyncGroupResponseV0 +syncGroup coordinator req@K.SyncGroupRequestV0{..} = do + handle (\(ErrorCodeException code) -> makeErrorResponse code) $ do + group <- getGroup coordinator groupId + G.syncGroup group req + where makeErrorResponse code = return $ K.SyncGroupResponseV0 { + errorCode = code, + assignment = "" + } + +leaveGroup :: GroupCoordinator -> K.LeaveGroupRequestV0 -> IO K.LeaveGroupResponseV0 +leaveGroup coordinator req = do + handle (\(ErrorCodeException code) -> makeErrorResponse code) $ do + group <- getGroup coordinator req.groupId + G.leaveGroup group req + where makeErrorResponse code = return $ K.LeaveGroupResponseV0 {errorCode=code} + +heartbeat :: GroupCoordinator -> K.HeartbeatRequestV0 -> IO K.HeartbeatResponseV0 +heartbeat coordinator req = do + handle (\(ErrorCodeException code) -> makeErrorResponse code) $ do + group <- getGroup coordinator req.groupId + G.heartbeat group req + where makeErrorResponse code = return $ K.HeartbeatResponseV0 {errorCode=code} + +------------------- Commit Offsets ------------------------- +commitOffsets :: GroupCoordinator -> K.OffsetCommitRequestV0 -> IO K.OffsetCommitResponseV0 +commitOffsets coordinator req = do + handle (\(ErrorCodeException code) -> makeErrorResponse code) $ do + -- TODO: check group and generation id(and if generationId < 0 then add self-management offsets strategy support) + group <- getGroup coordinator req.groupId + G.commitOffsets group req + where makeErrorResponse code = return $ K.OffsetCommitResponseV0 {topics = Utils.mapKaArray (mapTopic code) req.topics} + mapTopic code topic = K.OffsetCommitResponseTopicV0 {partitions=Utils.mapKaArray (mapPartition code) topic.partitions, name=topic.name} + mapPartition code partition = K.OffsetCommitResponsePartitionV0 {errorCode=code, partitionIndex=partition.partitionIndex} + +------------------- Fetch Offsets ------------------------- +-- TODO: improve error report +fetchOffsets :: GroupCoordinator -> K.OffsetFetchRequestV0 -> IO K.OffsetFetchResponseV0 +fetchOffsets coordinator req = do + handle (\(ErrorCodeException code) -> makeErrorResponse code) $ do + group <- getGroup coordinator req.groupId + G.fetchOffsets group req + where makeErrorResponse _ = return $ K.OffsetFetchResponseV0 {topics=K.KaArray Nothing} diff --git a/hstream-kafka/HStream/Kafka/Group/GroupMetadataManager.hs b/hstream-kafka/HStream/Kafka/Group/GroupMetadataManager.hs index 89986fa64..d3bd1c839 100644 --- a/hstream-kafka/HStream/Kafka/Group/GroupMetadataManager.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupMetadataManager.hs @@ -5,41 +5,47 @@ module HStream.Kafka.Group.GroupMetadataManager , fetchOffsets ) where -import Control.Concurrent (MVar, modifyMVar_, newMVar, - withMVar) -import Control.Concurrent.MVar (readMVar) -import Control.Monad (unless) +import Control.Concurrent (MVar, modifyMVar_, + newMVar, withMVar) +import Control.Exception (throw) +import qualified Control.Monad as M import Data.Hashable -import qualified Data.HashMap.Strict as HM -import Data.Int (Int32, Int64) -import qualified Data.Map.Strict as Map -import Data.Maybe (fromMaybe) -import qualified Data.Text as T -import qualified Data.Vector as V -import Data.Word (Word64) -import GHC.Generics (Generic) -import HStream.Kafka.Group.OffsetsStore (OffsetStorage (..), - OffsetStore) -import qualified HStream.Logger as Log -import Kafka.Protocol.Encoding (KaArray (KaArray, unKaArray)) -import qualified Kafka.Protocol.Error as K -import Kafka.Protocol.Message (OffsetCommitRequestPartitionV0 (..), - OffsetCommitResponsePartitionV0 (..), - OffsetFetchResponsePartitionV0 (..)) - -data GroupMetadataManager = GroupMetadataManager - { serverId :: Int +import qualified Data.HashTable.IO as H +import Data.Int (Int32, Int64) +import qualified Data.Map.Strict as Map +import Data.Maybe (fromMaybe) +import qualified Data.Text as T +import qualified Data.Vector as V +import Data.Word (Word64) +import GHC.Generics (Generic) +import HStream.Kafka.Common.KafkaException (ErrorCodeException (ErrorCodeException)) +import qualified HStream.Kafka.Common.Utils as Utils +import HStream.Kafka.Group.OffsetsStore (OffsetStorage (..), + mkCkpOffsetStorage) +import qualified HStream.Logger as Log +import qualified HStream.Store as S +import qualified Kafka.Protocol as K +import Kafka.Protocol.Encoding (KaArray (KaArray, unKaArray)) +import qualified Kafka.Protocol.Error as K +import Kafka.Protocol.Message (OffsetCommitRequestPartitionV0 (..), + OffsetCommitResponsePartitionV0 (..), + OffsetFetchResponsePartitionV0 (..)) + +data GroupMetadataManager = forall os. OffsetStorage os => GroupMetadataManager + { serverId :: Int32 + , ldClient :: S.LDClient , groupName :: T.Text - , offsetsStore :: OffsetStore + , offsetStorage :: os , offsetsCache :: MVar (Map.Map TopicPartition Int64) - , partitionsMap :: MVar (HM.HashMap TopicPartition Word64) + , partitionsMap :: Utils.HashTable TopicPartition Word64 -- ^ partitionsMap maps TopicPartition to the underlying logID } -mkGroupMetadataManager :: OffsetStore -> Int -> T.Text -> IO GroupMetadataManager -mkGroupMetadataManager offsetsStore serverId groupName = do +mkGroupMetadataManager :: S.LDClient -> Int32 -> T.Text -> IO GroupMetadataManager +mkGroupMetadataManager ldClient serverId groupName = do offsetsCache <- newMVar Map.empty - partitionsMap <- newMVar HM.empty + partitionsMap <- H.new + offsetStorage <- mkCkpOffsetStorage ldClient groupName return GroupMetadataManager{..} where @@ -51,28 +57,17 @@ storeOffsets -> T.Text -> KaArray OffsetCommitRequestPartitionV0 -> IO (KaArray OffsetCommitResponsePartitionV0) -storeOffsets GroupMetadataManager{..} topicName arrayOffsets = do +storeOffsets gmm@GroupMetadataManager{..} topicName arrayOffsets = do let offsets = fromMaybe V.empty (unKaArray arrayOffsets) -- check if a TopicPartition that has an offset to be committed is contained in current -- consumer group's partitionsMap. If not, server will return a UNKNOWN_TOPIC_OR_PARTITION -- error, and that error will be convert to COORDINATOR_NOT_AVAILABLE error finally - partitionsInfo <- readMVar partitionsMap - let (notFoundErrs, offsets') = V.partitionWith - ( \OffsetCommitRequestPartitionV0{..} -> - let key = mkTopicPartition topicName partitionIndex - in case HM.lookup key partitionsInfo of - Just logId -> Right $ (key, logId, fromIntegral committedOffset) - Nothing -> Left $ (partitionIndex, K.COORDINATOR_NOT_AVAILABLE) - ) offsets - unless (V.null notFoundErrs) $ do - Log.info $ "consumer group " <> Log.build groupName <> " receive OffsetCommitRequestPartition with unknown topic or partion" - <> ", topic name: " <> Log.build topicName - <> ", partitions: " <> Log.build (show $ V.map fst notFoundErrs) + offsets' <- computeCheckpointOffsets gmm topicName offsets -- write checkpoints let checkPoints = V.foldl' (\acc (_, logId, offset) -> Map.insert logId offset acc) Map.empty offsets' - commitOffsets offsetsStore topicName checkPoints + commitOffsets offsetStorage topicName checkPoints Log.debug $ "consumer group " <> Log.build groupName <> " commit offsets {" <> Log.build (show checkPoints) <> "} to topic " <> Log.build topicName @@ -82,9 +77,30 @@ storeOffsets GroupMetadataManager{..} topicName arrayOffsets = do return $ Map.union updates cache let suc = V.map (\(TopicPartition{topicPartitionIdx}, _, _) -> (topicPartitionIdx, K.NONE)) offsets' - res = V.map (\(partitionIndex, errorCode) -> OffsetCommitResponsePartitionV0{..}) (suc <> notFoundErrs) + res = V.map (\(partitionIndex, errorCode) -> OffsetCommitResponsePartitionV0{..}) suc return KaArray {unKaArray = Just res} +computeCheckpointOffsets :: GroupMetadataManager -> T.Text -> V.Vector K.OffsetCommitRequestPartitionV0 + -> IO (V.Vector (TopicPartition, Word64, Word64)) +computeCheckpointOffsets GroupMetadataManager{..} topicName requestOffsets = do + V.forM requestOffsets $ \OffsetCommitRequestPartitionV0{..} -> do + let tp = mkTopicPartition topicName partitionIndex + H.lookup partitionsMap tp >>= \case + Nothing -> do + -- read partitions and build partitionsMap + partitions <- S.listStreamPartitionsOrdered ldClient (S.transToTopicStreamName topicName) + M.forM_ (zip [0..] (V.toList partitions)) $ \(idx, (_, logId)) -> do + H.insert partitionsMap (TopicPartition topicName idx) logId + case partitions V.!? (fromIntegral partitionIndex) of + Nothing -> do + Log.info $ "consumer group " <> Log.build groupName <> " receive OffsetCommitRequestPartition with unknown topic or partion" + <> ", topic name: " <> Log.build topicName + <> ", partition: " <> Log.build partitionIndex + throw (ErrorCodeException K.UNKNOWN_TOPIC_OR_PARTITION) + -- ^ TODO: better response(and exception) + Just (_, logId) -> return (tp, logId, fromIntegral committedOffset) + Just logId -> return (tp, logId, fromIntegral committedOffset) + fetchOffsets :: GroupMetadataManager -> T.Text diff --git a/hstream-kafka/HStream/Kafka/Group/Member.hs b/hstream-kafka/HStream/Kafka/Group/Member.hs new file mode 100644 index 000000000..f2b38b268 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Group/Member.hs @@ -0,0 +1,33 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE OverloadedRecordDot #-} + +module HStream.Kafka.Group.Member where + +import qualified Control.Concurrent as C +import qualified Data.ByteString as BS +import Data.Int (Int32, Int64) +import qualified Data.IORef as IO +import qualified Data.Text as T + +data Member + = Member + { memberId :: T.Text + , sessionTimeoutMs :: Int32 + , assignment :: IO.IORef BS.ByteString + , lastHeartbeat :: IO.IORef Int64 + , heartbeatThread :: IO.IORef (Maybe C.ThreadId) + + -- protocols + , protocolType :: T.Text + , supportedProtcols :: [(T.Text, BS.ByteString)] + } + +newMember :: T.Text -> Int32 -> T.Text -> [(T.Text, BS.ByteString)] -> IO Member +newMember memberId sessionTimeoutMs protocolType supportedProtcols = do + assignment <- IO.newIORef BS.empty + lastHeartbeat <- IO.newIORef 0 + heartbeatThread <- IO.newIORef Nothing + + -- TODO: check request + return $ Member {..} diff --git a/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs b/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs index 4cba63b37..159394c5a 100644 --- a/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs +++ b/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs @@ -1,6 +1,5 @@ module HStream.Kafka.Group.OffsetsStore ( OffsetStorage(..) - , OffsetStore(..) , mkCkpOffsetStorage ) where @@ -17,11 +16,6 @@ import HStream.Utils (textToCBytes) class OffsetStorage s where commitOffsets :: s -> T.Text -> Map Word64 Word64 -> IO () -data OffsetStore = Ckp CkpOffsetStorage - -instance OffsetStorage OffsetStore where - commitOffsets (Ckp s) = commitOffsets s - -------------------------------------------------------------------------------- data CkpOffsetStorage = CkpOffsetStorage @@ -34,8 +28,9 @@ data CkpOffsetStorage = CkpOffsetStorage mkCkpOffsetStorage :: S.LDClient -> T.Text -> IO CkpOffsetStorage mkCkpOffsetStorage client ckpStoreName = do let cbGroupName = textToCBytes ckpStoreName + logAttrs = S.def{S.logReplicationFactor = S.defAttr1 1} -- FIXME: need to get log attributes from somewhere - S.initOffsetCheckpointDir client S.def + S.initOffsetCheckpointDir client logAttrs ckpStoreId <- S.allocOffsetCheckpointId client cbGroupName ckpStore <- S.newRSMBasedCheckpointStore client ckpStoreId 5000 Log.info $ "mkCkpOffsetStorage with name: " <> Log.build ckpStoreName <> ", storeId: " <> Log.build ckpStoreId diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hs b/hstream-kafka/HStream/Kafka/Server/Handler.hs index e0ad08d2b..c308c8ac8 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hs @@ -5,6 +5,7 @@ module HStream.Kafka.Server.Handler (handlers) where import HStream.Kafka.Server.Handler.Basic import HStream.Kafka.Server.Handler.Consume +import HStream.Kafka.Server.Handler.Group import HStream.Kafka.Server.Handler.Offset import HStream.Kafka.Server.Handler.Produce import HStream.Kafka.Server.Handler.Topic @@ -33,4 +34,11 @@ handlers sc = , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "listOffsets") (handleListOffsetsV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "offsetCommit") (handleOffsetCommitV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "offsetFetch") (handleOffsetFetchV0 sc) + + , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "findCoordinator") (handleFindCoordinatorV0 sc) + + , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "joinGroup") (handleJoinGroupV0 sc) + , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "syncGroup") (handleSyncGroupV0 sc) + , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "leaveGroup") (handleLeaveGroupV0 sc) + , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "heartbeat") (handleHeartbeatV0 sc) ] diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs new file mode 100644 index 000000000..47fe54118 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs @@ -0,0 +1,40 @@ +{-# LANGUAGE CPP #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE OverloadedRecordDot #-} + +module HStream.Kafka.Server.Handler.Group + ( -- 19: CreateTopics + handleFindCoordinatorV0 + , handleJoinGroupV0 + , handleSyncGroupV0 + , handleHeartbeatV0 + , handleLeaveGroupV0 + ) where + +import HStream.Common.Server.Lookup (KafkaResource (..), + lookupKafkaPersist) +import qualified HStream.Kafka.Group.GroupCoordinator as GC +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified HStream.Logger as Log +import qualified HStream.Server.HStreamApi as A +import qualified Kafka.Protocol.Message as K +import qualified Kafka.Protocol.Service as K + +-- FIXME: move to a separated Coordinator module +handleFindCoordinatorV0 :: ServerContext -> K.RequestContext -> K.FindCoordinatorRequestV0 -> IO K.FindCoordinatorResponseV0 +handleFindCoordinatorV0 ServerContext{..} _ req = do + A.ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext loadBalanceHashRing scAdvertisedListenersKey (KafkaResGroup req.key) + Log.info $ "findCoordinator for group:" <> Log.buildString' req.key <> ", result:" <> Log.buildString' serverNodeId + return $ K.FindCoordinatorResponseV0 0 (fromIntegral serverNodeId) serverNodeHost (fromIntegral serverNodePort) + +handleJoinGroupV0 :: ServerContext -> K.RequestContext -> K.JoinGroupRequestV0 -> IO K.JoinGroupResponseV0 +handleJoinGroupV0 ServerContext{..} _ = GC.joinGroup scGroupCoordinator scLDClient (fromIntegral serverID) + +handleSyncGroupV0 :: ServerContext -> K.RequestContext -> K.SyncGroupRequestV0 -> IO K.SyncGroupResponseV0 +handleSyncGroupV0 ServerContext{..} _ = GC.syncGroup scGroupCoordinator + +handleHeartbeatV0 :: ServerContext -> K.RequestContext -> K.HeartbeatRequestV0 -> IO K.HeartbeatResponseV0 +handleHeartbeatV0 ServerContext{..} _ = GC.heartbeat scGroupCoordinator + +handleLeaveGroupV0 :: ServerContext -> K.RequestContext -> K.LeaveGroupRequestV0 -> IO K.LeaveGroupResponseV0 +handleLeaveGroupV0 ServerContext{..} _ = GC.leaveGroup scGroupCoordinator diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs index 79e3f7a00..ae6118223 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs @@ -7,24 +7,21 @@ module HStream.Kafka.Server.Handler.Offset ) where -import Control.Concurrent (withMVar) -import qualified Data.HashMap.Strict as HM -import Data.Int (Int64) -import Data.Maybe (fromMaybe) -import Data.Text (Text) -import Data.Vector (Vector) -import qualified Data.Vector as V +import Data.Int (Int64) +import Data.Maybe (fromMaybe) +import Data.Text (Text) +import Data.Vector (Vector) +import qualified Data.Vector as V -import HStream.Kafka.Common.OffsetManager (getLatestOffset, - getOffsetByTimestamp, - getOldestOffset) -import HStream.Kafka.Group.GroupMetadataManager (fetchOffsets, - storeOffsets) -import HStream.Kafka.Server.Types (ServerContext (..)) -import qualified HStream.Store as S -import qualified Kafka.Protocol as K -import qualified Kafka.Protocol.Error as K -import qualified Kafka.Protocol.Service as K +import HStream.Kafka.Common.OffsetManager (getLatestOffset, + getOffsetByTimestamp, + getOldestOffset) +import qualified HStream.Kafka.Group.GroupCoordinator as GC +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified HStream.Store as S +import qualified Kafka.Protocol as K +import qualified Kafka.Protocol.Error as K +import qualified Kafka.Protocol.Service as K -------------------- -- 2: ListOffsets @@ -79,33 +76,13 @@ listOffsetTopicPartitions ServerContext{..} topicName (Just offsetsPartitions) = -------------------- handleOffsetCommitV0 :: ServerContext -> K.RequestContext -> K.OffsetCommitRequestV0 -> IO K.OffsetCommitResponseV0 -handleOffsetCommitV0 ServerContext{..} _ K.OffsetCommitRequestV0{..} = do - case K.unKaArray topics of - Nothing -> undefined - Just topics' -> do - mgr <- withMVar scGroupMetadataManagers $ return . HM.lookup groupId - case mgr of - Nothing -> undefined - Just groupMgr -> do - response <- V.forM topics' $ \K.OffsetCommitRequestTopicV0{..} -> do - res <- storeOffsets groupMgr name partitions - return $ K.OffsetCommitResponseTopicV0 {partitions = res, name = name} - return . K.OffsetCommitResponseV0 $ K.KaArray {unKaArray = Just response} +handleOffsetCommitV0 ServerContext{..} _ req = do + GC.commitOffsets scGroupCoordinator req -------------------- -- 9: OffsetFetch -------------------- handleOffsetFetchV0 :: ServerContext -> K.RequestContext -> K.OffsetFetchRequestV0 -> IO K.OffsetFetchResponseV0 -handleOffsetFetchV0 ServerContext{..} _ K.OffsetFetchRequestV0{..} = do - case K.unKaArray topics of - Nothing -> undefined - Just topics' -> do - mgr <- withMVar scGroupMetadataManagers $ return . HM.lookup groupId - case mgr of - Nothing -> undefined - Just groupMgr -> do - response <- V.forM topics' $ \K.OffsetFetchRequestTopicV0{..} -> do - res <- fetchOffsets groupMgr name partitionIndexes - return $ K.OffsetFetchResponseTopicV0 {partitions = res, name = name} - return . K.OffsetFetchResponseV0 $ K.KaArray {unKaArray = Just response} +handleOffsetFetchV0 ServerContext{..} _ req = do + GC.fetchOffsets scGroupCoordinator req diff --git a/hstream-kafka/HStream/Kafka/Server/Types.hs b/hstream-kafka/HStream/Kafka/Server/Types.hs index fbc83fd57..9ee856952 100644 --- a/hstream-kafka/HStream/Kafka/Server/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Types.hs @@ -3,24 +3,21 @@ module HStream.Kafka.Server.Types , initServerContext ) where -import Data.Text (Text) +import Data.Text (Text) import Data.Word -import Control.Concurrent (MVar, newMVar) -import Data.HashMap.Strict (HashMap) -import qualified Data.HashMap.Strict as HM -import qualified Data.Text as T -import HStream.Common.Server.HashRing (LoadBalanceHashRing, - initializeHashRing) -import HStream.Gossip.Types (GossipContext) -import HStream.Kafka.Common.OffsetManager (OffsetManager, - newOffsetManager) -import HStream.Kafka.Group.GroupMetadataManager (GroupMetadataManager) -import HStream.Kafka.Server.Config (ServerOpts (..)) -import HStream.MetaStore.Types (MetaHandle) -import HStream.Stats (newServerStatsHolder) -import qualified HStream.Stats as Stats -import qualified HStream.Store as S +import HStream.Common.Server.HashRing (LoadBalanceHashRing, + initializeHashRing) +import HStream.Gossip.Types (GossipContext) +import HStream.Kafka.Common.OffsetManager (OffsetManager, + newOffsetManager) +import HStream.Kafka.Group.GroupCoordinator (GroupCoordinator, + mkGroupCoordinator) +import HStream.Kafka.Server.Config (ServerOpts (..)) +import HStream.MetaStore.Types (MetaHandle) +import HStream.Stats (newServerStatsHolder) +import qualified HStream.Stats as Stats +import qualified HStream.Store as S data ServerContext = ServerContext { serverID :: !Word32 @@ -35,8 +32,7 @@ data ServerContext = ServerContext , loadBalanceHashRing :: !LoadBalanceHashRing , gossipContext :: !GossipContext , scOffsetManager :: !OffsetManager - , scGroupMetadataManagers :: MVar (HashMap T.Text GroupMetadataManager) - -- ^ {groupID: GroupMetadataManager} + , scGroupCoordinator :: GroupCoordinator } initServerContext @@ -50,7 +46,7 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do statsHolder <- newServerStatsHolder epochHashRing <- initializeHashRing gossipContext offsetManager <- newOffsetManager ldclient - groupMetadataManager <- newMVar HM.empty + scGroupCoordinator <- mkGroupCoordinator return ServerContext @@ -66,5 +62,5 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do , loadBalanceHashRing = epochHashRing , gossipContext = gossipContext , scOffsetManager = offsetManager - , scGroupMetadataManagers = groupMetadataManager + , scGroupCoordinator = scGroupCoordinator } diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index 79fa9b014..e2033d2b3 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -109,6 +109,11 @@ library HStream.Kafka.Common.OffsetManager HStream.Kafka.Common.Read HStream.Kafka.Common.RecordFormat + HStream.Kafka.Common.KafkaException + HStream.Kafka.Common.Utils + HStream.Kafka.Group.Group + HStream.Kafka.Group.Member + HStream.Kafka.Group.GroupCoordinator HStream.Kafka.Group.GroupMetadataManager HStream.Kafka.Group.OffsetsStore HStream.Kafka.Network @@ -125,6 +130,8 @@ library HStream.Kafka.Server.Handler.Offset HStream.Kafka.Server.Handler.Produce HStream.Kafka.Server.Handler.Topic + HStream.Kafka.Server.Handler.Offset + HStream.Kafka.Server.Handler.Group hs-source-dirs: . build-depends: @@ -153,6 +160,7 @@ library , unordered-containers , vector , yaml + , uuid , Z-Data default-language: GHC2021