Skip to content

Commit

Permalink
kafka: support set metrics port (#1693)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Dec 6, 2023
1 parent 3dd154f commit 88dc6af
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 2 deletions.
3 changes: 3 additions & 0 deletions conf/hstream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ kafka:
# and can be overwritten by cli option `--gossip-port`
gossip-port: 6571

# Server port value for export metrics and can be overwritten by cli option `--metrics-port`
metrics-port: 9700

bind-address: 0.0.0.0

# Server listener address value, the value must be given.
Expand Down
7 changes: 7 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Config/FromCli.hs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ cliOptionsParser = do

cliServerBindAddress <- optional bindAddressParser
cliServerPort <- optional serverPortParser
cliMetricsPort <- optional metricsPortParser

cliServerID <- optional serverIDParser
cliMetaStore <- optional metaStoreAddrParser
Expand Down Expand Up @@ -191,6 +192,12 @@ serverPortParser = option auto
<> metavar "INT"
<> help "server port value"

metricsPortParser :: O.Parser Word16
metricsPortParser = option auto
$ long "metrics-port"
<> metavar "INT"
<> help "metrics port value"

serverGossipAddressParser :: O.Parser String
serverGossipAddressParser = strOption
$ long "gossip-address"
Expand Down
2 changes: 2 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ parseJSONToOptions CliOptions{..} obj = do
let !_disableAutoCreateTopic = cliDisableAutoCreateTopic
let updateBrokerConfigs cfg = cfg {KC.autoCreateTopicsEnable=KC.AutoCreateTopicsEnable $ not _disableAutoCreateTopic}
!_kafkaBrokerConfigs <- updateBrokerConfigs <$> KC.parseBrokerConfigs nodeCfgObj
metricsPort <- nodeCfgObj .:? "metrics-port" .!= 9700
let !_metricsPort = fromMaybe metricsPort cliMetricsPort

-- TODO: For the max_record_size to work properly, we should also tell user
-- to set payload size for gRPC and LD.
Expand Down
2 changes: 2 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Config/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import HStream.Store.Logger (LDLogLevel)
data ServerOpts = ServerOpts
{ _serverHost :: !ByteString
, _serverPort :: !Word16
, _metricsPort :: !Word16

, _advertisedAddress :: !String
, _serverAdvertisedListeners :: !AdvertisedListeners
Expand Down Expand Up @@ -95,6 +96,7 @@ data CliOptions = CliOptions

, cliServerPort :: !(Maybe Word16)
, cliServerBindAddress :: !(Maybe ByteString)
, cliMetricsPort :: !(Maybe Word16)

, cliServerID :: !(Maybe Word32)
, cliMetaStore :: !(Maybe MetaStoreAddr)
Expand Down
1 change: 0 additions & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ handleMetadataV4 ctx@ServerContext{..} _ req@K.MetadataRequestV4{..} = do
allStreamNames <- S.findStreams scLDClient S.StreamTypeTopic <&> S.fromList . L.map (Utils.cBytesToText . S.streamName)
let needCreate = S.toList $ topicNames S.\\ allStreamNames
let alreadyExist = V.fromList . S.toList $ topicNames `S.intersection` allStreamNames
Log.debug $ "enableAutoCreateTopic: " <> Log.build (show kafkaBrokerConfigs.autoCreateTopicsEnable._value)

createResp <-
if kafkaBrokerConfigs.autoCreateTopicsEnable._value && allowAutoTopicCreation
Expand Down
2 changes: 1 addition & 1 deletion hstream/app/lib/KafkaServer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ app config@ServerOpts{..} = do
-- wait the default server
waitGossipBoot gossipContext
-- start prometheus app to export metrics
a2 <- Async.async $ Warp.run 9260 $ P.prometheus P.def {P.prometheusInstrumentPrometheus = False} P.metricsApp
a2 <- Async.async $ Warp.run (fromIntegral _metricsPort) $ P.prometheus P.def {P.prometheusInstrumentPrometheus = False} P.metricsApp
Async.link2Only (const True) a a2
Async.wait a

Expand Down

0 comments on commit 88dc6af

Please sign in to comment.