Skip to content

Commit

Permalink
Upgrade Produce to v3 & Fetch to v4 (#1706)
Browse files Browse the repository at this point in the history
* add apiVersion in RequestContext

* breaking change: add version in on-disk format
  • Loading branch information
4eUeP committed Dec 7, 2023
1 parent 88dc6af commit fa52319
Show file tree
Hide file tree
Showing 10 changed files with 384 additions and 62 deletions.
9 changes: 5 additions & 4 deletions hstream-kafka/HStream/Kafka/Common/RecordFormat.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@ import qualified Kafka.Protocol.Encoding as K

-- on-disk format
data RecordFormat = RecordFormat
{ offset :: Int64
, batchLength :: Int32
, recordBytes :: K.CompactBytes
{ version :: {-# UNPACK #-} !Int8
, offset :: {-# UNPACK #-} !Int64
, batchLength :: {-# UNPACK #-} !Int32
, recordBytes :: !K.CompactBytes
} deriving (Generic, Show)

instance K.Serializable RecordFormat

seekBatch :: Int32 -> ByteString -> IO ByteString
seekBatch i bs =
let parser = replicateM_ (fromIntegral i) $ do
void $ K.takeBytes 8{- baseOffset: Int64 -}
void $ K.takeBytes 9{- version(1) + offset(8) -}
len <- K.get @Int32
void $ K.takeBytes (fromIntegral len)
in snd <$> K.runParser' parser bs
Expand Down
3 changes: 2 additions & 1 deletion hstream-kafka/HStream/Kafka/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ runServer opts sc mkPreAuthedHandlers mkAuthedHandlers =
talk (peer, hds) s

runHandler peer handlers reqHeader@RequestHeader{..} reqBs = do
Log.debug $ "Received request header: " <> Log.buildString' reqHeader
P.incCounter totalRequests
Log.debug $ "receive requestHeader: " <> Log.build (show reqHeader)
let ServiceHandler{..} = findHandler handlers requestApiKey requestApiVersion
case rpcHandler of
UnaryHandler rpcHandler' -> do
Expand All @@ -139,6 +139,7 @@ runServer opts sc mkPreAuthedHandlers mkAuthedHandlers =
RequestContext
{ clientId = requestClientId
, clientHost = showSockAddrHost peer
, apiVersion = requestApiVersion
}
resp <- rpcHandler' reqContext req
Log.debug $ "Server response: " <> Log.buildString' resp
Expand Down
8 changes: 4 additions & 4 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ import qualified Kafka.Protocol.Service as K
-------------------------------------------------------------------------------

#cv_handler ApiVersions, 0, 3
#cv_handler Produce, 0, 2
#cv_handler Fetch, 0, 3
#cv_handler Produce, 0, 3
#cv_handler Fetch, 0, 4
#cv_handler DescribeConfigs, 0, 0

#cv_handler SaslHandshake, 0, 1
Expand All @@ -78,9 +78,9 @@ handlers :: ServerContext -> [K.ServiceHandler]
handlers sc =
[ #mk_handler ApiVersions, 0, 3
-- Write
, #mk_handler Produce, 0, 2
, #mk_handler Produce, 0, 3
-- Read
, #mk_handler Fetch, 0, 3
, #mk_handler Fetch, 0, 4

, #mk_handler FindCoordinator, 0, 0

Expand Down
76 changes: 57 additions & 19 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -133,28 +133,24 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
Right (_startlsn, _endlsn, hioffset) -> do
mgv <- HT.lookup readRecords logid
case mgv of
Nothing -> pure $ K.PartitionData p.partition K.NONE hioffset (Just "")
Nothing ->
pure $ K.PartitionData p.partition K.NONE hioffset (Just "")
(-1){- TODO: lastStableOffset -}
(K.NonNullKaArray V.empty){- TODO: abortedTransactions -}
Just gv -> do
v <- GV.unsafeFreeze gv
-- This should not be Nothing, because if we found the key in
-- `readRecords`, it means we have at least one record in this
let (rf :: K.RecordFormat, vs) =
fromMaybe (error "LogicError: got empty vector value")
(V.uncons v)
let absStartOffset = rf.offset + 1 - fromIntegral rf.batchLength
bytesOnDisk = K.unCompactBytes rf.recordBytes
fstRecordBytes <-
if (absStartOffset < p.fetchOffset)
-- only the first bathch need to to this seek
then K.seekBatch (fromIntegral $ p.fetchOffset - absStartOffset) bytesOnDisk
else pure bytesOnDisk
let b = V.foldl (<>) (BB.byteString fstRecordBytes)
(V.map (BB.byteString . K.unCompactBytes . (.recordBytes)) vs)
bs = BS.toStrict $ BB.toLazyByteString b
totalRecords = V.sum $ V.map (\K.RecordFormat{..} -> batchLength) v
P.withLabel topicTotalSendBytes (topic, T.pack . show $ p.partition) $ \counter -> void $ P.addCounter counter (fromIntegral $ BS.length bs)
P.withLabel topicTotalSendMessages (topic, T.pack . show $ p.partition) $ \counter -> void $ P.addCounter counter (fromIntegral totalRecords)
bs <- encodePartition p v

let partLabel = (topic, T.pack . show $ p.partition)
P.withLabel topicTotalSendBytes partLabel $ \counter -> void $
P.addCounter counter (fromIntegral $ BS.length bs)
P.withLabel topicTotalSendMessages partLabel $ \counter -> void $ do
let totalRecords = V.sum $ V.map (\K.RecordFormat{..} -> batchLength) v
P.addCounter counter (fromIntegral totalRecords)

pure $ K.PartitionData p.partition K.NONE hioffset (Just bs)
(-1){- TODO: lastStableOffset -}
(K.NonNullKaArray V.empty){- TODO: abortedTransactions -}
pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas)
pure $ K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -}

Expand Down Expand Up @@ -199,6 +195,46 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
S.readerSetTimeout reader r.maxWaitMs
S.readerRead reader storageOpts.fetchMaxLen

-- Note this function's behaviour is not the same as kafka broker
--
-- In kafka broker, regarding the format on disk, the broker will return
-- the message format according to the fetch api version. Which means
--
-- * if the fetch api version is less than 4, the broker will always
-- return MessageSet even the message format on disk is RecordBatch.
-- * if the fetch api version is 4+, the broker will always return
-- RecordBath.
--
-- Here, we donot handle the fetch api version, we just return the message
-- format according to the message format on disk.
--
-- However, if you always use RecordBath for appending and reading, it
-- won't be a problem.
encodePartition
:: K.FetchPartition -> V.Vector K.RecordFormat -> IO ByteString
encodePartition p v = do
let (rf :: K.RecordFormat, vs) =
-- This should not be Nothing, because if we found the key
-- in `readRecords`, it means we have at least one record
-- in this
fromMaybe (error "LogicError: got empty vector value")
(V.uncons v)
bytesOnDisk = K.unCompactBytes rf.recordBytes
-- only the first MessageSet need to to this seeking
magic <- K.decodeRecordMagic bytesOnDisk
fstRecordBytes <-
if | magic >= 2 -> pure bytesOnDisk
| otherwise -> do
let absStartOffset = rf.offset + 1 - fromIntegral rf.batchLength
offset = p.fetchOffset - absStartOffset
if offset > 0
then K.seekBatch (fromIntegral offset) bytesOnDisk
else pure bytesOnDisk

let v' = V.map (BB.byteString . K.unCompactBytes . (.recordBytes)) vs
b = V.foldl (<>) (BB.byteString fstRecordBytes) v'
pure $ BS.toStrict $ BB.toLazyByteString b

-------------------------------------------------------------------------------

-- Return tuple of (startLsn, tailLsn, highwaterOffset)
Expand Down Expand Up @@ -240,6 +276,8 @@ getPartitionLsn ldclient om logid partition offset = do
errorPartitionResponse :: Int32 -> K.ErrorCode -> K.PartitionData
errorPartitionResponse partitionIndex ec =
K.PartitionData partitionIndex ec (-1) (Just "")
(-1){- TODO: lastStableOffset -}
(K.NonNullKaArray V.empty){- TODO: abortedTransactions -}
{-# INLINE errorPartitionResponse #-}

foldWhileM :: Monad m => a -> (a -> m (a, Bool)) -> m a
Expand Down
21 changes: 14 additions & 7 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ handleProduce ServerContext{..} _ req = do
let Just (_, logid) = partitions V.!? (fromIntegral partition.index) -- TODO: handle Nothing
P.withLabel totalProduceRequest (topic.name, T.pack . show $ partition.index) $ \counter -> void $ P.addCounter counter 1
let Just recordBytes = partition.recordBytes -- TODO: handle Nothing
Log.debug1 $ "Append to logid " <> Log.build logid
Log.debug1 $ "Try to append to logid " <> Log.build logid
<> "(" <> Log.build partition.index <> ")"

-- Wirte appends
Expand All @@ -76,6 +76,7 @@ handleProduce ServerContext{..} _ req = do

Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN
<> ", start offset: " <> Log.build offset

-- TODO: logAppendTimeMs, only support LogAppendTime now
pure $ K.PartitionProduceResponse partition.index K.NONE offset appendCompTimestamp
Expand All @@ -95,8 +96,7 @@ appendRecords
-> ByteString
-> IO (S.AppendCompletion, Int64)
appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = do
records <- K.decodeBatchRecords shouldValidateCrc bs
let batchLength = V.length records
(records, batchLength) <- K.decodeBatchRecords' shouldValidateCrc bs
when (batchLength < 1) $ error "Invalid batch length"

-- Offset wroten into storage is the max key in the batch, but return the min
Expand Down Expand Up @@ -125,9 +125,16 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d
appendAttrs = Just [(S.KeyTypeFindKey, appendKey)]
storedBs = K.encodeBatchRecords records'
-- FIXME unlikely overflow: convert batchLength from Int to Int32
storedRecord = K.runPut $ K.RecordFormat o (fromIntegral batchLength) (K.CompactBytes storedBs)
storedRecord = K.runPut $ K.RecordFormat 0{- version -}
o (fromIntegral batchLength)
(K.CompactBytes storedBs)
Log.debug1 $ "Append key " <> Log.buildString' appendKey
r <- observeWithLabel appendLatencySnd streamName $
S.appendCompressedBS ldclient logid storedRecord S.CompressionNone appendAttrs
P.withLabel topicTotalAppendBytes (streamName, T.pack . show $ partition) $ \counter -> void $ P.addCounter counter (fromIntegral $ BS.length storedRecord)
P.withLabel topicTotalAppendMessages (streamName, T.pack . show $ partition) $ \counter -> void $ P.addCounter counter (fromIntegral batchLength)
S.appendCompressedBS ldclient logid storedRecord S.CompressionNone
appendAttrs
let !partLabel = (streamName, T.pack . show $ partition)
P.withLabel topicTotalAppendBytes partLabel $ \counter ->
void $ P.addCounter counter (fromIntegral $ BS.length storedRecord)
P.withLabel topicTotalAppendMessages partLabel $ \counter ->
void $ P.addCounter counter (fromIntegral batchLength)
pure (r, startOffset)
35 changes: 28 additions & 7 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ module Kafka.Protocol.Encoding
-- * Records
, BatchRecord (..)
, decodeBatchRecords
, decodeBatchRecords'
, encodeBatchRecords
, encodeBatchRecordsLazy
, modifyBatchRecordsOffset
Expand All @@ -47,6 +48,7 @@ module Kafka.Protocol.Encoding
, RecordHeaderValue (..)
-- ** Helpers
, decodeLegacyRecordBatch
, decodeRecordMagic
-- ** Misc
, pattern NonNullKaArray
, unNonNullKaArray
Expand Down Expand Up @@ -354,10 +356,19 @@ data BatchRecord
deriving (Show, Eq)

decodeBatchRecords :: Bool -> ByteString -> IO (Vector BatchRecord)
decodeBatchRecords shouldValidateCrc batchBs = Growing.new >>= decode batchBs
decodeBatchRecords shouldValidateCrc batchBs =
fst <$> decodeBatchRecords' shouldValidateCrc batchBs
{-# INLINABLE decodeBatchRecords #-}

-- | The same as 'decodeBatchRecords', but with some extra information for
-- convenience.
--
-- Currently, only the real total number of records is returned.
decodeBatchRecords' :: Bool -> ByteString -> IO (Vector BatchRecord, Int)
decodeBatchRecords' shouldValidateCrc batchBs = Growing.new >>= decode 0 batchBs
where
decode "" !v = Growing.unsafeFreeze v
decode !bs !v = do
decode len "" !v = (, len) <$> Growing.unsafeFreeze v
decode !len !bs !v = do
(RecordBase{..}, bs') <- runGet' @RecordBase bs
case magic of
0 -> do let crc = partitionLeaderEpochOrCrc
Expand All @@ -372,7 +383,7 @@ decodeBatchRecords shouldValidateCrc batchBs = Growing.new >>= decode batchBs
validLegacyCrc (fromIntegral batchLength) crc bs
(RecordBodyV0{..}, remainder) <- runGet' @RecordBodyV0 bs'
!v' <- Growing.append v (BatchRecordV0 RecordV0{..})
decode remainder v'
decode (len + 1) remainder v'
1 -> do let crc = partitionLeaderEpochOrCrc
messageSize = batchLength
when (messageSize < fromIntegral minRecordSizeV1) $
Expand All @@ -385,7 +396,7 @@ decodeBatchRecords shouldValidateCrc batchBs = Growing.new >>= decode batchBs
validLegacyCrc (fromIntegral batchLength) crc bs
(RecordBodyV1{..}, remainder) <- runGet' @RecordBodyV1 bs'
!v' <- Growing.append v (BatchRecordV1 RecordV1{..})
decode remainder v'
decode (len + 1) remainder v'
2 -> do let partitionLeaderEpoch = partitionLeaderEpochOrCrc
-- The CRC covers the data from the attributes to the end of
-- the batch (i.e. all the bytes that follow the CRC).
Expand All @@ -397,9 +408,10 @@ decodeBatchRecords shouldValidateCrc batchBs = Growing.new >>= decode batchBs
throwIO $ DecodeError "Invalid CRC32"
(RecordBodyV2{..}, remainder) <- runGet' @RecordBodyV2 bs''
!v' <- Growing.append v (BatchRecordV2 RecordBatch{..})
decode remainder v'
let !batchLen = maybe 0 V.length (unKaArray records)
decode (len + batchLen) remainder v'
_ -> throwIO $ DecodeError $ "Invalid magic " <> show magic
{-# INLINABLE decodeBatchRecords #-}
{-# INLINABLE decodeBatchRecords' #-}

encodeBatchRecordsLazy :: Vector BatchRecord -> BL.ByteString
encodeBatchRecordsLazy rs =
Expand Down Expand Up @@ -427,6 +439,15 @@ putBatchRecord (BatchRecordV0 r) = put r
putBatchRecord (BatchRecordV1 r) = put r
putBatchRecord (BatchRecordV2 r) = put r

decodeRecordMagic :: ByteString -> IO Int8
decodeRecordMagic bs =
let parser = do
-- baseOffset(8) + batchLength(4) + partitionLeaderEpochOrCrc(4)
void $ takeBytes (8 + 4 + 4)
get @Int8
in fst <$> runParser' parser bs
{-# INLINE decodeRecordMagic #-}

-- Internal type to help parse all Record version.
--
-- Common Record base for all versions.
Expand Down
Loading

0 comments on commit fa52319

Please sign in to comment.