Skip to content

Commit

Permalink
kafka protocol: code refactoring (#1787)
Browse files Browse the repository at this point in the history
* kafka protocol: code refactoring

- Support get Attributes
- Drop support for old message formats v0 and v1

* Kafka produce: handle DecodeError
  • Loading branch information
4eUeP committed Apr 15, 2024
1 parent 39ccb34 commit 3efa02e
Show file tree
Hide file tree
Showing 8 changed files with 883 additions and 652 deletions.
3 changes: 2 additions & 1 deletion hstream-kafka/HStream/Kafka/Network.hs
Expand Up @@ -55,6 +55,7 @@ import HStream.Kafka.Server.Types (ServerContext (..),
initConnectionContext)
import qualified HStream.Logger as Log
import Kafka.Protocol.Encoding
import Kafka.Protocol.Error
import Kafka.Protocol.Message
import Kafka.Protocol.Service

Expand Down Expand Up @@ -387,7 +388,7 @@ runParseIO more parser = more >>= go Nothing
Done l r -> pure (r, l)
More f -> do msg <- more
go (Just f) msg
Fail _ err -> E.throwIO $ DecodeError $ "Fail, " <> err
Fail _ err -> E.throwIO $ DecodeError $ (CORRUPT_MESSAGE, "Fail, " <> err)

showSockAddrHost :: N.SockAddr -> String
showSockAddrHost (N.SockAddrUnix str) = str
Expand Down
11 changes: 6 additions & 5 deletions hstream-kafka/HStream/Kafka/Network/IO.hs
Expand Up @@ -22,6 +22,7 @@ import qualified Network.Socket.ByteString as N
import qualified Network.Socket.ByteString.Lazy as NL

import Kafka.Protocol.Encoding
import Kafka.Protocol.Error
import Kafka.Protocol.Message

-- | Receive a kafka message with its request header from socket.
Expand Down Expand Up @@ -54,20 +55,20 @@ recvKafkaMsgBS peer m_more s = do
headerResult <- liftIO $ runParser @RequestHeader get reqBs
case headerResult of
Done l h -> return $ Just (h, l)
Fail _ err -> E.throw $ DecodeError $ "Fail, " <> err
More _ -> E.throw $ DecodeError $ "More"
Fail _ err -> E.throw $ DecodeError $ (CORRUPT_MESSAGE, "Fail, " <> err)
More _ -> E.throw $ DecodeError $ (CORRUPT_MESSAGE, "More")
Done l reqBs -> do
State.put l
headerResult <- liftIO $ runParser @RequestHeader get reqBs
case headerResult of
Done l' h -> return $ Just (h, l')
Fail _ err -> E.throw $ DecodeError $ "Fail, " <> err
More _ -> E.throw $ DecodeError $ "More"
Fail _ err -> E.throw $ DecodeError $ (CORRUPT_MESSAGE, "Fail, " <> err)
More _ -> E.throw $ DecodeError $ (CORRUPT_MESSAGE, "More")
More f -> do
i_new <- liftIO $ N.recv s 1024
State.put i_new
recvKafkaMsgBS peer (Just f) s
Fail _ err -> liftIO . E.throwIO $ DecodeError $ "Fail, " <> err
Fail _ err -> liftIO . E.throwIO $ DecodeError $ (CORRUPT_MESSAGE, "Fail, " <> err)

-- | Send a kafka message to socket. Note the message should be packed
-- with its response header.
Expand Down
83 changes: 47 additions & 36 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Expand Up @@ -4,6 +4,7 @@ module HStream.Kafka.Server.Handler.Produce
) where

import qualified Control.Concurrent.Async as Async
import Control.Exception
import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
Expand Down Expand Up @@ -85,33 +86,6 @@ handleProduce ServerContext{..} _reqCtx req = do
, logStartOffset = -1
}
True -> do
-- FIXME: Block is too deep. Extract to a standalone function.
-- Wirte appends
(S.AppendCompletion{..}, offset) <-
appendRecords True scLDClient scOffsetManager
(topic.name, partition.index) logid recordBytes

Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN
<> ", start offset: " <> Log.build offset

-- TODO: performance improvements
--
-- For each append request after version 5, we need to read the oldest
-- offset of the log. This will cause critical performance problems.
--
--logStartOffset <-
-- if reqCtx.apiVersion >= 5
-- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid
-- case m_logStartOffset of
-- Just logStartOffset -> pure logStartOffset
-- Nothing -> do
-- Log.fatal $ "Cannot get log start offset for logid "
-- <> Log.build logid
-- pure (-1)
-- else pure (-1)
let logStartOffset = (-1)

-- TODO: PartitionProduceResponse.logAppendTimeMs
--
-- The timestamp returned by broker after appending the messages. If
Expand All @@ -120,13 +94,31 @@ handleProduce ServerContext{..} _reqCtx req = do
-- local time when the messages are appended.
--
-- Currently, only support LogAppendTime
pure $ K.PartitionProduceResponse
{ index = partition.index
, errorCode = K.NONE
, baseOffset = offset
, logAppendTimeMs = appendCompTimestamp
, logStartOffset = logStartOffset
}
catches (do
(S.AppendCompletion{..}, offset) <-
appendRecords True scLDClient scOffsetManager
(topic.name, partition.index) logid recordBytes
Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN
<> ", start offset: " <> Log.build offset
pure $ K.PartitionProduceResponse
{ index = partition.index
, errorCode = K.NONE
, baseOffset = offset
, logAppendTimeMs = appendCompTimestamp
, logStartOffset = (-1) -- TODO: getLogStartOffset
})
[ Handler (\(K.DecodeError (ec, msg))-> do
Log.debug1 $ "Append DecodeError " <> Log.buildString' ec
<> ", " <> Log.buildString' msg
pure $ K.PartitionProduceResponse
{ index = partition.index
, errorCode = ec
, baseOffset = (-1)
, logAppendTimeMs = (-1)
, logStartOffset = (-1)
})
]

pure $ K.TopicProduceResponse topic.name (K.KaArray $ Just partitionResponses)

Expand Down Expand Up @@ -158,7 +150,8 @@ appendRecords
-> ByteString
-> IO (S.AppendCompletion, Int64)
appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = do
(batchLength, offsetOffsets) <- K.decodeBatchRecordsForProduce shouldValidateCrc bs
batch <- K.decodeRecordBatch shouldValidateCrc bs
let batchLength = batch.recordsCount
when (batchLength < 1) $ error "Invalid batch length"

-- Offset wroten into storage is the max key in the batch, but return the min
Expand All @@ -185,7 +178,7 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d
appendKey = U.intToCBytesWithPadding o
appendAttrs = Just [(S.KeyTypeFindKey, appendKey)]

K.unsafeAlterBatchRecordsBsForProduce (+ startOffset) offsetOffsets bs
K.unsafeUpdateRecordBatchBaseOffset bs (+ startOffset)

-- FIXME unlikely overflow: convert batchLength from Int to Int32
let storedRecord = K.runPut $ K.RecordFormat 0{- version -}
Expand All @@ -204,3 +197,21 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d
M.withLabel M.topicTotalAppendMessages partLabel $ \counter ->
void $ M.addCounter counter (fromIntegral batchLength)
pure (r, startOffset)

-- TODO: performance improvements
--
-- For each append request after version 5, we need to read the oldest
-- offset of the log. This will cause critical performance problems.
--
--logStartOffset <-
-- if reqCtx.apiVersion >= 5
-- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid
-- case m_logStartOffset of
-- Just logStartOffset -> pure logStartOffset
-- Nothing -> do
-- Log.fatal $ "Cannot get log start offset for logid "
-- <> Log.build logid
-- pure (-1)
-- else pure (-1)
getLogStartOffset :: IO Int64
getLogStartOffset = pure (-1)
1 change: 1 addition & 0 deletions hstream-kafka/hstream-kafka.cabal
Expand Up @@ -57,6 +57,7 @@ library kafka-protocol
Kafka.Protocol.Encoding.Encode
Kafka.Protocol.Encoding.Internal
Kafka.Protocol.Encoding.Parser
Kafka.Protocol.Encoding.Types
Kafka.Protocol.Message.Struct
Kafka.Protocol.Message.Total

Expand Down

0 comments on commit 3efa02e

Please sign in to comment.