Skip to content

Commit

Permalink
kafka: add FetchMaxBytes config (#1808)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed May 7, 2024
1 parent 1060655 commit afcae0f
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 64 deletions.
2 changes: 2 additions & 0 deletions conf/hstream.yaml
Expand Up @@ -297,6 +297,8 @@ kafka:
#default.replication.factor: 1
#auto.create.topic.enable: true
#offsets.topic.replication.factor: 1
# --- Fetch Configuration ---
#fetch.max.bytes: 57671680 # 55 * 1024 * 1024

# Internal storage options
#
Expand Down
12 changes: 6 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs
Expand Up @@ -113,13 +113,13 @@ parseJSONToOptions CliOptions{..} obj = do
-- SASL config
nodeEnableSaslAuth <- nodeCfgObj .:? "enable-sasl" .!= False
let !_enableSaslAuth = cliEnableSaslAuth || nodeEnableSaslAuth
let parsePlainTuple obj = do
username <- obj .: "username"
password <- obj .: "password"
let parsePlainTuple o = do
username <- o .: "username"
password <- o .: "password"
return (username, password)
let parseMechanisms obj = do
mech <- obj .: "mechanism"
auth_list <- obj .: "auth-list"
let parseMechanisms o = do
mech <- o .: "mechanism"
auth_list <- o .: "auth-list"
-- FIXME: more mechanisms
if (toUpper mech) == "PLAIN" then do
tups <- Y.withArray "auth-list" (
Expand Down
130 changes: 72 additions & 58 deletions hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs
Expand Up @@ -4,6 +4,7 @@
{-# LANGUAGE OverloadedStrings #-}

module HStream.Kafka.Server.Config.KafkaConfig where

import qualified Control.Monad as M
import qualified Data.Aeson.Key as Y
import qualified Data.Aeson.Text as Y
Expand Down Expand Up @@ -64,44 +65,12 @@ instance KafkaConfig KafkaConfigInstance where
instance Show KafkaConfigInstance where
show (KafkaConfigInstance kc) = T.unpack . showConfig $ kc

---------------------------------------------------------------------------
-- Kafka Topic Config
---------------------------------------------------------------------------
data CleanupPolicy = CleanupPolicyDelete | CleanupPolicyCompact deriving (Eq)
instance KafkaConfig CleanupPolicy where
name = const "cleanup.policy"
value CleanupPolicyDelete = "delete"
value CleanupPolicyCompact = "compact"
isSentitive = const False
fromText "delete" = Right CleanupPolicyDelete
fromText "compact" = Right CleanupPolicyCompact
fromText _ = Left $ "invalid cleanup.policy value"
defaultConfig = CleanupPolicyDelete

newtype RetentionMs = RetentionMs Int32 deriving (Eq)
instance KafkaConfig RetentionMs where
name = const "retention.ms"
value (RetentionMs v) = T.pack $ show v
isSentitive = const False
fromText textVal = case (T.signed T.decimal) textVal of
Left msg -> Left (T.pack msg)
Right (intVal, _) -> Right (RetentionMs intVal)
defaultConfig = RetentionMs 604800000

data KafkaTopicConfigs
= KafkaTopicConfigs
{ cleanupPolicy :: CleanupPolicy
, retentionMs :: RetentionMs
} deriving (G.Generic)
instance KafkaConfigs KafkaTopicConfigs

mkKafkaTopicConfigs :: Map.Map T.Text (Maybe T.Text) -> Either T.Text KafkaTopicConfigs
mkKafkaTopicConfigs configs = mkConfigs @KafkaTopicConfigs lk
where lk x = M.join (Map.lookup x configs)

---------------------------------------------------------------------------
-------------------------------------------------------------------------------
-- Kafka Broker Config
---------------------------------------------------------------------------
--
-- https://kafka.apache.org/documentation/#brokerconfigs
-------------------------------------------------------------------------------

showConfig :: KafkaConfig a => a -> T.Text
showConfig c = name c <> "=" <> value c

Expand Down Expand Up @@ -156,12 +125,22 @@ instance KafkaConfig GroupInitialRebalanceDelayMs where
defaultConfig = GroupInitialRebalanceDelayMs 3000
SHOWCONFIG(GroupInitialRebalanceDelayMs)

newtype FetchMaxBytes = FetchMaxBytes { _value :: Int } deriving (Eq)
instance KafkaConfig FetchMaxBytes where
name = const "fetch.max.bytes"
value (FetchMaxBytes v) = T.pack $ show v
isSentitive = const False
fromText t = FetchMaxBytes <$> textToIntE t
defaultConfig = FetchMaxBytes 57671680{- 55*1024*1024 -}
SHOWCONFIG(FetchMaxBytes)

data KafkaBrokerConfigs = KafkaBrokerConfigs
{ autoCreateTopicsEnable :: AutoCreateTopicsEnable
, numPartitions :: NumPartitions
, defaultReplicationFactor :: DefaultReplicationFactor
, offsetsTopicReplication :: OffsetsTopicReplicationFactor
, groupInitialRebalanceDelay :: GroupInitialRebalanceDelayMs
{ autoCreateTopicsEnable :: !AutoCreateTopicsEnable
, numPartitions :: !NumPartitions
, defaultReplicationFactor :: !DefaultReplicationFactor
, offsetsTopicReplication :: !OffsetsTopicReplicationFactor
, groupInitialRebalanceDelay :: !GroupInitialRebalanceDelayMs
, fetchMaxBytes :: !FetchMaxBytes
} deriving (Eq, G.Generic)
instance KafkaConfigs KafkaBrokerConfigs

Expand Down Expand Up @@ -193,9 +172,57 @@ mkKafkaBrokerConfigs mp =
Left msg -> errorWithoutStackTrace (T.unpack msg)
Right v -> v

-------------------------------------------------------------------------------
-- Kafka Topic Config
-------------------------------------------------------------------------------

data CleanupPolicy = CleanupPolicyDelete | CleanupPolicyCompact deriving (Eq)
instance KafkaConfig CleanupPolicy where
name = const "cleanup.policy"
value CleanupPolicyDelete = "delete"
value CleanupPolicyCompact = "compact"
isSentitive = const False
fromText "delete" = Right CleanupPolicyDelete
fromText "compact" = Right CleanupPolicyCompact
fromText _ = Left $ "invalid cleanup.policy value"
defaultConfig = CleanupPolicyDelete

newtype RetentionMs = RetentionMs Int32 deriving (Eq)
instance KafkaConfig RetentionMs where
name = const "retention.ms"
value (RetentionMs v) = T.pack $ show v
isSentitive = const False
fromText textVal = case (T.signed T.decimal) textVal of
Left msg -> Left (T.pack msg)
Right (intVal, _) -> Right (RetentionMs intVal)
defaultConfig = RetentionMs 604800000

data KafkaTopicConfigs
= KafkaTopicConfigs
{ cleanupPolicy :: CleanupPolicy
, retentionMs :: RetentionMs
} deriving (G.Generic)
instance KafkaConfigs KafkaTopicConfigs

mkKafkaTopicConfigs :: Map.Map T.Text (Maybe T.Text) -> Either T.Text KafkaTopicConfigs
mkKafkaTopicConfigs configs = mkConfigs @KafkaTopicConfigs lk
where lk x = M.join (Map.lookup x configs)

allTopicConfigs :: ConfigMap
allTopicConfigs = dumpConfigs (defaultConfigs @KafkaTopicConfigs)

getTopicConfig :: T.Text -> Map.Map T.Text (Maybe T.Text) -> Either T.Text KafkaConfigInstance
getTopicConfig configName configValues = do
let lk x = M.join (Map.lookup x configValues)
computedMap <- dumpConfigs <$> mkConfigs @KafkaTopicConfigs lk
case Map.lookup configName computedMap of
Nothing -> Left $ "unsupported config name:" <> configName
Just cfg -> Right cfg

---------------------------------------------------------------------------
-- Config Helpers
---------------------------------------------------------------------------

type Lookup = T.Text -> Maybe T.Text
type ConfigMap = Map.Map T.Text KafkaConfigInstance

Expand Down Expand Up @@ -247,23 +274,10 @@ instance (GKafkaConfigs a, GKafkaConfigs b) => GKafkaConfigs (a G.:*: b) where
gdefaultConfigs = gdefaultConfigs G.:*: gdefaultConfigs
gupdateConfigs (x G.:*: y) mp = (G.:*:) <$> gupdateConfigs x mp <*> gupdateConfigs y mp

#define MK_CONFIG_PAIR(configType) \
let dc = defaultConfig @configType in (name dc, (KafkaConfigInstance dc, fmap KafkaConfigInstance . fromText @configType))

allTopicConfigs :: ConfigMap
allTopicConfigs = dumpConfigs (defaultConfigs @KafkaTopicConfigs)

getTopicConfig :: T.Text -> Map.Map T.Text (Maybe T.Text) -> Either T.Text KafkaConfigInstance
getTopicConfig configName configValues = do
let lk x = M.join (Map.lookup x configValues)
computedMap <- dumpConfigs <$> mkConfigs @KafkaTopicConfigs lk
case Map.lookup configName computedMap of
Nothing -> Left $ "unsupported config name:" <> configName
Just cfg -> Right cfg

---------------------------------------------------------------------------
-------------------------------------------------------------------------------
-- Utils
---------------------------------------------------------------------------
-------------------------------------------------------------------------------

textToIntE :: T.Text -> Either T.Text Int
textToIntE v =
case (T.signed T.decimal) v of
Expand Down

0 comments on commit afcae0f

Please sign in to comment.