diff --git a/.github/workflows/haskell.yml b/.github/workflows/haskell.yml index 487225d..e4605f6 100644 --- a/.github/workflows/haskell.yml +++ b/.github/workflows/haskell.yml @@ -23,7 +23,7 @@ jobs: steps: - uses: actions/checkout@v4 - - uses: actions/cache@v2 + - uses: actions/cache@v3 name: Cache librdkafka with: path: .librdkafka @@ -32,7 +32,7 @@ jobs: - name: Build librdkafka run: ./scripts/build-librdkafka - - uses: haskell/actions/setup@v1 + - uses: haskell-actions/setup@v2 id: setup-haskell with: ghc-version: ${{ matrix.ghc }} @@ -45,10 +45,15 @@ jobs: - name: Configure project run: cabal configure --enable-tests --enable-benchmarks --write-ghc-environment-files=ghc8.4.4+ - - uses: action-works/cabal-cache@v1 - name: Cache cabal store + - name: Cache ~/.cabal/packages, ~/.cabal/store and dist-newstyle + uses: actions/cache@v4 with: - key-prefix: CwBTpnRd + path: | + ~/.cabal/packages + ~/.cabal/store + dist-newstyle + key: ${{ runner.os }}-${{ matrix.ghc }}-${{ hashFiles('**/*.cabal', '**/cabal.project', '**/cabal.project.freeze') }} + restore-keys: ${{ runner.os }}-${{ matrix.ghc }}- - name: Build # Try building it twice in case of flakey builds on Windows @@ -98,7 +103,8 @@ jobs: if git push origin "v$package_version"; then echo "Tagged with new version "v$package_version"" - echo "::set-output name=tag::v$package_version" + echo "tag=v$package_version" >> $GITHUB_OUTPUT + fi fi diff --git a/src/Kafka/Internal/RdKafka.chs b/src/Kafka/Internal/RdKafka.chs index e408a6d..e1cb54b 100644 --- a/src/Kafka/Internal/RdKafka.chs +++ b/src/Kafka/Internal/RdKafka.chs @@ -1164,7 +1164,8 @@ rdKafkaErrorTxnRequiresAbort ptr = boolFromCInt <$> rdKafkaErrorTxnRequiresAbort -- Topics {#enum rd_kafka_admin_op_t as ^ {underscoreToCase} deriving (Show, Eq) #} - +data RdKafkaTopicResultT +{#pointer *rd_kafka_topic_result_t as RdKafkaTopicResultTPtr foreign -> RdKafkaTopicResultT#} data RdKafkaAdminOptionsT {#pointer *rd_kafka_AdminOptions_t as RdKafkaAdminOptionsTPtr foreign -> RdKafkaAdminOptionsT #} @@ -1180,6 +1181,9 @@ data RdKafkaNewTopicT foreign import ccall unsafe "rdkafka.h &rd_kafka_AdminOptions_destroy" -- prevent memory leak finalRdKafkaAdminOptionsDestroy :: FinalizerPtr RdKafkaAdminOptionsT +{#fun rd_kafka_NewTopic_set_config as ^ + {`RdKafkaNewTopicTPtr', `String', `String'} -> `Either RdKafkaRespErrT ()' cIntToRespEither #} + newRdKAdminOptions :: RdKafkaTPtr -> RdKafkaAdminOpT -> IO RdKafkaAdminOptionsTPtr newRdKAdminOptions kafkaPtr opt = do res <- rdKafkaAdminOptionsNew kafkaPtr opt @@ -1193,6 +1197,9 @@ rdKafkaNewTopicDestroy tPtr = do foreign import ccall "&rd_kafka_NewTopic_destroy" rdKafkaNewTopicDestroyFinalizer :: FinalizerPtr RdKafkaNewTopicT +data RdKafkaCreateTopicsResultT +{#pointer *rd_kafka_CreateTopics_result_t as RdKafkaCreateTopicsResultTPtr foreign -> RdKafkaCreateTopicsResultT #} + newRdKafkaNewTopic :: String -> Int -> Int -> IO (Either String RdKafkaNewTopicTPtr) newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do allocaBytes nErrorBytes $ \ptr -> do @@ -1202,6 +1209,46 @@ newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do then peekCString ptr >>= pure . Left else addForeignPtrFinalizer rdKafkaNewTopicDestroyFinalizer res >> pure (Right res) +newRdKafkaNewTopicUnsafe :: String -> Int -> Int -> IO (Either String RdKafkaNewTopicTPtr) +newRdKafkaNewTopicUnsafe topicName topicPartition topicReplicationFactor = do + allocaBytes nErrorBytes $ \ptr -> do + res <- rdKafkaNewTopicNew topicName topicPartition topicReplicationFactor ptr (fromIntegral nErrorBytes) + withForeignPtr res $ \realPtr -> do + if realPtr == nullPtr + then peekCString ptr >>= pure . Left + else pure (Right res) + +rdKafkaEventCreateTopicsResult :: RdKafkaEventTPtr -> IO (Maybe RdKafkaCreateTopicsResultTPtr) +rdKafkaEventCreateTopicsResult evtPtr = + withForeignPtr evtPtr $ \evtPtr' -> do + res <- {#call rd_kafka_event_CreateTopics_result#} (castPtr evtPtr') + if (res == nullPtr) + then pure Nothing + else Just <$> newForeignPtr_ (castPtr res) + +rdKafkaCreateTopicsResultTopics :: RdKafkaCreateTopicsResultTPtr + -> IO [Either (String, RdKafkaRespErrT, String) String] +rdKafkaCreateTopicsResultTopics tRes = + withForeignPtr tRes $ \tRes' -> + alloca $ \sPtr -> do + res <- {#call rd_kafka_CreateTopics_result_topics#} (castPtr tRes') sPtr + size <- peekIntConv sPtr + arr <- peekArray size res + traverse unpackRdKafkaTopicResult arr + +-- | Unpacks raw result into +-- 'Either (topicName, errorType, errorMsg) topicName' +unpackRdKafkaTopicResult :: Ptr RdKafkaTopicResultT + -> IO (Either (String, RdKafkaRespErrT, String) String) +unpackRdKafkaTopicResult resPtr = do + name <- {#call rd_kafka_topic_result_name#} resPtr >>= peekCString + err <- {#call rd_kafka_topic_result_error#} resPtr + case cIntToEnum err of + RdKafkaRespErrNoError -> pure $ Right name + respErr -> do + errMsg <- {#call rd_kafka_topic_result_error_string#} resPtr >>= peekCString + pure $ Left (name, respErr, errMsg) + --- Create topic rdKafkaCreateTopic :: RdKafkaTPtr -> RdKafkaNewTopicTPtr @@ -1222,7 +1269,7 @@ data RdKafkaDeleteTopicT {#pointer *rd_kafka_DeleteTopic_t as RdKafkaDeleteTopicTPtr foreign -> RdKafkaDeleteTopicT #} data RdKafkaDeleteTopicsResultT -{#pointer *rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicResultTPtr foreign -> RdKafkaDeleteTopicsResultT #} +{#pointer *rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicsResultTPtr foreign -> RdKafkaDeleteTopicsResultT #} newRdKafkaDeleteTopic :: String -> IO (Either String RdKafkaDeleteTopicTPtr) newRdKafkaDeleteTopic topicNameStr = @@ -1232,6 +1279,14 @@ newRdKafkaDeleteTopic topicNameStr = then return $ Left $ "Something went wrong while deleting topic " ++ topicNameStr else Right <$> newForeignPtr rdKafkaDeleteTopicDestroy res +rdKafkaEventDeleteTopicsResult :: RdKafkaEventTPtr -> IO (Maybe RdKafkaDeleteTopicsResultTPtr) +rdKafkaEventDeleteTopicsResult evtPtr = + withForeignPtr evtPtr $ \evtPtr' -> do + res <- {#call rd_kafka_event_DeleteTopics_result#} (castPtr evtPtr') + if (res == nullPtr) + then pure Nothing + else Just <$> newForeignPtr_ (castPtr res) + rdKafkaDeleteTopics :: RdKafkaTPtr -> [RdKafkaDeleteTopicTPtr] -> RdKafkaAdminOptionsTPtr @@ -1242,6 +1297,16 @@ rdKafkaDeleteTopics kafkaPtr topics opts queue = do withForeignPtrsArrayLen topics $ \tLen tPtr -> do {#call rd_kafka_DeleteTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr +rdKafkaDeleteTopicsResultTopics :: RdKafkaDeleteTopicsResultTPtr + -> IO [Either (String, RdKafkaRespErrT, String) String] +rdKafkaDeleteTopicsResultTopics tRes = + withForeignPtr tRes $ \tRes' -> + alloca $ \sPtr -> do + res <- {#call rd_kafka_DeleteTopics_result_topics#} (castPtr tRes') sPtr + size <- peekIntConv sPtr + arr <- peekArray size res + traverse unpackRdKafkaTopicResult arr + -- Marshall / Unmarshall enumToCInt :: Enum a => a -> CInt enumToCInt = fromIntegral . fromEnum @@ -1255,6 +1320,12 @@ cIntConv :: (Integral a, Num b) => a -> b cIntConv = fromIntegral {-# INLINE cIntConv #-} +cIntToRespEither err = + case cIntToEnum err of + RdKafkaRespErrNoError -> Right () + respErr -> Left respErr +{-# INLINE cIntToRespEither #-} + boolToCInt :: Bool -> CInt boolToCInt True = CInt 1 boolToCInt False = CInt 0 diff --git a/src/Kafka/Topic.hs b/src/Kafka/Topic.hs index d03cb98..6a3ebfe 100644 --- a/src/Kafka/Topic.hs +++ b/src/Kafka/Topic.hs @@ -4,14 +4,21 @@ module X , deleteTopic ) where +import Control.Exception import Control.Monad.IO.Class +import Control.Monad.Trans.Class +import Control.Monad.Trans.Except +import Control.Monad.Trans.Maybe +import Data.Bifunctor +import Data.Foldable import Data.List.NonEmpty -import qualified Data.List.NonEmpty as NEL -import Data.Text -import qualified Data.Text as T - -import Kafka.Internal.RdKafka -import Kafka.Internal.Setup +import qualified Data.List.NonEmpty as NEL +import qualified Data.Map as M +import Data.Maybe +import qualified Data.Set as S +import qualified Data.Text as T +import Kafka.Internal.RdKafka +import Kafka.Internal.Setup import Kafka.Topic.Types as X import Kafka.Types as X @@ -24,11 +31,17 @@ createTopic k topic = do opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny topicRes <- withNewTopic topic $ \topic' -> rdKafkaCreateTopic kafkaPtr topic' opts queue + case topicRes of Left err -> do pure $ Left (NEL.head err) Right _ -> do - pure $ Right $ topicName topic + res <- waitForResponse (topicName topic) rdKafkaEventCreateTopicsResult rdKafkaCreateTopicsResultTopics queue + case listToMaybe res of + Nothing -> pure $ Left KafkaInvalidReturnValue + Just result -> pure $ case result of + Left (_, e, _) -> Left e + Right tName -> Right tName --- DELETE TOPIC --- deleteTopic :: HasKafka k @@ -45,19 +58,17 @@ deleteTopic k topic = liftIO $ do Left err -> do pure $ Left (NEL.head err) Right _ -> do - pure $ Right topic + res <- waitForResponse topic rdKafkaEventDeleteTopicsResult rdKafkaDeleteTopicsResultTopics queue + case listToMaybe res of + Nothing -> pure $ Left KafkaInvalidReturnValue + Just result -> pure $ case result of + Left (_, e, _) -> Left e + Right tName -> Right tName withNewTopic :: NewTopic -> (RdKafkaNewTopicTPtr -> IO a) -> IO (Either (NonEmpty KafkaError) a) -withNewTopic t transform = do - mkNewTopicRes <- mkNewTopic t newTopicPtr - case mkNewTopicRes of - Left err -> do - return $ Left err - Right topic -> do - res <- transform topic - return $ Right res +withNewTopic t = withUnsafeOne t mkNewTopicUnsafe rdKafkaNewTopicDestroy withOldTopic :: TopicName -> (RdKafkaDeleteTopicTPtr -> IO a) @@ -71,28 +82,21 @@ withOldTopic tName transform = do res <- transform topic return $ Right res -newTopicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr) -newTopicPtr topic = do - ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic) - case ptrRes of - Left str -> pure $ Left (KafkaError $ T.pack str) - Right ptr -> pure $ Right ptr - oldTopicPtr :: TopicName -> IO (Either KafkaError RdKafkaDeleteTopicTPtr) oldTopicPtr tName = do - res <- newRdKafkaDeleteTopic $ unpack . unTopicName $ tName + res <- newRdKafkaDeleteTopic $ T.unpack . unTopicName $ tName case res of Left str -> pure $ Left (KafkaError $ T.pack str) Right ptr -> pure $ Right ptr -mkNewTopic :: NewTopic - -> (NewTopic -> IO (Either KafkaError a)) - -> IO (Either (NonEmpty KafkaError) a) -mkNewTopic topic create = do - res <- create topic - case res of - Left err -> pure $ Left (singletonList err) - Right resource -> pure $ Right resource +mkNewTopicUnsafe :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr) +mkNewTopicUnsafe topic = runExceptT $ do + topic' <- withErrStr $ newRdKafkaNewTopicUnsafe (T.unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic) + _ <- withErrKafka $ whileRight (uncurry $ rdKafkaNewTopicSetConfig undefined) (M.toList $ topicConfig topic) + pure topic' + where + withErrStr = withExceptT (KafkaError . T.pack) . ExceptT + withErrKafka = withExceptT KafkaResponseError . ExceptT rmOldTopic :: TopicName -> (TopicName -> IO (Either KafkaError a)) @@ -103,5 +107,52 @@ rmOldTopic tName remove = do Left err -> pure $ Left (singletonList err) Right resource -> pure $ Right resource +withUnsafeOne :: a -- ^ Item to handle + -> (a -> IO (Either KafkaError b)) -- ^ Create an unsafe element + -> (b -> IO ()) -- ^ Destroy the unsafe element + -> (b -> IO c) -- ^ Handler + -> IO (Either (NonEmpty KafkaError) c) +withUnsafeOne a mkOne cleanup f = + bracket (mkOne a) cleanupOne processOne + where + cleanupOne (Right b) = cleanup b + cleanupOne (Left _) = pure () -- no resource to clean if creation failed + + processOne (Right b) = Right <$> f b + processOne (Left e) = pure (Left (singletonList e)) + +whileRight :: Monad m + => (a -> m (Either e ())) + -> [a] + -> m (Either e ()) +whileRight f as = runExceptT $ traverse_ (ExceptT . f) as + +waitForResponse :: TopicName + -> (RdKafkaEventTPtr -> IO (Maybe a)) + -> (a -> IO [Either (String, RdKafkaRespErrT, String) String]) + -> RdKafkaQueueTPtr + -> IO [Either (TopicName, KafkaError, String) TopicName] +waitForResponse topic fromEvent toResults q = + fromMaybe [] <$> runMaybeT (go []) + where + awaited = S.singleton topic + + go accRes = do + qRes <- MaybeT $ rdKafkaQueuePoll q 1000 + eRes <- MaybeT $ fromEvent qRes + tRes <- lift $ toResults eRes + let results = wrapTopicName <$> tRes + let topics = S.fromList $ getTopicName <$> results + let newRes = results <> accRes + let remaining = S.difference awaited topics + if S.null remaining + then pure newRes + else go newRes + + getTopicName = either (\(t,_,_) -> t) id + wrapTopicName = bimap (\(t,e,s) -> (TopicName (T.pack t), KafkaResponseError e, s)) + (TopicName . T.pack) + singletonList :: a -> NonEmpty a singletonList x = x :| [] +