Skip to content

Commit

Permalink
Merge branch 'develop' into kafkaconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
an0r0c committed Mar 13, 2021
2 parents 8a0c6e6 + 6a9b95a commit 9740199
Show file tree
Hide file tree
Showing 16 changed files with 478 additions and 201 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -66,7 +66,7 @@ dependencies {
}

group = 'at.esque.kafka'
version = '1.2.0-SNAPSHOT'
version = '1.3.0-SNAPSHOT'
sourceCompatibility = '1.8'

publishing {
Expand Down
21 changes: 13 additions & 8 deletions src/main/java/at/esque/kafka/Controller.java
Expand Up @@ -108,9 +108,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
Expand Down Expand Up @@ -460,7 +460,9 @@ private void getTopicsForCluster(ClusterConfig clusterConfig) {
stopWatch.start();
LOGGER.info("Started getting topics for cluster");
backGroundTaskHolder.setIsInProgress(true);
Platform.runLater(() -> topicListView.setItems(adminClient.getTopics()));
Set<String> topics = adminClient.getTopics();
Platform.runLater(() -> topicListView.setItems(topics));

} finally {
stopWatch.stop();
LOGGER.info("Finished getting topics for cluster [{}]", stopWatch);
Expand Down Expand Up @@ -668,9 +670,7 @@ public void aclViewer(ActionEvent actionEvent) {
fxmlLoader.setLocation(getClass().getResource("/fxml/aclViewer.fxml"));
Parent root1 = fxmlLoader.load();
AclViewerController controller = fxmlLoader.getController();
UUID consumerId = consumerHandler.registerConsumer(selectedCluster(), new TopicMessageTypeConfig(), new HashMap<>());
KafkaConsumer consumer = consumerHandler.getConsumer(consumerId).orElseThrow(() -> new RuntimeException("Error getting consumer"));
controller.setup(adminClient, consumer);
controller.setup(adminClient);
Stage stage = new Stage();
stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png")));
stage.initOwner(controlledStage);
Expand All @@ -681,7 +681,6 @@ public void aclViewer(ActionEvent actionEvent) {
centerStageOnControlledStage(stage);
stage.setOnCloseRequest(windowEvent -> {
controller.stop();
consumerHandler.deregisterConsumer(consumerId);
});
} catch (Exception e) {
ErrorAlert.show(e);
Expand Down Expand Up @@ -994,7 +993,11 @@ public void deleteClusterConfigsClick(ActionEvent event) {

@FXML
public void editClusterConfigsClick(ActionEvent actionEvent) {
ClusterConfigDialog.show(selectedCluster()).ifPresent(clusterConfig -> configHandler.saveConfigs());
ClusterConfig existingConfig = selectedCluster();
ClusterConfigDialog.show(existingConfig).ifPresent(clusterConfig -> {
existingConfig.update(clusterConfig);
configHandler.saveConfigs();
});
}

@FXML
Expand Down Expand Up @@ -1279,4 +1282,6 @@ private PinTab createTab(ClusterConfig clusterConfig, String name) {
private void updateTabName(PinTab tab, ClusterConfig clusterConfig, String name) {
Platform.runLater(() -> tab.setText(clusterConfig.getIdentifier() + " - " + name));
}


}
11 changes: 10 additions & 1 deletion src/main/java/at/esque/kafka/MessageType.java
Expand Up @@ -2,5 +2,14 @@

public enum MessageType {
STRING,
AVRO
AVRO,
SHORT,
INTEGER,
LONG,
FLOAT,
DOUBLE,
BYTEARRAY,
BYTEBUFFER,
BYTES,
UUID
}
Expand Up @@ -27,7 +27,6 @@

import javax.net.ssl.SSLSocketFactory;
import java.util.Collections;
import java.util.function.ToDoubleBiFunction;


public class SchemaRegistryBrowserController {
Expand Down Expand Up @@ -132,9 +131,9 @@ private ListCell<String> subjectListCellFactory() {

MenuItem configItem = new MenuItem();
configItem.setGraphic(new FontIcon(FontAwesome.COG));
configItem.textProperty().set("config");
configItem.textProperty().set("Configure Compatibility Level");
configItem.setOnAction(event -> {
SubjectConfigDialog.show(schemaRegistryRestService,subjectListView.getListView().getSelectionModel().getSelectedItem());
SubjectConfigDialog.show(schemaRegistryRestService, subjectListView.getListView().getSelectionModel().getSelectedItem());
});

contextMenu.getItems().add(configItem);
Expand Down
10 changes: 2 additions & 8 deletions src/main/java/at/esque/kafka/acl/viewer/Acl.java
Expand Up @@ -3,10 +3,6 @@
import javafx.beans.property.SimpleStringProperty;
import javafx.beans.property.StringProperty;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourceType;

public class Acl {
private StringProperty resourceType = new SimpleStringProperty();
Expand All @@ -18,8 +14,7 @@ public class Acl {
private StringProperty host = new SimpleStringProperty();
private AclBinding aclBinding;

public Acl(AclBinding aclBinding)
{
public Acl(AclBinding aclBinding) {
this.aclBinding = aclBinding;
this.resourceType.set(aclBinding.pattern().resourceType().toString());
this.resourceName.set(aclBinding.pattern().name());
Expand Down Expand Up @@ -86,8 +81,7 @@ public StringProperty hostProperty() {
return host;
}

public AclBinding getAclBinding()
{
public AclBinding getAclBinding() {
return aclBinding;
}
}
Expand Down
76 changes: 37 additions & 39 deletions src/main/java/at/esque/kafka/acl/viewer/AclViewerController.java
@@ -1,34 +1,30 @@
package at.esque.kafka.acl.viewer;

import at.esque.kafka.alerts.ErrorAlert;
import at.esque.kafka.cluster.KafkaesqueAdminClient;
import at.esque.kafka.controls.FilterableListView;
import at.esque.kafka.controls.LagViewerCellContent;
import at.esque.kafka.lag.viewer.Lag;
import at.esque.kafka.dialogs.CreateACLDialog;
import javafx.application.Platform;
import javafx.beans.binding.Bindings;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.SimpleBooleanProperty;
import javafx.collections.FXCollections;
import javafx.event.ActionEvent;
import javafx.event.EventHandler;
import javafx.fxml.FXML;
import javafx.scene.control.*;
import javafx.scene.control.ComboBox;
import javafx.scene.control.ContextMenu;
import javafx.scene.control.MenuItem;
import javafx.scene.control.TableColumn;
import javafx.scene.control.TableRow;
import javafx.scene.control.TableView;
import javafx.scene.control.TextField;
import javafx.scene.control.cell.PropertyValueFactory;
import javafx.util.Callback;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.kordamp.ikonli.fontawesome.FontAwesome;
import org.kordamp.ikonli.javafx.FontIcon;

import javax.swing.*;
import java.awt.event.MouseEvent;
import java.util.*;
import java.util.ArrayList;
import java.util.List;

public class AclViewerController {

Expand All @@ -45,29 +41,27 @@ public class AclViewerController {
TableView<Acl> resultView;

@FXML
TableColumn<Acl,String> resourceTypeColumn;
TableColumn<Acl, String> resourceTypeColumn;
@FXML
TableColumn<Acl,String> resourceNameColumn;
TableColumn<Acl, String> resourceNameColumn;
@FXML
TableColumn<Acl,String> patternTypeColumn;
TableColumn<Acl, String> patternTypeColumn;
@FXML
TableColumn<Acl,String> principalColumn;
TableColumn<Acl, String> principalColumn;
@FXML
TableColumn<Acl,String> operationColumn;
TableColumn<Acl, String> operationColumn;
@FXML
TableColumn<Acl,String> permissionTypeColumn;
TableColumn<Acl, String> permissionTypeColumn;
@FXML
TableColumn<Acl,String> hostColumn;
TableColumn<Acl, String> hostColumn;

private KafkaesqueAdminClient adminClient;

private KafkaConsumer kafkaConsumer;

private BooleanProperty refreshRunning = new SimpleBooleanProperty(false);

@FXML
public void initialize() {
resourceTypeCombo.setItems(FXCollections.observableArrayList( ResourceType.values()));
resourceTypeCombo.setItems(FXCollections.observableArrayList(ResourceType.values()));
resourcePatternCombo.setItems(FXCollections.observableArrayList(PatternType.values()));

resourceTypeCombo.getSelectionModel().select(ResourceType.ANY);
Expand Down Expand Up @@ -102,8 +96,7 @@ public TableRow<Acl> call(TableView<Acl> tableView) {
public void handle(ActionEvent event) {
Object item = resultView.getSelectionModel().getSelectedItem();

if (item instanceof Acl)
{
if (item instanceof Acl) {
Acl selectedAcl = (Acl) item;
adminClient.deleteAcl(selectedAcl.getAclBinding());
}
Expand All @@ -121,30 +114,35 @@ public void handle(ActionEvent event) {

}

public void setup(KafkaesqueAdminClient adminClient, KafkaConsumer kafkaConsumer) {
public void setup(KafkaesqueAdminClient adminClient) {
this.adminClient = adminClient;
this.kafkaConsumer = kafkaConsumer;
}

@FXML
private void startSearch(ActionEvent actionEvent) {
runInDaemonThread(() -> {
Platform.runLater(() -> {
refreshRunning.setValue(true);
List<Acl> aclList = new ArrayList<>();

adminClient.getACLs(resourceTypeCombo.getValue(), resourcePatternCombo.getValue(), resourceName.getText())
.forEach(acl -> aclList.add(new Acl(acl)));

resultView.setItems(FXCollections.observableArrayList(aclList));
});
List<Acl> aclList = new ArrayList<>();
Platform.runLater(() -> refreshRunning.setValue(true));
adminClient.getACLs(resourceTypeCombo.getValue(), resourcePatternCombo.getValue(), resourceName.getText())
.forEach(acl -> Platform.runLater(() -> aclList.add(new Acl(acl))));
Platform.runLater(() -> {
refreshRunning.setValue(true);
});
adminClient.getACLs(resourceTypeCombo.getValue(), resourcePatternCombo.getValue(), resourceName.getText())
.forEach(acl -> Platform.runLater(() -> aclList.add(new Acl(acl))));
Platform.runLater(() -> refreshRunning.setValue(false));
Platform.runLater(() -> resultView.setItems(FXCollections.observableArrayList(aclList)));
});
}

@FXML
private void addACL(ActionEvent actionEvent)
{
CreateACLDialog.show(adminClient);
}

public void stop(){
kafkaConsumer = null;
public void stop() {
adminClient = null;
}

private void runInDaemonThread(Runnable runnable) {
Expand Down
36 changes: 20 additions & 16 deletions src/main/java/at/esque/kafka/cluster/ClusterConfig.java
Expand Up @@ -30,22 +30,26 @@ public ClusterConfig() {
}

public ClusterConfig(ClusterConfig existingConfig) {
if (existingConfig != null) {
this.identifier = existingConfig.identifier;
this.bootStrapServers = existingConfig.bootStrapServers;
this.schemaRegistry = existingConfig.schemaRegistry;
this.schemaRegistryBasicAuthUserInfo = existingConfig.schemaRegistryBasicAuthUserInfo;
this.sslEnabled = existingConfig.sslEnabled;
this.keyStoreLocation = existingConfig.keyStoreLocation;
this.keyStorePassword = existingConfig.keyStorePassword;
this.trustStoreLocation = existingConfig.trustStoreLocation;
this.trustStorePassword = existingConfig.trustStorePassword;
this.saslSecurityProtocol = existingConfig.saslSecurityProtocol;
this.saslMechanism = existingConfig.saslMechanism;
this.saslJaasConfig = existingConfig.saslJaasConfig;
this.kafkaConnectUrl = existingConfig.kafkaConnectUrl;
this.kafkaConnectBasicAuthUser = existingConfig.kafkaConnectBasicAuthUser;
this.kafkaConnectBasicAuthPassword = existingConfig.kafkaConnectBasicAuthPassword;
update(existingConfig);
}

public void update(ClusterConfig existingConfig) {
if(existingConfig != null) {
this.setIdentifier(existingConfig.getIdentifier());
this.setBootStrapServers(existingConfig.getBootStrapServers());
this.setSchemaRegistry(existingConfig.getSchemaRegistry());
this.setSchemaRegistryBasicAuthUserInfo(existingConfig.getSchemaRegistryBasicAuthUserInfo());
this.setSslEnabled(existingConfig.isSslEnabled());
this.setKeyStoreLocation(existingConfig.getKeyStoreLocation());
this.setKeyStorePassword(existingConfig.getKeyStorePassword());
this.setTrustStoreLocation(existingConfig.getTrustStoreLocation());
this.setTrustStorePassword(existingConfig.getTrustStorePassword());
this.setSaslSecurityProtocol(existingConfig.getSaslSecurityProtocol());
this.setSaslMechanism(existingConfig.getSaslMechanism());
this.setSaslJaasConfig(existingConfig.getSaslJaasConfig());
this.setKafkaConnectUrl(existingConfig.getKafkaConnectUrl());
this.setKafkaConnectBasicAuthUser(existingConfig.getKafkaConnectBasicAuthUser());
this.setKafkaConnectBasicAuthPassword(existingConfig.getKafkaConnectBasicAuthPassword());
}
}

Expand Down

0 comments on commit 9740199

Please sign in to comment.