Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions src/Kafka/Consumer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,44 @@ newConsumer props (Subscription ts tp) = liftIO $ do
_ <- setDefaultTopicConf kc tp'
rdk <- newRdKafkaT RdKafkaConsumer kc'
case rdk of
Left err -> return . Left $ KafkaError err
Left err ->
return $
Left (KafkaError err)

Right rdk' -> do
let
kafka =
KafkaConsumer (Kafka rdk') kc

when (cpCallbackPollMode props == CallbackPollModeAsync) $ do
msgq <- rdKafkaQueueNew rdk'
writeIORef qref (Just msgq)
let kafka = KafkaConsumer (Kafka rdk') kc
messageQueue <- rdKafkaQueueNew rdk'
writeIORef qref (Just messageQueue)

redErr <- redirectCallbacksPoll kafka
case redErr of
Just err -> closeConsumer kafka >> return (Left err)
Nothing -> do
forM_ (cpLogLevel cp) (setConsumerLogLevel kafka)
sub <- subscribe kafka ts
case sub of
Nothing -> (when (cpCallbackPollMode props == CallbackPollModeAsync) $
runConsumerLoop kafka (Just $ Timeout 100)) >> return (Right kafka)
Just err -> closeConsumer kafka >> return (Left err)
Just err -> do
_ <- closeConsumer kafka
return (Left err)

Nothing -> do
forM_ (cpLogLevel cp) $
setConsumerLogLevel kafka

subscribeError <-
if Set.null ts then
pure Nothing
else
subscribe kafka ts

case subscribeError of
Nothing -> do
when (cpCallbackPollMode props == CallbackPollModeAsync) $
runConsumerLoop kafka (Just $ Timeout 100)
return (Right kafka)

Just err -> do
_ <- closeConsumer kafka
return (Left err)

-- | Polls a single message
pollMessage :: MonadIO m
Expand Down
Loading