Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: support set broker property from cli #1799

Merged
merged 9 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/hstream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ kafka:
#
#num.partitions: 1
#default.replication.factor: 1
#auto.create.topic.enable: true
#offsets.topic.replication.factor: 1

# Internal storage options
#
Expand Down
20 changes: 9 additions & 11 deletions hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,22 @@ import HStream.Kafka.Common.KafkaException (ErrorCodeException (Err
import qualified HStream.Kafka.Common.Utils as Utils
import HStream.Kafka.Group.Group (Group)
import qualified HStream.Kafka.Group.Group as G
import HStream.Kafka.Group.GroupOffsetManager (mkGroupOffsetManager)
import qualified HStream.Kafka.Group.GroupOffsetManager as GOM
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as Meta
import HStream.Store (LDClient)
import qualified Kafka.Protocol.Error as K

data GroupCoordinator = GroupCoordinator
{ groups :: C.MVar (Utils.HashTable T.Text Group)

, metaHandle :: Meta.MetaHandle
, serverId :: Word32
, ldClient :: LDClient
{ groups :: C.MVar (Utils.HashTable T.Text Group)
, metaHandle :: Meta.MetaHandle
, serverId :: Word32
, ldClient :: LDClient
, offsetTopicReplica :: Int
}

mkGroupCoordinator :: Meta.MetaHandle -> LDClient -> Word32 -> IO GroupCoordinator
mkGroupCoordinator metaHandle ldClient serverId = do
mkGroupCoordinator :: Meta.MetaHandle -> LDClient -> Word32 -> Int -> IO GroupCoordinator
mkGroupCoordinator metaHandle ldClient serverId offsetTopicReplica = do
groups <- H.new >>= C.newMVar
return $ GroupCoordinator {..}

Expand All @@ -54,14 +53,13 @@ instance TM.TaskManager GroupCoordinator where

unloadTaskAsync = unloadGroup


getOrMaybeCreateGroup :: GroupCoordinator -> T.Text -> T.Text -> IO Group
getOrMaybeCreateGroup GroupCoordinator{..} groupId memberId = do
C.withMVar groups $ \gs -> do
H.lookup gs groupId >>= \case
Nothing -> if T.null memberId
then do
metadataManager <- mkGroupOffsetManager ldClient (fromIntegral serverId) groupId
metadataManager <- GOM.mkGroupOffsetManager ldClient (fromIntegral serverId) groupId offsetTopicReplica
ng <- G.newGroup groupId metadataManager metaHandle
H.insert gs groupId ng
return ng
Expand Down Expand Up @@ -92,7 +90,7 @@ getGroupM GroupCoordinator{..} groupId = do
-- load group from meta store
loadGroupAndOffsets :: GroupCoordinator -> T.Text -> IO ()
loadGroupAndOffsets gc groupId = do
offsetManager <- mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) groupId
offsetManager <- GOM.mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) groupId gc.offsetTopicReplica
GOM.loadOffsetsFromStorage offsetManager
Meta.getMeta @CM.GroupMetadataValue groupId gc.metaHandle >>= \case
Nothing -> do
Expand Down
6 changes: 3 additions & 3 deletions hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ data GroupOffsetManager = forall os. OffsetStorage os => GroupOffsetManager

-- FIXME: if we create a consumer group with groupName haven been used, call
-- mkCkpOffsetStorage with groupName may lead us to a un-clean ckp-store
mkGroupOffsetManager :: S.LDClient -> Int32 -> T.Text -> IO GroupOffsetManager
mkGroupOffsetManager ldClient serverId groupName = do
mkGroupOffsetManager :: S.LDClient -> Int32 -> T.Text -> Int -> IO GroupOffsetManager
mkGroupOffsetManager ldClient serverId groupName offsetReplica = do
offsetsCache <- newIORef Map.empty
partitionsMap <- newIORef Map.empty
offsetStorage <- mkCkpOffsetStorage ldClient groupName
offsetStorage <- mkCkpOffsetStorage ldClient groupName offsetReplica
return GroupOffsetManager{..}

loadOffsetsFromStorage :: GroupOffsetManager -> IO ()
Expand Down
6 changes: 3 additions & 3 deletions hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ data CkpOffsetStorage = CkpOffsetStorage
, trimCkpWorker :: !CompactedWorker
}

mkCkpOffsetStorage :: S.LDClient -> T.Text -> IO CkpOffsetStorage
mkCkpOffsetStorage client ckpStoreName = do
mkCkpOffsetStorage :: S.LDClient -> T.Text -> Int -> IO CkpOffsetStorage
mkCkpOffsetStorage client ckpStoreName replica = do
let cbGroupName = textToCBytes ckpStoreName
logAttrs = S.def{S.logReplicationFactor = S.defAttr1 1}
logAttrs = S.def{S.logReplicationFactor = S.defAttr1 replica}
-- FIXME: need to get log attributes from somewhere
S.initOffsetCheckpointDir client logAttrs
ckpStoreId <- S.allocOffsetCheckpointId client cbGroupName
Expand Down
81 changes: 51 additions & 30 deletions hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,32 @@ module HStream.Kafka.Server.Config.FromCli
, parseMetaStoreAddr
) where

import qualified Data.Attoparsec.Text as AP
import Data.ByteString (ByteString)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as T
import Data.Word (Word16, Word32)
import Options.Applicative as O (auto, flag, help, long,
maybeReader, metavar,
option, optional,
short, strOption,
value, (<**>), (<|>))
import qualified Options.Applicative as O
import System.Environment (getProgName)
import System.Exit (exitSuccess)
import Z.Data.CBytes (CBytes)

import qualified Data.Attoparsec.Text as AP
import Data.Bifunctor (second)
import Data.ByteString (ByteString)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as T
import Data.Word (Word16, Word32)
import Options.Applicative as O (auto, flag, help,
long,
maybeReader,
metavar, option,
optional, short,
strOption, value,
(<**>), (<|>))
import qualified Options.Applicative as O
import System.Environment (getProgName)
import System.Exit (exitSuccess)
import Z.Data.CBytes (CBytes)

import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import HStream.Kafka.Server.Config.Types
import qualified HStream.Logger as Log
import HStream.Store (Compression (..))
import HStream.Store.Logger (LDLogLevel (..))
import qualified HStream.Logger as Log
import HStream.Store (Compression (..))
import HStream.Store.Logger (LDLogLevel (..))

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -78,19 +83,19 @@ cliOptionsParser = do

cliServerLogLevel <- optional logLevelParser
cliServerLogWithColor <- logWithColorParser
cliServerFileLog <- optional fileLoggerSettingsParser
cliServerLogFlushImmediately <- logFlushImmediatelyParser
cliServerFileLog <- optional fileLoggerSettingsParser

cliServerGossipAddress <- optional serverGossipAddressParser
cliServerGossipPort <- optional serverGossipPortParser
cliSeedNodes <- optional seedNodesParser
cliSeedNodes <- optional seedNodesParser

cliServerAdvertisedAddress <- optional advertisedAddressParser
cliServerAdvertisedListeners <- advertisedListenersParser
cliListenersSecurityProtocolMap <- listenersSecurityProtocolMapParser

cliLdLogLevel <- optional ldLogLevelParser
cliStoreConfigPath <- storeConfigPathParser
cliLdLogLevel <- optional ldLogLevelParser
cliStoreConfigPath <- storeConfigPathParser
cliEnableTls <- enableTlsParser
cliTlsKeyPath <- optional tlsKeyPathParser
cliTlsCertPath <- optional tlsCertPathParser
Expand All @@ -101,7 +106,7 @@ cliOptionsParser = do
cliEnableSaslAuth <- enableSaslAuthParser
cliEnableAcl <- enableAclParser

cliDisableAutoCreateTopic <- disableAutoCreateTopicParser
cliBrokerConfigs <- brokerConfigsParser

cliExperimentalFeatures <- O.many experimentalFeatureParser

Expand Down Expand Up @@ -291,15 +296,31 @@ enableAclParser = flag False True
$ long "enable-acl"
<> help "Enable ACL authorization"

disableAutoCreateTopicParser :: O.Parser Bool
disableAutoCreateTopicParser = flag False True
$ long "disable-auto-create-topic"
<> help "Disable auto create topic"

experimentalFeatureParser :: O.Parser ExperimentalFeature
experimentalFeatureParser = option parseExperimentalFeature $
long "experimental" <> metavar "ExperimentalFeature"

brokerConfigsParser :: O.Parser KC.KafkaBrokerConfigs
brokerConfigsParser = toKafkaBrokerConfigs . Map.fromList
<$> O.many
( O.option propertyReader
( O.long "prop"
<> metavar "KEY=VALUE"
<> help "Broker property"
)
)
where
propertyReader :: O.ReadM (Text, Text)
propertyReader = O.eitherReader $ \kv ->
let (k, v) = second tail $ span (/= '=') kv
in Right (T.pack k, T.pack v)

toKafkaBrokerConfigs :: Map Text Text -> KC.KafkaBrokerConfigs
toKafkaBrokerConfigs mp =
case KC.mkConfigs (mp Map.!?) of
Left msg -> errorWithoutStackTrace (T.unpack msg)
Right v -> v

-------------------------------------------------------------------------------

parserOpt :: (Text -> Either String a) -> O.Mod O.OptionFields a -> O.Parser a
Expand Down
6 changes: 2 additions & 4 deletions hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,8 @@ parseJSONToOptions CliOptions{..} obj = do
nodeLogWithColor <- nodeCfgObj .:? "log-with-color" .!= True

-- Kafka config
-- TODO: generate Parser from KafkaBrokerConfigs
let !_disableAutoCreateTopic = cliDisableAutoCreateTopic
let updateBrokerConfigs cfg = cfg {KC.autoCreateTopicsEnable=KC.AutoCreateTopicsEnable $ not _disableAutoCreateTopic}
!_kafkaBrokerConfigs <- updateBrokerConfigs <$> KC.parseBrokerConfigs nodeCfgObj
!_kafkaBrokerConfigs <- KC.mergeBrokerConfigs cliBrokerConfigs <$> KC.parseBrokerConfigs nodeCfgObj

metricsPort <- nodeCfgObj .:? "metrics-port" .!= 9700
let !_metricsPort = fromMaybe metricsPort cliMetricsPort

Expand Down
Loading
Loading