Skip to content

Commit

Permalink
kafka api: upgrade Metadata to v5 (#1721)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Dec 28, 2023
1 parent 9131db5 commit 53f406c
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 131 deletions.
8 changes: 2 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import qualified Kafka.Protocol.Service as K
-------------------------------------------------------------------------------

#cv_handler ApiVersions, 0, 3
#cv_handler Metadata, 0, 5
#cv_handler Produce, 0, 3
#cv_handler Fetch, 0, 4
#cv_handler DescribeConfigs, 0, 0
Expand All @@ -81,6 +82,7 @@ import qualified Kafka.Protocol.Service as K
handlers :: ServerContext -> [K.ServiceHandler]
handlers sc =
[ #mk_handler ApiVersions, 0, 3
, #mk_handler Metadata, 0, 5
-- Write
, #mk_handler Produce, 0, 3
-- Read
Expand All @@ -99,12 +101,6 @@ handlers sc =
, #mk_handler OffsetCommit, 0, 3
, #mk_handler OffsetFetch, 0, 3

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "metadata") (handleMetadataV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "metadata") (handleMetadataV1 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "metadata") (handleMetadataV2 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV3 "metadata") (handleMetadataV3 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV4 "metadata") (handleMetadataV4 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "createTopics") (handleCreateTopicsV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "deleteTopics") (handleDeleteTopicsV0 sc)

Expand Down
212 changes: 94 additions & 118 deletions hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}

module HStream.Kafka.Server.Handler.Basic
( -- 18: ApiVersions
handleApiVersions
-- 3: Metadata
, handleMetadataV0
, handleMetadataV1
, handleMetadataV2
, handleMetadataV3
, handleMetadataV4

--
, handleMetadata
-- 32: DescribeConfigs
, handleDescribeConfigs
) where

Expand Down Expand Up @@ -64,184 +58,167 @@ handleApiVersions _ _ _ = do
pure $ K.ApiVersionsResponse K.NONE apiKeys 0{- throttle_time_ms -}
K.EmptyTaggedFields

apiVersionV0To :: K.ApiVersionV0 -> K.ApiVersion
apiVersionV0To K.ApiVersionV0{..} =
let taggedFields = K.EmptyTaggedFields in K.ApiVersion{..}

--------------------
-- 3: Metadata
--------------------
handleMetadataV0
:: ServerContext -> K.RequestContext -> K.MetadataRequestV0 -> IO K.MetadataResponseV0
handleMetadataV0 ctx reqCtx req = do
-- In version 0, an empty array indicates "request metadata for all topics." In version 1 and
-- higher, an empty array indicates "request metadata for no topics," and a null array is used to
-- indiate "request metadata for all topics."
let K.NonNullKaArray topicVec = req.topics
v1Topics = if (V.null topicVec) then K.KaArray Nothing else K.NonNullKaArray topicVec
v1Req = K.MetadataRequestV0 {topics=v1Topics}
(K.MetadataResponseV1 (K.KaArray brokers) _ (K.KaArray topics)) <- handleMetadataV1 ctx reqCtx v1Req
return $ K.MetadataResponseV0 (K.KaArray $ V.map respBrokerV1toV0 <$> brokers)
(K.KaArray $ V.map respTopicV1toV0 <$> topics)

where
respBrokerV1toV0 :: K.MetadataResponseBrokerV1 -> K.MetadataResponseBrokerV0
respBrokerV1toV0 K.MetadataResponseBrokerV1{..} =
K.MetadataResponseBrokerV0 nodeId host port

respTopicV1toV0 :: K.MetadataResponseTopicV1 -> K.MetadataResponseTopicV0
respTopicV1toV0 K.MetadataResponseTopicV1{..} =
K.MetadataResponseTopicV0 errorCode name partitions

handleMetadataV1
:: ServerContext -> K.RequestContext -> K.MetadataRequestV1 -> IO K.MetadataResponseV1
handleMetadataV1 ctx reqCtx req = do
respV3 <- handleMetadataV3 ctx reqCtx req
return $ K.MetadataResponseV1 {
controllerId=respV3.controllerId
, topics=respV3.topics
, brokers=respV3.brokers}

handleMetadataV2
:: ServerContext -> K.RequestContext -> K.MetadataRequestV2 -> IO K.MetadataResponseV2
handleMetadataV2 ctx reqCtx req = do
respV3 <- handleMetadataV3 ctx reqCtx req
return $ K.MetadataResponseV2 {
controllerId=respV3.controllerId
, clusterId=respV3.clusterId
, topics=respV3.topics
, brokers=respV3.brokers}

handleMetadataV3
:: ServerContext -> K.RequestContext -> K.MetadataRequestV3 -> IO K.MetadataResponseV3
handleMetadataV3 ctx reqCtx req = do
let reqV4 = K.MetadataRequestV4 {allowAutoTopicCreation=False, topics=req.topics}
handleMetadataV4 ctx reqCtx reqV4

handleMetadataV4
:: ServerContext -> K.RequestContext -> K.MetadataRequestV4 -> IO K.MetadataResponseV4
handleMetadataV4 ctx@ServerContext{..} _ req@K.MetadataRequestV4{..} = do
handleMetadata
:: ServerContext -> K.RequestContext
-> K.MetadataRequest -> IO K.MetadataResponse
handleMetadata ctx reqCtx req = do
-- In version 0,
-- an empty array indicates "request metadata for all topics."
-- a null array will cause an error.
--
-- In version 1 and higher,
-- an empty array indicates "request metadata for no topics,", and
-- a null array is used to indiate "request metadata for all topics."
let reqTopics = if reqCtx.apiVersion >= 1
then req.topics
else let K.NonNullKaArray topicVec = req.topics
in if V.null topicVec then K.KaArray Nothing
else K.NonNullKaArray topicVec
-- FIXME: `serverID` is a `Word32` but kafka expects an `Int32`,
-- causing a potential overflow.
ctlId = fromIntegral ctx.serverID
respBrokers <- getBrokers
-- FIXME: `serverID` is a `Word32` but kafka expects an `Int32`,
-- causing a potential overflow.
let ctlId = fromIntegral serverID
let reqTopics = req.topics
case reqTopics of
K.KaArray Nothing -> returnAllTopics respBrokers ctlId
K.KaArray (Just v)
| V.null v -> return $ K.MetadataResponseV3 {
throttleTimeMs=0
, clusterId=Nothing
, controllerId=ctlId
, topics=K.NonNullKaArray V.empty
, brokers=K.NonNullKaArray respBrokers
| V.null v -> return $ K.MetadataResponse
{ throttleTimeMs = 0
, clusterId = Nothing
, controllerId = ctlId
, topics = K.NonNullKaArray V.empty
, brokers = K.NonNullKaArray respBrokers
}
| otherwise -> do
let topicNames = S.fromList . V.toList $ V.map (\K.MetadataRequestTopicV0{..} -> name) v
allStreamNames <- S.findStreams scLDClient S.StreamTypeTopic <&> S.fromList . L.map (Utils.cBytesToText . S.streamName)
let topicNames = S.fromList . V.toList $
V.map (\K.MetadataRequestTopic{..} -> name) v
allStreamNames <- S.findStreams ctx.scLDClient S.StreamTypeTopic <&> S.fromList . L.map (Utils.cBytesToText . S.streamName)
let needCreate = S.toList $ topicNames S.\\ allStreamNames
let alreadyExist = V.fromList . S.toList $ topicNames `S.intersection` allStreamNames
alreadyExist = V.fromList . S.toList $ topicNames `S.intersection` allStreamNames
kafkaBrokerConfigs = ctx.kafkaBrokerConfigs

createResp <-
if kafkaBrokerConfigs.autoCreateTopicsEnable._value && allowAutoTopicCreation
if kafkaBrokerConfigs.autoCreateTopicsEnable._value && req.allowAutoTopicCreation
then do
let defaultReplicas = kafkaBrokerConfigs.defaultReplicationFactor._value
defaultNumPartitions = kafkaBrokerConfigs.numPartitions._value
resp <- forM needCreate $ \topic -> do
(code, shards) <- createTopic ctx topic (fromIntegral defaultReplicas) (fromIntegral defaultNumPartitions) Map.empty
if code /= K.NONE
then do
return $ K.MetadataResponseTopicV1 code topic False K.emptyKaArray
then
return $ K.MetadataResponseTopic
{ errorCode = code
, name = topic
, partitions = K.emptyKaArray
, isInternal = False
}
else mkResponse topic (V.fromList shards)
return $ V.fromList resp
else do
let f topic acc = K.MetadataResponseTopicV1 K.UNKNOWN_TOPIC_OR_PARTITION topic False K.emptyKaArray : acc
let f topic acc = K.MetadataResponseTopic K.UNKNOWN_TOPIC_OR_PARTITION topic K.emptyKaArray False : acc
return . V.fromList $ FD.foldr' f [] needCreate
unless (V.null createResp) $ Log.info $ "auto create topic response: " <> Log.build (show createResp)

respTopics <- forM alreadyExist getRespTopic
let respTopics' = respTopics <> createResp
-- return $ K.MetadataResponseV4 (K.KaArray $ Just respBrokers) ctlId (K.KaArray $ Just respTopics)
-- TODO: implement read cluster id
return $ K.MetadataResponseV3 {
clusterId=Nothing
, controllerId=ctlId
, throttleTimeMs=0
, topics=K.NonNullKaArray respTopics'
, brokers=K.NonNullKaArray respBrokers
return $ K.MetadataResponse
{ clusterId = Nothing
, controllerId = ctlId
, throttleTimeMs = 0
, topics = K.NonNullKaArray respTopics'
, brokers = K.NonNullKaArray respBrokers
}
where
returnAllTopics :: V.Vector K.MetadataResponseBrokerV1
returnAllTopics :: V.Vector K.MetadataResponseBroker
-> Int32
-> IO K.MetadataResponseV3
-> IO K.MetadataResponse
returnAllTopics respBrokers_ ctlId_ = do
-- FIXME: `serverID` is a `Word32` but kafka expects an `Int32`,
-- causing a potential overflow.
allStreamNames <- S.findStreams scLDClient S.StreamTypeTopic <&> (fmap (Utils.cBytesToText . S.streamName))
allStreamNames <- S.findStreams ctx.scLDClient S.StreamTypeTopic <&> (fmap (Utils.cBytesToText . S.streamName))
respTopics <- forM allStreamNames getRespTopic <&> V.fromList
-- return $ K.MetadataResponseV1 (K.KaArray $ Just respBrokers_) ctlId_ (K.KaArray $ Just respTopics)
return $ K.MetadataResponseV3 {
clusterId=Nothing
, controllerId=ctlId_
, throttleTimeMs=0
, topics=K.NonNullKaArray respTopics
, brokers=K.NonNullKaArray respBrokers_
return $ K.MetadataResponse
{ clusterId = Nothing
, controllerId = ctlId_
, throttleTimeMs = 0
, topics = K.NonNullKaArray respTopics
, brokers = K.NonNullKaArray respBrokers_
}

getBrokers :: IO (V.Vector K.MetadataResponseBrokerV1)
getBrokers :: IO (V.Vector K.MetadataResponseBroker)
getBrokers = do
(nodes, _) <- Gossip.describeCluster gossipContext scAdvertisedListenersKey
(nodes, _) <- Gossip.describeCluster ctx.gossipContext ctx.scAdvertisedListenersKey
let brokers = V.map (\A.ServerNode{..} ->
K.MetadataResponseBrokerV1
{ nodeId = fromIntegral serverNodeId
, host = serverNodeHost
, port = fromIntegral serverNodePort
, rack = Nothing
K.MetadataResponseBroker
{ nodeId = fromIntegral serverNodeId
, host = serverNodeHost
, port = fromIntegral serverNodePort
, rack = Nothing
}
) nodes
return brokers

getRespTopic :: Text -> IO K.MetadataResponseTopicV1
getRespTopic :: Text -> IO K.MetadataResponseTopic
getRespTopic topicName = do
let streamId = S.transToTopicStreamName topicName
shards_e <- try ((V.map snd) <$> S.listStreamPartitionsOrdered scLDClient streamId)
errTopicResp code = K.MetadataResponseTopic
{ errorCode = code
, name = topicName
, partitions = K.emptyKaArray
, isInternal = False
}
shards_e <- try ((V.map snd) <$> S.listStreamPartitionsOrdered ctx.scLDClient streamId)
case shards_e of
-- FIXME: Are the following error codes proper?
-- FIXME: We passed `Nothing` as partitions when an error occurs. Is this proper?
Left (e :: SomeException)
| Just (_ :: S.NOTFOUND) <- fromException e ->
return $ K.MetadataResponseTopicV1 K.UNKNOWN_TOPIC_OR_PARTITION topicName False K.emptyKaArray
return $ errTopicResp K.UNKNOWN_TOPIC_OR_PARTITION
| otherwise ->
return $ K.MetadataResponseTopicV1 K.UNKNOWN_SERVER_ERROR topicName False K.emptyKaArray
return $ errTopicResp K.UNKNOWN_SERVER_ERROR
Right shards
| V.null shards ->
return $ K.MetadataResponseTopicV1 K.INVALID_TOPIC_EXCEPTION topicName False K.emptyKaArray
return $ errTopicResp K.INVALID_TOPIC_EXCEPTION
| V.length shards > fromIntegral (maxBound :: Int32) ->
return $ K.MetadataResponseTopicV1 K.INVALID_PARTITIONS topicName False K.emptyKaArray
return $ errTopicResp K.INVALID_PARTITIONS
| otherwise -> mkResponse topicName shards

mkResponse topicName shards = do
respPartitions <-
V.iforM shards $ \idx shardId -> do
theNode <- lookupKafkaPersist metaHandle gossipContext
loadBalanceHashRing scAdvertisedListenersKey
theNode <- lookupKafkaPersist ctx.metaHandle ctx.gossipContext
ctx.loadBalanceHashRing ctx.scAdvertisedListenersKey
(KafkaResTopic $ Text.pack $ show shardId)
-- FIXME: Convert from `Word32` to `Int32`, possible overflow!
when ((A.serverNodeId theNode) > fromIntegral (maxBound :: Int32)) $
Log.warning $ "ServerID " <> Log.build (A.serverNodeId theNode)
<> " is too large, it should be less than "
<> Log.build (maxBound :: Int32)
let (theNodeId :: Int32) = fromIntegral (A.serverNodeId theNode)
pure $ K.MetadataResponsePartitionV0
{ errorCode = K.NONE
, partitionIndex = (fromIntegral idx)
, leaderId = theNodeId
, replicaNodes = K.KaArray $ Just (V.singleton theNodeId) -- FIXME: what should it be?
, isrNodes = K.KaArray $ Just (V.singleton theNodeId) -- FIXME: what should it be?
pure $ K.MetadataResponsePartition
{ errorCode = K.NONE
, partitionIndex = (fromIntegral idx)
, leaderId = theNodeId
, replicaNodes = K.KaArray $ Just (V.singleton theNodeId) -- FIXME: what should it be?
, isrNodes = K.KaArray $ Just (V.singleton theNodeId) -- FIXME: what should it be?
, offlineReplicas = K.KaArray $ Just V.empty -- TODO
}
return $
K.MetadataResponseTopicV1 K.NONE topicName False (K.KaArray $ Just respPartitions)
-------------------------------------------------------------------------------

apiVersionV0To :: K.ApiVersionV0 -> K.ApiVersion
apiVersionV0To K.ApiVersionV0{..} =
let taggedFields = K.EmptyTaggedFields in K.ApiVersion{..}
K.MetadataResponseTopic
{ errorCode = K.NONE
, name = topicName
, partitions = (K.KaArray $ Just respPartitions)
, isInternal = False
}

---------------------------------------------------------------------------
-- 32: DescribeConfigs
Expand All @@ -262,4 +239,3 @@ handleDescribeConfigs serverCtx _ req = do
else return $ KCM.getErrorResponse KC.BROKER resource.resourceName ("invalid broker id:" <> resource.resourceName)
rt -> return $ KCM.getErrorResponse rt resource.resourceName ("unsupported resource type:" <> T.pack (show rt))
return $ K.DescribeConfigsResponse {results=K.NonNullKaArray results, throttleTimeMs=0}

0 comments on commit 53f406c

Please sign in to comment.