Skip to content

Commit

Permalink
kafka: upgrade CreateTopics to version 2 (#1728)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jan 5, 2024
1 parent c9f41ae commit bd7ea77
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 43 deletions.
28 changes: 17 additions & 11 deletions hstream-kafka/HStream/Kafka/Server/Core/Topic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module HStream.Kafka.Server.Core.Topic
)
where

import Control.Exception (Exception (fromException),
import Control.Exception (Exception (displayException, fromException),
SomeException, try)
import Control.Monad (forM)
import Data.Int (Int16, Int32)
Expand All @@ -27,7 +27,13 @@ import qualified HStream.Store as S
import qualified HStream.Utils as Utils
import qualified Kafka.Protocol.Error as K

createTopic :: ServerContext -> Text -> Int16 -> Int32 -> Map.Map T.Text (Maybe T.Text) -> IO (K.ErrorCode, [Shard.Shard])
createTopic
:: ServerContext
-> Text
-> Int16
-> Int32
-> Map.Map T.Text (Maybe T.Text)
-> IO ((K.ErrorCode, T.Text), [Shard.Shard])
createTopic ServerContext{..} name replicationFactor numPartitions configs = do
let streamId = S.transToTopicStreamName name
timeStamp <- BaseTime.getSystemNsTimestamp
Expand All @@ -36,8 +42,8 @@ createTopic ServerContext{..} name replicationFactor numPartitions configs = do
else fromIntegral replicationFactor
case KC.mkKafkaTopicConfigs configs of
Left msg -> do
Log.info $ "create topic failed, invaid config:" <> Log.build msg
return (K.INVALID_CONFIG, [])
Log.info $ "create topic failed, invalid config:" <> Log.build msg
return ((K.INVALID_CONFIG, "Create topic with invalid config: " <> T.pack (show msg)), [])
Right topicConfigs -> do
-- FIXME: Is there any other attrs to store?
-- FIXME: Should we parse any other attr from `confs` of `CreateableTopicV0`?
Expand All @@ -50,11 +56,11 @@ createTopic ServerContext{..} name replicationFactor numPartitions configs = do
try (S.createStream scLDClient streamId attrs) >>= \case
Left (e :: SomeException)
| isJust (fromException @S.EXISTS e) -> do
Log.warning $ "Stream already exists: " <> Log.build (show streamId)
return (K.TOPIC_ALREADY_EXISTS, [])
Log.warning $ "Topic already exists: " <> Log.build name
return ((K.TOPIC_ALREADY_EXISTS, "Topic " <> name <> " already exists"), [])
| otherwise -> do
Log.warning $ "Exception occurs when creating stream " <> Log.build (show streamId) <> ": " <> Log.build (show e)
return (K.UNKNOWN_SERVER_ERROR, [])
return ((K.UNKNOWN_SERVER_ERROR, "Unexpected Server error"), [])
Right _ -> do
let partitions = if numPartitions == -1
then kafkaBrokerConfigs.numPartitions._value
Expand All @@ -66,11 +72,11 @@ createTopic ServerContext{..} name replicationFactor numPartitions configs = do
Shard.createShard scLDClient shard
case shards_e of
Left (e :: SomeException) -> do
Log.warning $ "Exception occurs when creating shards of stream " <> Log.build (show streamId) <> ": " <> Log.build (show e)
return (K.INVALID_PARTITIONS, [])
Log.warning $ "Exception occurs when creating shards of topic " <> Log.build name <> ": " <> Log.build (show e)
return ((K.INVALID_PARTITIONS, "Create shard for topic " <> name <> " error: " <> T.pack (displayException e)), [])
Right shards -> do
Log.info $ "Created " <> Log.build (show (length shards)) <> " shards for stream " <> Log.build name <> ": " <> Log.build (show (Shard.shardId <$> shards))
return (K.NONE, shards)
Log.info $ "Created " <> Log.build (show (length shards)) <> " shards for topic " <> Log.build name <> ": " <> Log.build (show (Shard.shardId <$> shards))
return ((K.NONE, T.empty), shards)
where
getBacklogDuration KC.KafkaTopicConfigs{cleanupPolicy=cleanupPolicy, retentionMs=KC.RetentionMs retentionMs}
| cleanupPolicy == KC.CleanupPolicyCompact = Nothing
Expand Down
4 changes: 2 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import qualified Kafka.Protocol.Service as K
#cv_handler Fetch, 0, 4
#cv_handler DescribeConfigs, 0, 0

#cv_handler CreateTopics, 0, 2
#cv_handler DeleteTopics, 0, 1

#cv_handler SaslHandshake, 0, 1
Expand Down Expand Up @@ -96,6 +97,7 @@ handlers sc =

, #mk_handler FindCoordinator, 0, 1

, #mk_handler CreateTopics, 0, 2
, #mk_handler DeleteTopics, 0, 1

-- Group
Expand All @@ -109,8 +111,6 @@ handlers sc =
, #mk_handler OffsetCommit, 0, 3
, #mk_handler OffsetFetch, 0, 3

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "createTopics") (handleCreateTopicsV0 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "saslHandshake") (handleAfterAuthSaslHandshakeV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "saslHandshake") (handleAfterAuthSaslHandshakeV1 sc)

Expand Down
2 changes: 1 addition & 1 deletion hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ handleMetadata ctx reqCtx req = do
let defaultReplicas = kafkaBrokerConfigs.defaultReplicationFactor._value
defaultNumPartitions = kafkaBrokerConfigs.numPartitions._value
resp <- forM needCreate $ \topic -> do
(code, shards) <- createTopic ctx topic (fromIntegral defaultReplicas) (fromIntegral defaultNumPartitions) Map.empty
((code, _), shards) <- createTopic ctx topic (fromIntegral defaultReplicas) (fromIntegral defaultNumPartitions) Map.empty
if code /= K.NONE
then
return $ K.MetadataResponseTopic
Expand Down
94 changes: 73 additions & 21 deletions hstream-kafka/HStream/Kafka/Server/Handler/Topic.hs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ViewPatterns #-}

module HStream.Kafka.Server.Handler.Topic
( -- 19: CreateTopics
handleCreateTopicsV0
handleCreateTopics
-- 20: DeleteTopics
, handleDeleteTopics
) where
Expand All @@ -15,13 +17,16 @@ import qualified Data.Text as T
import qualified Data.Vector as V

import qualified Data.Map as Map
import Data.Maybe (isNothing)
import HStream.Kafka.Common.OffsetManager (cleanOffsetCache)
import qualified HStream.Kafka.Common.Utils as Utils
import qualified HStream.Kafka.Server.Core.Topic as Core
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import Kafka.Protocol (NullableString)
import qualified Kafka.Protocol.Encoding as K
import Kafka.Protocol.Error (ErrorCode)
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
Expand All @@ -30,32 +35,79 @@ import qualified Kafka.Protocol.Service as K
-- 19: CreateTopics
--------------------
-- FIXME: The `timeoutMs` field of request is omitted.
handleCreateTopicsV0
:: ServerContext -> K.RequestContext -> K.CreateTopicsRequestV0 -> IO K.CreateTopicsResponseV0
handleCreateTopicsV0 ctx _ K.CreateTopicsRequestV0{..} =
handleCreateTopics
:: ServerContext -> K.RequestContext -> K.CreateTopicsRequest -> IO K.CreateTopicsResponse
handleCreateTopics ctx@ServerContext{scLDClient} _ K.CreateTopicsRequest{..} =
case topics of
K.KaArray Nothing ->
-- FIXME: We return `[]` when topics is `Nothing`.
-- Is this proper?
return $ K.CreateTopicsResponseV0 (K.KaArray $ Just V.empty)
return $ K.CreateTopicsResponse {topics = K.KaArray $ Just V.empty, throttleTimeMs = 0}
K.KaArray (Just topics_)
| V.null topics_ -> return $ K.CreateTopicsResponseV0 (K.KaArray $ Just V.empty)
| otherwise -> do
respTopics <- forM topics_ $ createTopic
return $ K.CreateTopicsResponseV0 (K.KaArray $ Just respTopics)
| V.null topics_ -> return $ K.CreateTopicsResponse {topics = K.KaArray $ Just V.empty, throttleTimeMs = 0}
| otherwise -> do
let (errRes, topics') = V.partitionWith (\tp -> mapErr tp.name . validateTopic $ tp) topics_
if | null topics' ->
-- all topics validate failed, return directly
return $ K.CreateTopicsResponse {topics = K.KaArray $ Just errRes, throttleTimeMs = 0}
| validateOnly -> do
res <- V.forM topics' $ \K.CreatableTopic{..} -> do
let streamId = S.transToTopicStreamName name
exist <- S.doesStreamExist scLDClient streamId
if exist
then do
Log.info $ "Topic " <> Log.build name <> " already exist."
return K.CreatableTopicResult
{ errorMessage=Just $ "Topic " <> name <> " already exist."
, errorCode=K.TOPIC_ALREADY_EXISTS
, name=name
}
else return K.CreatableTopicResult {errorMessage=Nothing, errorCode=K.NONE, name=name}

return $ K.CreateTopicsResponse {topics = K.KaArray . Just $ res <> errRes, throttleTimeMs = 0}
| otherwise -> do
respTopics <- forM topics' $ createTopic
return $ K.CreateTopicsResponse {topics = K.KaArray . Just $ respTopics <> errRes, throttleTimeMs = 0}
where
createTopic :: K.CreatableTopicV0 -> IO K.CreatableTopicResultV0
createTopic K.CreatableTopicV0{..}
| replicationFactor < -1 || replicationFactor == 0 = do
Log.warning $ "Expect a positive replicationFactor but got " <> Log.build replicationFactor
return $ K.CreatableTopicResultV0 name K.INVALID_REPLICATION_FACTOR
| numPartitions < -1 || numPartitions == 0 = do
Log.warning $ "Expect a positive numPartitions but got " <> Log.build numPartitions
return $ K.CreatableTopicResultV0 name K.INVALID_PARTITIONS
| otherwise = do
let configMap = Map.fromList . map (\c -> (c.name, c.value)) . Utils.kaArrayToList $ configs
(errorCode, _) <- Core.createTopic ctx name replicationFactor numPartitions configMap
return $ K.CreatableTopicResultV0 name errorCode
mapErr name (Left (errorCode, msg)) = Left $ K.CreatableTopicResult name errorCode msg
mapErr _ (Right tp) = Right tp

createTopic :: K.CreatableTopic -> IO K.CreatableTopicResult
createTopic K.CreatableTopic{..} = do
let configMap = Map.fromList . map (\c -> (c.name, c.value)) . Utils.kaArrayToList $ configs
((errorCode, msg), _) <- Core.createTopic ctx name replicationFactor numPartitions configMap
return $ K.CreatableTopicResult name errorCode (Just msg)

validateTopic :: K.CreatableTopic -> Either (ErrorCode, NullableString) K.CreatableTopic
validateTopic topic@K.CreatableTopic{..} = do
validateNullConfig configs
*> validateAssignments assignments
*> validateReplica replicationFactor
*> validateNumPartitions numPartitions
where
invalidReplicaMsg = Just . T.pack $ "Replication factor must be larger than 0, or -1 to use the default value."
invalidNumPartitionsMsg = Just . T.pack $ "Number of partitions must be larger than 0, or -1 to use the default value."
unsuportedPartitionAssignments = Just . T.pack $ "Partition assignments is not supported now."

validateNullConfig (K.unKaArray -> Just configs') =
let nullConfigs = V.filter (\K.CreateableTopicConfig{value} -> isNothing value) configs'
in if V.null nullConfigs
then Right topic
else Left (K.INVALID_CONFIG, Just $ T.pack ("Null value not supported for topic configs: " <> show nullConfigs))
validateNullConfig _ = Right topic

validateAssignments (K.unKaArray -> Nothing) = Right topic
validateAssignments (K.unKaArray -> Just as)
| V.null as = Right topic
validateAssignments _ = Left (K.INVALID_REQUEST, unsuportedPartitionAssignments)

validateReplica replica
| replica < -1 || replica == 0 = Left (K.INVALID_REPLICATION_FACTOR, invalidReplicaMsg)
| otherwise = Right topic

validateNumPartitions partitions
| partitions < -1 || partitions == 0 = Left (K.INVALID_PARTITIONS, invalidNumPartitionsMsg)
| otherwise = Right topic

--------------------
-- 20: DeleteTopics
Expand Down
71 changes: 70 additions & 1 deletion hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,18 @@ data CreatableTopicV0 = CreatableTopicV0
} deriving (Show, Eq, Generic)
instance Serializable CreatableTopicV0

type CreatableReplicaAssignmentV1 = CreatableReplicaAssignmentV0

type CreateableTopicConfigV1 = CreateableTopicConfigV0

type CreatableTopicV1 = CreatableTopicV0

type CreatableReplicaAssignmentV2 = CreatableReplicaAssignmentV0

type CreateableTopicConfigV2 = CreateableTopicConfigV0

type CreatableTopicV2 = CreatableTopicV0

data CreatableTopicResultV0 = CreatableTopicResultV0
{ name :: !Text
-- ^ The topic name.
Expand All @@ -112,6 +124,18 @@ data CreatableTopicResultV0 = CreatableTopicResultV0
} deriving (Show, Eq, Generic)
instance Serializable CreatableTopicResultV0

data CreatableTopicResultV1 = CreatableTopicResultV1
{ name :: !Text
-- ^ The topic name.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, errorMessage :: !NullableString
-- ^ The error message, or null if there was no error.
} deriving (Show, Eq, Generic)
instance Serializable CreatableTopicResultV1

type CreatableTopicResultV2 = CreatableTopicResultV1

data DeletableTopicResultV0 = DeletableTopicResultV0
{ name :: !Text
-- ^ The topic name
Expand Down Expand Up @@ -802,11 +826,38 @@ data CreateTopicsRequestV0 = CreateTopicsRequestV0
} deriving (Show, Eq, Generic)
instance Serializable CreateTopicsRequestV0

data CreateTopicsRequestV1 = CreateTopicsRequestV1
{ topics :: !(KaArray CreatableTopicV0)
-- ^ The topics to create.
, timeoutMs :: {-# UNPACK #-} !Int32
-- ^ How long to wait in milliseconds before timing out the request.
, validateOnly :: Bool
-- ^ If true, check that the topics can be created as specified, but don't
-- create anything.
} deriving (Show, Eq, Generic)
instance Serializable CreateTopicsRequestV1

type CreateTopicsRequestV2 = CreateTopicsRequestV1

newtype CreateTopicsResponseV0 = CreateTopicsResponseV0
{ topics :: (KaArray CreatableTopicResultV0)
} deriving (Show, Eq, Generic)
instance Serializable CreateTopicsResponseV0

newtype CreateTopicsResponseV1 = CreateTopicsResponseV1
{ topics :: (KaArray CreatableTopicResultV1)
} deriving (Show, Eq, Generic)
instance Serializable CreateTopicsResponseV1

data CreateTopicsResponseV2 = CreateTopicsResponseV2
{ throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, topics :: !(KaArray CreatableTopicResultV1)
-- ^ Results for each topic we tried to create.
} deriving (Show, Eq, Generic)
instance Serializable CreateTopicsResponseV2

data DeleteTopicsRequestV0 = DeleteTopicsRequestV0
{ topicNames :: !(KaArray Text)
-- ^ The names of the topics to delete
Expand Down Expand Up @@ -1729,6 +1780,7 @@ instance Service HStreamKafkaV1 where
, "listGroups"
, "saslHandshake"
, "apiVersions"
, "createTopics"
, "deleteTopics"
]

Expand Down Expand Up @@ -1837,6 +1889,13 @@ instance HasMethodImpl HStreamKafkaV1 "apiVersions" where
type MethodInput HStreamKafkaV1 "apiVersions" = ApiVersionsRequestV1
type MethodOutput HStreamKafkaV1 "apiVersions" = ApiVersionsResponseV1

instance HasMethodImpl HStreamKafkaV1 "createTopics" where
type MethodName HStreamKafkaV1 "createTopics" = "createTopics"
type MethodKey HStreamKafkaV1 "createTopics" = 19
type MethodVersion HStreamKafkaV1 "createTopics" = 1
type MethodInput HStreamKafkaV1 "createTopics" = CreateTopicsRequestV1
type MethodOutput HStreamKafkaV1 "createTopics" = CreateTopicsResponseV1

instance HasMethodImpl HStreamKafkaV1 "deleteTopics" where
type MethodName HStreamKafkaV1 "deleteTopics" = "deleteTopics"
type MethodKey HStreamKafkaV1 "deleteTopics" = 20
Expand All @@ -1857,6 +1916,7 @@ instance Service HStreamKafkaV2 where
, "offsetFetch"
, "joinGroup"
, "apiVersions"
, "createTopics"
]

instance HasMethodImpl HStreamKafkaV2 "produce" where
Expand Down Expand Up @@ -1915,6 +1975,13 @@ instance HasMethodImpl HStreamKafkaV2 "apiVersions" where
type MethodInput HStreamKafkaV2 "apiVersions" = ApiVersionsRequestV2
type MethodOutput HStreamKafkaV2 "apiVersions" = ApiVersionsResponseV2

instance HasMethodImpl HStreamKafkaV2 "createTopics" where
type MethodName HStreamKafkaV2 "createTopics" = "createTopics"
type MethodKey HStreamKafkaV2 "createTopics" = 19
type MethodVersion HStreamKafkaV2 "createTopics" = 2
type MethodInput HStreamKafkaV2 "createTopics" = CreateTopicsRequestV2
type MethodOutput HStreamKafkaV2 "createTopics" = CreateTopicsResponseV2

data HStreamKafkaV3

instance Service HStreamKafkaV3 where
Expand Down Expand Up @@ -2054,7 +2121,7 @@ supportedApiVersions =
, ApiVersionV0 (ApiKey 16) 0 1
, ApiVersionV0 (ApiKey 17) 0 1
, ApiVersionV0 (ApiKey 18) 0 3
, ApiVersionV0 (ApiKey 19) 0 0
, ApiVersionV0 (ApiKey 19) 0 2
, ApiVersionV0 (ApiKey 20) 0 1
, ApiVersionV0 (ApiKey 22) 0 0
, ApiVersionV0 (ApiKey 32) 0 0
Expand Down Expand Up @@ -2111,6 +2178,8 @@ getHeaderVersion (ApiKey (18)) 1 = (1, 0)
getHeaderVersion (ApiKey (18)) 2 = (1, 0)
getHeaderVersion (ApiKey (18)) 3 = (2, 0)
getHeaderVersion (ApiKey (19)) 0 = (1, 0)
getHeaderVersion (ApiKey (19)) 1 = (1, 0)
getHeaderVersion (ApiKey (19)) 2 = (1, 0)
getHeaderVersion (ApiKey (20)) 0 = (1, 0)
getHeaderVersion (ApiKey (20)) 1 = (1, 0)
getHeaderVersion (ApiKey (22)) 0 = (1, 0)
Expand Down
Loading

0 comments on commit bd7ea77

Please sign in to comment.