Skip to content

Commit

Permalink
Handle Exceptions in describeTopic differently to prevent NPE
Browse files Browse the repository at this point in the history
  • Loading branch information
patschuh committed Jun 22, 2022
1 parent 59d6f3a commit 0333804
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 24 deletions.
33 changes: 20 additions & 13 deletions src/main/java/at/esque/kafka/Controller.java
Expand Up @@ -50,7 +50,6 @@
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import javafx.event.ActionEvent;
import javafx.event.EventHandler;
import javafx.fxml.FXML;
import javafx.fxml.FXMLLoader;
import javafx.scene.Parent;
Expand All @@ -73,7 +72,6 @@
import javafx.scene.control.ToggleButton;
import javafx.scene.image.Image;
import javafx.scene.input.MouseButton;
import javafx.scene.input.MouseEvent;
import javafx.stage.DirectoryChooser;
import javafx.stage.FileChooser;
import javafx.stage.Modality;
Expand Down Expand Up @@ -486,7 +484,7 @@ private ListCell<String> topicListCellFactory() {
});

cell.setOnMouseClicked(event -> {
if(event.getButton() == MouseButton.PRIMARY && event.getClickCount() == 2) {
if (event.getButton() == MouseButton.PRIMARY && event.getClickCount() == 2) {
this.getMessagesClick(new ActionEvent(event.getSource(), event.getTarget()));
}
});
Expand Down Expand Up @@ -934,6 +932,9 @@ private <KT, VT> void trace(TopicMessageTypeConfig topic, Map<String, String> co

private int getPartitionForKey(String topic, String key) {
DescribeTopicWrapper topicDescription = adminClient.describeTopic(topic);
if (topicDescription.isFailed()) {
throw new RuntimeException("Failed to determine number of partitions!", topicDescription.getException());
}
int numberOfPartitions = topicDescription.getTopicDescription().partitions().size();
try {
return Utils.toPositive(Utils.murmur2(key.getBytes("UTF-8"))) % numberOfPartitions;
Expand Down Expand Up @@ -1126,16 +1127,22 @@ private void showDescribeTopicDialog(String topic) {
FXMLLoader fxmlLoader = new FXMLLoader(getClass().getResource("/fxml/describeTopic.fxml"));
Parent root1 = fxmlLoader.load();
DescribeTopicController controller = fxmlLoader.getController();
controller.setup(adminClient.describeTopic(topic));
Stage stage = new Stage();
stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png")));
stage.initOwner(controlledStage);
stage.initModality(Modality.APPLICATION_MODAL);
stage.setTitle("Topic Description");
Scene styledScene = Main.createStyledScene(root1, -1, -1);
stage.setScene(styledScene);
stage.show();
centerStageOnControlledStage(stage);
final DescribeTopicWrapper describeTopicWrapper = adminClient.describeTopic(topic);
if (!describeTopicWrapper.isFailed()) {
controller.setup(describeTopicWrapper);
Stage stage = new Stage();
stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png")));
stage.initOwner(controlledStage);
stage.initModality(Modality.APPLICATION_MODAL);
stage.setTitle("Topic Description");
Scene styledScene = Main.createStyledScene(root1, -1, -1);
stage.setScene(styledScene);
stage.show();
centerStageOnControlledStage(stage);
} else {
final Exception exception = describeTopicWrapper.getException();
ErrorAlert.show("Failed to describe topic", "Topic description failed: " + exception.getClass().getName(), exception.getMessage(), exception, controlledStage, true);
}
} catch (Exception e) {
ErrorAlert.show(e, controlledStage);
}
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/at/esque/kafka/alerts/ErrorAlert.java
Expand Up @@ -21,10 +21,14 @@ public static void show(Throwable ex, Window owner){
}

public static void show(Throwable ex, Window owner, boolean expandable) {
show(ex.getClass().getSimpleName(), ex.getClass().getName(), ex.getMessage(), ex, owner, expandable);
}

public static void show(String title, String headerText, String content, Throwable ex, Window owner, boolean expandable) {
Alert alert = new Alert(Alert.AlertType.ERROR);
alert.setTitle(ex.getClass().getSimpleName());
alert.setHeaderText(ex.getClass().getName());
alert.setContentText(ex.getMessage());
alert.setTitle(title);
alert.setHeaderText(headerText);
alert.setContentText(content);
Main.applyIcon(alert);
Main.applyStylesheet(alert.getDialogPane().getScene());

Expand Down
35 changes: 28 additions & 7 deletions src/main/java/at/esque/kafka/cluster/KafkaesqueAdminClient.java
Expand Up @@ -4,7 +4,22 @@
import at.esque.kafka.lag.viewer.Lag;
import at.esque.kafka.topics.DescribeTopicWrapper;
import javafx.application.Platform;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.CreateAclsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
Expand All @@ -16,7 +31,15 @@
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;

import java.util.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -41,7 +64,7 @@ public Set<String> getTopics() {
ListTopicsResult result = adminClient.listTopics(options);
try {
return result.names().get(15, TimeUnit.SECONDS);
} catch (Exception e) {
} catch (Exception e) {
Platform.runLater(() -> ErrorAlert.show(e));
}
return new HashSet<>();
Expand Down Expand Up @@ -84,9 +107,8 @@ public DescribeTopicWrapper describeTopic(String topic) {

return new DescribeTopicWrapper(topicDescription, config);
} catch (Exception e) {
ErrorAlert.show(e);
return new DescribeTopicWrapper(e);
}
return null;
}

public List<Lag> getConsumerGroups() {
Expand Down Expand Up @@ -155,8 +177,7 @@ public void deleteAcl(AclBinding aclBinding) {
}
}

public void addAcl(AclBinding aclBinding)
{
public void addAcl(AclBinding aclBinding) {
try {


Expand Down
3 changes: 2 additions & 1 deletion src/main/java/at/esque/kafka/handlers/ConsumerHandler.java
Expand Up @@ -8,6 +8,7 @@
import at.esque.kafka.serialization.KafkaEsqueDeserializer;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import javafx.application.Platform;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -145,7 +146,7 @@ public void seekToOffset(KafkaConsumer<String, String> consumer, long offset) {
try {
consumer.assignment().forEach(topicPartition -> LOGGER.info("Set position for topicPartition[{}/{}] to [{}]", topicPartition.topic(), topicPartition.partition(), consumer.position(topicPartition, Duration.ofSeconds(10))));
} catch (Exception e) {
ErrorAlert.show(e);
Platform.runLater(() -> ErrorAlert.show(e));
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/main/java/at/esque/kafka/topics/DescribeTopicWrapper.java
Expand Up @@ -9,12 +9,17 @@
public class DescribeTopicWrapper {
private TopicDescription topicDescription;
private Config configurations;
private Exception exception;

public DescribeTopicWrapper(TopicDescription topicDescription, Config configurations) {
this.topicDescription = topicDescription;
this.configurations = configurations;
}

public DescribeTopicWrapper(Exception e) {
this.exception = e;
}

public Collection<ConfigEntry> getConfigurations() {
return configurations.entries();
}
Expand All @@ -30,4 +35,12 @@ public void setTopicDescription(TopicDescription topicDescription) {
public void setConfigurations(Config configurations) {
this.configurations = configurations;
}

public Exception getException() {
return exception;
}

public boolean isFailed() {
return exception != null;
}
}

0 comments on commit 0333804

Please sign in to comment.