Skip to content

Commit

Permalink
Merge pull request #4 from an0r0c/kafkaconnect
Browse files Browse the repository at this point in the history
Kafkaconnect
  • Loading branch information
an0r0c committed Mar 13, 2021
2 parents 6a9b95a + 9740199 commit e9b0474
Show file tree
Hide file tree
Showing 17 changed files with 1,467 additions and 11 deletions.
1 change: 1 addition & 0 deletions build.gradle
Expand Up @@ -57,6 +57,7 @@ dependencies {
compile 'org.fxmisc.richtext:richtextfx:0.10.5'
compile 'net.thisptr:jackson-jq:1.0.0-preview.20191208'
compile 'com.dlsc.formsfx:formsfx-core:8.4.0'
compile group: 'org.sourcelab', name: 'kafka-connect-client', version: '3.1.0'

testCompile 'junit:junit:4.12'
testCompile 'org.springframework.kafka:spring-kafka-test:2.2.0.RELEASE'
Expand Down
64 changes: 64 additions & 0 deletions src/main/java/at/esque/kafka/Controller.java
Expand Up @@ -552,6 +552,70 @@ public void schemaRegistryClick(ActionEvent event) {
}
}

@FXML
public void kafkaConnectClick(ActionEvent actionEvent)
{
try {
ClusterConfig selectedConfig = selectedCluster();
if (StringUtils.isEmpty(selectedConfig.getkafkaConnectUrl())) {
Optional<String> input = SystemUtils.showInputDialog("http://localhost:8083", "Add kafka connect url", "this cluster config is missing a kafka connect url please add it now", "kafka connect URL");
if (!input.isPresent()) {
return;
}
input.ifPresent(url -> {
selectedConfig.setkafkaConnectUrl(url);
configHandler.saveConfigs();
});
}

FXMLLoader fxmlLoader = new FXMLLoader(getClass().getResource("/fxml/kafkaConnectBrowser.fxml"));
Parent root1 = fxmlLoader.load();
KafkaConnectBrowserController controller = fxmlLoader.getController();
controller.setup(selectedConfig,configHandler);
Stage stage = new Stage();
stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png")));
stage.initModality(Modality.APPLICATION_MODAL);
stage.setTitle("Browse Kafka Connect");
stage.setScene(Main.createStyledScene(root1, -1, -1));
stage.show();
centerStageOnControlledStage(stage);
} catch (Exception e) {
ErrorAlert.show(e);
}
}

@FXML
public void kafkaConnectInstalledPluginClick(ActionEvent actionEvent)
{
try {
ClusterConfig selectedConfig = selectedCluster();
if (StringUtils.isEmpty(selectedConfig.getkafkaConnectUrl())) {
Optional<String> input = SystemUtils.showInputDialog("http://localhost:8083", "Add kafka connect url", "this cluster config is missing a kafka connect url please add it now", "kafka connect URL");
if (!input.isPresent()) {
return;
}
input.ifPresent(url -> {
selectedConfig.setkafkaConnectUrl(url);
configHandler.saveConfigs();
});
}

FXMLLoader fxmlLoader = new FXMLLoader(getClass().getResource("/fxml/installedConnectorPluginsBrowser.fxml"));
Parent root1 = fxmlLoader.load();
InstalledConnectorPluginsController controller = fxmlLoader.getController();
controller.setup(selectedConfig,configHandler);
Stage stage = new Stage();
stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png")));
stage.initModality(Modality.APPLICATION_MODAL);
stage.setTitle("Browse installed Kafka Connect plugins");
stage.setScene(Main.createStyledScene(root1, -1, -1));
stage.show();
centerStageOnControlledStage(stage);
} catch (Exception e) {
ErrorAlert.show(e);
}
}

@FXML
public void crossClusterClick(ActionEvent actionEvent) {
try {
Expand Down
223 changes: 223 additions & 0 deletions src/main/java/at/esque/kafka/CreateConnectorController.java
@@ -0,0 +1,223 @@
package at.esque.kafka;

import at.esque.kafka.acl.viewer.Acl;
import at.esque.kafka.alerts.ErrorAlert;
import at.esque.kafka.alerts.SuccessAlert;
import at.esque.kafka.alerts.WarningAlert;
import at.esque.kafka.connect.ConnectConfigParameter;
import at.esque.kafka.connect.KafkaesqueConnectClient;
import at.esque.kafka.connect.ValidationResult;
import at.esque.kafka.connect.utils.ConnectUtil;
import at.esque.kafka.controls.KafkaEsqueCodeArea;
import at.esque.kafka.dialogs.SubjectConfigDialog;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import javafx.beans.property.ListProperty;
import javafx.beans.property.SimpleListProperty;
import javafx.collections.FXCollections;
import javafx.event.ActionEvent;
import javafx.fxml.FXML;
import javafx.scene.control.*;
import javafx.scene.control.cell.PropertyValueFactory;
import javafx.stage.FileChooser;
import javafx.stage.Stage;
import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPluginConfigValidationResults;

import javax.swing.text.TabableView;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;

public class CreateConnectorController {


@FXML
private ComboBox connectorClassCombo;

@FXML
private KafkaEsqueCodeArea connectorConfigTextArea;

@FXML
private TextField connectorNameField;

private boolean newConnectorMode;

private KafkaesqueConnectClient kafkaesqueConnectClient;

private Stage stage;

@FXML
private TableView paramHelpView;

@FXML
TableColumn<ConnectConfigParameter, String> paramName;
@FXML
TableColumn<ConnectConfigParameter, String> paramDisplayName;
@FXML
TableColumn<ConnectConfigParameter, String> paramType;
@FXML
TableColumn<ConnectConfigParameter, String> paramDefaultValue;
@FXML
TableColumn<ConnectConfigParameter, String> paramImportance;
@FXML
TableColumn<ConnectConfigParameter, String> paramDocumentation;
@FXML
TableColumn<ConnectConfigParameter, String> paramGroup;
@FXML
TableColumn<ConnectConfigParameter, Boolean> paramRequired;
@FXML
TableColumn<ConnectConfigParameter, Integer> paramOrder;

@FXML
Button saveButton;

private List<ConnectConfigParameter> currentConnectorClassParameters;

public void setup(String selectedConnector, KafkaesqueConnectClient kafkaesqueConnectClient, Stage stage) {
this.kafkaesqueConnectClient = kafkaesqueConnectClient;
this.stage = stage;
currentConnectorClassParameters = new ArrayList<>();


paramName.setCellValueFactory(new PropertyValueFactory<>("name"));
paramDisplayName.setCellValueFactory(new PropertyValueFactory<>("displayName"));
paramType.setCellValueFactory(new PropertyValueFactory<>("type"));
paramDefaultValue.setCellValueFactory(new PropertyValueFactory<>("defaultValue"));
paramImportance.setCellValueFactory(new PropertyValueFactory<>("importance"));
paramDocumentation.setCellValueFactory(new PropertyValueFactory<>("documentation"));
paramGroup.setCellValueFactory(new PropertyValueFactory<>("group"));
paramRequired.setCellValueFactory(new PropertyValueFactory<>("required"));
paramOrder.setCellValueFactory(new PropertyValueFactory<>("order"));

if (selectedConnector == null) {
newConnectorMode = true;

setupConnectorClassCombo();

}
else
{
newConnectorMode = false;

Map<String,String> currentConfig = kafkaesqueConnectClient.getConnectorConfig(selectedConnector);
String connectorClass = currentConfig.get("connector.class");

connectorConfigTextArea.setText(ConnectUtil.buildConfigString(currentConfig, ConnectUtil.PARAM_BLACK_LIST_EDIT));

connectorClassCombo.setDisable(true);
connectorClassCombo.setItems(FXCollections.observableArrayList(Arrays.asList(connectorClass)));
connectorClassCombo.getSelectionModel().select(connectorClass);

fetchAndShowPossibleConnectorConfigParams(connectorClass);

connectorNameField.setDisable(true);
connectorNameField.setText(selectedConnector);

saveButton.setText("Update");

}


}

public void cleanup() {
kafkaesqueConnectClient = null;
stage = null;
}

public void addConnector(ActionEvent actionEvent) {

try {
Map<String, String> paramMap = ConnectUtil.parseConfigMapFromJsonString(connectorConfigTextArea.getText());

String connectorClass = (String) connectorClassCombo.getValue();
String connectorName = connectorNameField.getText();

if(newConnectorMode == true) {
kafkaesqueConnectClient.createConnector(connectorName, connectorClass, paramMap);

SuccessAlert.show("Success", null, "Connector added successfully!");
}
else
{
kafkaesqueConnectClient.updateConnector(connectorName, connectorClass, paramMap);
SuccessAlert.show("Success", null, "Connector updated successfully!");
}

stage.close();
} catch (Exception e) {
ErrorAlert.show(e);
}

}

private void setupConnectorClassCombo() {
List<String> availPlugins = kafkaesqueConnectClient.getInstalledConnectorPlugins();
ListProperty<String> availPluginProperties = new SimpleListProperty<>(FXCollections.observableArrayList(availPlugins));
connectorClassCombo.setItems(availPluginProperties);

connectorClassCombo.getSelectionModel().selectedItemProperty().addListener((observable, oldValue, newValue) -> {
if (newValue != null && !newValue.equals(oldValue)) {

fetchAndShowPossibleConnectorConfigParams((String) newValue);
showInitialConnectorConfig();
}
});
}

private void fetchAndShowPossibleConnectorConfigParams(String connectorClass)
{
currentConnectorClassParameters.clear();
currentConnectorClassParameters.addAll(kafkaesqueConnectClient.getConnectorConfigParameters(connectorClass));

paramHelpView.setItems(FXCollections.observableArrayList(currentConnectorClassParameters));
}

private void showInitialConnectorConfig()
{
Map<String, String> initialMap = new HashMap<>();

currentConnectorClassParameters.stream()
.filter(c -> c.getImportance().equals("HIGH"))
.forEach(c -> initialMap.put(c.getName(), c.getValue() != null ? c.getValue() : c.getDefaultValue()));

connectorConfigTextArea.setText(ConnectUtil.buildConfigString(initialMap, ConnectUtil.PARAM_BLACK_LIST_EDIT));
}

public void validateConfig(ActionEvent actionEvent) {

try {
Map<String, String> paramMap = ConnectUtil.parseConfigMapFromJsonString(connectorConfigTextArea.getText());

String connectorClass = (String) connectorClassCombo.getValue();
String connectorName = connectorNameField.getText();

ValidationResult validationResult = kafkaesqueConnectClient.validateConnectorConfig(connectorName, connectorClass, paramMap);

if (validationResult.getErrorCount() == 0) {
SuccessAlert.show("Success", null, "No validation error found!");
} else {
StringBuilder errorProtocol = new StringBuilder();

for (String param : validationResult.getErrors().keySet()) {
errorProtocol.append(String.format("%s:", param));
errorProtocol.append(System.getProperty("line.separator"));

for (String error : validationResult.getErrors().get(param)) {
errorProtocol.append(String.format(" %s", error));
errorProtocol.append(System.getProperty("line.separator"));
}
}

WarningAlert.show(String.format("%d validation errors found!", validationResult.getErrorCount()), null, errorProtocol.toString());
}


} catch (Exception e) {
ErrorAlert.show(e);
}
}
}
@@ -0,0 +1,41 @@
package at.esque.kafka;

import at.esque.kafka.alerts.ErrorAlert;
import at.esque.kafka.cluster.ClusterConfig;
import at.esque.kafka.connect.KafkaesqueConnectClient;
import at.esque.kafka.controls.FilterableListView;
import at.esque.kafka.controls.KafkaEsqueCodeArea;
import at.esque.kafka.handlers.ConfigHandler;
import javafx.collections.FXCollections;
import javafx.event.ActionEvent;
import javafx.fxml.FXML;
import javafx.scene.control.ContextMenu;
import javafx.scene.control.ListCell;

import java.util.Map;


public class InstalledConnectorPluginsController {

@FXML
private FilterableListView<String> connectorPluginListView;

private KafkaesqueConnectClient kafkaesqueConnectClient;

public void setup(ClusterConfig selectedConfig, ConfigHandler configHandler) {
kafkaesqueConnectClient = new KafkaesqueConnectClient(selectedConfig.getkafkaConnectUrl(), selectedConfig.getkafkaConnectBasicAuthUser(), selectedConfig.getkafkaConnectBasicAuthPassword(), configHandler.getSslProperties(selectedConfig));
connectorPluginListView.setListComparator(String::compareTo);

refreshConnectorPlugins(null);

}

public void refreshConnectorPlugins(ActionEvent actionEvent) {
try {
connectorPluginListView.setItems(FXCollections.observableArrayList(kafkaesqueConnectClient.getInstalledConnectorPlugins()));
} catch (Exception e) {
ErrorAlert.show(e);
}
}

}

0 comments on commit e9b0474

Please sign in to comment.