Skip to content

Commit

Permalink
[ESQUE-12, ESQUE-21] Added configuration options for Producers and co…
Browse files Browse the repository at this point in the history
…nsumers; added configurations for topics; used topic configuration to enable use of avro
  • Loading branch information
patschuh committed Feb 25, 2019
1 parent 0068cec commit deb586f
Show file tree
Hide file tree
Showing 7 changed files with 446 additions and 122 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Expand Up @@ -59,6 +59,11 @@
<artifactId>kafka-schema-registry</artifactId>
<version>5.0.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.0.0</version>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
Expand Down
150 changes: 150 additions & 0 deletions src/main/java/at/esque/kafka/ConfigHandler.java
@@ -0,0 +1,150 @@
package at.esque.kafka;

import at.esque.kafka.alerts.ErrorAlert;
import at.esque.kafka.cluster.ClusterConfigs;
import at.esque.kafka.cluster.TopicMessageTypeConfig;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
import com.google.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

@Singleton
public class ConfigHandler {
private static final String CONFIG_DIRECTORY = System.getProperty("user.home") + "/.kafkaesque/%s";

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigHandler.class);

private ObjectMapper objectMapper = new ObjectMapper();
private YAMLMapper yamlMapper = new YAMLMapper();

private File clusterConfig;

private ClusterConfigs clusterConfigs;

private Map<String, Map<String, TopicMessageTypeConfig>> cachedConfigs = new ConcurrentHashMap<>();

public ConfigHandler() {
}

public TopicMessageTypeConfig getConfigForTopic(String clusterIdentification, String topic) {
Map<String, TopicMessageTypeConfig> configs = getTopicConfigForClusterIdentifier(clusterIdentification);
TopicMessageTypeConfig topicMessageTypeConfig = configs.get(topic);
if (topicMessageTypeConfig == null) {
topicMessageTypeConfig = new TopicMessageTypeConfig(topic);
configs.put(topic, topicMessageTypeConfig);
}
return topicMessageTypeConfig;
}

public Map<String, TopicMessageTypeConfig> getTopicConfigForClusterIdentifier(String clusterIdentification) {
Map<String, TopicMessageTypeConfig> config = cachedConfigs.get(clusterIdentification);
if (config != null) {
return config;
}
File configFile = new File(String.format(CONFIG_DIRECTORY, clusterIdentification + "/topics.yaml"));
if (!configFile.exists()) {
List<TopicMessageTypeConfig> topicConfigs = new ArrayList<>();
configFile.getParentFile().mkdirs();
try {
yamlMapper.writeValue(configFile, topicConfigs);
} catch (IOException e) {
ErrorAlert.show(e);
}
}
try {
List<TopicMessageTypeConfig> topicConfigList = yamlMapper.readValue(configFile, new TypeReference<List<TopicMessageTypeConfig>>() {
});
Map<String, TopicMessageTypeConfig> configMap = topicConfigList.stream()
.collect(Collectors.toMap(TopicMessageTypeConfig::getName, Function.identity()));
cachedConfigs.put(clusterIdentification, configMap);
return configMap;
} catch (IOException e) {
ErrorAlert.show(e);
}
cachedConfigs.put(clusterIdentification, new HashMap<>());
return cachedConfigs.get(clusterIdentification);
}

public Map<String, String> readConsumerConfigs(String clusterIdentification) throws IOException {
return readConfigsMap(clusterIdentification, "consumer");
}

public Map<String, String> readProducerConfigs(String clusterIdentification) throws IOException {
return readConfigsMap(clusterIdentification, "producer");
}

public Map<String, String> readConfigsMap(String clusterIdentification, String fileNameWithoutExtension) throws IOException {
File configFile = new File(String.format(CONFIG_DIRECTORY, clusterIdentification + "/" + fileNameWithoutExtension + ".yaml"));
configFile.getParentFile().mkdirs();

if (configFile.exists()) {
return yamlMapper.readValue(configFile, new TypeReference<Map<String, String>>() {
});
}
return new HashMap<>();
}

public void writeConfigsMap(String clusterIdentification, String fileNameWithoutExtension, Map<String, String> configMap) throws IOException {
File configFile = new File(String.format(CONFIG_DIRECTORY, clusterIdentification + "/" + fileNameWithoutExtension + ".yaml"));
configFile.getParentFile().mkdirs();

yamlMapper.writeValue(configFile, configMap);
}


public ClusterConfigs loadOrCreateConfigs() {
clusterConfig = new File(String.format(CONFIG_DIRECTORY, "clusters.json"));
if (clusterConfigs != null) {
return clusterConfigs;
}
if (clusterConfig.exists()) {
try {
clusterConfigs = objectMapper.readValue(clusterConfig, ClusterConfigs.class);
return clusterConfigs;
} catch (IOException e) {
ErrorAlert.show(e);
}
} else {
ClusterConfigs clusterConfigs = new ClusterConfigs();
try {
clusterConfig.getParentFile().mkdirs();
objectMapper.writeValue(clusterConfig, clusterConfigs);
return clusterConfigs;
} catch (IOException e) {
ErrorAlert.show(e);
}
}
return new ClusterConfigs();
}

public void saveConfigs() {
try {
objectMapper.writeValue(clusterConfig, clusterConfigs);
} catch (IOException e) {
ErrorAlert.show(e);
}
}

public void saveTopicMessageTypeConfigs(String clusterIdentification) {
File configFile = new File(String.format(CONFIG_DIRECTORY, clusterIdentification + "/topics.yaml"));
try {
yamlMapper.writeValue(configFile, cachedConfigs.get(clusterIdentification).values());
} catch (IOException e) {
ErrorAlert.show(e);
}
}


}
37 changes: 21 additions & 16 deletions src/main/java/at/esque/kafka/ConsumerHandler.java
Expand Up @@ -2,6 +2,8 @@

import at.esque.kafka.alerts.ErrorAlert;
import at.esque.kafka.cluster.ClusterConfig;
import at.esque.kafka.cluster.TopicMessageTypeConfig;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -11,47 +13,50 @@
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Singleton
public class ConsumerHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(Controller.class);

private ConsumerHandler instance;
private Map<UUID, KafkaConsumer<String, String>> registeredConsumers = new ConcurrentHashMap<>();
@Inject
private ConfigHandler configHandler;

public ConsumerHandler() {
}
private Map<UUID, KafkaConsumer> registeredConsumers = new ConcurrentHashMap<>();

public ConsumerHandler getInstance() {
if (instance == null) {
return instance = new ConsumerHandler();
}
return instance;
public ConsumerHandler() {
}

public Optional<KafkaConsumer<String, String>> getConsumer(UUID consumerId) {
public Optional<KafkaConsumer> getConsumer(UUID consumerId) {
return Optional.ofNullable(registeredConsumers.get(consumerId));
}

public Map<UUID, KafkaConsumer<String, String>> getRegisteredConsumers() {
public Map<UUID, KafkaConsumer> getRegisteredConsumers() {
return registeredConsumers;
}

public void setRegisteredConsumers(Map<UUID, KafkaConsumer<String, String>> registeredConsumers) {
public void setRegisteredConsumers(Map<UUID, KafkaConsumer> registeredConsumers) {
this.registeredConsumers = registeredConsumers;
}

public UUID registerConsumer(ClusterConfig config) {
public UUID registerConsumer(ClusterConfig config, TopicMessageTypeConfig topicMessageTypeConfig, Map<String, String> consumerConfigs) {
Properties consumerProps = new Properties();
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootStrapServers());
UUID consumerId = UUID.randomUUID();
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kafkaesque-" + consumerId);
consumerProps.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, topicMessageTypeConfig.getKeyType().equals(MessageType.AVRO)? "io.confluent.kafka.serializers.KafkaAvroDeserializer" : "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, topicMessageTypeConfig.getValueType().equals(MessageType.AVRO)? "io.confluent.kafka.serializers.KafkaAvroDeserializer" : "org.apache.kafka.common.serialization.StringDeserializer");
consumerProps.setProperty("schema.registry.url", config.getSchemaRegistry());

consumerProps.putAll(consumerConfigs);

LOGGER.info("Creating new Consumer with properties: [{}]", consumerProps);
registeredConsumers.put(consumerId, new KafkaConsumer<>(consumerProps));
return consumerId;
Expand Down

0 comments on commit deb586f

Please sign in to comment.