Skip to content

Commit

Permalink
kafka: support set broker property from cli (#1799)
Browse files Browse the repository at this point in the history
* support merge kafkaConfig

* support parse OffsetsTopicReplicationFactor

* create offset topic with OffsetsTopicReplication

* pretty print server opts when start

---------

Co-authored-by: mu <59917266+4eUeP@users.noreply.github.com>
  • Loading branch information
YangKian and 4eUeP committed Apr 26, 2024
1 parent a94e55d commit eb5f4e2
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 68 deletions.
2 changes: 2 additions & 0 deletions conf/hstream.yaml
Expand Up @@ -295,6 +295,8 @@ kafka:
#
#num.partitions: 1
#default.replication.factor: 1
#auto.create.topic.enable: true
#offsets.topic.replication.factor: 1

# Internal storage options
#
Expand Down
20 changes: 9 additions & 11 deletions hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs
Expand Up @@ -19,23 +19,22 @@ import HStream.Kafka.Common.KafkaException (ErrorCodeException (Err
import qualified HStream.Kafka.Common.Utils as Utils
import HStream.Kafka.Group.Group (Group)
import qualified HStream.Kafka.Group.Group as G
import HStream.Kafka.Group.GroupOffsetManager (mkGroupOffsetManager)
import qualified HStream.Kafka.Group.GroupOffsetManager as GOM
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as Meta
import HStream.Store (LDClient)
import qualified Kafka.Protocol.Error as K

data GroupCoordinator = GroupCoordinator
{ groups :: C.MVar (Utils.HashTable T.Text Group)

, metaHandle :: Meta.MetaHandle
, serverId :: Word32
, ldClient :: LDClient
{ groups :: C.MVar (Utils.HashTable T.Text Group)
, metaHandle :: Meta.MetaHandle
, serverId :: Word32
, ldClient :: LDClient
, offsetTopicReplica :: Int
}

mkGroupCoordinator :: Meta.MetaHandle -> LDClient -> Word32 -> IO GroupCoordinator
mkGroupCoordinator metaHandle ldClient serverId = do
mkGroupCoordinator :: Meta.MetaHandle -> LDClient -> Word32 -> Int -> IO GroupCoordinator
mkGroupCoordinator metaHandle ldClient serverId offsetTopicReplica = do
groups <- H.new >>= C.newMVar
return $ GroupCoordinator {..}

Expand All @@ -54,14 +53,13 @@ instance TM.TaskManager GroupCoordinator where

unloadTaskAsync = unloadGroup


getOrMaybeCreateGroup :: GroupCoordinator -> T.Text -> T.Text -> IO Group
getOrMaybeCreateGroup GroupCoordinator{..} groupId memberId = do
C.withMVar groups $ \gs -> do
H.lookup gs groupId >>= \case
Nothing -> if T.null memberId
then do
metadataManager <- mkGroupOffsetManager ldClient (fromIntegral serverId) groupId
metadataManager <- GOM.mkGroupOffsetManager ldClient (fromIntegral serverId) groupId offsetTopicReplica
ng <- G.newGroup groupId metadataManager metaHandle
H.insert gs groupId ng
return ng
Expand Down Expand Up @@ -92,7 +90,7 @@ getGroupM GroupCoordinator{..} groupId = do
-- load group from meta store
loadGroupAndOffsets :: GroupCoordinator -> T.Text -> IO ()
loadGroupAndOffsets gc groupId = do
offsetManager <- mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) groupId
offsetManager <- GOM.mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) groupId gc.offsetTopicReplica
GOM.loadOffsetsFromStorage offsetManager
Meta.getMeta @CM.GroupMetadataValue groupId gc.metaHandle >>= \case
Nothing -> do
Expand Down
6 changes: 3 additions & 3 deletions hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs
Expand Up @@ -54,11 +54,11 @@ data GroupOffsetManager = forall os. OffsetStorage os => GroupOffsetManager

-- FIXME: if we create a consumer group with groupName haven been used, call
-- mkCkpOffsetStorage with groupName may lead us to a un-clean ckp-store
mkGroupOffsetManager :: S.LDClient -> Int32 -> T.Text -> IO GroupOffsetManager
mkGroupOffsetManager ldClient serverId groupName = do
mkGroupOffsetManager :: S.LDClient -> Int32 -> T.Text -> Int -> IO GroupOffsetManager
mkGroupOffsetManager ldClient serverId groupName offsetReplica = do
offsetsCache <- newIORef Map.empty
partitionsMap <- newIORef Map.empty
offsetStorage <- mkCkpOffsetStorage ldClient groupName
offsetStorage <- mkCkpOffsetStorage ldClient groupName offsetReplica
return GroupOffsetManager{..}

loadOffsetsFromStorage :: GroupOffsetManager -> IO ()
Expand Down
6 changes: 3 additions & 3 deletions hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs
Expand Up @@ -34,10 +34,10 @@ data CkpOffsetStorage = CkpOffsetStorage
, trimCkpWorker :: !CompactedWorker
}

mkCkpOffsetStorage :: S.LDClient -> T.Text -> IO CkpOffsetStorage
mkCkpOffsetStorage client ckpStoreName = do
mkCkpOffsetStorage :: S.LDClient -> T.Text -> Int -> IO CkpOffsetStorage
mkCkpOffsetStorage client ckpStoreName replica = do
let cbGroupName = textToCBytes ckpStoreName
logAttrs = S.def{S.logReplicationFactor = S.defAttr1 1}
logAttrs = S.def{S.logReplicationFactor = S.defAttr1 replica}
-- FIXME: need to get log attributes from somewhere
S.initOffsetCheckpointDir client logAttrs
ckpStoreId <- S.allocOffsetCheckpointId client cbGroupName
Expand Down
81 changes: 51 additions & 30 deletions hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs
Expand Up @@ -14,27 +14,32 @@ module HStream.Kafka.Server.Config.FromCli
, parseMetaStoreAddr
) where

import qualified Data.Attoparsec.Text as AP
import Data.ByteString (ByteString)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as T
import Data.Word (Word16, Word32)
import Options.Applicative as O (auto, flag, help, long,
maybeReader, metavar,
option, optional,
short, strOption,
value, (<**>), (<|>))
import qualified Options.Applicative as O
import System.Environment (getProgName)
import System.Exit (exitSuccess)
import Z.Data.CBytes (CBytes)

import qualified Data.Attoparsec.Text as AP
import Data.Bifunctor (second)
import Data.ByteString (ByteString)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as T
import Data.Word (Word16, Word32)
import Options.Applicative as O (auto, flag, help,
long,
maybeReader,
metavar, option,
optional, short,
strOption, value,
(<**>), (<|>))
import qualified Options.Applicative as O
import System.Environment (getProgName)
import System.Exit (exitSuccess)
import Z.Data.CBytes (CBytes)

import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import HStream.Kafka.Server.Config.Types
import qualified HStream.Logger as Log
import HStream.Store (Compression (..))
import HStream.Store.Logger (LDLogLevel (..))
import qualified HStream.Logger as Log
import HStream.Store (Compression (..))
import HStream.Store.Logger (LDLogLevel (..))

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -78,19 +83,19 @@ cliOptionsParser = do

cliServerLogLevel <- optional logLevelParser
cliServerLogWithColor <- logWithColorParser
cliServerFileLog <- optional fileLoggerSettingsParser
cliServerLogFlushImmediately <- logFlushImmediatelyParser
cliServerFileLog <- optional fileLoggerSettingsParser

cliServerGossipAddress <- optional serverGossipAddressParser
cliServerGossipPort <- optional serverGossipPortParser
cliSeedNodes <- optional seedNodesParser
cliSeedNodes <- optional seedNodesParser

cliServerAdvertisedAddress <- optional advertisedAddressParser
cliServerAdvertisedListeners <- advertisedListenersParser
cliListenersSecurityProtocolMap <- listenersSecurityProtocolMapParser

cliLdLogLevel <- optional ldLogLevelParser
cliStoreConfigPath <- storeConfigPathParser
cliLdLogLevel <- optional ldLogLevelParser
cliStoreConfigPath <- storeConfigPathParser
cliEnableTls <- enableTlsParser
cliTlsKeyPath <- optional tlsKeyPathParser
cliTlsCertPath <- optional tlsCertPathParser
Expand All @@ -101,7 +106,7 @@ cliOptionsParser = do
cliEnableSaslAuth <- enableSaslAuthParser
cliEnableAcl <- enableAclParser

cliDisableAutoCreateTopic <- disableAutoCreateTopicParser
cliBrokerConfigs <- brokerConfigsParser

cliExperimentalFeatures <- O.many experimentalFeatureParser

Expand Down Expand Up @@ -291,15 +296,31 @@ enableAclParser = flag False True
$ long "enable-acl"
<> help "Enable ACL authorization"

disableAutoCreateTopicParser :: O.Parser Bool
disableAutoCreateTopicParser = flag False True
$ long "disable-auto-create-topic"
<> help "Disable auto create topic"

experimentalFeatureParser :: O.Parser ExperimentalFeature
experimentalFeatureParser = option parseExperimentalFeature $
long "experimental" <> metavar "ExperimentalFeature"

brokerConfigsParser :: O.Parser KC.KafkaBrokerConfigs
brokerConfigsParser = toKafkaBrokerConfigs . Map.fromList
<$> O.many
( O.option propertyReader
( O.long "prop"
<> metavar "KEY=VALUE"
<> help "Broker property"
)
)
where
propertyReader :: O.ReadM (Text, Text)
propertyReader = O.eitherReader $ \kv ->
let (k, v) = second tail $ span (/= '=') kv
in Right (T.pack k, T.pack v)

toKafkaBrokerConfigs :: Map Text Text -> KC.KafkaBrokerConfigs
toKafkaBrokerConfigs mp =
case KC.mkConfigs (mp Map.!?) of
Left msg -> errorWithoutStackTrace (T.unpack msg)
Right v -> v

-------------------------------------------------------------------------------

parserOpt :: (Text -> Either String a) -> O.Mod O.OptionFields a -> O.Parser a
Expand Down
6 changes: 2 additions & 4 deletions hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs
Expand Up @@ -40,10 +40,8 @@ parseJSONToOptions CliOptions{..} obj = do
nodeLogWithColor <- nodeCfgObj .:? "log-with-color" .!= True

-- Kafka config
-- TODO: generate Parser from KafkaBrokerConfigs
let !_disableAutoCreateTopic = cliDisableAutoCreateTopic
let updateBrokerConfigs cfg = cfg {KC.autoCreateTopicsEnable=KC.AutoCreateTopicsEnable $ not _disableAutoCreateTopic}
!_kafkaBrokerConfigs <- updateBrokerConfigs <$> KC.parseBrokerConfigs nodeCfgObj
!_kafkaBrokerConfigs <- KC.mergeBrokerConfigs cliBrokerConfigs <$> KC.parseBrokerConfigs nodeCfgObj

metricsPort <- nodeCfgObj .:? "metrics-port" .!= 9700
let !_metricsPort = fromMaybe metricsPort cliMetricsPort

Expand Down

0 comments on commit eb5f4e2

Please sign in to comment.