diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hsc b/hstream-kafka/HStream/Kafka/Server/Handler.hsc index f028d7228..1ad6d4b70 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hsc +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hsc @@ -57,6 +57,7 @@ import qualified Kafka.Protocol.Service as K ------------------------------------------------------------------------------- #cv_handler ApiVersions, 0, 3 +#cv_handler Metadata, 0, 5 #cv_handler Produce, 0, 3 #cv_handler Fetch, 0, 4 #cv_handler DescribeConfigs, 0, 0 @@ -81,6 +82,7 @@ import qualified Kafka.Protocol.Service as K handlers :: ServerContext -> [K.ServiceHandler] handlers sc = [ #mk_handler ApiVersions, 0, 3 + , #mk_handler Metadata, 0, 5 -- Write , #mk_handler Produce, 0, 3 -- Read @@ -99,12 +101,6 @@ handlers sc = , #mk_handler OffsetCommit, 0, 3 , #mk_handler OffsetFetch, 0, 3 - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "metadata") (handleMetadataV0 sc) - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "metadata") (handleMetadataV1 sc) - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV2 "metadata") (handleMetadataV2 sc) - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV3 "metadata") (handleMetadataV3 sc) - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV4 "metadata") (handleMetadataV4 sc) - , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "createTopics") (handleCreateTopicsV0 sc) , K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "deleteTopics") (handleDeleteTopicsV0 sc) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs index 831080389..644c67fdd 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs @@ -1,17 +1,11 @@ {-# LANGUAGE OverloadedRecordDot #-} -{-# LANGUAGE OverloadedStrings #-} module HStream.Kafka.Server.Handler.Basic ( -- 18: ApiVersions handleApiVersions -- 3: Metadata - , handleMetadataV0 - , handleMetadataV1 - , handleMetadataV2 - , handleMetadataV3 - , handleMetadataV4 - - -- + , handleMetadata + -- 32: DescribeConfigs , handleDescribeConfigs ) where @@ -64,94 +58,70 @@ handleApiVersions _ _ _ = do pure $ K.ApiVersionsResponse K.NONE apiKeys 0{- throttle_time_ms -} K.EmptyTaggedFields +apiVersionV0To :: K.ApiVersionV0 -> K.ApiVersion +apiVersionV0To K.ApiVersionV0{..} = + let taggedFields = K.EmptyTaggedFields in K.ApiVersion{..} + -------------------- -- 3: Metadata -------------------- -handleMetadataV0 - :: ServerContext -> K.RequestContext -> K.MetadataRequestV0 -> IO K.MetadataResponseV0 -handleMetadataV0 ctx reqCtx req = do - -- In version 0, an empty array indicates "request metadata for all topics." In version 1 and - -- higher, an empty array indicates "request metadata for no topics," and a null array is used to - -- indiate "request metadata for all topics." - let K.NonNullKaArray topicVec = req.topics - v1Topics = if (V.null topicVec) then K.KaArray Nothing else K.NonNullKaArray topicVec - v1Req = K.MetadataRequestV0 {topics=v1Topics} - (K.MetadataResponseV1 (K.KaArray brokers) _ (K.KaArray topics)) <- handleMetadataV1 ctx reqCtx v1Req - return $ K.MetadataResponseV0 (K.KaArray $ V.map respBrokerV1toV0 <$> brokers) - (K.KaArray $ V.map respTopicV1toV0 <$> topics) - - where - respBrokerV1toV0 :: K.MetadataResponseBrokerV1 -> K.MetadataResponseBrokerV0 - respBrokerV1toV0 K.MetadataResponseBrokerV1{..} = - K.MetadataResponseBrokerV0 nodeId host port - - respTopicV1toV0 :: K.MetadataResponseTopicV1 -> K.MetadataResponseTopicV0 - respTopicV1toV0 K.MetadataResponseTopicV1{..} = - K.MetadataResponseTopicV0 errorCode name partitions - -handleMetadataV1 - :: ServerContext -> K.RequestContext -> K.MetadataRequestV1 -> IO K.MetadataResponseV1 -handleMetadataV1 ctx reqCtx req = do - respV3 <- handleMetadataV3 ctx reqCtx req - return $ K.MetadataResponseV1 { - controllerId=respV3.controllerId - , topics=respV3.topics - , brokers=respV3.brokers} - -handleMetadataV2 - :: ServerContext -> K.RequestContext -> K.MetadataRequestV2 -> IO K.MetadataResponseV2 -handleMetadataV2 ctx reqCtx req = do - respV3 <- handleMetadataV3 ctx reqCtx req - return $ K.MetadataResponseV2 { - controllerId=respV3.controllerId - , clusterId=respV3.clusterId - , topics=respV3.topics - , brokers=respV3.brokers} - -handleMetadataV3 - :: ServerContext -> K.RequestContext -> K.MetadataRequestV3 -> IO K.MetadataResponseV3 -handleMetadataV3 ctx reqCtx req = do - let reqV4 = K.MetadataRequestV4 {allowAutoTopicCreation=False, topics=req.topics} - handleMetadataV4 ctx reqCtx reqV4 - -handleMetadataV4 - :: ServerContext -> K.RequestContext -> K.MetadataRequestV4 -> IO K.MetadataResponseV4 -handleMetadataV4 ctx@ServerContext{..} _ req@K.MetadataRequestV4{..} = do +handleMetadata + :: ServerContext -> K.RequestContext + -> K.MetadataRequest -> IO K.MetadataResponse +handleMetadata ctx reqCtx req = do + -- In version 0, + -- an empty array indicates "request metadata for all topics." + -- a null array will cause an error. + -- + -- In version 1 and higher, + -- an empty array indicates "request metadata for no topics,", and + -- a null array is used to indiate "request metadata for all topics." + let reqTopics = if reqCtx.apiVersion >= 1 + then req.topics + else let K.NonNullKaArray topicVec = req.topics + in if V.null topicVec then K.KaArray Nothing + else K.NonNullKaArray topicVec + -- FIXME: `serverID` is a `Word32` but kafka expects an `Int32`, + -- causing a potential overflow. + ctlId = fromIntegral ctx.serverID respBrokers <- getBrokers - -- FIXME: `serverID` is a `Word32` but kafka expects an `Int32`, - -- causing a potential overflow. - let ctlId = fromIntegral serverID - let reqTopics = req.topics case reqTopics of K.KaArray Nothing -> returnAllTopics respBrokers ctlId K.KaArray (Just v) - | V.null v -> return $ K.MetadataResponseV3 { - throttleTimeMs=0 - , clusterId=Nothing - , controllerId=ctlId - , topics=K.NonNullKaArray V.empty - , brokers=K.NonNullKaArray respBrokers + | V.null v -> return $ K.MetadataResponse + { throttleTimeMs = 0 + , clusterId = Nothing + , controllerId = ctlId + , topics = K.NonNullKaArray V.empty + , brokers = K.NonNullKaArray respBrokers } | otherwise -> do - let topicNames = S.fromList . V.toList $ V.map (\K.MetadataRequestTopicV0{..} -> name) v - allStreamNames <- S.findStreams scLDClient S.StreamTypeTopic <&> S.fromList . L.map (Utils.cBytesToText . S.streamName) + let topicNames = S.fromList . V.toList $ + V.map (\K.MetadataRequestTopic{..} -> name) v + allStreamNames <- S.findStreams ctx.scLDClient S.StreamTypeTopic <&> S.fromList . L.map (Utils.cBytesToText . S.streamName) let needCreate = S.toList $ topicNames S.\\ allStreamNames - let alreadyExist = V.fromList . S.toList $ topicNames `S.intersection` allStreamNames + alreadyExist = V.fromList . S.toList $ topicNames `S.intersection` allStreamNames + kafkaBrokerConfigs = ctx.kafkaBrokerConfigs createResp <- - if kafkaBrokerConfigs.autoCreateTopicsEnable._value && allowAutoTopicCreation + if kafkaBrokerConfigs.autoCreateTopicsEnable._value && req.allowAutoTopicCreation then do let defaultReplicas = kafkaBrokerConfigs.defaultReplicationFactor._value defaultNumPartitions = kafkaBrokerConfigs.numPartitions._value resp <- forM needCreate $ \topic -> do (code, shards) <- createTopic ctx topic (fromIntegral defaultReplicas) (fromIntegral defaultNumPartitions) Map.empty if code /= K.NONE - then do - return $ K.MetadataResponseTopicV1 code topic False K.emptyKaArray + then + return $ K.MetadataResponseTopic + { errorCode = code + , name = topic + , partitions = K.emptyKaArray + , isInternal = False + } else mkResponse topic (V.fromList shards) return $ V.fromList resp else do - let f topic acc = K.MetadataResponseTopicV1 K.UNKNOWN_TOPIC_OR_PARTITION topic False K.emptyKaArray : acc + let f topic acc = K.MetadataResponseTopic K.UNKNOWN_TOPIC_OR_PARTITION topic K.emptyKaArray False : acc return . V.fromList $ FD.foldr' f [] needCreate unless (V.null createResp) $ Log.info $ "auto create topic response: " <> Log.build (show createResp) @@ -159,68 +129,74 @@ handleMetadataV4 ctx@ServerContext{..} _ req@K.MetadataRequestV4{..} = do let respTopics' = respTopics <> createResp -- return $ K.MetadataResponseV4 (K.KaArray $ Just respBrokers) ctlId (K.KaArray $ Just respTopics) -- TODO: implement read cluster id - return $ K.MetadataResponseV3 { - clusterId=Nothing - , controllerId=ctlId - , throttleTimeMs=0 - , topics=K.NonNullKaArray respTopics' - , brokers=K.NonNullKaArray respBrokers + return $ K.MetadataResponse + { clusterId = Nothing + , controllerId = ctlId + , throttleTimeMs = 0 + , topics = K.NonNullKaArray respTopics' + , brokers = K.NonNullKaArray respBrokers } where - returnAllTopics :: V.Vector K.MetadataResponseBrokerV1 + returnAllTopics :: V.Vector K.MetadataResponseBroker -> Int32 - -> IO K.MetadataResponseV3 + -> IO K.MetadataResponse returnAllTopics respBrokers_ ctlId_ = do -- FIXME: `serverID` is a `Word32` but kafka expects an `Int32`, -- causing a potential overflow. - allStreamNames <- S.findStreams scLDClient S.StreamTypeTopic <&> (fmap (Utils.cBytesToText . S.streamName)) + allStreamNames <- S.findStreams ctx.scLDClient S.StreamTypeTopic <&> (fmap (Utils.cBytesToText . S.streamName)) respTopics <- forM allStreamNames getRespTopic <&> V.fromList -- return $ K.MetadataResponseV1 (K.KaArray $ Just respBrokers_) ctlId_ (K.KaArray $ Just respTopics) - return $ K.MetadataResponseV3 { - clusterId=Nothing - , controllerId=ctlId_ - , throttleTimeMs=0 - , topics=K.NonNullKaArray respTopics - , brokers=K.NonNullKaArray respBrokers_ + return $ K.MetadataResponse + { clusterId = Nothing + , controllerId = ctlId_ + , throttleTimeMs = 0 + , topics = K.NonNullKaArray respTopics + , brokers = K.NonNullKaArray respBrokers_ } - getBrokers :: IO (V.Vector K.MetadataResponseBrokerV1) + getBrokers :: IO (V.Vector K.MetadataResponseBroker) getBrokers = do - (nodes, _) <- Gossip.describeCluster gossipContext scAdvertisedListenersKey + (nodes, _) <- Gossip.describeCluster ctx.gossipContext ctx.scAdvertisedListenersKey let brokers = V.map (\A.ServerNode{..} -> - K.MetadataResponseBrokerV1 - { nodeId = fromIntegral serverNodeId - , host = serverNodeHost - , port = fromIntegral serverNodePort - , rack = Nothing + K.MetadataResponseBroker + { nodeId = fromIntegral serverNodeId + , host = serverNodeHost + , port = fromIntegral serverNodePort + , rack = Nothing } ) nodes return brokers - getRespTopic :: Text -> IO K.MetadataResponseTopicV1 + getRespTopic :: Text -> IO K.MetadataResponseTopic getRespTopic topicName = do let streamId = S.transToTopicStreamName topicName - shards_e <- try ((V.map snd) <$> S.listStreamPartitionsOrdered scLDClient streamId) + errTopicResp code = K.MetadataResponseTopic + { errorCode = code + , name = topicName + , partitions = K.emptyKaArray + , isInternal = False + } + shards_e <- try ((V.map snd) <$> S.listStreamPartitionsOrdered ctx.scLDClient streamId) case shards_e of -- FIXME: Are the following error codes proper? -- FIXME: We passed `Nothing` as partitions when an error occurs. Is this proper? Left (e :: SomeException) | Just (_ :: S.NOTFOUND) <- fromException e -> - return $ K.MetadataResponseTopicV1 K.UNKNOWN_TOPIC_OR_PARTITION topicName False K.emptyKaArray + return $ errTopicResp K.UNKNOWN_TOPIC_OR_PARTITION | otherwise -> - return $ K.MetadataResponseTopicV1 K.UNKNOWN_SERVER_ERROR topicName False K.emptyKaArray + return $ errTopicResp K.UNKNOWN_SERVER_ERROR Right shards | V.null shards -> - return $ K.MetadataResponseTopicV1 K.INVALID_TOPIC_EXCEPTION topicName False K.emptyKaArray + return $ errTopicResp K.INVALID_TOPIC_EXCEPTION | V.length shards > fromIntegral (maxBound :: Int32) -> - return $ K.MetadataResponseTopicV1 K.INVALID_PARTITIONS topicName False K.emptyKaArray + return $ errTopicResp K.INVALID_PARTITIONS | otherwise -> mkResponse topicName shards mkResponse topicName shards = do respPartitions <- V.iforM shards $ \idx shardId -> do - theNode <- lookupKafkaPersist metaHandle gossipContext - loadBalanceHashRing scAdvertisedListenersKey + theNode <- lookupKafkaPersist ctx.metaHandle ctx.gossipContext + ctx.loadBalanceHashRing ctx.scAdvertisedListenersKey (KafkaResTopic $ Text.pack $ show shardId) -- FIXME: Convert from `Word32` to `Int32`, possible overflow! when ((A.serverNodeId theNode) > fromIntegral (maxBound :: Int32)) $ @@ -228,20 +204,21 @@ handleMetadataV4 ctx@ServerContext{..} _ req@K.MetadataRequestV4{..} = do <> " is too large, it should be less than " <> Log.build (maxBound :: Int32) let (theNodeId :: Int32) = fromIntegral (A.serverNodeId theNode) - pure $ K.MetadataResponsePartitionV0 - { errorCode = K.NONE - , partitionIndex = (fromIntegral idx) - , leaderId = theNodeId - , replicaNodes = K.KaArray $ Just (V.singleton theNodeId) -- FIXME: what should it be? - , isrNodes = K.KaArray $ Just (V.singleton theNodeId) -- FIXME: what should it be? + pure $ K.MetadataResponsePartition + { errorCode = K.NONE + , partitionIndex = (fromIntegral idx) + , leaderId = theNodeId + , replicaNodes = K.KaArray $ Just (V.singleton theNodeId) -- FIXME: what should it be? + , isrNodes = K.KaArray $ Just (V.singleton theNodeId) -- FIXME: what should it be? + , offlineReplicas = K.KaArray $ Just V.empty -- TODO } return $ - K.MetadataResponseTopicV1 K.NONE topicName False (K.KaArray $ Just respPartitions) -------------------------------------------------------------------------------- - -apiVersionV0To :: K.ApiVersionV0 -> K.ApiVersion -apiVersionV0To K.ApiVersionV0{..} = - let taggedFields = K.EmptyTaggedFields in K.ApiVersion{..} + K.MetadataResponseTopic + { errorCode = K.NONE + , name = topicName + , partitions = (K.KaArray $ Just respPartitions) + , isInternal = False + } --------------------------------------------------------------------------- -- 32: DescribeConfigs @@ -262,4 +239,3 @@ handleDescribeConfigs serverCtx _ req = do else return $ KCM.getErrorResponse KC.BROKER resource.resourceName ("invalid broker id:" <> resource.resourceName) rt -> return $ KCM.getErrorResponse rt resource.resourceName ("unsupported resource type:" <> T.pack (show rt)) return $ K.DescribeConfigsResponse {results=K.NonNullKaArray results, throttleTimeMs=0} - diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index 9c7832839..2ed877fe4 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -415,6 +415,8 @@ type MetadataRequestTopicV3 = MetadataRequestTopicV0 type MetadataRequestTopicV4 = MetadataRequestTopicV0 +type MetadataRequestTopicV5 = MetadataRequestTopicV0 + data MetadataResponseBrokerV0 = MetadataResponseBrokerV0 { nodeId :: {-# UNPACK #-} !Int32 -- ^ The broker ID. @@ -493,6 +495,36 @@ type MetadataResponsePartitionV4 = MetadataResponsePartitionV0 type MetadataResponseTopicV4 = MetadataResponseTopicV1 +type MetadataResponseBrokerV5 = MetadataResponseBrokerV1 + +data MetadataResponsePartitionV5 = MetadataResponsePartitionV5 + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The partition error, or 0 if there was no error. + , partitionIndex :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , leaderId :: {-# UNPACK #-} !Int32 + -- ^ The ID of the leader broker. + , replicaNodes :: !(KaArray Int32) + -- ^ The set of all nodes that host this partition. + , isrNodes :: !(KaArray Int32) + -- ^ The set of nodes that are in sync with the leader for this partition. + , offlineReplicas :: !(KaArray Int32) + -- ^ The set of offline replicas of this partition. + } deriving (Show, Eq, Generic) +instance Serializable MetadataResponsePartitionV5 + +data MetadataResponseTopicV5 = MetadataResponseTopicV5 + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The topic error, or 0 if there was no error. + , name :: !Text + -- ^ The topic name. + , isInternal :: Bool + -- ^ True if the topic is internal. + , partitions :: !(KaArray MetadataResponsePartitionV5) + -- ^ Each partition in the topic. + } deriving (Show, Eq, Generic) +instance Serializable MetadataResponseTopicV5 + data OffsetCommitRequestPartitionV0 = OffsetCommitRequestPartitionV0 { partitionIndex :: {-# UNPACK #-} !Int32 -- ^ The partition index. @@ -1111,6 +1143,8 @@ data MetadataRequestV4 = MetadataRequestV4 } deriving (Show, Eq, Generic) instance Serializable MetadataRequestV4 +type MetadataRequestV5 = MetadataRequestV4 + data MetadataResponseV0 = MetadataResponseV0 { brokers :: !(KaArray MetadataResponseBrokerV0) -- ^ Each broker in the response. @@ -1158,6 +1192,21 @@ instance Serializable MetadataResponseV3 type MetadataResponseV4 = MetadataResponseV3 +data MetadataResponseV5 = MetadataResponseV5 + { throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , brokers :: !(KaArray MetadataResponseBrokerV1) + -- ^ Each broker in the response. + , clusterId :: !NullableString + -- ^ The cluster ID that responding broker belongs to. + , controllerId :: {-# UNPACK #-} !Int32 + -- ^ The ID of the controller broker. + , topics :: !(KaArray MetadataResponseTopicV5) + -- ^ Each topic in the response. + } deriving (Show, Eq, Generic) +instance Serializable MetadataResponseV5 + data OffsetCommitRequestV0 = OffsetCommitRequestV0 { groupId :: !Text -- ^ The unique group identifier. @@ -1817,6 +1866,21 @@ instance HasMethodImpl HStreamKafkaV4 "metadata" where type MethodInput HStreamKafkaV4 "metadata" = MetadataRequestV4 type MethodOutput HStreamKafkaV4 "metadata" = MetadataResponseV4 +data HStreamKafkaV5 + +instance Service HStreamKafkaV5 where + type ServiceName HStreamKafkaV5 = "HStreamKafkaV5" + type ServiceMethods HStreamKafkaV5 = + '[ "metadata" + ] + +instance HasMethodImpl HStreamKafkaV5 "metadata" where + type MethodName HStreamKafkaV5 "metadata" = "metadata" + type MethodKey HStreamKafkaV5 "metadata" = 3 + type MethodVersion HStreamKafkaV5 "metadata" = 5 + type MethodInput HStreamKafkaV5 "metadata" = MetadataRequestV5 + type MethodOutput HStreamKafkaV5 "metadata" = MetadataResponseV5 + ------------------------------------------------------------------------------- newtype ApiKey = ApiKey Int16 @@ -1850,7 +1914,7 @@ supportedApiVersions = [ ApiVersionV0 (ApiKey 0) 0 3 , ApiVersionV0 (ApiKey 1) 0 4 , ApiVersionV0 (ApiKey 2) 0 1 - , ApiVersionV0 (ApiKey 3) 0 4 + , ApiVersionV0 (ApiKey 3) 0 5 , ApiVersionV0 (ApiKey 8) 0 3 , ApiVersionV0 (ApiKey 9) 0 3 , ApiVersionV0 (ApiKey 10) 0 0 @@ -1886,6 +1950,7 @@ getHeaderVersion (ApiKey (3)) 1 = (1, 0) getHeaderVersion (ApiKey (3)) 2 = (1, 0) getHeaderVersion (ApiKey (3)) 3 = (1, 0) getHeaderVersion (ApiKey (3)) 4 = (1, 0) +getHeaderVersion (ApiKey (3)) 5 = (1, 0) getHeaderVersion (ApiKey (8)) 0 = (1, 0) getHeaderVersion (ApiKey (8)) 1 = (1, 0) getHeaderVersion (ApiKey (8)) 2 = (1, 0) diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs index 9f8e51059..d4db50630 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -760,6 +760,8 @@ metadataRequestTopicToV3 :: MetadataRequestTopic -> MetadataRequestTopicV3 metadataRequestTopicToV3 = metadataRequestTopicToV0 metadataRequestTopicToV4 :: MetadataRequestTopic -> MetadataRequestTopicV4 metadataRequestTopicToV4 = metadataRequestTopicToV0 +metadataRequestTopicToV5 :: MetadataRequestTopic -> MetadataRequestTopicV5 +metadataRequestTopicToV5 = metadataRequestTopicToV0 metadataRequestTopicFromV0 :: MetadataRequestTopicV0 -> MetadataRequestTopic metadataRequestTopicFromV0 x = MetadataRequestTopic @@ -773,6 +775,8 @@ metadataRequestTopicFromV3 :: MetadataRequestTopicV3 -> MetadataRequestTopic metadataRequestTopicFromV3 = metadataRequestTopicFromV0 metadataRequestTopicFromV4 :: MetadataRequestTopicV4 -> MetadataRequestTopic metadataRequestTopicFromV4 = metadataRequestTopicFromV0 +metadataRequestTopicFromV5 :: MetadataRequestTopicV5 -> MetadataRequestTopic +metadataRequestTopicFromV5 = metadataRequestTopicFromV0 data MetadataResponseBroker = MetadataResponseBroker { nodeId :: {-# UNPACK #-} !Int32 @@ -805,6 +809,8 @@ metadataResponseBrokerToV3 :: MetadataResponseBroker -> MetadataResponseBrokerV3 metadataResponseBrokerToV3 = metadataResponseBrokerToV1 metadataResponseBrokerToV4 :: MetadataResponseBroker -> MetadataResponseBrokerV4 metadataResponseBrokerToV4 = metadataResponseBrokerToV1 +metadataResponseBrokerToV5 :: MetadataResponseBroker -> MetadataResponseBrokerV5 +metadataResponseBrokerToV5 = metadataResponseBrokerToV1 metadataResponseBrokerFromV0 :: MetadataResponseBrokerV0 -> MetadataResponseBroker metadataResponseBrokerFromV0 x = MetadataResponseBroker @@ -826,18 +832,22 @@ metadataResponseBrokerFromV3 :: MetadataResponseBrokerV3 -> MetadataResponseBrok metadataResponseBrokerFromV3 = metadataResponseBrokerFromV1 metadataResponseBrokerFromV4 :: MetadataResponseBrokerV4 -> MetadataResponseBroker metadataResponseBrokerFromV4 = metadataResponseBrokerFromV1 +metadataResponseBrokerFromV5 :: MetadataResponseBrokerV5 -> MetadataResponseBroker +metadataResponseBrokerFromV5 = metadataResponseBrokerFromV1 data MetadataResponsePartition = MetadataResponsePartition - { errorCode :: {-# UNPACK #-} !ErrorCode + { errorCode :: {-# UNPACK #-} !ErrorCode -- ^ The partition error, or 0 if there was no error. - , partitionIndex :: {-# UNPACK #-} !Int32 + , partitionIndex :: {-# UNPACK #-} !Int32 -- ^ The partition index. - , leaderId :: {-# UNPACK #-} !Int32 + , leaderId :: {-# UNPACK #-} !Int32 -- ^ The ID of the leader broker. - , replicaNodes :: !(KaArray Int32) + , replicaNodes :: !(KaArray Int32) -- ^ The set of all nodes that host this partition. - , isrNodes :: !(KaArray Int32) + , isrNodes :: !(KaArray Int32) -- ^ The set of nodes that are in sync with the leader for this partition. + , offlineReplicas :: !(KaArray Int32) + -- ^ The set of offline replicas of this partition. } deriving (Show, Eq, Generic) instance Serializable MetadataResponsePartition @@ -857,6 +867,15 @@ metadataResponsePartitionToV3 :: MetadataResponsePartition -> MetadataResponsePa metadataResponsePartitionToV3 = metadataResponsePartitionToV0 metadataResponsePartitionToV4 :: MetadataResponsePartition -> MetadataResponsePartitionV4 metadataResponsePartitionToV4 = metadataResponsePartitionToV0 +metadataResponsePartitionToV5 :: MetadataResponsePartition -> MetadataResponsePartitionV5 +metadataResponsePartitionToV5 x = MetadataResponsePartitionV5 + { errorCode = x.errorCode + , partitionIndex = x.partitionIndex + , leaderId = x.leaderId + , replicaNodes = x.replicaNodes + , isrNodes = x.isrNodes + , offlineReplicas = x.offlineReplicas + } metadataResponsePartitionFromV0 :: MetadataResponsePartitionV0 -> MetadataResponsePartition metadataResponsePartitionFromV0 x = MetadataResponsePartition @@ -865,6 +884,7 @@ metadataResponsePartitionFromV0 x = MetadataResponsePartition , leaderId = x.leaderId , replicaNodes = x.replicaNodes , isrNodes = x.isrNodes + , offlineReplicas = KaArray (Just V.empty) } metadataResponsePartitionFromV1 :: MetadataResponsePartitionV1 -> MetadataResponsePartition metadataResponsePartitionFromV1 = metadataResponsePartitionFromV0 @@ -874,6 +894,15 @@ metadataResponsePartitionFromV3 :: MetadataResponsePartitionV3 -> MetadataRespon metadataResponsePartitionFromV3 = metadataResponsePartitionFromV0 metadataResponsePartitionFromV4 :: MetadataResponsePartitionV4 -> MetadataResponsePartition metadataResponsePartitionFromV4 = metadataResponsePartitionFromV0 +metadataResponsePartitionFromV5 :: MetadataResponsePartitionV5 -> MetadataResponsePartition +metadataResponsePartitionFromV5 x = MetadataResponsePartition + { errorCode = x.errorCode + , partitionIndex = x.partitionIndex + , leaderId = x.leaderId + , replicaNodes = x.replicaNodes + , isrNodes = x.isrNodes + , offlineReplicas = x.offlineReplicas + } data MetadataResponseTopic = MetadataResponseTopic { errorCode :: {-# UNPACK #-} !ErrorCode @@ -906,6 +935,13 @@ metadataResponseTopicToV3 :: MetadataResponseTopic -> MetadataResponseTopicV3 metadataResponseTopicToV3 = metadataResponseTopicToV1 metadataResponseTopicToV4 :: MetadataResponseTopic -> MetadataResponseTopicV4 metadataResponseTopicToV4 = metadataResponseTopicToV1 +metadataResponseTopicToV5 :: MetadataResponseTopic -> MetadataResponseTopicV5 +metadataResponseTopicToV5 x = MetadataResponseTopicV5 + { errorCode = x.errorCode + , name = x.name + , isInternal = x.isInternal + , partitions = fmap metadataResponsePartitionToV5 x.partitions + } metadataResponseTopicFromV0 :: MetadataResponseTopicV0 -> MetadataResponseTopic metadataResponseTopicFromV0 x = MetadataResponseTopic @@ -927,6 +963,13 @@ metadataResponseTopicFromV3 :: MetadataResponseTopicV3 -> MetadataResponseTopic metadataResponseTopicFromV3 = metadataResponseTopicFromV1 metadataResponseTopicFromV4 :: MetadataResponseTopicV4 -> MetadataResponseTopic metadataResponseTopicFromV4 = metadataResponseTopicFromV1 +metadataResponseTopicFromV5 :: MetadataResponseTopicV5 -> MetadataResponseTopic +metadataResponseTopicFromV5 x = MetadataResponseTopic + { errorCode = x.errorCode + , name = x.name + , partitions = fmap metadataResponsePartitionFromV5 x.partitions + , isInternal = x.isInternal + } data OffsetCommitRequestPartition = OffsetCommitRequestPartition { partitionIndex :: {-# UNPACK #-} !Int32 @@ -2286,6 +2329,8 @@ metadataRequestToV4 x = MetadataRequestV4 { topics = fmap metadataRequestTopicToV4 x.topics , allowAutoTopicCreation = x.allowAutoTopicCreation } +metadataRequestToV5 :: MetadataRequest -> MetadataRequestV5 +metadataRequestToV5 = metadataRequestToV4 metadataRequestFromV0 :: MetadataRequestV0 -> MetadataRequest metadataRequestFromV0 x = MetadataRequest @@ -2303,6 +2348,8 @@ metadataRequestFromV4 x = MetadataRequest { topics = fmap metadataRequestTopicFromV4 x.topics , allowAutoTopicCreation = x.allowAutoTopicCreation } +metadataRequestFromV5 :: MetadataRequestV5 -> MetadataRequest +metadataRequestFromV5 = metadataRequestFromV4 data MetadataResponse = MetadataResponse { brokers :: !(KaArray MetadataResponseBroker) @@ -2347,6 +2394,14 @@ metadataResponseToV3 x = MetadataResponseV3 } metadataResponseToV4 :: MetadataResponse -> MetadataResponseV4 metadataResponseToV4 = metadataResponseToV3 +metadataResponseToV5 :: MetadataResponse -> MetadataResponseV5 +metadataResponseToV5 x = MetadataResponseV5 + { throttleTimeMs = x.throttleTimeMs + , brokers = fmap metadataResponseBrokerToV5 x.brokers + , clusterId = x.clusterId + , controllerId = x.controllerId + , topics = fmap metadataResponseTopicToV5 x.topics + } metadataResponseFromV0 :: MetadataResponseV0 -> MetadataResponse metadataResponseFromV0 x = MetadataResponse @@ -2382,6 +2437,14 @@ metadataResponseFromV3 x = MetadataResponse } metadataResponseFromV4 :: MetadataResponseV4 -> MetadataResponse metadataResponseFromV4 = metadataResponseFromV3 +metadataResponseFromV5 :: MetadataResponseV5 -> MetadataResponse +metadataResponseFromV5 x = MetadataResponse + { brokers = fmap metadataResponseBrokerFromV5 x.brokers + , topics = fmap metadataResponseTopicFromV5 x.topics + , controllerId = x.controllerId + , clusterId = x.clusterId + , throttleTimeMs = x.throttleTimeMs + } data OffsetCommitRequest = OffsetCommitRequest { groupId :: !Text diff --git a/script/kafka_gen.py b/script/kafka_gen.py index 7a672959e..58b1c9168 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -109,7 +109,7 @@ def get_field_default(field_type, default=None): GLOBAL_API_VERSION_PATCH = (0, 0) API_VERSION_PATCHES = { "ApiVersions": (0, 3), - "Metadata": (0, 4), + "Metadata": (0, 5), "Produce": (0, 3), "Fetch": (0, 4), "OffsetFetch": (0, 3),