Skip to content

Commit

Permalink
kafka: support set group initialRebalaceDelay (#1806)
Browse files Browse the repository at this point in the history
* add group.initial.rebalance.delay.ms prop

* refactor: support set group initialRebalaceDelay
  • Loading branch information
YangKian committed Apr 30, 2024
1 parent 128261e commit 36fd8b3
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 40 deletions.
38 changes: 20 additions & 18 deletions hstream-kafka/HStream/Kafka/Group/Group.hs
Expand Up @@ -105,18 +105,12 @@ data GroupState
| Empty
deriving (Show, Eq)

data GroupConfig
= GroupConfig
{
}

data Group
= Group
data Group = Group
{ lock :: C.MVar ()
, groupId :: T.Text
, groupGenerationId :: IO.IORef Int32
, state :: IO.IORef GroupState
, config :: GroupConfig
, groupConfig :: GroupConfig
, leader :: IO.IORef (Maybe T.Text)
, members :: HashTable T.Text Member
-- , pendingMembers :: HashTable T.Text ()
Expand All @@ -142,8 +136,12 @@ data Group
, storedMetadata :: IO.IORef Bool
}

newGroup :: T.Text -> GroupOffsetManager -> Meta.MetaHandle -> IO Group
newGroup group metadataManager metaHandle = do
data GroupConfig = GroupConfig
{ groupInitialRebalanceDelay :: Int
} deriving (Show)

newGroup :: T.Text -> GroupOffsetManager -> Meta.MetaHandle -> GroupConfig -> IO Group
newGroup group metadataManager metaHandle config = do
lock <- C.newMVar ()
state <- IO.newIORef Empty
groupGenerationId <- IO.newIORef 0
Expand All @@ -169,7 +167,7 @@ newGroup group metadataManager metaHandle = do
, groupId = group
, groupGenerationId = groupGenerationId
, state = state
, config = GroupConfig
, groupConfig = config
, leader = leader
-- all members
, members = members
Expand All @@ -193,8 +191,13 @@ newGroup group metadataManager metaHandle = do
, storedMetadata = storedMetadata
}

newGroupFromValue :: CM.GroupMetadataValue -> GroupOffsetManager -> Meta.MetaHandle -> IO Group
newGroupFromValue value metadataManager metaHandle = do
newGroupFromValue
:: CM.GroupMetadataValue
-> GroupOffsetManager
-> Meta.MetaHandle
-> GroupConfig
-> IO Group
newGroupFromValue value metadataManager metaHandle config = do
lock <- C.newMVar ()

state <- IO.newIORef (if V.null value.members then Empty else Stable)
Expand All @@ -221,7 +224,7 @@ newGroupFromValue value metadataManager metaHandle = do
, groupId = value.groupId
, groupGenerationId = groupGenerationId
, state = state
, config = GroupConfig
, groupConfig = config
, leader = leader
-- all members
, members = members
Expand Down Expand Up @@ -371,21 +374,20 @@ prepareRebalance group@Group{..} reason = do
-- isEmptyState <- (Empty ==) <$> IO.readIORef state

-- setup delayed rebalance if delayedRebalance is Nothing
-- TODO: configurable initRebalanceDelayMs, 5000 by default
IO.readIORef delayedRebalance >>= \case
Nothing -> do
delayed <- makeDelayedRebalance group 5000
delayed <- makeDelayedRebalance group group.groupConfig.groupInitialRebalanceDelay
Log.info $ "created delayed rebalance thread:" <> Log.buildString' delayed
<> ", group:" <> Log.build groupId
IO.atomicWriteIORef delayedRebalance (Just delayed)
IO.atomicWriteIORef state PreparingRebalance
_ -> pure ()

-- TODO: dynamically delay with initTimeoutMs and RebalanceTimeoutMs
makeDelayedRebalance :: Group -> Int32 -> IO C.ThreadId
makeDelayedRebalance :: Group -> Int -> IO C.ThreadId
makeDelayedRebalance group rebalanceDelayMs = do
C.forkIO $ do
C.threadDelay (1000 * fromIntegral rebalanceDelayMs)
C.threadDelay (1000 * rebalanceDelayMs)
rebalance group

rebalance :: Group -> IO ()
Expand Down
30 changes: 19 additions & 11 deletions hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs
Expand Up @@ -26,15 +26,22 @@ import HStream.Store (LDClient)
import qualified Kafka.Protocol.Error as K

data GroupCoordinator = GroupCoordinator
{ groups :: C.MVar (Utils.HashTable T.Text Group)
, metaHandle :: Meta.MetaHandle
, serverId :: Word32
, ldClient :: LDClient
, offsetTopicReplica :: Int
{ groups :: C.MVar (Utils.HashTable T.Text Group)
, metaHandle :: Meta.MetaHandle
, serverId :: Word32
, ldClient :: LDClient
, groupConfig :: G.GroupConfig
, offsetConfig :: GOM.OffsetConfig
}

mkGroupCoordinator :: Meta.MetaHandle -> LDClient -> Word32 -> Int -> IO GroupCoordinator
mkGroupCoordinator metaHandle ldClient serverId offsetTopicReplica = do
mkGroupCoordinator
:: Meta.MetaHandle
-> LDClient
-> Word32
-> GOM.OffsetConfig
-> G.GroupConfig
-> IO GroupCoordinator
mkGroupCoordinator metaHandle ldClient serverId offsetConfig groupConfig = do
groups <- H.new >>= C.newMVar
return $ GroupCoordinator {..}

Expand All @@ -59,8 +66,8 @@ getOrMaybeCreateGroup GroupCoordinator{..} groupId memberId = do
H.lookup gs groupId >>= \case
Nothing -> if T.null memberId
then do
metadataManager <- GOM.mkGroupOffsetManager ldClient (fromIntegral serverId) groupId offsetTopicReplica
ng <- G.newGroup groupId metadataManager metaHandle
metadataManager <- GOM.mkGroupOffsetManager ldClient (fromIntegral serverId) groupId offsetConfig
ng <- G.newGroup groupId metadataManager metaHandle groupConfig
H.insert gs groupId ng
return ng
else throw (ErrorCodeException K.UNKNOWN_MEMBER_ID)
Expand Down Expand Up @@ -90,7 +97,7 @@ getGroupM GroupCoordinator{..} groupId = do
-- load group from meta store
loadGroupAndOffsets :: GroupCoordinator -> T.Text -> IO ()
loadGroupAndOffsets gc groupId = do
offsetManager <- GOM.mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) groupId gc.offsetTopicReplica
offsetManager <- GOM.mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) groupId gc.offsetConfig
GOM.loadOffsetsFromStorage offsetManager
Meta.getMeta @CM.GroupMetadataValue groupId gc.metaHandle >>= \case
Nothing -> do
Expand All @@ -105,7 +112,8 @@ addGroupByValue gc value offsetManager = do
C.withMVar gc.groups $ \gs -> do
H.lookup gs value.groupId >>= \case
Nothing -> do
ng <- G.newGroupFromValue value offsetManager gc.metaHandle
-- TODO: double check if persistence groupConfig in metastore is needed
ng <- G.newGroupFromValue value offsetManager gc.metaHandle gc.groupConfig
H.insert gs value.groupId ng
Just _ -> do
Log.warning $ "load group failed, group:" <> Log.build value.groupId <> " is loaded"
Expand Down
12 changes: 9 additions & 3 deletions hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs
Expand Up @@ -8,6 +8,7 @@ module HStream.Kafka.Group.GroupOffsetManager
, fetchAllOffsets
, nullOffsets
, loadOffsetsFromStorage
, OffsetConfig (..)
) where

import Control.Exception (throw)
Expand Down Expand Up @@ -50,15 +51,20 @@ data GroupOffsetManager = forall os. OffsetStorage os => GroupOffsetManager
, offsetStorage :: os
, offsetsCache :: IORef (Map.Map TopicPartition Int64)
, partitionsMap :: IORef (Map.Map TopicPartition S.C_LogID)
, offsetConfig :: OffsetConfig
}

data OffsetConfig = OffsetConfig
{ offsetsTopicReplicationFactor :: Int
} deriving (Show)

-- FIXME: if we create a consumer group with groupName haven been used, call
-- mkCkpOffsetStorage with groupName may lead us to a un-clean ckp-store
mkGroupOffsetManager :: S.LDClient -> Int32 -> T.Text -> Int -> IO GroupOffsetManager
mkGroupOffsetManager ldClient serverId groupName offsetReplica = do
mkGroupOffsetManager :: S.LDClient -> Int32 -> T.Text -> OffsetConfig -> IO GroupOffsetManager
mkGroupOffsetManager ldClient serverId groupName offsetConfig = do
offsetsCache <- newIORef Map.empty
partitionsMap <- newIORef Map.empty
offsetStorage <- mkCkpOffsetStorage ldClient groupName offsetReplica
offsetStorage <- mkCkpOffsetStorage ldClient groupName offsetConfig.offsetsTopicReplicationFactor
return GroupOffsetManager{..}

loadOffsetsFromStorage :: GroupOffsetManager -> IO ()
Expand Down
21 changes: 15 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs
Expand Up @@ -146,12 +146,21 @@ instance KafkaConfig OffsetsTopicReplicationFactor where
defaultConfig = OffsetsTopicReplicationFactor 1
SHOWCONFIG(OffsetsTopicReplicationFactor)

data KafkaBrokerConfigs
= KafkaBrokerConfigs
{ autoCreateTopicsEnable :: AutoCreateTopicsEnable
, numPartitions :: NumPartitions
, defaultReplicationFactor :: DefaultReplicationFactor
, offsetsTopicReplication :: OffsetsTopicReplicationFactor
newtype GroupInitialRebalanceDelayMs = GroupInitialRebalanceDelayMs { _value :: Int } deriving (Eq)
instance KafkaConfig GroupInitialRebalanceDelayMs where
name = const "group.initial.rebalance.delay.ms"
value (GroupInitialRebalanceDelayMs v) = T.pack $ show v
isSentitive = const False
fromText t = GroupInitialRebalanceDelayMs <$> textToIntE t
defaultConfig = GroupInitialRebalanceDelayMs 3000
SHOWCONFIG(GroupInitialRebalanceDelayMs)

data KafkaBrokerConfigs = KafkaBrokerConfigs
{ autoCreateTopicsEnable :: AutoCreateTopicsEnable
, numPartitions :: NumPartitions
, defaultReplicationFactor :: DefaultReplicationFactor
, offsetsTopicReplication :: OffsetsTopicReplicationFactor
, groupInitialRebalanceDelay :: GroupInitialRebalanceDelayMs
} deriving (Eq, G.Generic)
instance KafkaConfigs KafkaBrokerConfigs

Expand Down
19 changes: 17 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Types.hs
Expand Up @@ -18,8 +18,10 @@ import HStream.Kafka.Common.FetchManager (FetchContext,
import HStream.Kafka.Common.OffsetManager (OffsetManager,
initOffsetReader,
newOffsetManager)
import qualified HStream.Kafka.Group.Group as G
import HStream.Kafka.Group.GroupCoordinator (GroupCoordinator,
mkGroupCoordinator)
import qualified HStream.Kafka.Group.GroupOffsetManager as GOM
import HStream.Kafka.Server.Config (ServerOpts (..))
import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import HStream.MetaStore.Types (MetaHandle (..))
Expand Down Expand Up @@ -62,8 +64,9 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do
statsHolder <- newServerStatsHolder
epochHashRing <- initializeHashRing gossipContext

let replica = _kafkaBrokerConfigs.offsetsTopicReplication._value
scGroupCoordinator <- mkGroupCoordinator mh ldclient _serverID replica
let groupConfigs = brokerConfigToGroupConfig _kafkaBrokerConfigs
offsetConfigs = brokerConfigToOffsetConfig _kafkaBrokerConfigs
scGroupCoordinator <- mkGroupCoordinator mh ldclient _serverID offsetConfigs groupConfigs
-- must be initialized later
offsetManager <- newOffsetManager ldclient
-- Trick to avoid use maybe, must be initialized later
Expand Down Expand Up @@ -113,3 +116,15 @@ initConnectionContext sc = do
!fc <- initFetchContext (scLDClient sc)

pure sc{scOffsetManager = om, fetchCtx = fc}

brokerConfigToOffsetConfig :: KC.KafkaBrokerConfigs -> GOM.OffsetConfig
brokerConfigToOffsetConfig KC.KafkaBrokerConfigs{..} =
GOM.OffsetConfig {
offsetsTopicReplicationFactor = offsetsTopicReplication._value
}

brokerConfigToGroupConfig :: KC.KafkaBrokerConfigs -> G.GroupConfig
brokerConfigToGroupConfig KC.KafkaBrokerConfigs{..} =
G.GroupConfig {
groupInitialRebalanceDelay = groupInitialRebalanceDelay._value
}

0 comments on commit 36fd8b3

Please sign in to comment.