Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: add FetchMaxBytes config #1808

Merged
merged 1 commit into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/hstream.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,8 @@ kafka:
#default.replication.factor: 1
#auto.create.topic.enable: true
#offsets.topic.replication.factor: 1
# --- Fetch Configuration ---
#fetch.max.bytes: 57671680 # 55 * 1024 * 1024

# Internal storage options
#
Expand Down
12 changes: 6 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Config/FromJson.hs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ parseJSONToOptions CliOptions{..} obj = do
-- SASL config
nodeEnableSaslAuth <- nodeCfgObj .:? "enable-sasl" .!= False
let !_enableSaslAuth = cliEnableSaslAuth || nodeEnableSaslAuth
let parsePlainTuple obj = do
username <- obj .: "username"
password <- obj .: "password"
let parsePlainTuple o = do
username <- o .: "username"
password <- o .: "password"
return (username, password)
let parseMechanisms obj = do
mech <- obj .: "mechanism"
auth_list <- obj .: "auth-list"
let parseMechanisms o = do
mech <- o .: "mechanism"
auth_list <- o .: "auth-list"
-- FIXME: more mechanisms
if (toUpper mech) == "PLAIN" then do
tups <- Y.withArray "auth-list" (
Expand Down
130 changes: 72 additions & 58 deletions hstream-kafka/HStream/Kafka/Server/Config/KafkaConfig.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{-# LANGUAGE OverloadedStrings #-}

module HStream.Kafka.Server.Config.KafkaConfig where

import qualified Control.Monad as M
import qualified Data.Aeson.Key as Y
import qualified Data.Aeson.Text as Y
Expand Down Expand Up @@ -64,44 +65,12 @@ instance KafkaConfig KafkaConfigInstance where
instance Show KafkaConfigInstance where
show (KafkaConfigInstance kc) = T.unpack . showConfig $ kc

---------------------------------------------------------------------------
-- Kafka Topic Config
---------------------------------------------------------------------------
data CleanupPolicy = CleanupPolicyDelete | CleanupPolicyCompact deriving (Eq)
instance KafkaConfig CleanupPolicy where
name = const "cleanup.policy"
value CleanupPolicyDelete = "delete"
value CleanupPolicyCompact = "compact"
isSentitive = const False
fromText "delete" = Right CleanupPolicyDelete
fromText "compact" = Right CleanupPolicyCompact
fromText _ = Left $ "invalid cleanup.policy value"
defaultConfig = CleanupPolicyDelete

newtype RetentionMs = RetentionMs Int32 deriving (Eq)
instance KafkaConfig RetentionMs where
name = const "retention.ms"
value (RetentionMs v) = T.pack $ show v
isSentitive = const False
fromText textVal = case (T.signed T.decimal) textVal of
Left msg -> Left (T.pack msg)
Right (intVal, _) -> Right (RetentionMs intVal)
defaultConfig = RetentionMs 604800000

data KafkaTopicConfigs
= KafkaTopicConfigs
{ cleanupPolicy :: CleanupPolicy
, retentionMs :: RetentionMs
} deriving (G.Generic)
instance KafkaConfigs KafkaTopicConfigs

mkKafkaTopicConfigs :: Map.Map T.Text (Maybe T.Text) -> Either T.Text KafkaTopicConfigs
mkKafkaTopicConfigs configs = mkConfigs @KafkaTopicConfigs lk
where lk x = M.join (Map.lookup x configs)

---------------------------------------------------------------------------
-------------------------------------------------------------------------------
-- Kafka Broker Config
---------------------------------------------------------------------------
--
-- https://kafka.apache.org/documentation/#brokerconfigs
-------------------------------------------------------------------------------

showConfig :: KafkaConfig a => a -> T.Text
showConfig c = name c <> "=" <> value c

Expand Down Expand Up @@ -156,12 +125,22 @@ instance KafkaConfig GroupInitialRebalanceDelayMs where
defaultConfig = GroupInitialRebalanceDelayMs 3000
SHOWCONFIG(GroupInitialRebalanceDelayMs)

newtype FetchMaxBytes = FetchMaxBytes { _value :: Int } deriving (Eq)
instance KafkaConfig FetchMaxBytes where
name = const "fetch.max.bytes"
value (FetchMaxBytes v) = T.pack $ show v
isSentitive = const False
fromText t = FetchMaxBytes <$> textToIntE t
defaultConfig = FetchMaxBytes 57671680{- 55*1024*1024 -}
SHOWCONFIG(FetchMaxBytes)

data KafkaBrokerConfigs = KafkaBrokerConfigs
{ autoCreateTopicsEnable :: AutoCreateTopicsEnable
, numPartitions :: NumPartitions
, defaultReplicationFactor :: DefaultReplicationFactor
, offsetsTopicReplication :: OffsetsTopicReplicationFactor
, groupInitialRebalanceDelay :: GroupInitialRebalanceDelayMs
{ autoCreateTopicsEnable :: !AutoCreateTopicsEnable
, numPartitions :: !NumPartitions
, defaultReplicationFactor :: !DefaultReplicationFactor
, offsetsTopicReplication :: !OffsetsTopicReplicationFactor
, groupInitialRebalanceDelay :: !GroupInitialRebalanceDelayMs
, fetchMaxBytes :: !FetchMaxBytes
} deriving (Eq, G.Generic)
instance KafkaConfigs KafkaBrokerConfigs

Expand Down Expand Up @@ -193,9 +172,57 @@ mkKafkaBrokerConfigs mp =
Left msg -> errorWithoutStackTrace (T.unpack msg)
Right v -> v

-------------------------------------------------------------------------------
-- Kafka Topic Config
-------------------------------------------------------------------------------

data CleanupPolicy = CleanupPolicyDelete | CleanupPolicyCompact deriving (Eq)
instance KafkaConfig CleanupPolicy where
name = const "cleanup.policy"
value CleanupPolicyDelete = "delete"
value CleanupPolicyCompact = "compact"
isSentitive = const False
fromText "delete" = Right CleanupPolicyDelete
fromText "compact" = Right CleanupPolicyCompact
fromText _ = Left $ "invalid cleanup.policy value"
defaultConfig = CleanupPolicyDelete

newtype RetentionMs = RetentionMs Int32 deriving (Eq)
instance KafkaConfig RetentionMs where
name = const "retention.ms"
value (RetentionMs v) = T.pack $ show v
isSentitive = const False
fromText textVal = case (T.signed T.decimal) textVal of
Left msg -> Left (T.pack msg)
Right (intVal, _) -> Right (RetentionMs intVal)
defaultConfig = RetentionMs 604800000

data KafkaTopicConfigs
= KafkaTopicConfigs
{ cleanupPolicy :: CleanupPolicy
, retentionMs :: RetentionMs
} deriving (G.Generic)
instance KafkaConfigs KafkaTopicConfigs

mkKafkaTopicConfigs :: Map.Map T.Text (Maybe T.Text) -> Either T.Text KafkaTopicConfigs
mkKafkaTopicConfigs configs = mkConfigs @KafkaTopicConfigs lk
where lk x = M.join (Map.lookup x configs)

allTopicConfigs :: ConfigMap
allTopicConfigs = dumpConfigs (defaultConfigs @KafkaTopicConfigs)

getTopicConfig :: T.Text -> Map.Map T.Text (Maybe T.Text) -> Either T.Text KafkaConfigInstance
getTopicConfig configName configValues = do
let lk x = M.join (Map.lookup x configValues)
computedMap <- dumpConfigs <$> mkConfigs @KafkaTopicConfigs lk
case Map.lookup configName computedMap of
Nothing -> Left $ "unsupported config name:" <> configName
Just cfg -> Right cfg

---------------------------------------------------------------------------
-- Config Helpers
---------------------------------------------------------------------------

type Lookup = T.Text -> Maybe T.Text
type ConfigMap = Map.Map T.Text KafkaConfigInstance

Expand Down Expand Up @@ -247,23 +274,10 @@ instance (GKafkaConfigs a, GKafkaConfigs b) => GKafkaConfigs (a G.:*: b) where
gdefaultConfigs = gdefaultConfigs G.:*: gdefaultConfigs
gupdateConfigs (x G.:*: y) mp = (G.:*:) <$> gupdateConfigs x mp <*> gupdateConfigs y mp

#define MK_CONFIG_PAIR(configType) \
let dc = defaultConfig @configType in (name dc, (KafkaConfigInstance dc, fmap KafkaConfigInstance . fromText @configType))

allTopicConfigs :: ConfigMap
allTopicConfigs = dumpConfigs (defaultConfigs @KafkaTopicConfigs)

getTopicConfig :: T.Text -> Map.Map T.Text (Maybe T.Text) -> Either T.Text KafkaConfigInstance
getTopicConfig configName configValues = do
let lk x = M.join (Map.lookup x configValues)
computedMap <- dumpConfigs <$> mkConfigs @KafkaTopicConfigs lk
case Map.lookup configName computedMap of
Nothing -> Left $ "unsupported config name:" <> configName
Just cfg -> Right cfg

---------------------------------------------------------------------------
-------------------------------------------------------------------------------
-- Utils
---------------------------------------------------------------------------
-------------------------------------------------------------------------------

textToIntE :: T.Text -> Either T.Text Int
textToIntE v =
case (T.signed T.decimal) v of
Expand Down
Loading