From 812be051e7da845c268a36d0a729b9f22ca957cf Mon Sep 17 00:00:00 2001 From: JoranVanBelle Date: Tue, 17 Dec 2024 20:28:14 +0100 Subject: [PATCH 1/4] feat(admin): create topics --- README.md | 2 +- docker-compose.yml | 73 +++++++++---- hw-kafka-client.cabal | 5 +- src/Kafka/Admin.hs | 83 +++++++++++++++ src/Kafka/Admin/AdminProperties.hs | 43 ++++++++ src/Kafka/Admin/Types.hs | 34 ++++++ src/Kafka/Internal/RdKafka.chs | 160 +++++++++++++++++++++++++---- src/Kafka/Producer.hs | 2 +- tests-it/Kafka/IntegrationSpec.hs | 23 +++++ tests-it/Kafka/TestEnv.hs | 12 +++ 10 files changed, 395 insertions(+), 42 deletions(-) create mode 100644 src/Kafka/Admin.hs create mode 100644 src/Kafka/Admin/AdminProperties.hs create mode 100644 src/Kafka/Admin/Types.hs diff --git a/README.md b/README.md index 115c3d4..12d9216 100644 --- a/README.md +++ b/README.md @@ -223,7 +223,7 @@ To be able to run tests locally, `$KAFKA_TEST_BROKER` environment variable is ex `$KAFKA_TEST_BROKER` should contain an IP address of an accessible Kafka broker that will be used to run integration tests against. -With [Docker Compose](./docker-compose.yml) this variable is used to configure Kafka broker to listen on this address: +With [Docker Compose](./docker-compose.yml) this variable is used to configure a Kafka broker with a UI on localhost:8080 to listen on this address: ``` $ docker-compose up diff --git a/docker-compose.yml b/docker-compose.yml index 9c79ea9..fec8e00 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,24 +1,57 @@ -version: "3.8" - +version: '3.7' services: - zookeeper: - image: confluentinc/cp-zookeeper - hostname: zookeeper - ports: - - 2182:2181 - environment: - SERVICE_NAME: zookeeper - ZOOKEEPER_CLIENT_PORT: 2181 - - kafka: - image: confluentinc/cp-kafka:latest - hostname: localhost + # Redpanda cluster + redpanda-1: + image: docker.redpanda.com/redpandadata/redpanda:v23.1.1 + container_name: redpanda-1 + command: + - redpanda + - start + - --smp + - '1' + - --reserve-memory + - 0M + - --overprovisioned + - --node-id + - '1' + - --kafka-addr + - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 + - --advertise-kafka-addr + - PLAINTEXT://redpanda-1:29092,OUTSIDE://localhost:9092 + - --pandaproxy-addr + - PLAINTEXT://0.0.0.0:28082,OUTSIDE://0.0.0.0:8082 + - --advertise-pandaproxy-addr + - PLAINTEXT://redpanda-1:28082,OUTSIDE://localhost:8082 + - --rpc-addr + - 0.0.0.0:33145 + - --advertise-rpc-addr + - redpanda-1:33145 ports: + - 8082:8082 - 9092:9092 - links: - - zookeeper:zookeeper + - 9644:9644 + - 28082:28082 + - 29092:29092 + + redpanda-console: + image: docker.redpanda.com/redpandadata/console:v2.2.2 + container_name: redpanda-console + entrypoint: /bin/sh + command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console" environment: - KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://$KAFKA_TEST_BROKER:9092" - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_CREATE_TOPICS: + CONFIG_FILEPATH: /tmp/config.yml + CONSOLE_CONFIG_FILE: | + kafka: + brokers: ["redpanda-1:29092"] + schemaRegistry: + enabled: false + redpanda: + adminApi: + enabled: true + urls: ["http://redpanda-1:9644"] + connect: + enabled: false + ports: + - 8080:8080 + depends_on: + - redpanda-1 diff --git a/hw-kafka-client.cabal b/hw-kafka-client.cabal index e3e6729..474cacf 100644 --- a/hw-kafka-client.cabal +++ b/hw-kafka-client.cabal @@ -55,7 +55,10 @@ library build-tool-depends: c2hs:c2hs if impl(ghc <8.0) build-depends: semigroups - exposed-modules: Kafka.Consumer + exposed-modules: Kafka.Admin + Kafka.Admin.AdminProperties + Kafka.Admin.Types + Kafka.Consumer Kafka.Consumer.ConsumerProperties Kafka.Consumer.Subscription Kafka.Consumer.Types diff --git a/src/Kafka/Admin.hs b/src/Kafka/Admin.hs new file mode 100644 index 0000000..2f2baa0 --- /dev/null +++ b/src/Kafka/Admin.hs @@ -0,0 +1,83 @@ +module Kafka.Admin( +module X +, newKAdmin +, createTopic +, closeKAdmin +) where + +import Control.Monad +import Control.Monad.Trans.Class +import Control.Monad.Trans.Maybe +import Control.Monad.IO.Class +import Data.Text +import Data.Maybe +import Data.Bifunctor +import Data.List.NonEmpty +import qualified Data.List.NonEmpty as NEL +import qualified Data.Text as T +import qualified Data.Set as S + +import Kafka.Internal.RdKafka +import Kafka.Internal.Setup + +import Kafka.Types as X +import Kafka.Admin.AdminProperties as X +import Kafka.Admin.Types as X + +newKAdmin ::( MonadIO m ) + => AdminProperties + -> m (Either KafkaError KAdmin) +newKAdmin properties = liftIO $ do + kafkaConfig@(KafkaConf kafkaConf' _ _) <- kafkaConf ( KafkaProps $ adminProps properties) + maybeKafka <- newRdKafkaT RdKafkaConsumer kafkaConf' + case maybeKafka of + Left err -> pure $ Left $ KafkaError err + Right kafka -> pure $ Right $ KAdmin (Kafka kafka) kafkaConfig + +closeKAdmin :: KAdmin + -> IO () +closeKAdmin ka = void $ rdKafkaConsumerClose (getRdKafka ka) +--- CREATE TOPIC --- +createTopic :: KAdmin + -> NewTopic + -> IO (Either KafkaError TopicName) +createTopic kAdmin topic = liftIO $ do + let kafkaPtr = getRdKafka kAdmin + queue <- newRdKafkaQueue kafkaPtr + opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny + + topicRes <- withNewTopic topic $ \topic' -> rdKafkaCreateTopic kafkaPtr topic' opts queue + case topicRes of + Left err -> do + pure $ Left (NEL.head err) + Right _ -> do + pure $ Right $ topicName topic + +withNewTopic :: NewTopic + -> (RdKafkaNewTopicTPtr -> IO a) + -> IO (Either (NonEmpty KafkaError) a) +withNewTopic t transform = do + mkNewTopicRes <- mkNewTopic t topicPtr + case mkNewTopicRes of + Left err -> do + return $ Left err + Right topic -> do + res <- transform topic + return $ Right res + +topicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr) +topicPtr topic = do + ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic) + case ptrRes of + Left str -> pure $ Left (KafkaError $ T.pack str) + Right ptr -> pure $ Right ptr + +mkNewTopic :: NewTopic + -> (NewTopic -> IO (Either KafkaError a)) + -> IO (Either (NonEmpty KafkaError) a) +mkNewTopic topic create = do + res <- create topic + case res of + Left err -> pure $ Left (Data.List.NonEmpty.singleton err) + Right resource -> pure $ Right resource + diff --git a/src/Kafka/Admin/AdminProperties.hs b/src/Kafka/Admin/AdminProperties.hs new file mode 100644 index 0000000..30d858f --- /dev/null +++ b/src/Kafka/Admin/AdminProperties.hs @@ -0,0 +1,43 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Kafka.Admin.AdminProperties where + +import Data.Map +import qualified Data.Map as M +import Data.Text + +import Kafka.Types + +newtype AdminProperties = AdminProperties { + adminProps :: Map Text Text +} + +instance Semigroup AdminProperties where + ( AdminProperties props1 ) <> ( AdminProperties props2 ) = + AdminProperties ( props2 `union` props1 ) + {-# INLINE (<>) #-} + +instance Monoid AdminProperties where + mempty = AdminProperties { + adminProps = M.empty + } + {-# INLINE mempty #-} + mappend = (<>) + {-# INLINE mappend #-} + +brokers :: [BrokerAddress] -> AdminProperties +brokers b = + let b' = intercalate "," ((\( BrokerAddress i ) -> i ) <$> b ) + in extraProps $ fromList [("bootstrap.servers", b')] + +clientId :: ClientId -> AdminProperties +clientId (ClientId cid) = + extraProps $ M.fromList [("client.id", cid)] + +timeOut :: Timeout -> AdminProperties +timeOut (Timeout to) = + let to' = ( pack $ show to ) + in extraProps $ fromList [("request.timeout.ms", to')] + +extraProps :: Map Text Text -> AdminProperties +extraProps m = mempty { adminProps = m } diff --git a/src/Kafka/Admin/Types.hs b/src/Kafka/Admin/Types.hs new file mode 100644 index 0000000..54f606e --- /dev/null +++ b/src/Kafka/Admin/Types.hs @@ -0,0 +1,34 @@ +module Kafka.Admin.Types ( +KAdmin(..) +, PartitionCount (..) +, ReplicationFactor (..) +, NewTopic (..) +) where + +import Data.Map + +import Kafka.Types +import Kafka.Internal.Setup + +data KAdmin = KAdmin { + kaKafkaPtr :: !Kafka + , kaKafkaConf :: !KafkaConf +} + +instance HasKafka KAdmin where + getKafka = kaKafkaPtr + {-# INLINE getKafka #-} + +instance HasKafkaConf KAdmin where + getKafkaConf = kaKafkaConf + {-# INLINE getKafkaConf #-} + +newtype PartitionCount = PartitionCount { unPartitionCount :: Int } deriving (Show, Eq) +newtype ReplicationFactor = ReplicationFactor { unReplicationFactor :: Int } deriving (Show, Eq) + +data NewTopic = NewTopic { + topicName :: TopicName + , topicPartitionCount :: PartitionCount + , topicReplicationFactor :: ReplicationFactor + , topicConfig :: Map String String +} deriving (Show) diff --git a/src/Kafka/Internal/RdKafka.chs b/src/Kafka/Internal/RdKafka.chs index 86ed69a..3c2bb11 100644 --- a/src/Kafka/Internal/RdKafka.chs +++ b/src/Kafka/Internal/RdKafka.chs @@ -10,13 +10,12 @@ import qualified Data.Text as Text import Control.Monad (liftM) import Data.Int (Int32, Int64) import Data.Word (Word8) -import Foreign.Concurrent (newForeignPtr) import qualified Foreign.Concurrent as Concurrent import Foreign.Marshal.Alloc (alloca, allocaBytes) import Foreign.Marshal.Array (peekArray, allocaArray, withArrayLen) import Foreign.Storable (Storable(..)) import Foreign.Ptr (Ptr, FunPtr, castPtr, nullPtr) -import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr) +import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr, ForeignPtr, newForeignPtr) import Foreign.C.Error (Errno(..), getErrno) import Foreign.C.String (CString, newCString, withCAString, peekCAString, peekCString) import Foreign.C.Types (CFile, CInt(..), CSize, CChar, CLong) @@ -271,14 +270,12 @@ instance Storable RdKafkaMetadataT where {`Int'} -> `RdKafkaTopicPartitionListTPtr' #} foreign import ccall unsafe "rdkafka.h &rd_kafka_topic_partition_list_destroy" - rdKafkaTopicPartitionListDestroyF :: FinalizerPtr RdKafkaTopicPartitionListT -foreign import ccall unsafe "rdkafka.h rd_kafka_topic_partition_list_destroy" - rdKafkaTopicPartitionListDestroy :: Ptr RdKafkaTopicPartitionListT -> IO () + rdKafkaTopicPartitionListDestroy :: FinalizerPtr RdKafkaTopicPartitionListT newRdKafkaTopicPartitionListT :: Int -> IO RdKafkaTopicPartitionListTPtr newRdKafkaTopicPartitionListT size = do ret <- rdKafkaTopicPartitionListNew size - addForeignPtrFinalizer rdKafkaTopicPartitionListDestroyF ret + addForeignPtrFinalizer rdKafkaTopicPartitionListDestroy ret return ret {# fun rd_kafka_topic_partition_list_add as ^ @@ -293,7 +290,7 @@ newRdKafkaTopicPartitionListT size = do copyRdKafkaTopicPartitionList :: RdKafkaTopicPartitionListTPtr -> IO RdKafkaTopicPartitionListTPtr copyRdKafkaTopicPartitionList pl = do cp <- rdKafkaTopicPartitionListCopy pl - addForeignPtrFinalizer rdKafkaTopicPartitionListDestroyF cp + addForeignPtrFinalizer rdKafkaTopicPartitionListDestroy cp return cp {# fun rd_kafka_topic_partition_list_set_offset as ^ @@ -521,6 +518,16 @@ rdKafkaTopicConfSetPartitionerCb conf cb = do {#fun rd_kafka_resume_partitions as ^ {`RdKafkaTPtr', `RdKafkaTopicPartitionListTPtr'} -> `RdKafkaRespErrT' cIntToEnum #} +---- EVENT +foreign import ccall unsafe "rdkafka.h &rd_kafka_event_destroy" + rdKafkaEventDestroyF :: FinalizerPtr RdKafkaEventT + +data RdKafkaEventT +{#pointer *rd_kafka_event_t as RdKafkaEventTPtr foreign -> RdKafkaEventT #} + +{#fun rd_kafka_event_destroy as ^ + {`RdKafkaEventTPtr'} -> `()'#} + ---- QUEUE data RdKafkaQueueT {#pointer *rd_kafka_queue_t as RdKafkaQueueTPtr foreign -> RdKafkaQueueT #} @@ -528,18 +535,26 @@ data RdKafkaQueueT {#fun rd_kafka_queue_new as ^ {`RdKafkaTPtr'} -> `RdKafkaQueueTPtr' #} -foreign import ccall unsafe "rdkafka.h &rd_kafka_queue_destroy" - rdKafkaQueueDestroyF :: FinalizerPtr RdKafkaQueueT - {#fun rd_kafka_queue_destroy as ^ {`RdKafkaQueueTPtr'} -> `()'#} +foreign import ccall unsafe "rdkafka.h &rd_kafka_queue_destroy" + rdKafkaQueueDestroyF :: FinalizerPtr RdKafkaQueueT + newRdKafkaQueue :: RdKafkaTPtr -> IO RdKafkaQueueTPtr newRdKafkaQueue k = do q <- rdKafkaQueueNew k addForeignPtrFinalizer rdKafkaQueueDestroyF q return q +rdKafkaQueuePoll :: RdKafkaQueueTPtr -> Int -> IO (Maybe RdKafkaEventTPtr) +rdKafkaQueuePoll qPtr timeout = + withForeignPtr qPtr $ \qPtr' -> do + res <- {#call rd_kafka_queue_poll#} qPtr' (fromIntegral timeout) + if res == nullPtr + then pure Nothing + else Just <$> newForeignPtr rdKafkaEventDestroyF res + {#fun rd_kafka_consume_queue as ^ {`RdKafkaQueueTPtr', `Int'} -> `RdKafkaMessageTPtr' #} @@ -566,7 +581,7 @@ rdKafkaConsumeBatchQueue :: RdKafkaQueueTPtr -> Int -> Int -> IO [RdKafkaMessage rdKafkaConsumeBatchQueue qptr timeout batchSize = do allocaArray batchSize $ \pArr -> do rSize <- rdKafkaConsumeBatchQueue' qptr timeout pArr (fromIntegral batchSize) - peekArray (fromIntegral rSize) pArr >>= traverse (flip newForeignPtr (return ())) + peekArray (fromIntegral rSize) pArr >>= traverse newForeignPtr_ ------------------------------------------------------------------------------------------------- ---- High-level KafkaConsumer @@ -588,7 +603,7 @@ rdKafkaSubscription k = do (err, sub) <- rdKafkaSubscription' k case err of RdKafkaRespErrNoError -> - Right <$> newForeignPtr sub (rdKafkaTopicPartitionListDestroy sub) + Right <$> newForeignPtr rdKafkaTopicPartitionListDestroy sub e -> return (Left e) {#fun rd_kafka_consumer_poll as ^ @@ -623,7 +638,7 @@ rdKafkaAssignment k = do (err, ass) <- rdKafkaAssignment' k case err of RdKafkaRespErrNoError -> - Right <$> newForeignPtr ass (rdKafkaTopicPartitionListDestroy ass) + Right <$> newForeignPtr rdKafkaTopicPartitionListDestroy ass e -> return (Left e) {#fun rd_kafka_commit as ^ @@ -732,8 +747,8 @@ instance Storable RdKafkaGroupListT where foreign import ccall "rdkafka.h &rd_kafka_group_list_destroy" rdKafkaGroupListDestroyF :: FinalizerPtr RdKafkaGroupListT -foreign import ccall "rdkafka.h rd_kafka_group_list_destroy" - rdKafkaGroupListDestroy :: Ptr RdKafkaGroupListT -> IO () +foreign import ccall "rdkafka.h &rd_kafka_group_list_destroy" + rdKafkaGroupListDestroy :: FinalizerPtr RdKafkaGroupListT rdKafkaListGroups :: RdKafkaTPtr -> Maybe String -> Int -> IO (Either RdKafkaRespErrT RdKafkaGroupListTPtr) rdKafkaListGroups k g t = case g of @@ -743,7 +758,7 @@ rdKafkaListGroups k g t = case g of listGroups grp = do (err, res) <- rdKafkaListGroups' k grp t case err of - RdKafkaRespErrNoError -> Right <$> newForeignPtr res (rdKafkaGroupListDestroy res) + RdKafkaRespErrNoError -> Right <$> newForeignPtr rdKafkaGroupListDestroy res e -> return $ Left e ------------------------------------------------------------------------------------------------- @@ -924,15 +939,15 @@ rdKafkaConsumeStop topicPtr partition = do alloca- `Ptr RdKafkaMetadataT' peekPtr*, `Int'} -> `RdKafkaRespErrT' cIntToEnum #} -foreign import ccall unsafe "rdkafka.h rd_kafka_metadata_destroy" - rdKafkaMetadataDestroy :: Ptr RdKafkaMetadataT -> IO () +foreign import ccall unsafe "rdkafka.h &rd_kafka_metadata_destroy" + rdKafkaMetadataDestroy :: FinalizerPtr RdKafkaMetadataT rdKafkaMetadata :: RdKafkaTPtr -> Bool -> Maybe RdKafkaTopicTPtr -> Int -> IO (Either RdKafkaRespErrT RdKafkaMetadataTPtr) rdKafkaMetadata k allTopics mt timeout = do tptr <- maybe (newForeignPtr_ nullPtr) pure mt (err, res) <- rdKafkaMetadata' k allTopics tptr timeout case err of - RdKafkaRespErrNoError -> Right <$> newForeignPtr res (rdKafkaMetadataDestroy res) + RdKafkaRespErrNoError -> Right <$> newForeignPtr rdKafkaMetadataDestroy res e -> return (Left e) {#fun rd_kafka_poll as ^ @@ -1146,6 +1161,87 @@ rdKafkaErrorIsRetriable ptr = boolFromCInt <$> rdKafkaErrorIsRetriable' ptr rdKafkaErrorTxnRequiresAbort :: RdKafkaErrorTPtr -> IO Bool rdKafkaErrorTxnRequiresAbort ptr = boolFromCInt <$> rdKafkaErrorTxnRequiresAbort' ptr +-- Admin API +{#enum rd_kafka_admin_op_t as ^ {underscoreToCase} deriving (Show, Eq) #} + +data RdKafkaTopicResultT +{#pointer *rd_kafka_topic_result_t as RdKafkaTopicResultTPtr foreign -> RdKafkaTopicResultT #} + +data RdKafkaAdminOptionsT +{#pointer *rd_kafka_AdminOptions_t as RdKafkaAdminOptionsTPtr foreign -> RdKafkaAdminOptionsT #} + +{#fun rd_kafka_AdminOptions_new as ^ + {`RdKafkaTPtr', enumToCInt `RdKafkaAdminOpT'} -> `RdKafkaAdminOptionsTPtr' #} + +data RdKafkaNewTopicT +{#pointer *rd_kafka_NewTopic_t as RdKafkaNewTopicTPtr foreign -> RdKafkaNewTopicT #} + +{#fun rd_kafka_NewTopic_new as ^ {`String', `Int', `Int', id `Ptr CChar', cIntConv `CSize'} -> `RdKafkaNewTopicTPtr' #} + +foreign import ccall unsafe "rdkafka.h &rd_kafka_AdminOptions_destroy" -- prevent memory leak + finalRdKafkaAdminOptionsDestroy :: FinalizerPtr RdKafkaAdminOptionsT + +newRdKAdminOptions :: RdKafkaTPtr -> RdKafkaAdminOpT -> IO RdKafkaAdminOptionsTPtr +newRdKAdminOptions kafkaPtr opt = do + res <- rdKafkaAdminOptionsNew kafkaPtr opt + addForeignPtrFinalizer finalRdKafkaAdminOptionsDestroy res + pure res + +rdKafkaNewTopicDestroy :: RdKafkaNewTopicTPtr -> IO () -- prevent memory leak +rdKafkaNewTopicDestroy tPtr = do + withForeignPtr tPtr {#call rd_kafka_NewTopic_destroy#} + +foreign import ccall "&rd_kafka_NewTopic_destroy" + rdKafkaNewTopicDestroyFinalizer :: FinalizerPtr RdKafkaNewTopicT + +newRdKafkaNewTopic :: String -> Int -> Int -> IO (Either String RdKafkaNewTopicTPtr) +newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do + allocaBytes nErrorBytes $ \ptr -> do + res <- rdKafkaNewTopicNew topicName topicPartitions topicReplicationFactor ptr (fromIntegral nErrorBytes) + withForeignPtr res $ \realPtr -> do + if realPtr == nullPtr + then peekCString ptr >>= pure . Left + else addForeignPtrFinalizer rdKafkaNewTopicDestroyFinalizer res >> pure (Right res) + +rdKafkaCreateTopic :: RdKafkaTPtr + -> RdKafkaNewTopicTPtr + -> RdKafkaAdminOptionsTPtr + -> RdKafkaQueueTPtr + -> IO () +rdKafkaCreateTopic kafkaPtr topic opts queue = do + let topics = [topic] + withForeignPtrs kafkaPtr opts queue $ \kPtr oPtr qPtr -> + withForeignPtrsArrayLen topics $ \tLen tPtr -> do + {#call rd_kafka_CreateTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr + +rdKafkaEventCreateTopicsResult :: RdKafkaEventTPtr -> IO (Maybe RdKafkaTopicResultTPtr) +rdKafkaEventCreateTopicsResult evtPtr = + withForeignPtr evtPtr $ \evtPtr' -> do + res <- {#call rd_kafka_event_CreateTopics_result#} (castPtr evtPtr') + if (res == nullPtr) + then pure Nothing + else Just <$> newForeignPtr_ (castPtr res) + +rdKafkaCreateTopicsResultTopics :: RdKafkaTopicResultTPtr + -> IO [Either (String, RdKafkaRespErrT, String) String] +rdKafkaCreateTopicsResultTopics tRes = + withForeignPtr tRes $ \tRes' -> + alloca $ \sPtr -> do + res <- {#call rd_kafka_CreateTopics_result_topics#} (castPtr tRes') sPtr + size <- peekIntConv sPtr + array <- peekArray size res + traverse unpackRdKafkaTopicResult array + +unpackRdKafkaTopicResult :: Ptr RdKafkaTopicResultT + -> IO (Either (String, RdKafkaRespErrT, String) String) +unpackRdKafkaTopicResult resPtr = do + name <- {#call rd_kafka_topic_result_name#} resPtr >>= peekCString + err <- {#call rd_kafka_topic_result_error#} resPtr + case cIntToEnum err of + respErr -> do + errMsg <- {#call rd_kafka_topic_result_error_string#} resPtr >>= peekCString + pure $ Left (name, respErr, errMsg) + RdKafkaRespErrNoError -> pure $ Right name -- Marshall / Unmarshall enumToCInt :: Enum a => a -> CInt @@ -1170,6 +1266,9 @@ boolFromCInt (CInt 0) = False boolFromCInt (CInt _) = True {-# INLINE boolFromCInt #-} +peekIntConv :: (Storable a, Integral a, Integral b) => Ptr a -> IO b +peekIntConv = liftM fromIntegral . peek + peekInt64Conv :: (Storable a, Integral a) => Ptr a -> IO Int64 peekInt64Conv = liftM cIntConv . peek {-# INLINE peekInt64Conv #-} @@ -1194,3 +1293,26 @@ c_stdout :: IO CFilePtr c_stdout = handleToCFile stdout "w" c_stderr :: IO CFilePtr c_stderr = handleToCFile stderr "w" + + +withForeignPtrs :: ForeignPtr kafkaPtr + -> ForeignPtr optPtr + -> ForeignPtr queuePtr + -> (Ptr kafkaPtr -> Ptr optPtr -> Ptr queuePtr -> IO x) + -> IO x +withForeignPtrs kafkaPtr optPtr queuePtr f = + withForeignPtr kafkaPtr $ \kafkaPtr' -> + withForeignPtr optPtr $ \optPtr' -> + withForeignPtr queuePtr $ \queuePtr' -> f kafkaPtr' optPtr' queuePtr' + +withForeignPtrsArrayLen :: [ForeignPtr a] + -> (Int -> Ptr (Ptr a) -> IO b) + -> IO b +withForeignPtrsArrayLen as f = + let withForeignPtrsList [] g = g [] + withForeignPtrsList (x:xs) g = + withForeignPtr x $ \x' -> + withForeignPtrsList xs $ \xs' -> + g (x' : xs') + in withForeignPtrsList as $ \ptrs -> + withArrayLen ptrs $ \llen pptrs -> f llen pptrs diff --git a/src/Kafka/Producer.hs b/src/Kafka/Producer.hs index f07815e..35f052e 100644 --- a/src/Kafka/Producer.hs +++ b/src/Kafka/Producer.hs @@ -209,4 +209,4 @@ withBS (Just bs) f = in withForeignPtr d $ \p -> f (p `plusPtr` o) l outboundQueueLength :: Kafka -> IO Int -outboundQueueLength (Kafka k) = rdKafkaOutqLen k \ No newline at end of file +outboundQueueLength (Kafka k) = rdKafkaOutqLen k diff --git a/tests-it/Kafka/IntegrationSpec.hs b/tests-it/Kafka/IntegrationSpec.hs index 601b29a..1c21d64 100644 --- a/tests-it/Kafka/IntegrationSpec.hs +++ b/tests-it/Kafka/IntegrationSpec.hs @@ -5,6 +5,8 @@ module Kafka.IntegrationSpec where +import System.Random (randomRIO) +import Control.Concurrent import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) import Control.Monad (forM, forM_, void) import Control.Monad.Loops @@ -13,8 +15,10 @@ import Data.Map (fromList) import qualified Data.Set as Set import Data.Monoid ((<>)) import Kafka.Consumer +import qualified Data.Text as T import Kafka.Metadata import Kafka.Producer +import Kafka.Admin import Kafka.TestEnv import Test.Hspec @@ -170,6 +174,16 @@ spec = do forM_ res $ \rcs -> forM_ rcs ((`shouldBe` Set.fromList (headersToList testHeaders)) . Set.fromList . headersToList . crHeaders) + describe "Kafka.Admin.Spec" $ do + let topicName = addRandomChars "admin.topic.created." 5 + + specWithAdmin "Create topic" $ do + + it "should create a new topic" $ \(admin :: KAdmin) -> do + tName <- topicName + let newTopic = mkNewTopic (TopicName ( T.pack(tName) )) + result <- createTopic admin newTopic + result `shouldSatisfy` isRight ---------------------------------------------------------------------------------------------------------------- data ReadState = Skip | Read @@ -311,3 +325,12 @@ runConsumerSpec = do res `shouldBe` Nothing msg <- pollMessage k (Timeout 2000) crOffset <$> msg `shouldBe` Right (Offset 0) + +mkNewTopic :: TopicName -> NewTopic +mkNewTopic name = NewTopic name (PartitionCount 1) (ReplicationFactor 1) mempty + + +addRandomChars :: String -> Int -> IO String +addRandomChars baseStr n = do + randomChars <- mapM (\_ -> randomRIO ('a', 'z')) [1..n] + return $ baseStr ++ randomChars diff --git a/tests-it/Kafka/TestEnv.hs b/tests-it/Kafka/TestEnv.hs index d31b405..e588137 100644 --- a/tests-it/Kafka/TestEnv.hs +++ b/tests-it/Kafka/TestEnv.hs @@ -15,6 +15,7 @@ import qualified System.Random as Rnd import Control.Concurrent import Kafka.Consumer as C import Kafka.Producer as P +import Kafka.Admin as A import Test.Hspec @@ -57,6 +58,9 @@ producerProps = P.brokersList [brokerAddress] <> P.setCallback (logCallback (\l s1 s2 -> print $ "[Producer] " <> show l <> ": " <> s1 <> ", " <> s2)) <> P.setCallback (errorCallback (\e r -> print $ "[Producer] " <> show e <> ": " <> r)) +adminProperties :: AdminProperties +adminProperties = A.brokers [brokerAddress] + testSubscription :: TopicName -> Subscription testSubscription t = topics [t] <> offsetReset Earliest @@ -76,6 +80,8 @@ mkConsumerWith props = do (RebalanceAssign _) -> putMVar var True _ -> pure () +mkAdmin :: IO KAdmin +mkAdmin = newKAdmin adminProperties >>= \(Right k) -> pure k specWithConsumer :: String -> ConsumerProperties -> SpecWith KafkaConsumer -> Spec specWithConsumer s p f = @@ -91,3 +97,9 @@ specWithKafka s p f = beforeAll ((,) <$> mkConsumerWith p <*> mkProducer) $ afterAll (\(consumer, producer) -> void $ closeProducer producer >> closeConsumer consumer) $ describe s f + +specWithAdmin :: String -> SpecWith KAdmin -> Spec +specWithAdmin s f = + beforeAll mkAdmin + $ afterAll (void . closeKAdmin) + $ describe s f From ba286d396ec1c3a9f1a599c743199213d3ec822b Mon Sep 17 00:00:00 2001 From: JoranVanBelle Date: Wed, 8 Jan 2025 10:15:11 +0100 Subject: [PATCH 2/4] feat(admin): remove topics --- src/Kafka/Admin.hs | 57 ++++++++++++++++++++++++++----- src/Kafka/Internal/RdKafka.chs | 56 +++++++++++++++--------------- tests-it/Kafka/IntegrationSpec.hs | 28 +++++++++++++++ 3 files changed, 104 insertions(+), 37 deletions(-) diff --git a/src/Kafka/Admin.hs b/src/Kafka/Admin.hs index 2f2baa0..50f1ab6 100644 --- a/src/Kafka/Admin.hs +++ b/src/Kafka/Admin.hs @@ -2,20 +2,16 @@ module Kafka.Admin( module X , newKAdmin , createTopic +, deleteTopic , closeKAdmin ) where import Control.Monad -import Control.Monad.Trans.Class -import Control.Monad.Trans.Maybe import Control.Monad.IO.Class import Data.Text -import Data.Maybe -import Data.Bifunctor import Data.List.NonEmpty import qualified Data.List.NonEmpty as NEL import qualified Data.Text as T -import qualified Data.Set as S import Kafka.Internal.RdKafka import Kafka.Internal.Setup @@ -53,11 +49,27 @@ createTopic kAdmin topic = liftIO $ do Right _ -> do pure $ Right $ topicName topic +--- DELETE TOPIC --- +deleteTopic :: KAdmin + -> TopicName + -> IO (Either KafkaError TopicName) +deleteTopic kAdmin topic = liftIO $ do + let kafkaPtr = getRdKafka kAdmin + queue <- newRdKafkaQueue kafkaPtr + opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny + + topicRes <- withOldTopic topic $ \topic' -> rdKafkaDeleteTopics kafkaPtr [topic'] opts queue + case topicRes of + Left err -> do + pure $ Left (NEL.head err) + Right _ -> do + pure $ Right topic + withNewTopic :: NewTopic -> (RdKafkaNewTopicTPtr -> IO a) -> IO (Either (NonEmpty KafkaError) a) withNewTopic t transform = do - mkNewTopicRes <- mkNewTopic t topicPtr + mkNewTopicRes <- mkNewTopic t newTopicPtr case mkNewTopicRes of Left err -> do return $ Left err @@ -65,19 +77,46 @@ withNewTopic t transform = do res <- transform topic return $ Right res -topicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr) -topicPtr topic = do +withOldTopic :: TopicName + -> (RdKafkaDeleteTopicTPtr -> IO a) + -> IO (Either (NonEmpty KafkaError) a) +withOldTopic tName transform = do + rmOldTopicRes <- rmOldTopic tName oldTopicPtr + case rmOldTopicRes of + Left err -> do + return $ Left err + Right topic -> do + res <- transform topic + return $ Right res + +newTopicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr) +newTopicPtr topic = do ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic) case ptrRes of Left str -> pure $ Left (KafkaError $ T.pack str) Right ptr -> pure $ Right ptr +oldTopicPtr :: TopicName -> IO (Either KafkaError RdKafkaDeleteTopicTPtr) +oldTopicPtr tName = do + res <- newRdKafkaDeleteTopic $ unpack . unTopicName $ tName + case res of + Left str -> pure $ Left (KafkaError $ T.pack str) + Right ptr -> pure $ Right ptr + mkNewTopic :: NewTopic -> (NewTopic -> IO (Either KafkaError a)) -> IO (Either (NonEmpty KafkaError) a) mkNewTopic topic create = do res <- create topic case res of - Left err -> pure $ Left (Data.List.NonEmpty.singleton err) + Left err -> pure $ Left (NEL.singleton err) Right resource -> pure $ Right resource +rmOldTopic :: TopicName + -> (TopicName -> IO (Either KafkaError a)) + -> IO (Either (NonEmpty KafkaError) a) +rmOldTopic tName remove = do + res <- remove tName + case res of + Left err -> pure $ Left (NEL.singleton err) + Right resource -> pure $ Right resource diff --git a/src/Kafka/Internal/RdKafka.chs b/src/Kafka/Internal/RdKafka.chs index 3c2bb11..9ec9f52 100644 --- a/src/Kafka/Internal/RdKafka.chs +++ b/src/Kafka/Internal/RdKafka.chs @@ -17,7 +17,7 @@ import Foreign.Storable (Storable(..)) import Foreign.Ptr (Ptr, FunPtr, castPtr, nullPtr) import Foreign.ForeignPtr (FinalizerPtr, addForeignPtrFinalizer, newForeignPtr_, withForeignPtr, ForeignPtr, newForeignPtr) import Foreign.C.Error (Errno(..), getErrno) -import Foreign.C.String (CString, newCString, withCAString, peekCAString, peekCString) +import Foreign.C.String (CString, newCString, withCString, withCAString, peekCAString, peekCString) import Foreign.C.Types (CFile, CInt(..), CSize, CChar, CLong) import System.IO (Handle, stdin, stdout, stderr) import System.Posix.IO (handleToFd) @@ -1203,6 +1203,7 @@ newRdKafkaNewTopic topicName topicPartitions topicReplicationFactor = do then peekCString ptr >>= pure . Left else addForeignPtrFinalizer rdKafkaNewTopicDestroyFinalizer res >> pure (Right res) +--- Create topic rdKafkaCreateTopic :: RdKafkaTPtr -> RdKafkaNewTopicTPtr -> RdKafkaAdminOptionsTPtr @@ -1214,34 +1215,33 @@ rdKafkaCreateTopic kafkaPtr topic opts queue = do withForeignPtrsArrayLen topics $ \tLen tPtr -> do {#call rd_kafka_CreateTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr -rdKafkaEventCreateTopicsResult :: RdKafkaEventTPtr -> IO (Maybe RdKafkaTopicResultTPtr) -rdKafkaEventCreateTopicsResult evtPtr = - withForeignPtr evtPtr $ \evtPtr' -> do - res <- {#call rd_kafka_event_CreateTopics_result#} (castPtr evtPtr') +--- Delete topic +foreign import ccall unsafe "rdkafka.h &rd_kafka_DeleteTopic_destroy" + rdKafkaDeleteTopicDestroy :: FinalizerPtr RdKafkaDeleteTopicT + +data RdKafkaDeleteTopicT +{#pointer *rd_kafka_DeleteTopic_t as RdKafkaDeleteTopicTPtr foreign -> RdKafkaDeleteTopicT #} + +data RdKafkaDeleteTopicsResultT +{#pointer *rd_kafka_DeleteTopics_result_t as RdKafkaDeleteTopicResultTPtr foreign -> RdKafkaDeleteTopicsResultT #} + +newRdKafkaDeleteTopic :: String -> IO (Either String RdKafkaDeleteTopicTPtr) +newRdKafkaDeleteTopic topicNameStr = + withCString topicNameStr $ \topicNameStrPtr -> do + res <- {#call rd_kafka_DeleteTopic_new#} topicNameStrPtr if (res == nullPtr) - then pure Nothing - else Just <$> newForeignPtr_ (castPtr res) - -rdKafkaCreateTopicsResultTopics :: RdKafkaTopicResultTPtr - -> IO [Either (String, RdKafkaRespErrT, String) String] -rdKafkaCreateTopicsResultTopics tRes = - withForeignPtr tRes $ \tRes' -> - alloca $ \sPtr -> do - res <- {#call rd_kafka_CreateTopics_result_topics#} (castPtr tRes') sPtr - size <- peekIntConv sPtr - array <- peekArray size res - traverse unpackRdKafkaTopicResult array - -unpackRdKafkaTopicResult :: Ptr RdKafkaTopicResultT - -> IO (Either (String, RdKafkaRespErrT, String) String) -unpackRdKafkaTopicResult resPtr = do - name <- {#call rd_kafka_topic_result_name#} resPtr >>= peekCString - err <- {#call rd_kafka_topic_result_error#} resPtr - case cIntToEnum err of - respErr -> do - errMsg <- {#call rd_kafka_topic_result_error_string#} resPtr >>= peekCString - pure $ Left (name, respErr, errMsg) - RdKafkaRespErrNoError -> pure $ Right name + then return $ Left $ "Something went wrong while deleting topic " ++ topicNameStr + else Right <$> newForeignPtr rdKafkaDeleteTopicDestroy res + +rdKafkaDeleteTopics :: RdKafkaTPtr + -> [RdKafkaDeleteTopicTPtr] + -> RdKafkaAdminOptionsTPtr + -> RdKafkaQueueTPtr + -> IO () +rdKafkaDeleteTopics kafkaPtr topics opts queue = do + withForeignPtrs kafkaPtr opts queue $ \kPtr oPtr qPtr -> + withForeignPtrsArrayLen topics $ \tLen tPtr -> do + {#call rd_kafka_DeleteTopics#} kPtr tPtr (fromIntegral tLen) oPtr qPtr -- Marshall / Unmarshall enumToCInt :: Enum a => a -> CInt diff --git a/tests-it/Kafka/IntegrationSpec.hs b/tests-it/Kafka/IntegrationSpec.hs index 1c21d64..5fc2ea0 100644 --- a/tests-it/Kafka/IntegrationSpec.hs +++ b/tests-it/Kafka/IntegrationSpec.hs @@ -177,6 +177,8 @@ spec = do describe "Kafka.Admin.Spec" $ do let topicName = addRandomChars "admin.topic.created." 5 + topicsMVar <- runIO newEmptyMVar + specWithAdmin "Create topic" $ do it "should create a new topic" $ \(admin :: KAdmin) -> do @@ -184,6 +186,32 @@ spec = do let newTopic = mkNewTopic (TopicName ( T.pack(tName) )) result <- createTopic admin newTopic result `shouldSatisfy` isRight + + specWithConsumer "Read all topics" consumerProps $ do + + it "should return all the topics" $ \(consumer :: KafkaConsumer) -> do + res <- allTopicsMetadata consumer (Timeout 1000) + res `shouldSatisfy` isRight + let filterUserTopics m = m { kmTopics = filter (\t -> topicType (tmTopicName t) == User) (kmTopics m) } + let res' = fmap filterUserTopics res + length . kmBrokers <$> res' `shouldBe` Right 1 + + let topics = either (const []) (map tmTopicName . kmTopics) res' + putMVar topicsMVar topics + + let topicsLen = either (const 0) (length . kmTopics) res' + let hasTopic = either (const False) (any (\t -> tmTopicName t == testTopic) . kmTopics) res' + + topicsLen `shouldSatisfy` (>0) + hasTopic `shouldBe` True + + specWithAdmin "Remove topics" $ do + + it "should delete all the topics currently existing" $ \(admin ::KAdmin) -> do + topics <- takeMVar topicsMVar + forM_ topics $ \topic -> do + result <- deleteTopic admin topic + result `shouldSatisfy` isRight ---------------------------------------------------------------------------------------------------------------- data ReadState = Skip | Read From f0383f4f28bd2565878099c9f91aee501ca5b3f4 Mon Sep 17 00:00:00 2001 From: JoranVanBelle Date: Tue, 14 Jan 2025 13:22:07 +0100 Subject: [PATCH 3/4] chore: use HasKafka in favor of KAdmin --- hw-kafka-client.cabal | 5 +-- src/Kafka/Admin/AdminProperties.hs | 43 ------------------ src/Kafka/Internal/RdKafka.chs | 5 +-- src/Kafka/{Admin.hs => Topic.hs} | 69 +++++++++++------------------ src/Kafka/{Admin => Topic}/Types.hs | 19 +------- tests-it/Kafka/IntegrationSpec.hs | 20 ++++----- tests-it/Kafka/TestEnv.hs | 13 ------ 7 files changed, 42 insertions(+), 132 deletions(-) delete mode 100644 src/Kafka/Admin/AdminProperties.hs rename src/Kafka/{Admin.hs => Topic.hs} (62%) rename src/Kafka/{Admin => Topic}/Types.hs (57%) diff --git a/hw-kafka-client.cabal b/hw-kafka-client.cabal index 474cacf..4f43ef3 100644 --- a/hw-kafka-client.cabal +++ b/hw-kafka-client.cabal @@ -55,9 +55,8 @@ library build-tool-depends: c2hs:c2hs if impl(ghc <8.0) build-depends: semigroups - exposed-modules: Kafka.Admin - Kafka.Admin.AdminProperties - Kafka.Admin.Types + exposed-modules: Kafka.Topic + Kafka.Topic.Types Kafka.Consumer Kafka.Consumer.ConsumerProperties Kafka.Consumer.Subscription diff --git a/src/Kafka/Admin/AdminProperties.hs b/src/Kafka/Admin/AdminProperties.hs deleted file mode 100644 index 30d858f..0000000 --- a/src/Kafka/Admin/AdminProperties.hs +++ /dev/null @@ -1,43 +0,0 @@ -{-# LANGUAGE OverloadedStrings #-} - -module Kafka.Admin.AdminProperties where - -import Data.Map -import qualified Data.Map as M -import Data.Text - -import Kafka.Types - -newtype AdminProperties = AdminProperties { - adminProps :: Map Text Text -} - -instance Semigroup AdminProperties where - ( AdminProperties props1 ) <> ( AdminProperties props2 ) = - AdminProperties ( props2 `union` props1 ) - {-# INLINE (<>) #-} - -instance Monoid AdminProperties where - mempty = AdminProperties { - adminProps = M.empty - } - {-# INLINE mempty #-} - mappend = (<>) - {-# INLINE mappend #-} - -brokers :: [BrokerAddress] -> AdminProperties -brokers b = - let b' = intercalate "," ((\( BrokerAddress i ) -> i ) <$> b ) - in extraProps $ fromList [("bootstrap.servers", b')] - -clientId :: ClientId -> AdminProperties -clientId (ClientId cid) = - extraProps $ M.fromList [("client.id", cid)] - -timeOut :: Timeout -> AdminProperties -timeOut (Timeout to) = - let to' = ( pack $ show to ) - in extraProps $ fromList [("request.timeout.ms", to')] - -extraProps :: Map Text Text -> AdminProperties -extraProps m = mempty { adminProps = m } diff --git a/src/Kafka/Internal/RdKafka.chs b/src/Kafka/Internal/RdKafka.chs index 9ec9f52..e408a6d 100644 --- a/src/Kafka/Internal/RdKafka.chs +++ b/src/Kafka/Internal/RdKafka.chs @@ -1161,11 +1161,10 @@ rdKafkaErrorIsRetriable ptr = boolFromCInt <$> rdKafkaErrorIsRetriable' ptr rdKafkaErrorTxnRequiresAbort :: RdKafkaErrorTPtr -> IO Bool rdKafkaErrorTxnRequiresAbort ptr = boolFromCInt <$> rdKafkaErrorTxnRequiresAbort' ptr --- Admin API +-- Topics {#enum rd_kafka_admin_op_t as ^ {underscoreToCase} deriving (Show, Eq) #} -data RdKafkaTopicResultT -{#pointer *rd_kafka_topic_result_t as RdKafkaTopicResultTPtr foreign -> RdKafkaTopicResultT #} + data RdKafkaAdminOptionsT {#pointer *rd_kafka_AdminOptions_t as RdKafkaAdminOptionsTPtr foreign -> RdKafkaAdminOptionsT #} diff --git a/src/Kafka/Admin.hs b/src/Kafka/Topic.hs similarity index 62% rename from src/Kafka/Admin.hs rename to src/Kafka/Topic.hs index 50f1ab6..d03cb98 100644 --- a/src/Kafka/Admin.hs +++ b/src/Kafka/Topic.hs @@ -1,60 +1,42 @@ -module Kafka.Admin( +module Kafka.Topic( module X -, newKAdmin , createTopic , deleteTopic -, closeKAdmin ) where -import Control.Monad -import Control.Monad.IO.Class -import Data.Text -import Data.List.NonEmpty -import qualified Data.List.NonEmpty as NEL -import qualified Data.Text as T +import Control.Monad.IO.Class +import Data.List.NonEmpty +import qualified Data.List.NonEmpty as NEL +import Data.Text +import qualified Data.Text as T import Kafka.Internal.RdKafka import Kafka.Internal.Setup -import Kafka.Types as X -import Kafka.Admin.AdminProperties as X -import Kafka.Admin.Types as X +import Kafka.Topic.Types as X +import Kafka.Types as X -newKAdmin ::( MonadIO m ) - => AdminProperties - -> m (Either KafkaError KAdmin) -newKAdmin properties = liftIO $ do - kafkaConfig@(KafkaConf kafkaConf' _ _) <- kafkaConf ( KafkaProps $ adminProps properties) - maybeKafka <- newRdKafkaT RdKafkaConsumer kafkaConf' - case maybeKafka of - Left err -> pure $ Left $ KafkaError err - Right kafka -> pure $ Right $ KAdmin (Kafka kafka) kafkaConfig - -closeKAdmin :: KAdmin - -> IO () -closeKAdmin ka = void $ rdKafkaConsumerClose (getRdKafka ka) --- CREATE TOPIC --- -createTopic :: KAdmin - -> NewTopic - -> IO (Either KafkaError TopicName) -createTopic kAdmin topic = liftIO $ do - let kafkaPtr = getRdKafka kAdmin +createTopic :: HasKafka k => k -> NewTopic -> IO (Either KafkaError TopicName) +createTopic k topic = do + let kafkaPtr = getRdKafka k queue <- newRdKafkaQueue kafkaPtr - opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny + opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny topicRes <- withNewTopic topic $ \topic' -> rdKafkaCreateTopic kafkaPtr topic' opts queue case topicRes of - Left err -> do + Left err -> do pure $ Left (NEL.head err) Right _ -> do pure $ Right $ topicName topic --- DELETE TOPIC --- -deleteTopic :: KAdmin +deleteTopic :: HasKafka k + => k -> TopicName -> IO (Either KafkaError TopicName) -deleteTopic kAdmin topic = liftIO $ do - let kafkaPtr = getRdKafka kAdmin +deleteTopic k topic = liftIO $ do + let kafkaPtr = getRdKafka k queue <- newRdKafkaQueue kafkaPtr opts <- newRdKAdminOptions kafkaPtr RdKafkaAdminOpAny @@ -65,7 +47,7 @@ deleteTopic kAdmin topic = liftIO $ do Right _ -> do pure $ Right topic -withNewTopic :: NewTopic +withNewTopic :: NewTopic -> (RdKafkaNewTopicTPtr -> IO a) -> IO (Either (NonEmpty KafkaError) a) withNewTopic t transform = do @@ -73,7 +55,7 @@ withNewTopic t transform = do case mkNewTopicRes of Left err -> do return $ Left err - Right topic -> do + Right topic -> do res <- transform topic return $ Right res @@ -93,23 +75,23 @@ newTopicPtr :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr) newTopicPtr topic = do ptrRes <- newRdKafkaNewTopic (unpack $ unTopicName $ topicName topic) (unPartitionCount $ topicPartitionCount topic) (unReplicationFactor $ topicReplicationFactor topic) case ptrRes of - Left str -> pure $ Left (KafkaError $ T.pack str) + Left str -> pure $ Left (KafkaError $ T.pack str) Right ptr -> pure $ Right ptr oldTopicPtr :: TopicName -> IO (Either KafkaError RdKafkaDeleteTopicTPtr) oldTopicPtr tName = do res <- newRdKafkaDeleteTopic $ unpack . unTopicName $ tName case res of - Left str -> pure $ Left (KafkaError $ T.pack str) + Left str -> pure $ Left (KafkaError $ T.pack str) Right ptr -> pure $ Right ptr -mkNewTopic :: NewTopic +mkNewTopic :: NewTopic -> (NewTopic -> IO (Either KafkaError a)) -> IO (Either (NonEmpty KafkaError) a) mkNewTopic topic create = do res <- create topic case res of - Left err -> pure $ Left (NEL.singleton err) + Left err -> pure $ Left (singletonList err) Right resource -> pure $ Right resource rmOldTopic :: TopicName @@ -118,5 +100,8 @@ rmOldTopic :: TopicName rmOldTopic tName remove = do res <- remove tName case res of - Left err -> pure $ Left (NEL.singleton err) + Left err -> pure $ Left (singletonList err) Right resource -> pure $ Right resource + +singletonList :: a -> NonEmpty a +singletonList x = x :| [] diff --git a/src/Kafka/Admin/Types.hs b/src/Kafka/Topic/Types.hs similarity index 57% rename from src/Kafka/Admin/Types.hs rename to src/Kafka/Topic/Types.hs index 54f606e..e42e2c3 100644 --- a/src/Kafka/Admin/Types.hs +++ b/src/Kafka/Topic/Types.hs @@ -1,6 +1,5 @@ -module Kafka.Admin.Types ( -KAdmin(..) -, PartitionCount (..) +module Kafka.Topic.Types ( +PartitionCount (..) , ReplicationFactor (..) , NewTopic (..) ) where @@ -8,20 +7,6 @@ KAdmin(..) import Data.Map import Kafka.Types -import Kafka.Internal.Setup - -data KAdmin = KAdmin { - kaKafkaPtr :: !Kafka - , kaKafkaConf :: !KafkaConf -} - -instance HasKafka KAdmin where - getKafka = kaKafkaPtr - {-# INLINE getKafka #-} - -instance HasKafkaConf KAdmin where - getKafkaConf = kaKafkaConf - {-# INLINE getKafkaConf #-} newtype PartitionCount = PartitionCount { unPartitionCount :: Int } deriving (Show, Eq) newtype ReplicationFactor = ReplicationFactor { unReplicationFactor :: Int } deriving (Show, Eq) diff --git a/tests-it/Kafka/IntegrationSpec.hs b/tests-it/Kafka/IntegrationSpec.hs index 5fc2ea0..6ed1853 100644 --- a/tests-it/Kafka/IntegrationSpec.hs +++ b/tests-it/Kafka/IntegrationSpec.hs @@ -18,7 +18,7 @@ import Kafka.Consumer import qualified Data.Text as T import Kafka.Metadata import Kafka.Producer -import Kafka.Admin +import Kafka.Topic import Kafka.TestEnv import Test.Hspec @@ -174,20 +174,19 @@ spec = do forM_ res $ \rcs -> forM_ rcs ((`shouldBe` Set.fromList (headersToList testHeaders)) . Set.fromList . headersToList . crHeaders) - describe "Kafka.Admin.Spec" $ do + describe "Kafka.Topic.Spec" $ do let topicName = addRandomChars "admin.topic.created." 5 topicsMVar <- runIO newEmptyMVar - specWithAdmin "Create topic" $ do - - it "should create a new topic" $ \(admin :: KAdmin) -> do + specWithConsumer "Read all topics" consumerProps $ do + + it "should create a topic" $ \(consumer :: KafkaConsumer) -> do tName <- topicName let newTopic = mkNewTopic (TopicName ( T.pack(tName) )) - result <- createTopic admin newTopic + result <- createTopic consumer newTopic result `shouldSatisfy` isRight - specWithConsumer "Read all topics" consumerProps $ do it "should return all the topics" $ \(consumer :: KafkaConsumer) -> do res <- allTopicsMetadata consumer (Timeout 1000) @@ -205,13 +204,12 @@ spec = do topicsLen `shouldSatisfy` (>0) hasTopic `shouldBe` True - specWithAdmin "Remove topics" $ do - - it "should delete all the topics currently existing" $ \(admin ::KAdmin) -> do + it "should delete all the topics currently existing" $ \(consumer :: KafkaConsumer) -> do topics <- takeMVar topicsMVar forM_ topics $ \topic -> do - result <- deleteTopic admin topic + result <- deleteTopic consumer topic result `shouldSatisfy` isRight + ---------------------------------------------------------------------------------------------------------------- data ReadState = Skip | Read diff --git a/tests-it/Kafka/TestEnv.hs b/tests-it/Kafka/TestEnv.hs index e588137..daee4a2 100644 --- a/tests-it/Kafka/TestEnv.hs +++ b/tests-it/Kafka/TestEnv.hs @@ -15,7 +15,6 @@ import qualified System.Random as Rnd import Control.Concurrent import Kafka.Consumer as C import Kafka.Producer as P -import Kafka.Admin as A import Test.Hspec @@ -58,9 +57,6 @@ producerProps = P.brokersList [brokerAddress] <> P.setCallback (logCallback (\l s1 s2 -> print $ "[Producer] " <> show l <> ": " <> s1 <> ", " <> s2)) <> P.setCallback (errorCallback (\e r -> print $ "[Producer] " <> show e <> ": " <> r)) -adminProperties :: AdminProperties -adminProperties = A.brokers [brokerAddress] - testSubscription :: TopicName -> Subscription testSubscription t = topics [t] <> offsetReset Earliest @@ -80,9 +76,6 @@ mkConsumerWith props = do (RebalanceAssign _) -> putMVar var True _ -> pure () -mkAdmin :: IO KAdmin -mkAdmin = newKAdmin adminProperties >>= \(Right k) -> pure k - specWithConsumer :: String -> ConsumerProperties -> SpecWith KafkaConsumer -> Spec specWithConsumer s p f = beforeAll (mkConsumerWith p) @@ -97,9 +90,3 @@ specWithKafka s p f = beforeAll ((,) <$> mkConsumerWith p <*> mkProducer) $ afterAll (\(consumer, producer) -> void $ closeProducer producer >> closeConsumer consumer) $ describe s f - -specWithAdmin :: String -> SpecWith KAdmin -> Spec -specWithAdmin s f = - beforeAll mkAdmin - $ afterAll (void . closeKAdmin) - $ describe s f From 035a3f73bc4a50775561a8f62b76424fc183bea4 Mon Sep 17 00:00:00 2001 From: JoranVanBelle Date: Wed, 29 Jan 2025 07:49:17 +0100 Subject: [PATCH 4/4] chore: update location of project from git:// to https:// --- hw-kafka-client.cabal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hw-kafka-client.cabal b/hw-kafka-client.cabal index 4f43ef3..083a0bd 100644 --- a/hw-kafka-client.cabal +++ b/hw-kafka-client.cabal @@ -25,7 +25,7 @@ extra-source-files: README.md source-repository head type: git - location: git://github.com/haskell-works/hw-kafka-client.git + location: https://github.com/haskell-works/hw-kafka-client.git flag examples description: Also compile examples