/
KafkaesqueAdminClient.java
157 lines (129 loc) · 6.3 KB
/
KafkaesqueAdminClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package at.esque.kafka.cluster;
import at.esque.kafka.alerts.ErrorAlert;
import at.esque.kafka.lag.viewer.Lag;
import at.esque.kafka.topics.DescribeTopicWrapper;
import javafx.application.Platform;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class KafkaesqueAdminClient {
private AdminClient adminClient;
public KafkaesqueAdminClient(String bootstrapServers, Map<String, String> sslProps, Map<String, String> saslProps) {
Properties props = new Properties();
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, String.format("kafkaesque-%s", UUID.randomUUID()));
props.putAll(sslProps);
props.putAll(saslProps);
this.adminClient = AdminClient.create(props);
}
public Set<String> getTopics() {
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult result = adminClient.listTopics(options);
try {
return result.names().get(15, TimeUnit.SECONDS);
} catch (Exception e) {
Platform.runLater(() -> ErrorAlert.show(e));
}
return new HashSet<>();
}
public List<Integer> getTopicPatitions(String topic) {
DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topic));
try {
return result.values().get(topic).get().partitions().stream()
.map(TopicPartitionInfo::partition).collect(Collectors.toList());
} catch (Exception e) {
ErrorAlert.show(e);
}
return null;
}
public void deleteTopic(String name) throws ExecutionException, InterruptedException {
DeleteTopicsResult result = adminClient.deleteTopics(Collections.singletonList(name));
result.values().get(name).get();
}
public void createTopic(String name, int partitions, short replicationFactor, Map<String, String> configs) throws ExecutionException, InterruptedException {
NewTopic topic = new NewTopic(name, partitions, replicationFactor);
topic.configs(configs);
CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(topic));
result.all().get();
if (result.all().isCompletedExceptionally()) {
throw new RuntimeException("Exeption during Topic creation");
}
}
public DescribeTopicWrapper describeTopic(String topic) {
DescribeTopicsResult describeResult = adminClient.describeTopics(Collections.singletonList(topic));
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
DescribeConfigsResult configsResult = adminClient.describeConfigs(Collections.singletonList(configResource));
try {
TopicDescription topicDescription = describeResult.values().get(topic).get(10, TimeUnit.SECONDS);
Config config = configsResult.values().get(configResource).get(10, TimeUnit.SECONDS);
return new DescribeTopicWrapper(topicDescription, config);
} catch (Exception e) {
ErrorAlert.show(e);
}
return null;
}
public List<Lag> getConsumerGroups() {
ListConsumerGroupsResult result = adminClient.listConsumerGroups();
try {
Collection<ConsumerGroupListing> consumerGroupListings = result.all().get();
return consumerGroupListings.stream().map(consumerGroupListing -> {
Lag lag = new Lag();
lag.setTitle(consumerGroupListing.groupId());
return lag;
}).collect(Collectors.toList());
} catch (Exception e) {
Platform.runLater(() -> ErrorAlert.show(e));
}
return Collections.EMPTY_LIST;
}
public List<AclBinding> getACLs(ResourceType resourceType, PatternType resourcePattern, String resourceName) {
try {
if ("".equals(resourceName))
resourceName = null;
AclBindingFilter aclBindingFilter = new AclBindingFilter(new ResourcePatternFilter(resourceType, resourceName, resourcePattern),
new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
DescribeAclsResult describeAclsResult = adminClient.describeAcls(aclBindingFilter);
return describeAclsResult.values().get().stream().collect(Collectors.toList());
} catch (Exception e) {
Platform.runLater(() -> ErrorAlert.show(e));
}
return Collections.EMPTY_LIST;
}
public void deleteAcl(AclBinding aclBinding) {
try {
AclBindingFilter aclBindingFilter = new AclBindingFilter(new ResourcePatternFilter(aclBinding.pattern().resourceType(), aclBinding.pattern().name(), aclBinding.pattern().patternType()),
new AccessControlEntryFilter(aclBinding.entry().principal(), aclBinding.entry().host(), aclBinding.entry().operation(), aclBinding.entry().permissionType()));
adminClient.deleteAcls(Collections.singletonList(aclBindingFilter));
} catch (Exception e) {
Platform.runLater(() -> ErrorAlert.show(e));
}
}
public void addAcl(AclBinding aclBinding)
{
try {
CreateAclsResult result = adminClient.createAcls(Arrays.asList(aclBinding));
result.all().get();
} catch (Exception e) {
Platform.runLater(() -> ErrorAlert.show(e));
}
}
public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) {
return adminClient.listConsumerGroupOffsets(groupId);
}
public void close() {
adminClient.close();
}
}