Skip to content

Commit

Permalink
kafka: fix response for OffsetFetch (#1812)
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed May 8, 2024
1 parent 55ce01d commit 1f6ac0f
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 19 deletions.
3 changes: 0 additions & 3 deletions hstream-kafka/HStream/Kafka/Common/Utils.hs
Expand Up @@ -46,9 +46,6 @@ listToKaArray = K.KaArray . Just . V.fromList
kaArrayToVector :: K.KaArray a -> V.Vector a
kaArrayToVector kaArray = fromMaybe V.empty (K.unKaArray kaArray)

vectorToKaArray :: V.Vector a -> K.KaArray a
vectorToKaArray vec = K.KaArray (Just vec)

mapKaArray :: (a -> b) -> K.KaArray a -> K.KaArray b
mapKaArray f arr = K.KaArray (fmap (V.map f) (K.unKaArray arr))

Expand Down
3 changes: 2 additions & 1 deletion hstream-kafka/HStream/Kafka/Group/Group.hs
Expand Up @@ -916,11 +916,12 @@ fetchOffsets Group{..} reqTopic validateReqTopic = validateReqTopic reqTopic >>=
, partitions = K.KaArray (Just $ (makeErrorPartition code) <$> partitions')
}
where
-- FIXME: hardcoded constants
makeErrorPartition code idx =
K.OffsetFetchResponsePartition
{ partitionIndex = idx
, committedOffset = -1
, metadata = Nothing
, metadata = Just ""
, errorCode = code
}

Expand Down
8 changes: 5 additions & 3 deletions hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs
Expand Up @@ -189,18 +189,19 @@ fetchOffsets GroupOffsetManager{..} topicName partitions = do
, partitions = KaArray {unKaArray = Just res}
}
where
-- FIXME: hardcoded constants
getOffset cache partitionIdx = do
let key = mkTopicPartition topicName partitionIdx
in case Map.lookup key cache of
Just offset -> return $ OffsetFetchResponsePartition
{ committedOffset = offset
, metadata = Nothing
, metadata = Just ""
, partitionIndex= partitionIdx
, errorCode = K.NONE
}
Nothing -> return $ OffsetFetchResponsePartition
{ committedOffset = -1
, metadata = Nothing
, metadata = Just ""
, partitionIndex= partitionIdx
, errorCode = K.NONE
-- TODO: check the error code here
Expand All @@ -214,9 +215,10 @@ fetchAllOffsets GroupOffsetManager{..} = do
-- group offsets by TopicName
cachedOffset <- Map.foldrWithKey foldF Map.empty <$> readIORef offsetsCache
return . KaArray . Just . V.map makeTopic . V.fromList . Map.toList $ cachedOffset
-- FIXME: hardcoded constants
where makePartition partition offset = OffsetFetchResponsePartition
{ committedOffset = offset
, metadata = Nothing
, metadata = Just ""
, partitionIndex=partition
, errorCode = K.NONE
}
Expand Down
30 changes: 18 additions & 12 deletions hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs
Expand Up @@ -192,7 +192,7 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_DESCRIBE >>= \case
False -> return $ makeErrorResponse K.TOPIC_AUTHORIZATION_FAILED
True -> do
group <- GC.getGroup scGroupCoordinator req.groupId
group_m <- GC.getGroupM scGroupCoordinator req.groupId
case K.unKaArray req.topics of
-- 'Nothing' means fetch offsets of ALL topics.
-- WARNING: Offsets of unauthzed topics should not be leaked
Expand All @@ -201,11 +201,13 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio
-- FIXME: Better method than passing a "validate" function?
Nothing -> do
Log.debug $ "fetching all offsets in group:" <> Log.build req.groupId
topicResps <- G.fetchAllOffsets group $ \reqTopic -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic.name AclOp_DESCRIBE >>= \case
False -> return K.TOPIC_AUTHORIZATION_FAILED
True -> return K.NONE
topicResps <- case group_m of
Nothing -> return (K.NonNullKaArray V.empty)
Just group -> G.fetchAllOffsets group $ \reqTopic -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic.name AclOp_DESCRIBE >>= \case
False -> return K.TOPIC_AUTHORIZATION_FAILED
True -> return K.NONE
return $ K.OffsetFetchResponse
{ throttleTimeMs = 0
, errorCode = K.NONE
Expand All @@ -217,11 +219,15 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio
-- on each topic. That is why we pass a "validate"
-- function to it.
-- FIXME: Better method than passing a "validate" function?
G.fetchOffsets group reqTopic $ \reqTopic_ -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic_.name AclOp_DESCRIBE >>= \case
False -> return K.TOPIC_AUTHORIZATION_FAILED
True -> return K.NONE
case group_m of
-- FIXME: what error code should it return? 'K.UNKNOWN_TOPIC_OR_PARTITION'
-- crashes some tests...
Nothing -> return (makeErrorTopicResponse K.NONE reqTopic)
Just group -> G.fetchOffsets group reqTopic $ \reqTopic_ -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic_.name AclOp_DESCRIBE >>= \case
False -> return K.TOPIC_AUTHORIZATION_FAILED
True -> return K.NONE
return $ K.OffsetFetchResponse
{ throttleTimeMs = 0
, errorCode = K.NONE
Expand All @@ -240,7 +246,7 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio
K.OffsetFetchResponsePartition
{ partitionIndex = idx
, errorCode = code
, metadata = Nothing
, metadata = Just ""
, committedOffset = -1
}
}
Expand Down

0 comments on commit 1f6ac0f

Please sign in to comment.