-
Notifications
You must be signed in to change notification settings - Fork 50
/
Admin.hs
174 lines (151 loc) · 6.83 KB
/
Admin.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
{-# LANGUAGE RecordWildCards #-}
module Kafka.Admin
( module X
, NewTopic (..)
, ReplicationFactor(..)
, PartitionsCount(..)
, KafkaAdmin
, newKafkaAdmin
, createTopics
, deleteTopics
, closeKafkaAdmin
)
where
import Control.Exception (bracket, displayException)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Except (ExceptT (..), runExceptT, withExceptT)
import Control.Monad.Trans.Maybe (MaybeT (..), runMaybeT)
import Data.Bifunctor (bimap, first)
import Data.Either (partitionEithers)
import Data.Foldable (traverse_)
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NEL
import qualified Data.Map as M
import Data.Maybe (fromMaybe)
import Data.Semigroup ((<>))
import qualified Data.Set as S
import qualified Data.Text as T
import Kafka.Internal.RdKafka
import Kafka.Internal.Setup
import Kafka.Types
import Kafka.Admin.AdminProperties as X
newtype ReplicationFactor = ReplicationFactor { unReplicationFactor :: Int } deriving (Show, Eq, Read, Ord)
newtype PartitionsCount = PartitionsCount { unPartitionsCount :: Int } deriving (Show, Eq, Read, Ord)
{-# DEPRECATED KafkaAdmin "Do we even need a special KafkaAdmin now when all the functions accept HasKafka?" #-}
data KafkaAdmin = KafkaAdmin
{ acKafka :: !Kafka
, acKafkaConf :: !KafkaConf
}
instance HasKafka KafkaAdmin where
getKafka = acKafka
data NewTopic = NewTopic
{ ntName :: TopicName
, ntPartitions :: PartitionsCount
, ntReplicationFactor :: ReplicationFactor
, ntConfig :: M.Map String String
} deriving (Show)
{-# DEPRECATED newKafkaAdmin "Do we even need a special KafkaAdmin now when all the functions accept HasKafka?" #-}
newKafkaAdmin :: (MonadIO m)
=> AdminProperties
-> m (Either KafkaError KafkaAdmin)
newKafkaAdmin props = liftIO $ do
kc@(KafkaConf kc' _ _) <- kafkaConf (KafkaProps $ apKafkaProps props) --kafkaConf (KafkaProps [])
mbKafka <- newRdKafkaT RdKafkaConsumer kc'
case mbKafka of
Left err -> pure . Left $ KafkaError err
Right kafka -> pure $ Right $ KafkaAdmin (Kafka kafka) kc
closeKafkaAdmin :: KafkaAdmin -> IO ()
closeKafkaAdmin k = void $ rdKafkaConsumerClose (getRdKafka k)
----------------------------- CREATE ------------------------------------------
createTopics :: (HasKafka k)
=> k
-> [NewTopic]
-> IO [Either (KafkaError, String) TopicName]
createTopics client ts =
withAdminOperation client $ \(kafkaPtr, opts, queue) -> do
let topicNames = ntName <$> ts
crRes <- withNewTopics ts $ \topics ->
rdKafkaCreateTopics kafkaPtr topics opts queue
case crRes of
Left es -> pure $ (\e -> Left (e, displayException e)) <$> NEL.toList es
Right _ -> do
res <- waitForAllResponses topicNames rdKafkaEventCreateTopicsResult rdKafkaCreateTopicsResultTopics queue
pure $ first (\(_, a, b) -> (a, b)) <$> res
withNewTopics :: [NewTopic] -> ([RdKafkaNewTopicTPtr] -> IO a) -> IO (Either (NonEmpty KafkaError) a)
withNewTopics ts =
withUnsafe ts mkNewTopicUnsafe rdKafkaNewTopicDestroyArray
mkNewTopicUnsafe :: NewTopic -> IO (Either KafkaError RdKafkaNewTopicTPtr)
mkNewTopicUnsafe NewTopic{..} = runExceptT $ do
t <- withStrErr $ newRdKafkaNewTopicUnsafe (T.unpack $ unTopicName ntName) (unPartitionsCount ntPartitions) (unReplicationFactor ntReplicationFactor)
_ <- withKafkaErr $ whileRight (uncurry $ rdKafkaNewTopicSetConfig undefined) (M.toList ntConfig)
pure t
where
withStrErr = withExceptT (KafkaError . T.pack) . ExceptT
withKafkaErr = withExceptT KafkaResponseError . ExceptT
----------------------------- DELETE ------------------------------------------
deleteTopics :: (HasKafka k)
=> k
-> [TopicName]
-> IO [Either (TopicName, KafkaError, String) TopicName]
deleteTopics client ts =
withAdminOperation client $ \(kafkaPtr, opts, queue) -> do
topics <- traverse (newRdKafkaDeleteTopic . T.unpack . unTopicName) ts
rdKafkaDeleteTopics kafkaPtr topics opts queue
waitForAllResponses ts rdKafkaEventDeleteTopicsResult rdKafkaDeleteTopicsResultTopics queue
-------------------- Hepler servicing functions
withAdminOperation :: HasKafka k
=> k
-> ((RdKafkaTPtr, RdKafkaAdminOptionsTPtr, RdKafkaQueueTPtr) -> IO a)
-> IO a
withAdminOperation k f = do
let kafkaPtr = getRdKafka k
queue <- newRdKafkaQueue kafkaPtr
opts <- newRdKafkaAdminOptions kafkaPtr RdKafkaAdminOpAny
f (kafkaPtr, opts, queue)
withUnsafe :: [a] -- ^ Items to handle
-> (a -> IO (Either KafkaError b)) -- ^ Create an unsafe element
-> ([b] -> IO ()) -- ^ Destroy all unsafe elements
-> ([b] -> IO c) -- ^ Handler
-> IO (Either (NonEmpty KafkaError) c)
withUnsafe as mkOne cleanup f =
bracket mkAll cleanupAll processAll
where
mkAll = partitionEithers <$> traverse mkOne as
cleanupAll (_, ts') = cleanup ts'
processAll (es, ts') =
case es of
[] -> Right <$> f ts'
(e:es') -> pure $ Left (e :| es')
-- ^ Keeps applying a function until the error is found
--
-- Maybe it'be easier if internal helpers would return "ExceptT e m ()"?
whileRight :: Monad m
=> (a -> m (Either e ()))
-> [a]
-> m (Either e ())
whileRight f as = runExceptT $ traverse_ (ExceptT . f) as
-- ^ Polls the provided queue until it hets all the responses
-- from all the specified topics
waitForAllResponses :: [TopicName]
-> (RdKafkaEventTPtr -> IO (Maybe a))
-> (a -> IO [Either (String, RdKafkaRespErrT, String) String])
-> RdKafkaQueueTPtr
-> IO [Either (TopicName, KafkaError, String) TopicName]
waitForAllResponses ts fromEvent toResults q =
fromMaybe [] <$> runMaybeT (go (S.fromList ts) [])
where
go awaited accRes = do
qRes <- MaybeT $ rdKafkaQueuePoll q 1000
eRes <- MaybeT $ fromEvent qRes
tRes <- lift $ toResults eRes
let results = wrapTopicName <$> tRes
let topics = S.fromList $ getTopicName <$> results
let newRes = results <> accRes
let remaining = S.difference awaited topics
if S.null remaining
then pure newRes
else go remaining newRes
getTopicName = either (\(t,_,_) -> t) id
wrapTopicName = bimap (\(t,e,s) -> (TopicName . T.pack $ t, KafkaResponseError e, s)) (TopicName . T.pack)