Skip to content

Commit

Permalink
Add option to replicate messages from one cluster to another
Browse files Browse the repository at this point in the history
  • Loading branch information
patschuh committed Mar 18, 2019
1 parent fe40aee commit fd97d0b
Show file tree
Hide file tree
Showing 8 changed files with 428 additions and 11 deletions.
19 changes: 19 additions & 0 deletions src/main/java/at/esque/kafka/Controller.java
Expand Up @@ -548,6 +548,25 @@ public void schemaRegistryClick(ActionEvent event) {
}
}

@FXML
public void crossClusterClick(ActionEvent actionEvent) {
try {
FXMLLoader fxmlLoader = injector.getInstance(FXMLLoader.class);
fxmlLoader.setLocation(getClass().getResource("/fxml/crossClusterOperation.fxml"));
Parent root1 = fxmlLoader.load();
CrossClusterController controller = fxmlLoader.getController();
controller.setup();
Stage stage = new Stage();
stage.getIcons().add(new Image(getClass().getResourceAsStream("/icons/kafkaesque.png")));
stage.initModality(Modality.APPLICATION_MODAL);
stage.setTitle("Cross Cluster Operations");
stage.setScene(Main.createStyledScene(root1, 1000, 500));
stage.show();
} catch (Exception e) {
ErrorAlert.show(e);
}
}

private void getOldestMessages(TopicMessageTypeConfig topic, Map<String, String> consumerConfig) {
runInDaemonThread(() -> {
UUID consumerId = consumerHandler.registerConsumer(selectedCluster(), topic, consumerConfig);
Expand Down
158 changes: 158 additions & 0 deletions src/main/java/at/esque/kafka/CrossClusterController.java
@@ -0,0 +1,158 @@
package at.esque.kafka;

import at.esque.kafka.alerts.ErrorAlert;
import at.esque.kafka.cluster.ClusterConfig;
import at.esque.kafka.cluster.ClusterConfigs;
import at.esque.kafka.cluster.CrossClusterOperation;
import at.esque.kafka.cluster.KafkaesqueAdminClient;
import at.esque.kafka.cluster.TopicMessageTypeConfig;
import at.esque.kafka.handlers.ConfigHandler;
import at.esque.kafka.handlers.ConsumerHandler;
import at.esque.kafka.handlers.CrossClusterOperationHandler;
import at.esque.kafka.handlers.ProducerHandler;
import com.google.inject.Inject;
import javafx.application.Platform;
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import javafx.event.ActionEvent;
import javafx.fxml.FXML;
import javafx.scene.control.ComboBox;
import javafx.scene.control.ListView;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;

public class CrossClusterController {
@FXML
private ListView<String> fromClusterTopicsList;
@FXML
private ListView<String> toClusterTopicsList;
@FXML
private ListView<CrossClusterOperation> runningOperationsList;
@FXML
private ComboBox<ClusterConfig> fromClusterComboBox;
@FXML
private ComboBox<ClusterConfig> toClusterComboBox;

private KafkaesqueAdminClient fromAdmin;
private KafkaesqueAdminClient toAdmin;

@Inject
private CrossClusterOperationHandler crossClusterOperationHandler;
@Inject
private ConfigHandler configHandler;
@Inject
private ProducerHandler producerHandler;
@Inject
private ConsumerHandler consumerHandler;

public void setup() {
ClusterConfigs clusterConfigs = configHandler.loadOrCreateConfigs();
fromClusterComboBox.setItems(clusterConfigs.getClusterConfigs());
toClusterComboBox.setItems(clusterConfigs.getClusterConfigs());

fromClusterComboBox.getSelectionModel().selectedItemProperty().addListener((observable, oldValue, newValue) -> {
setupClusterControls(newValue, fromAdmin, fromClusterTopicsList);
});

toClusterComboBox.getSelectionModel().selectedItemProperty().addListener((observable, oldValue, newValue) -> {
setupClusterControls(newValue, toAdmin, toClusterTopicsList);
});

refreshOperationList(null);
}

private void setupClusterControls(ClusterConfig clusterConfig, KafkaesqueAdminClient adminClient, ListView topicList) {
if (adminClient != null) {
adminClient.close();
}
adminClient = new KafkaesqueAdminClient(clusterConfig.getBootStrapServers());
KafkaesqueAdminClient finalAdminClient = adminClient;
runInDaemonThread(() -> {
ObservableList<String> topics = FXCollections.observableArrayList(finalAdminClient.getTopics());
Platform.runLater(() -> topicList.setItems(topics.sorted()));
});
}

private void startOperation(UUID operationId) {
runInDaemonThread(() -> {
CrossClusterOperation operation = crossClusterOperationHandler.getOperation(operationId);
UUID producerId;
UUID consumerId;
try {
producerId = producerHandler.registerProducer(operation.getToCluster());
consumerId = consumerHandler.registerConsumer(operation.getFromCluster(), operation.getFromTopic(), configHandler.readConsumerConfigs(operation.getToCluster().getIdentifier()));
consumerHandler.subscribe(consumerId, operation.getFromTopic().getName());
consumerHandler.seekToOffset(consumerId, -2);
} catch (IOException e) {
ErrorAlert.show(e);
return;
}
Optional<KafkaConsumer> consumer = consumerHandler.getConsumer(consumerId);
consumer.ifPresent(kafkaConsumer -> {
while (!operation.getStop().get()) {
ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
Iterable<ConsumerRecord> records = consumerRecords.records(operation.getFromTopic().getName());
records.forEach(consumerRecord -> {
try {
if (operation.getFilterFunction().test(consumerRecord)) {
producerHandler.sendMessage(producerId, operation.getToTopic().getName(), -1, (String) consumerRecord.key(), (String) consumerRecord.value());
}
} catch (Exception e) {
ErrorAlert.show(e);
return;
}
});
}
});
consumerHandler.deregisterConsumer(consumerId);
producerHandler.deregisterProducer(producerId);
});
}


private void runInDaemonThread(Runnable runnable) {
Thread daemonThread = new Thread(runnable);
daemonThread.setDaemon(true);
daemonThread.start();
}

@FXML
public void startOperationClick(ActionEvent actionEvent) {
try {
ClusterConfig fromCluster = fromClusterComboBox.getSelectionModel().getSelectedItem();
ClusterConfig toCluster = toClusterComboBox.getSelectionModel().getSelectedItem();

String fromTopicName = fromClusterTopicsList.getSelectionModel().getSelectedItem();
String toTopicName = toClusterTopicsList.getSelectionModel().getSelectedItem();

TopicMessageTypeConfig fromTopic = configHandler.getConfigForTopic(fromCluster.getIdentifier(), fromTopicName);
TopicMessageTypeConfig toTopic = configHandler.getConfigForTopic(toCluster.getIdentifier(), toTopicName);

CrossClusterOperation crossClusterOperation = new CrossClusterOperation(fromCluster, toCluster, fromTopic, toTopic, consumerRecord -> true);
UUID operationId = crossClusterOperationHandler.registerOperation(crossClusterOperation);
startOperation(operationId);
} catch (Exception e) {
ErrorAlert.show(e);
return;
}
}

@FXML
public void refreshOperationList(ActionEvent actionEvent) {
runningOperationsList.setItems(crossClusterOperationHandler.getOperations());
}

@FXML
public void stopSelectedOperation(ActionEvent actionEvent) {
CrossClusterOperation selectedItem = runningOperationsList.getSelectionModel().getSelectedItem();
if (selectedItem != null && selectedItem.getOperationId() != null) {
crossClusterOperationHandler.markOperationForStop(selectedItem.getOperationId());
}
}
}
16 changes: 11 additions & 5 deletions src/main/java/at/esque/kafka/cluster/ClusterConfigs.java
Expand Up @@ -7,27 +7,33 @@
import java.beans.Transient;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class ClusterConfigs {
private ObservableList<ClusterConfig> clusterConfigs = FXCollections.observableArrayList();

@Transient
public ObservableList<ClusterConfig> getClusterConfigs() {
public ObservableList<ClusterConfig> getClusterConfigs() {
return clusterConfigs;
}

public void setClusterConfigs( ObservableList<ClusterConfig> clusterConfigs) {
public void setClusterConfigs(ObservableList<ClusterConfig> clusterConfigs) {
this.clusterConfigs = clusterConfigs;
}

public Map<String, ClusterConfig> asMap() {
return clusterConfigs.stream().collect(Collectors.toMap(ClusterConfig::getIdentifier, clusterConfig -> clusterConfig));
}

@JsonProperty("clusterConfigs")
public List<ClusterConfig> getClusterConfigAsList(){
public List<ClusterConfig> getClusterConfigAsList() {
return new ArrayList<>(clusterConfigs);
}

@JsonProperty("clusterConfigs")
public void setClusterConfigAsList(List<ClusterConfig> clusterConfigs){
this.clusterConfigs = FXCollections.observableArrayList(clusterConfigs);
public void setClusterConfigAsList(List<ClusterConfig> clusterConfigs) {
this.clusterConfigs = FXCollections.observableArrayList(clusterConfigs);
}

}
86 changes: 86 additions & 0 deletions src/main/java/at/esque/kafka/cluster/CrossClusterOperation.java
@@ -0,0 +1,86 @@
package at.esque.kafka.cluster;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;

public class CrossClusterOperation {
private ClusterConfig fromCluster;
private ClusterConfig toCluster;
private TopicMessageTypeConfig fromTopic;
private TopicMessageTypeConfig toTopic;
private Predicate<ConsumerRecord> filterFunction;
private AtomicBoolean stop = new AtomicBoolean(false);
private UUID operationId;

public CrossClusterOperation(ClusterConfig fromCluster, ClusterConfig toCluster, TopicMessageTypeConfig fromTopic, TopicMessageTypeConfig toTopic, Predicate<ConsumerRecord> filterFunction) {
this.fromCluster = fromCluster;
this.toCluster = toCluster;
this.fromTopic = fromTopic;
this.toTopic = toTopic;
this.filterFunction = filterFunction;
}

public ClusterConfig getFromCluster() {
return fromCluster;
}

public void setFromCluster(ClusterConfig fromCluster) {
this.fromCluster = fromCluster;
}

public ClusterConfig getToCluster() {
return toCluster;
}

public void setToCluster(ClusterConfig toCluster) {
this.toCluster = toCluster;
}

public TopicMessageTypeConfig getFromTopic() {
return fromTopic;
}

public void setFromTopic(TopicMessageTypeConfig fromTopic) {
this.fromTopic = fromTopic;
}

public TopicMessageTypeConfig getToTopic() {
return toTopic;
}

public void setToTopic(TopicMessageTypeConfig toTopic) {
this.toTopic = toTopic;
}

public Predicate<ConsumerRecord> getFilterFunction() {
return filterFunction;
}

public void setFilterFunction(Predicate<ConsumerRecord> filterFunction) {
this.filterFunction = filterFunction;
}

public UUID getOperationId() {
return operationId;
}

public void setOperationId(UUID operationId) {
this.operationId = operationId;
}

public AtomicBoolean getStop() {
return stop;
}

public void setStop(AtomicBoolean stop) {
this.stop = stop;
}

@Override
public String toString() {
return fromCluster.getIdentifier() + " / " + fromTopic.getName() + " --> " + toCluster.getIdentifier() + " / " + toTopic.getName();
}
}
24 changes: 22 additions & 2 deletions src/main/java/at/esque/kafka/cluster/KafkaesqueAdminClient.java
Expand Up @@ -2,11 +2,27 @@

import at.esque.kafka.alerts.ErrorAlert;
import at.esque.kafka.topics.DescribeTopicWrapper;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;

import java.util.*;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -74,4 +90,8 @@ public DescribeTopicWrapper describeTopic(String topic) {
}
return null;
}

public void close() {
adminClient.close();
}
}

0 comments on commit fd97d0b

Please sign in to comment.