Skip to content

Commit

Permalink
kcctl#65 added version check for topics api
Browse files Browse the repository at this point in the history
  • Loading branch information
SiddiqueAhmad committed Sep 27, 2021
1 parent 44791eb commit 8f0f518
Showing 1 changed file with 12 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,16 @@ public class DescribeConnectorCommand implements Callable<Integer> {
boolean includeTasksConfig;

private final Version requiredVersionForTasksConfig = new Version(2, 8);
private final Version requiredVersionForTopicsApi = new Version(2, 5);

@Override
public Integer call() {
KafkaConnectApi kafkaConnectApi = RestClientBuilder.newBuilder()
.baseUri(context.getContext().getCluster())
.build(KafkaConnectApi.class);
Version currentVersion = new Version(kafkaConnectApi.getWorkerInfo().version);

if (includeTasksConfig) {
Version currentVersion = new Version(kafkaConnectApi.getWorkerInfo().version);
if (!currentVersion.greaterOrEquals(requiredVersionForTasksConfig)) {
System.out.println("--tasks-config requires at least Kafka Connect 2.8. Current version: " + currentVersion);
return 1;
Expand All @@ -78,7 +79,6 @@ public Integer call() {
ConnectorInfo connector = kafkaConnectApi.getConnector(name);
ConnectorStatusInfo connectorStatus = kafkaConnectApi.getConnectorStatus(name);
Map<String, String> connectorConfig = kafkaConnectApi.getConnectorConfig(name);
Map<String, TopicsInfo> connectorTopics = kafkaConnectApi.getConnectorTopics(name);

List<Tuple> connectorInfo = Arrays.asList(
new Tuple("Name", connector.name),
Expand Down Expand Up @@ -129,14 +129,19 @@ public Integer call() {
Tuple.print(tuples);
}

Tuple.print(Arrays.asList(new Tuple(ANSI_WHITE_BOLD + "Topics" + ANSI_RESET, "")));
if (currentVersion.greaterOrEquals(requiredVersionForTopicsApi)) {

List<Tuple> topics = new ArrayList<>();
Map<String, TopicsInfo> connectorTopics = kafkaConnectApi.getConnectorTopics(name);

for (String topic : connectorTopics.entrySet().iterator().next().getValue().topics) {
topics.add(new Tuple("", " " + topic));
Tuple.print(Arrays.asList(new Tuple(ANSI_WHITE_BOLD + "Topics" + ANSI_RESET, "")));

List<Tuple> topics = new ArrayList<>();

for (String topic : connectorTopics.entrySet().iterator().next().getValue().topics) {
topics.add(new Tuple("", " " + topic));
}
Tuple.print(topics);
}
Tuple.print(topics);
}
catch (Exception e) {
if (!e.getMessage().contains("not found")) {
Expand Down

0 comments on commit 8f0f518

Please sign in to comment.