diff --git a/conf/hstream.yaml b/conf/hstream.yaml index 5e6ec57b2..b06cae592 100644 --- a/conf/hstream.yaml +++ b/conf/hstream.yaml @@ -295,6 +295,8 @@ kafka: # #num.partitions: 1 #default.replication.factor: 1 + #auto.create.topic.enable: true + #offsets.topic.replication.factor: 1 # Internal storage options # diff --git a/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs b/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs index 35e2a4280..2d3029b19 100644 --- a/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs @@ -19,7 +19,6 @@ import HStream.Kafka.Common.KafkaException (ErrorCodeException (Err import qualified HStream.Kafka.Common.Utils as Utils import HStream.Kafka.Group.Group (Group) import qualified HStream.Kafka.Group.Group as G -import HStream.Kafka.Group.GroupOffsetManager (mkGroupOffsetManager) import qualified HStream.Kafka.Group.GroupOffsetManager as GOM import qualified HStream.Logger as Log import qualified HStream.MetaStore.Types as Meta @@ -27,15 +26,15 @@ import HStream.Store (LDClient) import qualified Kafka.Protocol.Error as K data GroupCoordinator = GroupCoordinator - { groups :: C.MVar (Utils.HashTable T.Text Group) - - , metaHandle :: Meta.MetaHandle - , serverId :: Word32 - , ldClient :: LDClient + { groups :: C.MVar (Utils.HashTable T.Text Group) + , metaHandle :: Meta.MetaHandle + , serverId :: Word32 + , ldClient :: LDClient + , offsetTopicReplica :: Int } -mkGroupCoordinator :: Meta.MetaHandle -> LDClient -> Word32 -> IO GroupCoordinator -mkGroupCoordinator metaHandle ldClient serverId = do +mkGroupCoordinator :: Meta.MetaHandle -> LDClient -> Word32 -> Int -> IO GroupCoordinator +mkGroupCoordinator metaHandle ldClient serverId offsetTopicReplica = do groups <- H.new >>= C.newMVar return $ GroupCoordinator {..} @@ -54,14 +53,13 @@ instance TM.TaskManager GroupCoordinator where unloadTaskAsync = unloadGroup - getOrMaybeCreateGroup :: GroupCoordinator -> T.Text -> T.Text -> IO Group getOrMaybeCreateGroup GroupCoordinator{..} groupId memberId = do C.withMVar groups $ \gs -> do H.lookup gs groupId >>= \case Nothing -> if T.null memberId then do - metadataManager <- mkGroupOffsetManager ldClient (fromIntegral serverId) groupId + metadataManager <- GOM.mkGroupOffsetManager ldClient (fromIntegral serverId) groupId offsetTopicReplica ng <- G.newGroup groupId metadataManager metaHandle H.insert gs groupId ng return ng @@ -92,7 +90,7 @@ getGroupM GroupCoordinator{..} groupId = do -- load group from meta store loadGroupAndOffsets :: GroupCoordinator -> T.Text -> IO () loadGroupAndOffsets gc groupId = do - offsetManager <- mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) groupId + offsetManager <- GOM.mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) groupId gc.offsetTopicReplica GOM.loadOffsetsFromStorage offsetManager Meta.getMeta @CM.GroupMetadataValue groupId gc.metaHandle >>= \case Nothing -> do diff --git a/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs b/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs index c9f689a57..90f8fb5ff 100644 --- a/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs @@ -54,11 +54,11 @@ data GroupOffsetManager = forall os. OffsetStorage os => GroupOffsetManager -- FIXME: if we create a consumer group with groupName haven been used, call -- mkCkpOffsetStorage with groupName may lead us to a un-clean ckp-store -mkGroupOffsetManager :: S.LDClient -> Int32 -> T.Text -> IO GroupOffsetManager -mkGroupOffsetManager ldClient serverId groupName = do +mkGroupOffsetManager :: S.LDClient -> Int32 -> T.Text -> Int -> IO GroupOffsetManager +mkGroupOffsetManager ldClient serverId groupName offsetReplica = do offsetsCache <- newIORef Map.empty partitionsMap <- newIORef Map.empty - offsetStorage <- mkCkpOffsetStorage ldClient groupName + offsetStorage <- mkCkpOffsetStorage ldClient groupName offsetReplica return GroupOffsetManager{..} loadOffsetsFromStorage :: GroupOffsetManager -> IO () diff --git a/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs b/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs index 6f48e8600..ae51bfdaa 100644 --- a/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs +++ b/hstream-kafka/HStream/Kafka/Group/OffsetsStore.hs @@ -34,10 +34,10 @@ data CkpOffsetStorage = CkpOffsetStorage , trimCkpWorker :: !CompactedWorker } -mkCkpOffsetStorage :: S.LDClient -> T.Text -> IO CkpOffsetStorage -mkCkpOffsetStorage client ckpStoreName = do +mkCkpOffsetStorage :: S.LDClient -> T.Text -> Int -> IO CkpOffsetStorage +mkCkpOffsetStorage client ckpStoreName replica = do let cbGroupName = textToCBytes ckpStoreName - logAttrs = S.def{S.logReplicationFactor = S.defAttr1 1} + logAttrs = S.def{S.logReplicationFactor = S.defAttr1 replica} -- FIXME: need to get log attributes from somewhere S.initOffsetCheckpointDir client logAttrs ckpStoreId <- S.allocOffsetCheckpointId client cbGroupName diff --git a/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs b/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs index dc69341ad..7bd4f05c6 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs @@ -14,27 +14,32 @@ module HStream.Kafka.Server.Config.FromCli , parseMetaStoreAddr ) where -import qualified Data.Attoparsec.Text as AP -import Data.ByteString (ByteString) -import qualified Data.Map.Strict as Map -import qualified Data.Set as Set -import Data.Text (Text) -import qualified Data.Text as T -import Data.Word (Word16, Word32) -import Options.Applicative as O (auto, flag, help, long, - maybeReader, metavar, - option, optional, - short, strOption, - value, (<**>), (<|>)) -import qualified Options.Applicative as O -import System.Environment (getProgName) -import System.Exit (exitSuccess) -import Z.Data.CBytes (CBytes) - +import qualified Data.Attoparsec.Text as AP +import Data.Bifunctor (second) +import Data.ByteString (ByteString) +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set +import Data.Text (Text) +import qualified Data.Text as T +import Data.Word (Word16, Word32) +import Options.Applicative as O (auto, flag, help, + long, + maybeReader, + metavar, option, + optional, short, + strOption, value, + (<**>), (<|>)) +import qualified Options.Applicative as O +import System.Environment (getProgName) +import System.Exit (exitSuccess) +import Z.Data.CBytes (CBytes) + +import qualified HStream.Kafka.Server.Config.KafkaConfig as KC import HStream.Kafka.Server.Config.Types -import qualified HStream.Logger as Log -import HStream.Store (Compression (..)) -import HStream.Store.Logger (LDLogLevel (..)) +import qualified HStream.Logger as Log +import HStream.Store (Compression (..)) +import HStream.Store.Logger (LDLogLevel (..)) ------------------------------------------------------------------------------- @@ -78,19 +83,19 @@ cliOptionsParser = do cliServerLogLevel <- optional logLevelParser cliServerLogWithColor <- logWithColorParser + cliServerFileLog <- optional fileLoggerSettingsParser cliServerLogFlushImmediately <- logFlushImmediatelyParser - cliServerFileLog <- optional fileLoggerSettingsParser cliServerGossipAddress <- optional serverGossipAddressParser cliServerGossipPort <- optional serverGossipPortParser - cliSeedNodes <- optional seedNodesParser + cliSeedNodes <- optional seedNodesParser cliServerAdvertisedAddress <- optional advertisedAddressParser cliServerAdvertisedListeners <- advertisedListenersParser cliListenersSecurityProtocolMap <- listenersSecurityProtocolMapParser - cliLdLogLevel <- optional ldLogLevelParser - cliStoreConfigPath <- storeConfigPathParser + cliLdLogLevel <- optional ldLogLevelParser + cliStoreConfigPath <- storeConfigPathParser cliEnableTls <- enableTlsParser cliTlsKeyPath <- optional tlsKeyPathParser cliTlsCertPath <- optional tlsCertPathParser @@ -101,7 +106,7 @@ cliOptionsParser = do cliEnableSaslAuth <- enableSaslAuthParser cliEnableAcl <- enableAclParser - cliDisableAutoCreateTopic <- disableAutoCreateTopicParser + cliBrokerConfigs <- brokerConfigsParser cliExperimentalFeatures <- O.many experimentalFeatureParser @@ -291,15 +296,31 @@ enableAclParser = flag False True $ long "enable-acl" <> help "Enable ACL authorization" -disableAutoCreateTopicParser :: O.Parser Bool -disableAutoCreateTopicParser = flag False True - $ long "disable-auto-create-topic" - <> help "Disable auto create topic" - experimentalFeatureParser :: O.Parser ExperimentalFeature experimentalFeatureParser = option parseExperimentalFeature $ long "experimental" <> metavar "ExperimentalFeature" +brokerConfigsParser :: O.Parser KC.KafkaBrokerConfigs +brokerConfigsParser = toKafkaBrokerConfigs . Map.fromList + <$> O.many + ( O.option propertyReader + ( O.long "prop" + <> metavar "KEY=VALUE" + <> help "Broker property" + ) + ) + where + propertyReader :: O.ReadM (Text, Text) + propertyReader = O.eitherReader $ \kv -> + let (k, v) = second tail $ span (/= '=') kv + in Right (T.pack k, T.pack v) + + toKafkaBrokerConfigs :: Map Text Text -> KC.KafkaBrokerConfigs + toKafkaBrokerConfigs mp = + case KC.mkConfigs (mp Map.!?) of + Left msg -> errorWithoutStackTrace (T.unpack msg) + Right v -> v + ------------------------------------------------------------------------------- parserOpt :: (Text -> Either String a) -> O.Mod O.OptionFields a -> O.Parser a diff --git a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs index 4152fdee8..7b5d0bf9c 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs @@ -40,10 +40,8 @@ parseJSONToOptions CliOptions{..} obj = do nodeLogWithColor <- nodeCfgObj .:? "log-with-color" .!= True -- Kafka config - -- TODO: generate Parser from KafkaBrokerConfigs - let !_disableAutoCreateTopic = cliDisableAutoCreateTopic - let updateBrokerConfigs cfg = cfg {KC.autoCreateTopicsEnable=KC.AutoCreateTopicsEnable $ not _disableAutoCreateTopic} - !_kafkaBrokerConfigs <- updateBrokerConfigs <$> KC.parseBrokerConfigs nodeCfgObj + !_kafkaBrokerConfigs <- KC.mergeBrokerConfigs cliBrokerConfigs <$> KC.parseBrokerConfigs nodeCfgObj + metricsPort <- nodeCfgObj .:? "metrics-port" .!= 9700 let !_metricsPort = fromMaybe metricsPort cliMetricsPort diff --git a/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs b/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs index 187a0ea77..1555632cb 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs @@ -8,6 +8,7 @@ import qualified Control.Monad as M import qualified Data.Aeson.Key as Y import qualified Data.Aeson.Text as Y import Data.Int (Int32) +import Data.List (intercalate) import qualified Data.Map as Map import qualified Data.Text as T import qualified Data.Text.Lazy as TL @@ -34,7 +35,7 @@ instance Enum KafkaConfigResource where class Eq kc => KafkaConfig kc where name :: kc -> T.Text - value :: kc -> Maybe T.Text + value :: kc -> T.Text isSentitive :: kc -> Bool -- in current implement, all configs should be read-only @@ -48,8 +49,10 @@ class Eq kc => KafkaConfig kc where isDefaultValue = (defaultConfig == ) data KafkaConfigInstance = forall kc. KafkaConfig kc => KafkaConfigInstance kc + instance Eq KafkaConfigInstance where (==) (KafkaConfigInstance x) (KafkaConfigInstance y) = value x == value y + instance KafkaConfig KafkaConfigInstance where name (KafkaConfigInstance x) = name x value (KafkaConfigInstance x) = value x @@ -57,14 +60,17 @@ instance KafkaConfig KafkaConfigInstance where fromText = fromText defaultConfig = defaultConfig +instance Show KafkaConfigInstance where + show (KafkaConfigInstance kc) = T.unpack . showConfig $ kc + --------------------------------------------------------------------------- -- Kafka Topic Config --------------------------------------------------------------------------- data CleanupPolicy = CleanupPolicyDelete | CleanupPolicyCompact deriving (Eq) instance KafkaConfig CleanupPolicy where name = const "cleanup.policy" - value CleanupPolicyDelete = Just "delete" - value CleanupPolicyCompact = Just "compact" + value CleanupPolicyDelete = "delete" + value CleanupPolicyCompact = "compact" isSentitive = const False fromText "delete" = Right CleanupPolicyDelete fromText "compact" = Right CleanupPolicyCompact @@ -74,7 +80,7 @@ instance KafkaConfig CleanupPolicy where newtype RetentionMs = RetentionMs Int32 deriving (Eq) instance KafkaConfig RetentionMs where name = const "retention.ms" - value (RetentionMs v) = Just . T.pack $ show v + value (RetentionMs v) = T.pack $ show v isSentitive = const False fromText textVal = case (T.signed T.decimal) textVal of Left msg -> Left (T.pack msg) @@ -95,41 +101,65 @@ mkKafkaTopicConfigs configs = mkConfigs @KafkaTopicConfigs lk --------------------------------------------------------------------------- -- Kafka Broker Config --------------------------------------------------------------------------- -newtype AutoCreateTopicsEnable = AutoCreateTopicsEnable { _value :: Bool } deriving (Eq, Show) +showConfig :: KafkaConfig a => a -> T.Text +showConfig c = name c <> "=" <> value c + +#define SHOWCONFIG(configType) \ +instance Show configType where { show = T.unpack . showConfig } + +newtype AutoCreateTopicsEnable = AutoCreateTopicsEnable { _value :: Bool } deriving (Eq) instance KafkaConfig AutoCreateTopicsEnable where name = const "auto.create.topics.enable" - value (AutoCreateTopicsEnable True) = Just "true" - value (AutoCreateTopicsEnable False) = Just "false" + value (AutoCreateTopicsEnable True) = "true" + value (AutoCreateTopicsEnable False) = "false" isSentitive = const False fromText "true" = Right (AutoCreateTopicsEnable True) fromText "false" = Right (AutoCreateTopicsEnable False) fromText v = Left $ "invalid bool value:" <> v defaultConfig = AutoCreateTopicsEnable True +SHOWCONFIG(AutoCreateTopicsEnable) -newtype NumPartitions = NumPartitions { _value :: Int } deriving (Eq, Show) +newtype NumPartitions = NumPartitions { _value :: Int } deriving (Eq) instance KafkaConfig NumPartitions where name = const "num.partitions" - value (NumPartitions v) = Just . T.pack $ show v + value (NumPartitions v) = T.pack $ show v isSentitive = const False fromText t = NumPartitions <$> textToIntE t defaultConfig = NumPartitions 1 +SHOWCONFIG(NumPartitions) -newtype DefaultReplicationFactor = DefaultReplicationFactor { _value :: Int } deriving (Eq, Show) +newtype DefaultReplicationFactor = DefaultReplicationFactor { _value :: Int } deriving (Eq) instance KafkaConfig DefaultReplicationFactor where name = const "default.replication.factor" - value (DefaultReplicationFactor v) = Just . T.pack $ show v + value (DefaultReplicationFactor v) = T.pack $ show v isSentitive = const False fromText t = DefaultReplicationFactor <$> textToIntE t defaultConfig = DefaultReplicationFactor 1 +SHOWCONFIG(DefaultReplicationFactor) + +newtype OffsetsTopicReplicationFactor = OffsetsTopicReplicationFactor { _value :: Int } deriving (Eq) +instance KafkaConfig OffsetsTopicReplicationFactor where + name = const "offsets.topic.replication.factor" + value (OffsetsTopicReplicationFactor v) = T.pack $ show v + isSentitive = const False + fromText t = OffsetsTopicReplicationFactor <$> textToIntE t + defaultConfig = OffsetsTopicReplicationFactor 1 +SHOWCONFIG(OffsetsTopicReplicationFactor) data KafkaBrokerConfigs = KafkaBrokerConfigs { autoCreateTopicsEnable :: AutoCreateTopicsEnable , numPartitions :: NumPartitions , defaultReplicationFactor :: DefaultReplicationFactor - } deriving (Show, Eq, G.Generic) + , offsetsTopicReplication :: OffsetsTopicReplicationFactor + } deriving (Eq, G.Generic) instance KafkaConfigs KafkaBrokerConfigs +instance Show KafkaBrokerConfigs where + show cfgs = let props = Map.elems $ dumpConfigs cfgs + fmtProps' = intercalate ", " $ map show props + in "{" <> fmtProps' <> "}" + parseBrokerConfigs :: Y.Object -> Y.Parser KafkaBrokerConfigs parseBrokerConfigs obj = case mkConfigs @KafkaBrokerConfigs lk of @@ -144,6 +174,9 @@ parseBrokerConfigs obj = allBrokerConfigs :: KafkaBrokerConfigs -> V.Vector KafkaConfigInstance allBrokerConfigs = V.fromList . Map.elems . dumpConfigs +mergeBrokerConfigs :: KafkaBrokerConfigs -> KafkaBrokerConfigs -> KafkaBrokerConfigs +mergeBrokerConfigs = mergeConfigs + --------------------------------------------------------------------------- -- Config Helpers --------------------------------------------------------------------------- @@ -154,6 +187,7 @@ class KafkaConfigs a where mkConfigs :: Lookup -> Either T.Text a dumpConfigs :: a -> ConfigMap defaultConfigs :: a + mergeConfigs :: a -> a -> a default mkConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => Lookup -> Either T.Text a mkConfigs lk = G.to <$> gmkConfigs lk @@ -164,10 +198,14 @@ class KafkaConfigs a where default defaultConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => a defaultConfigs = G.to gdefaultConfigs + default mergeConfigs :: (G.Generic a, GKafkaConfigs (G.Rep a)) => a -> a -> a + mergeConfigs x y = G.to (gmergeConfigs (G.from x) (G.from y)) + class GKafkaConfigs f where gmkConfigs :: Lookup -> Either T.Text (f p) gdumpConfigs :: (f p) -> ConfigMap gdefaultConfigs :: f p + gmergeConfigs :: f p -> f p -> f p instance KafkaConfig c => GKafkaConfigs (G.K1 i c) where gmkConfigs lk = G.K1 <$> case lk (name @c defaultConfig) of @@ -175,16 +213,19 @@ instance KafkaConfig c => GKafkaConfigs (G.K1 i c) where Just textValue -> fromText @c textValue gdumpConfigs (G.K1 x) = (Map.singleton (name x) (KafkaConfigInstance x)) gdefaultConfigs = G.K1 (defaultConfig @c) + gmergeConfigs (G.K1 x) (G.K1 y) = G.K1 (if isDefaultValue x then y else x) instance GKafkaConfigs f => GKafkaConfigs (G.M1 i c f) where gmkConfigs lk = G.M1 <$> (gmkConfigs lk) gdumpConfigs (G.M1 x) = gdumpConfigs x gdefaultConfigs = G.M1 gdefaultConfigs + gmergeConfigs (G.M1 x) (G.M1 y) = G.M1 (gmergeConfigs x y) instance (GKafkaConfigs a, GKafkaConfigs b) => GKafkaConfigs (a G.:*: b) where gmkConfigs lk = (G.:*:) <$> (gmkConfigs lk) <*> (gmkConfigs lk) gdumpConfigs (x G.:*: y) = Map.union (gdumpConfigs x) (gdumpConfigs y) gdefaultConfigs = gdefaultConfigs G.:*: gdefaultConfigs + gmergeConfigs (x1 G.:*: y1) (x2 G.:*: y2) = (gmergeConfigs x1 x2) G.:*: (gmergeConfigs y1 y2) #define MK_CONFIG_PAIR(configType) \ let dc = defaultConfig @configType in (name dc, (KafkaConfigInstance dc, fmap KafkaConfigInstance . fromText @configType)) diff --git a/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs b/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs index ad693c024..3bddb2d8c 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs @@ -73,7 +73,7 @@ getResultFromInstance (KC.KafkaConfigInstance cfg) = , isDefault=KC.isDefaultValue cfg , readOnly=KC.readOnly cfg , name=KC.name cfg - , value=KC.value cfg + , value= Just $ KC.value cfg } listBrokerConfigs :: KafkaConfigManager -> T.Text -> K.KaArray T.Text -> IO K.DescribeConfigsResult diff --git a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs index 2bc4af0c9..a3c0c642f 100644 --- a/hstream-kafka/HStream/Kafka/Server/Config/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Config/Types.hs @@ -75,7 +75,6 @@ data ServerOpts = ServerOpts , _maxRecordSize :: !Int , _seedNodes :: ![(ByteString, Int)] - , _disableAutoCreateTopic :: !Bool , _enableSaslAuth :: !Bool , _enableAcl :: !Bool @@ -142,8 +141,8 @@ data CliOptions = CliOptions -- ACL Authorization , cliEnableAcl :: !Bool - -- Kafka config - , cliDisableAutoCreateTopic :: !Bool + -- Kafka broker config + , cliBrokerConfigs :: !KC.KafkaBrokerConfigs -- HStream Experimental Features , cliExperimentalFeatures :: ![ExperimentalFeature] diff --git a/hstream-kafka/HStream/Kafka/Server/Types.hs b/hstream-kafka/HStream/Kafka/Server/Types.hs index b79a18b04..b61835ed2 100644 --- a/hstream-kafka/HStream/Kafka/Server/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Types.hs @@ -61,8 +61,9 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do -- XXX: Should we add a server option to toggle Stats? statsHolder <- newServerStatsHolder epochHashRing <- initializeHashRing gossipContext - scGroupCoordinator <- mkGroupCoordinator mh ldclient _serverID + let replica = _kafkaBrokerConfigs.offsetsTopicReplication._value + scGroupCoordinator <- mkGroupCoordinator mh ldclient _serverID replica -- must be initialized later offsetManager <- newOffsetManager ldclient -- Trick to avoid use maybe, must be initialized later diff --git a/hstream/app/lib/KafkaServer.hs b/hstream/app/lib/KafkaServer.hs index f786ceb5b..1e86f41e1 100644 --- a/hstream/app/lib/KafkaServer.hs +++ b/hstream/app/lib/KafkaServer.hs @@ -78,6 +78,7 @@ runApp = do app :: ServerOpts -> IO () app config@ServerOpts{..} = do + Log.info $ "start server with opts: " <> Log.build (show config) setupFatalSignalHandler S.setLogDeviceDbgLevel' _ldLogLevel let logType = case config.serverFileLog of