/
AdminClient.java
511 lines (473 loc) · 23 KB
/
AdminClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigResource;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* The administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs.
*
* The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker
* version required.
*
* This client was introduced in 0.11.0.0 and the API is still evolving. We will try to evolve the API in a compatible
* manner, but we reserve the right to make breaking changes in minor releases, if necessary. We will update the
* {@code InterfaceStability} annotation and this notice once the API is considered stable.
*/
@InterfaceStability.Evolving
public abstract class AdminClient implements AutoCloseable {
/**
* Create a new AdminClient with the given configuration.
*
* @param props The configuration.
* @return The new KafkaAdminClient.
*/
public static AdminClient create(Properties props) {
return KafkaAdminClient.createInternal(new AdminClientConfig(props), null);
}
/**
* Create a new AdminClient with the given configuration.
*
* @param conf The configuration.
* @return The new KafkaAdminClient.
*/
public static AdminClient create(Map<String, Object> conf) {
return KafkaAdminClient.createInternal(new AdminClientConfig(conf), null);
}
/**
* Close the AdminClient and release all associated resources.
*
* See {@link AdminClient#close(long, TimeUnit)}
*/
@Override
public void close() {
close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
/**
* Close the AdminClient and release all associated resources.
*
* The close operation has a grace period during which current operations will be allowed to
* complete, specified by the given duration and time unit.
* New operations will not be accepted during the grace period. Once the grace period is over,
* all operations that have not yet been completed will be aborted with a TimeoutException.
*
* @param duration The duration to use for the wait time.
* @param unit The time unit to use for the wait time.
*/
public abstract void close(long duration, TimeUnit unit);
/**
* Create a batch of new topics with the default options.
*
* This is a convenience method for #{@link #createTopics(Collection, CreateTopicsOptions)} with default options.
* See the overload for more details.
*
* This operation is supported by brokers with version 0.10.1.0 or higher.
*
* @param newTopics The new topics to create.
* @return The CreateTopicsResult.
*/
public CreateTopicsResult createTopics(Collection<NewTopic> newTopics) {
return createTopics(newTopics, new CreateTopicsOptions());
}
/**
* Create a batch of new topics.
*
* This operation is not transactional so it may succeed for some topics while fail for others.
*
* It may take several seconds after {@code CreateTopicsResult} returns
* success for all the brokers to become aware that the topics have been created.
* During this time, {@link AdminClient#listTopics()} and {@link AdminClient#describeTopics(Collection)}
* may not return information about the new topics.
*
* This operation is supported by brokers with version 0.10.1.0 or higher. The validateOnly option is supported
* from version 0.10.2.0.
*
* @param newTopics The new topics to create.
* @param options The options to use when creating the new topics.
* @return The CreateTopicsResult.
*/
public abstract CreateTopicsResult createTopics(Collection<NewTopic> newTopics,
CreateTopicsOptions options);
/**
* This is a convenience method for #{@link AdminClient#deleteTopics(Collection, DeleteTopicsOptions)}
* with default options. See the overload for more details.
*
* This operation is supported by brokers with version 0.10.1.0 or higher.
*
* @param topics The topic names to delete.
* @return The DeleteTopicsResult.
*/
public DeleteTopicsResult deleteTopics(Collection<String> topics) {
return deleteTopics(topics, new DeleteTopicsOptions());
}
/**
* Delete a batch of topics.
*
* This operation is not transactional so it may succeed for some topics while fail for others.
*
* It may take several seconds after the {@code DeleteTopicsResult} returns
* success for all the brokers to become aware that the topics are gone.
* During this time, AdminClient#listTopics and AdminClient#describeTopics
* may continue to return information about the deleted topics.
*
* If delete.topic.enable is false on the brokers, deleteTopics will mark
* the topics for deletion, but not actually delete them. The futures will
* return successfully in this case.
*
* This operation is supported by brokers with version 0.10.1.0 or higher.
*
* @param topics The topic names to delete.
* @param options The options to use when deleting the topics.
* @return The DeleteTopicsResult.
*/
public abstract DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options);
/**
* List the topics available in the cluster with the default options.
*
* This is a convenience method for #{@link AdminClient#listTopics(ListTopicsOptions)} with default options.
* See the overload for more details.
*
* @return The ListTopicsResult.
*/
public ListTopicsResult listTopics() {
return listTopics(new ListTopicsOptions());
}
/**
* List the topics available in the cluster.
*
* @param options The options to use when listing the topics.
* @return The ListTopicsResult.
*/
public abstract ListTopicsResult listTopics(ListTopicsOptions options);
/**
* Describe some topics in the cluster, with the default options.
*
* This is a convenience method for #{@link AdminClient#describeTopics(Collection, DescribeTopicsOptions)} with
* default options. See the overload for more details.
*
* @param topicNames The names of the topics to describe.
*
* @return The DescribeTopicsResult.
*/
public DescribeTopicsResult describeTopics(Collection<String> topicNames) {
return describeTopics(topicNames, new DescribeTopicsOptions());
}
/**
* Describe some topics in the cluster.
*
* @param topicNames The names of the topics to describe.
* @param options The options to use when describing the topic.
*
* @return The DescribeTopicsResult.
*/
public abstract DescribeTopicsResult describeTopics(Collection<String> topicNames,
DescribeTopicsOptions options);
/**
* Get information about the nodes in the cluster, using the default options.
*
* This is a convenience method for #{@link AdminClient#describeCluster(DescribeClusterOptions)} with default options.
* See the overload for more details.
*
* @return The DescribeClusterResult.
*/
public DescribeClusterResult describeCluster() {
return describeCluster(new DescribeClusterOptions());
}
/**
* Get information about the nodes in the cluster.
*
* @param options The options to use when getting information about the cluster.
* @return The DescribeClusterResult.
*/
public abstract DescribeClusterResult describeCluster(DescribeClusterOptions options);
/**
* This is a convenience method for #{@link AdminClient#describeAcls(AclBindingFilter, DescribeAclsOptions)} with
* default options. See the overload for more details.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param filter The filter to use.
* @return The DeleteAclsResult.
*/
public DescribeAclsResult describeAcls(AclBindingFilter filter) {
return describeAcls(filter, new DescribeAclsOptions());
}
/**
* Lists access control lists (ACLs) according to the supplied filter.
*
* Note: it may take some time for changes made by createAcls or deleteAcls to be reflected
* in the output of describeAcls.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param filter The filter to use.
* @param options The options to use when listing the ACLs.
* @return The DeleteAclsResult.
*/
public abstract DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options);
/**
* This is a convenience method for #{@link AdminClient#createAcls(Collection, CreateAclsOptions)} with
* default options. See the overload for more details.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param acls The ACLs to create
* @return The CreateAclsResult.
*/
public CreateAclsResult createAcls(Collection<AclBinding> acls) {
return createAcls(acls, new CreateAclsOptions());
}
/**
* Creates access control lists (ACLs) which are bound to specific resources.
*
* This operation is not transactional so it may succeed for some ACLs while fail for others.
*
* If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
* no changes will be made.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param acls The ACLs to create
* @param options The options to use when creating the ACLs.
* @return The CreateAclsResult.
*/
public abstract CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options);
/**
* This is a convenience method for #{@link AdminClient#deleteAcls(Collection, DeleteAclsOptions)} with default options.
* See the overload for more details.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param filters The filters to use.
* @return The DeleteAclsResult.
*/
public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters) {
return deleteAcls(filters, new DeleteAclsOptions());
}
/**
* Deletes access control lists (ACLs) according to the supplied filters.
*
* This operation is not transactional so it may succeed for some ACLs while fail for others.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param filters The filters to use.
* @param options The options to use when deleting the ACLs.
* @return The DeleteAclsResult.
*/
public abstract DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options);
/**
* Get the configuration for the specified resources with the default options.
*
* This is a convenience method for #{@link AdminClient#describeConfigs(Collection, DescribeConfigsOptions)} with default options.
* See the overload for more details.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param resources The resources (topic and broker resource types are currently supported)
* @return The DescribeConfigsResult
*/
public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
return describeConfigs(resources, new DescribeConfigsOptions());
}
/**
* Get the configuration for the specified resources.
*
* The returned configuration includes default values and the isDefault() method can be used to distinguish them
* from user supplied values.
*
* The value of config entries where isSensitive() is true is always {@code null} so that sensitive information
* is not disclosed.
*
* Config entries where isReadOnly() is true cannot be updated.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param resources The resources (topic and broker resource types are currently supported)
* @param options The options to use when describing configs
* @return The DescribeConfigsResult
*/
public abstract DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources,
DescribeConfigsOptions options);
/**
* Update the configuration for the specified resources with the default options.
*
* This is a convenience method for #{@link AdminClient#alterConfigs(Map, AlterConfigsOptions)} with default options.
* See the overload for more details.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param configs The resources with their configs (topic is the only resource type with configs that can
* be updated currently)
* @return The AlterConfigsResult
*/
public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs) {
return alterConfigs(configs, new AlterConfigsOptions());
}
/**
* Update the configuration for the specified resources with the default options.
*
* Updates are not transactional so they may succeed for some resources while fail for others. The configs for
* a particular resource are updated atomically.
*
* This operation is supported by brokers with version 0.11.0.0 or higher.
*
* @param configs The resources with their configs (topic is the only resource type with configs that can
* be updated currently)
* @param options The options to use when describing configs
* @return The AlterConfigsResult
*/
public abstract AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, AlterConfigsOptions options);
/**
* Change the log directory for the specified replicas. This API is currently only useful if it is used
* before the replica has been created on the broker. It will support moving replicas that have already been created after
* KIP-113 is fully implemented.
*
* This is a convenience method for #{@link AdminClient#alterReplicaLogDirs(Map, AlterReplicaLogDirsOptions)} with default options.
* See the overload for more details.
*
* This operation is supported by brokers with version 1.0.0 or higher.
*
* @param replicaAssignment The replicas with their log directory absolute path
* @return The AlterReplicaLogDirsResult
*/
public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment) {
return alterReplicaLogDirs(replicaAssignment, new AlterReplicaLogDirsOptions());
}
/**
* Change the log directory for the specified replicas. This API is currently only useful if it is used
* before the replica has been created on the broker. It will support moving replicas that have already been created after
* KIP-113 is fully implemented.
*
* This operation is not transactional so it may succeed for some replicas while fail for others.
*
* This operation is supported by brokers with version 1.0.0 or higher.
*
* @param replicaAssignment The replicas with their log directory absolute path
* @param options The options to use when changing replica dir
* @return The AlterReplicaLogDirsResult
*/
public abstract AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment, AlterReplicaLogDirsOptions options);
/**
* Query the information of all log directories on the given set of brokers
*
* This is a convenience method for #{@link AdminClient#describeLogDirs(Collection, DescribeLogDirsOptions)} with default options.
* See the overload for more details.
*
* This operation is supported by brokers with version 1.0.0 or higher.
*
* @param brokers A list of brokers
* @return The DescribeLogDirsResult
*/
public DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers) {
return describeLogDirs(brokers, new DescribeLogDirsOptions());
}
/**
* Query the information of all log directories on the given set of brokers
*
* This operation is supported by brokers with version 1.0.0 or higher.
*
* @param brokers A list of brokers
* @param options The options to use when querying log dir info
* @return The DescribeLogDirsResult
*/
public abstract DescribeLogDirsResult describeLogDirs(Collection<Integer> brokers, DescribeLogDirsOptions options);
/**
* Query the replica log directory information for the specified replicas.
*
* This is a convenience method for #{@link AdminClient#describeReplicaLogDirs(Collection, DescribeReplicaLogDirsOptions)}
* with default options. See the overload for more details.
*
* This operation is supported by brokers with version 1.0.0 or higher.
*
* @param replicas The replicas to query
* @return The DescribeReplicaLogDirsResult
*/
public DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas) {
return describeReplicaLogDirs(replicas, new DescribeReplicaLogDirsOptions());
}
/**
* Query the replica log directory information for the specified replicas.
*
* This operation is supported by brokers with version 1.0.0 or higher.
*
* @param replicas The replicas to query
* @param options The options to use when querying replica log dir info
* @return The DescribeReplicaLogDirsResult
*/
public abstract DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas, DescribeReplicaLogDirsOptions options);
/**
* <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
* according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
* the partition logic or ordering of the messages will be affected.</strong></p>
*
* <p>This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)} with default options.
* See the overload for more details.</p>
*
* @param newPartitions The topics which should have new partitions created, and corresponding parameters
* for the created partitions.
* @return The CreatePartitionsResult.
*/
public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions) {
return createPartitions(newPartitions, new CreatePartitionsOptions());
}
/**
* <p>Increase the number of partitions of the topics given as the keys of {@code newPartitions}
* according to the corresponding values. <strong>If partitions are increased for a topic that has a key,
* the partition logic or ordering of the messages will be affected.</strong></p>
*
* <p>This operation is not transactional so it may succeed for some topics while fail for others.</p>
*
* <p>It may take several seconds after this method returns
* success for all the brokers to become aware that the partitions have been created.
* During this time, {@link AdminClient#describeTopics(Collection)}
* may not return information about the new partitions.</p>
*
* <p>This operation is supported by brokers with version 1.0.0 or higher.</p>
*
* <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from the
* {@link CreatePartitionsResult#values() values()} method of the returned {@code CreatePartitionsResult}</p>
* <ul>
* <li>{@link org.apache.kafka.common.errors.AuthorizationException}
* if the authenticated user is not authorized to alter the topic</li>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* if the request was not completed in within the given {@link CreatePartitionsOptions#timeoutMs()}.</li>
* <li>{@link org.apache.kafka.common.errors.ReassignmentInProgressException}
* if a partition reassignment is currently in progress</li>
* <li>{@link org.apache.kafka.common.errors.BrokerNotAvailableException}
* if the requested {@link NewPartitions#assignments()} contain a broker that is currently unavailable.</li>
* <li>{@link org.apache.kafka.common.errors.InvalidReplicationFactorException}
* if no {@link NewPartitions#assignments()} are given and it is impossible for the broker to assign
* replicas with the topics replication factor.</li>
* <li>Subclasses of {@link org.apache.kafka.common.KafkaException}
* if the request is invalid in some way.</li>
* </ul>
*
* @param newPartitions The topics which should have new partitions created, and corresponding parameters
* for the created partitions.
* @param options The options to use when creating the new paritions.
* @return The CreatePartitionsResult.
*/
public abstract CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
CreatePartitionsOptions options);
}