Skip to content

Commit

Permalink
feat(kafka): validate topic name when create (#1788)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Apr 15, 2024
1 parent c6b56d7 commit aa525df
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
17 changes: 11 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Expand Up @@ -36,6 +36,7 @@ import qualified HStream.Kafka.Common.Utils as Utils
import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import qualified HStream.Kafka.Server.Config.KafkaConfigManager as KCM
import HStream.Kafka.Server.Core.Topic (createTopic)
import qualified HStream.Kafka.Server.Handler.Topic as K
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Server.HStreamApi as A
Expand Down Expand Up @@ -121,11 +122,15 @@ handleMetadata ctx reqCtx req = do
((K.simpleAuthorize (K.toAuthorizableReqCtx reqCtx) ctx.authorizer K.Res_TOPIC topic K.AclOp_DESCRIBE) &&^
(K.simpleAuthorize (K.toAuthorizableReqCtx reqCtx) ctx.authorizer K.Res_TOPIC topic K.AclOp_CREATE)) >>= \case
False -> return $ makeErrorTopicResp topic K.TOPIC_AUTHORIZATION_FAILED
True -> do
((code, _), shards) <- createTopic ctx topic (fromIntegral defaultReplicas) (fromIntegral defaultNumPartitions) Map.empty
if code /= K.NONE
then return $ makeErrorTopicResp topic code
else mkResponse topic (V.fromList shards)
True -> case K.validateTopicName topic of
Left (code, msg) -> do
Log.warning $ "Auto create topic " <> Log.build topic <> " failed, error: " <> Log.build (show msg)
return $ makeErrorTopicResp topic code
Right _ -> do
((code, _), shards) <- createTopic ctx topic (fromIntegral defaultReplicas) (fromIntegral defaultNumPartitions) Map.empty
if code /= K.NONE
then return $ makeErrorTopicResp topic code
else mkResponse topic (V.fromList shards)
return $ V.fromList resp
else do
let f topic acc = makeErrorTopicResp topic K.UNKNOWN_TOPIC_OR_PARTITION : acc
Expand Down Expand Up @@ -302,7 +307,7 @@ handleFindCoordinator ServerContext{..} reqCtx req = do
K.simpleAuthorize (K.toAuthorizableReqCtx reqCtx) authorizer K.Res_GROUP req.key K.AclOp_DESCRIBE >>= \case
True -> do
A.ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext loadBalanceHashRing scAdvertisedListenersKey (KafkaResGroup req.key)
Log.info $ "findCoordinator for group:" <> Log.buildString' req.key <> ", result:" <> Log.buildString' serverNodeId
Log.info $ "findCoordinator for group:" <> Log.buildString' req.key <> ", assign to node " <> Log.buildString' serverNodeId
return $ K.FindCoordinatorResponse {
errorMessage=Nothing
, nodeId=fromIntegral serverNodeId
Expand Down
36 changes: 34 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs
Expand Up @@ -7,6 +7,7 @@
module HStream.Kafka.Server.Handler.Topic
( -- 19: CreateTopics
handleCreateTopics
, validateTopicName
-- 20: DeleteTopics
, handleDeleteTopics
-- 37: CreatePartitions
Expand Down Expand Up @@ -55,7 +56,7 @@ handleCreateTopics ctx@ServerContext{scLDClient} reqCtx K.CreateTopicsRequest{..
| V.null topics_ -> return $ K.CreateTopicsResponse {topics = K.KaArray $ Just V.empty, throttleTimeMs = 0}
| otherwise -> do
(errRes, topics') <- mapM (\tp -> mapErr tp.name
<$> liftM2 (*>) (authorizeTopic tp) (pure . validateTopic $ tp)
<$> liftM2 (*>) (authorizeTopic tp) (doValidate tp)
) topics_ <&> V.partitionWith id
if | null topics' ->
-- all topics validate failed, return directly
Expand Down Expand Up @@ -93,6 +94,12 @@ handleCreateTopics ctx@ServerContext{scLDClient} reqCtx K.CreateTopicsRequest{..
mapErr name (Left (errorCode, msg)) = Left $ K.CreatableTopicResult name errorCode msg
mapErr _ (Right tp) = Right tp

doValidate tp = case validateTopic tp of
Left err'@(_, msg) -> do
Log.warning $ "Topic " <> Log.build tp.name <> " validate failed: " <> Log.build (show msg)
return $ Left err'
Right tp' -> return $ Right tp'

createTopic :: K.CreatableTopic -> IO K.CreatableTopicResult
createTopic topic@K.CreatableTopic{..} = do
authorizeTopic topic >>= \case
Expand All @@ -104,7 +111,8 @@ handleCreateTopics ctx@ServerContext{scLDClient} reqCtx K.CreateTopicsRequest{..

validateTopic :: K.CreatableTopic -> Either (ErrorCode, NullableString) K.CreatableTopic
validateTopic topic@K.CreatableTopic{..} = do
validateNullConfig configs
validateName name
*> validateNullConfig configs
*> validateAssignments assignments
*> validateReplica replicationFactor
*> validateNumPartitions numPartitions
Expand All @@ -113,6 +121,8 @@ validateTopic topic@K.CreatableTopic{..} = do
invalidNumPartitionsMsg = Just . T.pack $ "Number of partitions must be larger than 0, or -1 to use the default value."
unsuportedPartitionAssignments = Just . T.pack $ "Partition assignments is not supported now."

validateName n = topic <$ validateTopicName n

validateNullConfig (K.unKaArray -> Just configs') =
let nullConfigs = V.filter (\K.CreateableTopicConfig{value} -> isNothing value) configs'
in if V.null nullConfigs
Expand All @@ -133,6 +143,28 @@ validateTopic topic@K.CreatableTopic{..} = do
| partitions < -1 || partitions == 0 = Left (K.INVALID_PARTITIONS, invalidNumPartitionsMsg)
| otherwise = Right topic

validateTopicName :: T.Text -> Either (ErrorCode, Maybe T.Text) ()
validateTopicName name
| T.null name = Left (K.INVALID_TOPIC_EXCEPTION, Just "Topic name should not be empty.")
| name == "." = Left (K.INVALID_TOPIC_EXCEPTION, Just "Topic name should not be '.'")
| name == ".." = Left (K.INVALID_TOPIC_EXCEPTION, Just "Topic name should not be '..'")
| T.length name > maxNameLength = Left (K.INVALID_TOPIC_EXCEPTION, topicNameTooLong name)
| not (containsValidChars name) = Left (K.INVALID_TOPIC_EXCEPTION, invalidChars name)
| otherwise = Right ()
where
maxNameLength = 249

containsValidChars = T.all isValidChar
isValidChar c = (c >= 'a' && c <= 'z')
|| (c >= 'A' && c <= 'Z')
|| (c >= '0' && c <= '9')
|| c == '.'
|| c == '_'
|| c == '-'

topicNameTooLong n = Just $ "the lenght of " <> n <> " is longer than the max allowd length " <> (T.pack . show $ maxNameLength)
invalidChars n = Just $ n <> " contains one or more characters other than ASCII alphanumeric, '.', '_', and '-'"

--------------------
-- 20: DeleteTopics
--------------------
Expand Down

0 comments on commit aa525df

Please sign in to comment.