/
admin.clj
227 lines (195 loc) · 7.9 KB
/
admin.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
(ns jackdaw.admin
"Tools for administering or just interacting with a Kafka cluster.
Wraps the `AdminClient` API, replacing the Scala admin APIs.
Like the underlying `AdminClient` API, this namespace is subject to
change and should be considered of alpha stability."
{:license "BSD 3-Clause License <https://github.com/FundingCircle/jackdaw/blob/master/LICENSE>"}
(:require
[jackdaw.data :as jd]
[manifold.deferred :as d])
(:import [org.apache.kafka.clients.admin AdminClient
DescribeTopicsOptions DescribeClusterOptions DescribeConfigsOptions]))
(defprotocol Client
(alter-topics* [this topics])
(create-topics* [this topics])
(delete-topics* [this topics])
(describe-topics* [this topics])
(describe-configs* [this configs])
(describe-cluster* [this])
(list-topics* [this]))
(def client-impl
{:alter-topics* (fn [this topics]
(d/future
@(.all (.alterConfigs this topics))))
:create-topics* (fn [this topics]
(d/future
@(.all (.createTopics this topics))))
:delete-topics* (fn [this topics]
(d/future
@(.all (.deleteTopics this topics))))
:describe-topics* (fn [this topics]
(d/future
@(.all (.describeTopics this topics (DescribeTopicsOptions.)))))
:describe-configs* (fn [this configs]
(d/future
@(.all (.describeConfigs this configs (DescribeConfigsOptions.)))))
:describe-cluster* (fn [this]
(d/future
(jd/datafy (.describeCluster this (DescribeClusterOptions.)))))
:list-topics* (fn [this]
(d/future
@(.names (.listTopics this))))})
(extend AdminClient
Client
client-impl)
(defn ->AdminClient
"Given a Kafka properties map having `\"bootstrap.servers\"`, return
an `AdminClient` bootstrapped off of the configured servers."
^AdminClient [kafka-config]
{:pre [(get kafka-config "bootstrap.servers")]}
(AdminClient/create (jd/map->Properties kafka-config)))
(defn client?
"Predicate.
Return `true` if and only if given an `AdminClient` instance."
[x]
(instance? AdminClient x))
(defn list-topics
"Given an `AdminClient`, return a seq of topic records, being the
topics on the cluster."
[^AdminClient client]
{:pre [(client? client)]}
(->> @(list-topics* client)
;; We should allow the caller to decide whether they want
;; the result to be sorted or not?
sort
(map #(hash-map :topic-name %))))
(defn topic-exists?
"Verifies the existence of the topic.
Does not verify any config. details or values."
[^AdminClient client {:keys [topic-name] :as topic}]
{:pre [(client? client)
(string? topic-name)]}
(contains? (set (list-topics client)) {:topic-name topic-name}))
(defn retry-exists?
"Returns `true` if topic exists. Otherwise spins as configured."
[client topic num-retries wait-ms]
(cond (topic-exists? client topic)
true
(zero? num-retries)
false
:else
(do (Thread/sleep wait-ms)
(recur client topic (dec num-retries) wait-ms))))
(defn create-topics!
"Given an `AdminClient` and a collection of topic descriptors,
create the specified topics with their configuration(s).
Does not block until the created topics are ready. It may take some
time for replicas and leaders to be chosen for newly created
topics.
See `#'topics-ready?`, `#'topic-exists?` and `#'retry-exists?` for
tools with which to wait for topics to be ready."
[^AdminClient client topics]
{:pre [(client? client)
(sequential? topics)]}
@(create-topics* client (map jd/map->NewTopic topics)))
(defn describe-topics
"Given an `AdminClient` and an optional collection of topic
descriptors, return a map from topic names to topic
descriptions.
If no topics are provided, describes all topics.
Note that the topic description does NOT include the topic's
configuration.See `#'describe-topic-config` for that capability."
([^AdminClient client]
{:pre [(client? client)]}
(describe-topics client (list-topics client)))
([^AdminClient client topics]
{:pre [(client? client)
(sequential? topics)]}
(->> @(describe-topics* client (map :topic-name topics))
(map (fn [[k v]] [k (jd/datafy v)]))
(into {}))))
(defn describe-topics-configs
"Given an `AdminClient` and a collection of topic descriptors, returns
the selected topics' live configuration as a map from topic names to
configured properties to metadata about each property including its
current value."
[^AdminClient client topics]
{:pre [(client? client)
(sequential? topics)]}
(->> @(describe-configs* client (map #(-> % :topic-name jd/->topic-resource) topics))
(into {})
(reduce-kv (fn [m k v]
(assoc m (jd/datafy k) (jd/datafy v)))
{})))
(defn topics-ready?
"Given an `AdminClient` and a sequence topic descriptors, return
`true` if and only if all listed topics have a leader and in-sync
replicas.
This can be used to determine if some set of newly created topics
are healthy yet, or detect whether leader re-election has finished
following the demise of a Kafka broker."
[^AdminClient client topics]
{:pre [(client? client)
(sequential? topics)]}
(->> @(describe-topics* client (map :topic-name topics))
(every? (fn [[topic-name {:keys [partition-info]}]]
(every? (fn [part-info]
(and (boolean (:leader part-info))
(seq (:isr part-info))))
partition-info)))))
(defn- topics->configs
^java.util.Map [topics]
(into {}
(map (fn [{:keys [topic-name topic-config] :as t}]
{:pre [(string? topic-name)
(map? topic-config)]}
[(jd/->ConfigResource jd/+topic-config-resource-type+
topic-name)
(jd/map->Config topic-config)]))
topics))
(defn alter-topic-config!
"Given an `AdminClient` and a sequence of topic descriptors having
`:topic-config`, alters the live configuration of the specified
topics to correspond to the specified `:topic-config`."
[^AdminClient client topics]
{:pre [(client? client)
(sequential? topics)]}
@(alter-topics* client (topics->configs topics)))
(defn delete-topics!
"Given an `AdminClient` and a sequence of topic descriptors, marks the
topics for deletion.
Does not block until the topics are deleted, just until the deletion
request(s) are acknowledged."
[^AdminClient client topics]
{:pre [(client? client)
(sequential? topics)]}
@(delete-topics* client (map :topic-name topics)))
(defn partition-ids-of-topics
"Given an `AdminClient` and an optional sequence of topics, produces a
mapping from topic names to a sequence of the partition IDs for that
topic.
By default, enumerates the partition IDs for all topics."
([^AdminClient client]
{:pre [(client? client)]}
(partition-ids-of-topics client (list-topics client)))
([^AdminClient client topics]
{:pre [(client? client)
(sequential? topics)]}
(->> (describe-topics client topics)
(map (fn [[topic-name {:keys [partition-info]}]]
[topic-name (mapv :partition partition-info)]))
(into {}))))
(defn describe-cluster
"Returns a `DescribeClusterResult` describing the cluster."
[^AdminClient client]
{:pre [(client? client)]}
(-> @(describe-cluster* client)
jd/datafy))
(defn get-broker-config
"Returns the broker config as a map.
Broker-id is an int, typically 0-2, get the list of valid broker ids
using describe-cluster"
[^AdminClient client broker-id]
{:pre [(client? client)]}
(-> @(describe-configs* client [(jd/->broker-resource (str broker-id))])
vals first jd/datafy))