Skip to content

Commit

Permalink
kafka: upgrade ListOffsets to version 2 (#1727)
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jan 3, 2024
1 parent 2ce2e5a commit a958435
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 41 deletions.
5 changes: 2 additions & 3 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import qualified Kafka.Protocol.Service as K
-------------------------------------------------------------------------------

#cv_handler ApiVersions, 0, 3
#cv_handler ListOffsets, 0, 2
#cv_handler Metadata, 0, 5
#cv_handler Produce, 0, 3
#cv_handler InitProducerId, 0, 0
Expand All @@ -83,6 +84,7 @@ import qualified Kafka.Protocol.Service as K
handlers :: ServerContext -> [K.ServiceHandler]
handlers sc =
[ #mk_handler ApiVersions, 0, 3
, #mk_handler ListOffsets, 0, 2
, #mk_handler Metadata, 0, 5
-- Write
, #mk_handler Produce, 0, 3
Expand All @@ -106,9 +108,6 @@ handlers sc =
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "createTopics") (handleCreateTopicsV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "deleteTopics") (handleDeleteTopicsV0 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "listOffsets") (handleListOffsetsV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "listOffsets") (handleListOffsetsV1 sc)

, K.hd (K.RPC :: K.RPC K.HStreamKafkaV0 "saslHandshake") (handleAfterAuthSaslHandshakeV0 sc)
, K.hd (K.RPC :: K.RPC K.HStreamKafkaV1 "saslHandshake") (handleAfterAuthSaslHandshakeV1 sc)

Expand Down
69 changes: 37 additions & 32 deletions hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@

module HStream.Kafka.Server.Handler.Offset
( handleOffsetCommit

, handleOffsetFetch

, handleListOffsetsV0
, handleListOffsetsV1
, handleListOffsets
)
where

Expand All @@ -20,7 +17,7 @@ import qualified Data.Vector as V
import HStream.Kafka.Common.OffsetManager (getLatestOffset,
getOffsetByTimestamp,
getOldestOffset)
import qualified HStream.Kafka.Common.Utils as Utils
import HStream.Kafka.Common.Utils (mapKaArray)
import qualified HStream.Kafka.Group.GroupCoordinator as GC
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Store as S
Expand All @@ -37,52 +34,60 @@ pattern LatestTimestamp = (-1)
pattern EarliestTimestamp :: Int64
pattern EarliestTimestamp = (-2)

handleListOffsetsV0
:: ServerContext -> K.RequestContext -> K.ListOffsetsRequestV0 -> IO K.ListOffsetsResponseV0
handleListOffsetsV0 sc _ K.ListOffsetsRequestV0{..} = do
case K.unKaArray topics of
Nothing -> undefined
-- ^ check kafka
Just topics' -> do
res <- V.forM topics' $ \K.ListOffsetsTopicV0 {..} -> do
listOffsetTopicPartitions sc name (K.unKaArray (Utils.mapKaArray convertRequestPartition partitions))
return $ K.ListOffsetsResponseV0 {topics = K.NonNullKaArray (V.map convertTopic res)}
where convertRequestPartition p = K.ListOffsetsPartitionV1 {timestamp=p.timestamp, partitionIndex=p.partitionIndex}
convertTopic topic = K.ListOffsetsTopicResponseV0 {partitions=Utils.mapKaArray convertResponsePartition topic.partitions, name=topic.name}
convertResponsePartition p = K.ListOffsetsPartitionResponseV0
{ errorCode=0
, oldStyleOffsets=K.NonNullKaArray (V.singleton p.offset)
, partitionIndex=p.partitionIndex
}
handleListOffsets
:: ServerContext -> K.RequestContext -> K.ListOffsetsRequest -> IO K.ListOffsetsResponse
handleListOffsets sc reqCtx req
| reqCtx.apiVersion >= 2 && req.isolationLevel /= 0 = return $ mkErrResponse req
| otherwise = listOffsets sc reqCtx req
where
mkErrResponse K.ListOffsetsRequest{..} =
let topicsRsp = mapKaArray (\K.ListOffsetsTopic{..} ->
let partitionsRsp = mapKaArray (\K.ListOffsetsPartition{..} ->
K.ListOffsetsPartitionResponse
{ offset = -1
, timestamp = -1
-- ^ FIXME: read record timestamp ?
, partitionIndex = partitionIndex
, errorCode = K.INVALID_REQUEST
, oldStyleOffsets = K.KaArray Nothing
}) partitions
in K.ListOffsetsTopicResponse {partitions=partitionsRsp, name=name}
) topics
in K.ListOffsetsResponse {topics=topicsRsp, throttleTimeMs=0}

handleListOffsetsV1
:: ServerContext -> K.RequestContext -> K.ListOffsetsRequestV1 -> IO K.ListOffsetsResponseV1
handleListOffsetsV1 sc _ K.ListOffsetsRequestV1{..} = do
listOffsets
:: ServerContext -> K.RequestContext -> K.ListOffsetsRequest -> IO K.ListOffsetsResponse
listOffsets sc _ K.ListOffsetsRequest{..} = do
case K.unKaArray topics of
Nothing -> undefined
-- ^ check kafka
Just topics' -> do
res <- V.forM topics' $ \K.ListOffsetsTopicV1 {..} -> do
res <- V.forM topics' $ \K.ListOffsetsTopic {..} -> do
listOffsetTopicPartitions sc name (K.unKaArray partitions)
return $ K.ListOffsetsResponseV1 {topics = K.KaArray {unKaArray = Just res}}
return $ K.ListOffsetsResponse {topics = K.KaArray {unKaArray = Just res}, throttleTimeMs = 0}

listOffsetTopicPartitions :: ServerContext -> Text -> Maybe (Vector K.ListOffsetsPartitionV1) -> IO K.ListOffsetsTopicResponseV1
listOffsetTopicPartitions
:: ServerContext
-> Text
-> Maybe (Vector K.ListOffsetsPartition)
-> IO K.ListOffsetsTopicResponse
listOffsetTopicPartitions _ topicName Nothing = do
return $ K.ListOffsetsTopicResponseV1 {partitions = K.KaArray {unKaArray = Nothing}, name = topicName}
return $ K.ListOffsetsTopicResponse {partitions = K.KaArray {unKaArray = Nothing}, name = topicName}
listOffsetTopicPartitions ServerContext{..} topicName (Just offsetsPartitions) = do
orderedParts <- S.listStreamPartitionsOrdered scLDClient (S.transToTopicStreamName topicName)
res <- V.forM offsetsPartitions $ \K.ListOffsetsPartitionV1{..} -> do
res <- V.forM offsetsPartitions $ \K.ListOffsetsPartition{..} -> do
-- TODO: handle Nothing
let partition = orderedParts V.! (fromIntegral partitionIndex)
offset <- getOffset (snd partition) timestamp
return $ K.ListOffsetsPartitionResponseV1
return $ K.ListOffsetsPartitionResponse
{ offset = offset
, timestamp = timestamp
-- ^ FIXME: read record timestamp ?
, partitionIndex = partitionIndex
, errorCode = K.NONE
, oldStyleOffsets = K.NonNullKaArray (V.singleton offset)
}
return $ K.ListOffsetsTopicResponseV1 {partitions = K.KaArray {unKaArray = Just res}, name = topicName}
return $ K.ListOffsetsTopicResponse {partitions = K.KaArray {unKaArray = Just res}, name = topicName}
where
-- NOTE: The last offset of a partition is the offset of the upcoming
-- message, i.e. the offset of the last available message + 1.
Expand Down
46 changes: 45 additions & 1 deletion hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,10 @@ data ListOffsetsTopicV1 = ListOffsetsTopicV1
} deriving (Show, Eq, Generic)
instance Serializable ListOffsetsTopicV1

type ListOffsetsPartitionV2 = ListOffsetsPartitionV1

type ListOffsetsTopicV2 = ListOffsetsTopicV1

data ListOffsetsPartitionResponseV0 = ListOffsetsPartitionResponseV0
{ partitionIndex :: {-# UNPACK #-} !Int32
-- ^ The partition index.
Expand Down Expand Up @@ -402,6 +406,10 @@ data ListOffsetsTopicResponseV1 = ListOffsetsTopicResponseV1
} deriving (Show, Eq, Generic)
instance Serializable ListOffsetsTopicResponseV1

type ListOffsetsPartitionResponseV2 = ListOffsetsPartitionResponseV1

type ListOffsetsTopicResponseV2 = ListOffsetsTopicResponseV1

newtype MetadataRequestTopicV0 = MetadataRequestTopicV0
{ name :: Text
} deriving (Show, Eq, Generic)
Expand Down Expand Up @@ -1160,6 +1168,24 @@ data ListOffsetsRequestV1 = ListOffsetsRequestV1
} deriving (Show, Eq, Generic)
instance Serializable ListOffsetsRequestV1

data ListOffsetsRequestV2 = ListOffsetsRequestV2
{ replicaId :: {-# UNPACK #-} !Int32
-- ^ The broker ID of the requestor, or -1 if this request is being made by
-- a normal consumer.
, isolationLevel :: {-# UNPACK #-} !Int8
-- ^ This setting controls the visibility of transactional records. Using
-- READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With
-- READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED
-- transactional records are visible. To be more concrete, READ_COMMITTED
-- returns all data from offsets smaller than the current LSO (last stable
-- offset), and enables the inclusion of the list of aborted transactions
-- in the result, which allows consumers to discard ABORTED transactional
-- records
, topics :: !(KaArray ListOffsetsTopicV1)
-- ^ Each topic in the request.
} deriving (Show, Eq, Generic)
instance Serializable ListOffsetsRequestV2

newtype ListOffsetsResponseV0 = ListOffsetsResponseV0
{ topics :: (KaArray ListOffsetsTopicResponseV0)
} deriving (Show, Eq, Generic)
Expand All @@ -1170,6 +1196,15 @@ newtype ListOffsetsResponseV1 = ListOffsetsResponseV1
} deriving (Show, Eq, Generic)
instance Serializable ListOffsetsResponseV1

data ListOffsetsResponseV2 = ListOffsetsResponseV2
{ throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, topics :: !(KaArray ListOffsetsTopicResponseV1)
-- ^ Each topic in the response.
} deriving (Show, Eq, Generic)
instance Serializable ListOffsetsResponseV2

newtype MetadataRequestV0 = MetadataRequestV0
{ topics :: (KaArray MetadataRequestTopicV0)
} deriving (Show, Eq, Generic)
Expand Down Expand Up @@ -1795,6 +1830,7 @@ instance Service HStreamKafkaV2 where
type ServiceMethods HStreamKafkaV2 =
'[ "produce"
, "fetch"
, "listOffsets"
, "metadata"
, "offsetCommit"
, "offsetFetch"
Expand All @@ -1816,6 +1852,13 @@ instance HasMethodImpl HStreamKafkaV2 "fetch" where
type MethodInput HStreamKafkaV2 "fetch" = FetchRequestV2
type MethodOutput HStreamKafkaV2 "fetch" = FetchResponseV2

instance HasMethodImpl HStreamKafkaV2 "listOffsets" where
type MethodName HStreamKafkaV2 "listOffsets" = "listOffsets"
type MethodKey HStreamKafkaV2 "listOffsets" = 2
type MethodVersion HStreamKafkaV2 "listOffsets" = 2
type MethodInput HStreamKafkaV2 "listOffsets" = ListOffsetsRequestV2
type MethodOutput HStreamKafkaV2 "listOffsets" = ListOffsetsResponseV2

instance HasMethodImpl HStreamKafkaV2 "metadata" where
type MethodName HStreamKafkaV2 "metadata" = "metadata"
type MethodKey HStreamKafkaV2 "metadata" = 3
Expand Down Expand Up @@ -1977,7 +2020,7 @@ supportedApiVersions :: [ApiVersionV0]
supportedApiVersions =
[ ApiVersionV0 (ApiKey 0) 0 3
, ApiVersionV0 (ApiKey 1) 0 4
, ApiVersionV0 (ApiKey 2) 0 1
, ApiVersionV0 (ApiKey 2) 0 2
, ApiVersionV0 (ApiKey 3) 0 5
, ApiVersionV0 (ApiKey 8) 0 3
, ApiVersionV0 (ApiKey 9) 0 3
Expand Down Expand Up @@ -2010,6 +2053,7 @@ getHeaderVersion (ApiKey (1)) 3 = (1, 0)
getHeaderVersion (ApiKey (1)) 4 = (1, 0)
getHeaderVersion (ApiKey (2)) 0 = (1, 0)
getHeaderVersion (ApiKey (2)) 1 = (1, 0)
getHeaderVersion (ApiKey (2)) 2 = (1, 0)
getHeaderVersion (ApiKey (3)) 0 = (1, 0)
getHeaderVersion (ApiKey (3)) 1 = (1, 0)
getHeaderVersion (ApiKey (3)) 2 = (1, 0)
Expand Down
Loading

0 comments on commit a958435

Please sign in to comment.