From b123388347b87fab1a7e6f23f1b99c928ffaa155 Mon Sep 17 00:00:00 2001 From: Huw Campbell Date: Tue, 22 Jul 2025 12:52:45 +1000 Subject: [PATCH] Don't call subscribe when there's no topics to subscribe to. This is important if one wants to use Consumer.assign instead of a subscriptions. --- src/Kafka/Consumer.hs | 46 ++++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/src/Kafka/Consumer.hs b/src/Kafka/Consumer.hs index f84484b..eb81001 100644 --- a/src/Kafka/Consumer.hs +++ b/src/Kafka/Consumer.hs @@ -125,22 +125,44 @@ newConsumer props (Subscription ts tp) = liftIO $ do _ <- setDefaultTopicConf kc tp' rdk <- newRdKafkaT RdKafkaConsumer kc' case rdk of - Left err -> return . Left $ KafkaError err + Left err -> + return $ + Left (KafkaError err) + Right rdk' -> do + let + kafka = + KafkaConsumer (Kafka rdk') kc + when (cpCallbackPollMode props == CallbackPollModeAsync) $ do - msgq <- rdKafkaQueueNew rdk' - writeIORef qref (Just msgq) - let kafka = KafkaConsumer (Kafka rdk') kc + messageQueue <- rdKafkaQueueNew rdk' + writeIORef qref (Just messageQueue) + redErr <- redirectCallbacksPoll kafka case redErr of - Just err -> closeConsumer kafka >> return (Left err) - Nothing -> do - forM_ (cpLogLevel cp) (setConsumerLogLevel kafka) - sub <- subscribe kafka ts - case sub of - Nothing -> (when (cpCallbackPollMode props == CallbackPollModeAsync) $ - runConsumerLoop kafka (Just $ Timeout 100)) >> return (Right kafka) - Just err -> closeConsumer kafka >> return (Left err) + Just err -> do + _ <- closeConsumer kafka + return (Left err) + + Nothing -> do + forM_ (cpLogLevel cp) $ + setConsumerLogLevel kafka + + subscribeError <- + if Set.null ts then + pure Nothing + else + subscribe kafka ts + + case subscribeError of + Nothing -> do + when (cpCallbackPollMode props == CallbackPollModeAsync) $ + runConsumerLoop kafka (Just $ Timeout 100) + return (Right kafka) + + Just err -> do + _ <- closeConsumer kafka + return (Left err) -- | Polls a single message pollMessage :: MonadIO m