Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -708,11 +708,29 @@ public Map<String, Object> getComponentConfiguration() {
configuration.put(configKeyPrefix + "topics", getTopicsString());

configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId());
configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
configuration.put(configKeyPrefix + "security.protocol", kafkaSpoutConfig.getKafkaProps().get("security.protocol"));
for (Entry<String, Object> conf: kafkaSpoutConfig.getKafkaProps().entrySet()) {
if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) {
configuration.put(configKeyPrefix + conf.getKey(), conf.getValue());
} else {
LOG.debug("Dropping Kafka prop '{}' from component configuration", conf.getKey());
}
}
return configuration;
}

private boolean isPrimitiveOrWrapper(Class<?> type) {
if (type == null) {
return false;
}
return type.isPrimitive() || isWrapper(type);
}

private boolean isWrapper(Class<?> type) {
return type == Double.class || type == Float.class || type == Long.class ||
type == Integer.class || type == Short.class || type == Character.class ||
type == Byte.class || type == Boolean.class || type == String.class;
}

private String getTopicsString() {
return kafkaSpoutConfig.getSubscription().getTopicsString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.json.simple.JSONValue;

import java.util.ArrayList;
Expand Down Expand Up @@ -71,6 +72,8 @@ public class KafkaOffsetLagUtil {
private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node";
private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s";
private static final String OPTION_SECURITY_PROTOCOL_LONG = "security-protocol";
private static final String OPTION_CONSUMER_CONFIG_SHORT = "c";
private static final String OPTION_CONSUMER_CONFIG_LONG = "consumer-config";

public static void main (String args[]) {
try {
Expand Down Expand Up @@ -136,10 +139,10 @@ public static void main (String args[]) {
" is not specified");
}
NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol);
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG),
securityProtocol, commandLine.getOptionValue(OPTION_CONSUMER_CONFIG_LONG));
results = getOffsetLags(newKafkaSpoutOffsetQuery);
}

Map<String, Map<Integer, KafkaPartitionOffsetLag>> keyedResult = keyByTopicAndPartition(results);
System.out.print(JSONValue.toJSONString(keyedResult));
} catch (Exception ex) {
Expand Down Expand Up @@ -195,6 +198,8 @@ private static Options buildOptions () {
options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. " +
"/brokers (applicable only for old kafka spout) ");
options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka");
options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true, "Properties file with additional " +
"Kafka consumer properties");
return options;
}

Expand All @@ -203,7 +208,7 @@ private static Options buildOptions () {
* @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
* @return log head offset, spout offset and lag for each partition
*/
public static List<KafkaOffsetLagResult> getOffsetLags (NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
public static List<KafkaOffsetLagResult> getOffsetLags(NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) throws Exception {
KafkaConsumer<String, String> consumer = null;
List<KafkaOffsetLagResult> result = new ArrayList<>();
try {
Expand All @@ -217,6 +222,10 @@ public static List<KafkaOffsetLagResult> getOffsetLags (NewKafkaSpoutOffsetQuery
if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol());
}
// Read property file for extra consumer properties
if (newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName() != null) {
props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName()));
}
List<TopicPartition> topicPartitionList = new ArrayList<>();
consumer = new KafkaConsumer<>(props);
for (String topic: newKafkaSpoutOffsetQuery.getTopics().split(",")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ public class NewKafkaSpoutOffsetQuery {
private final String consumerGroupId; // consumer group id for which the offset needs to be calculated
private final String bootStrapBrokers; // bootstrap brokers
private final String securityProtocol; // security protocol to connect to kafka
private final String consumerPropertiesFileName; // properties file containing additional kafka consumer configs

public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId, String securityProtocol) {
public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId, String securityProtocol,
String consumerPropertiesFileName) {
this.topics = topics;
this.bootStrapBrokers = bootstrapBrokers;
this.consumerGroupId = consumerGroupId;
this.securityProtocol = securityProtocol;
this.consumerPropertiesFileName = consumerPropertiesFileName;
}

public String getTopics() {
Expand All @@ -49,6 +52,10 @@ public String getSecurityProtocol() {
return this.securityProtocol;
}

public String getConsumerPropertiesFileName() {
return this.consumerPropertiesFileName;
}

@Override
public String toString() {
return "NewKafkaSpoutOffsetQuery{" +
Expand Down
60 changes: 52 additions & 8 deletions storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@
package org.apache.storm.utils;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
import java.util.Properties;
import java.util.Set;

import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
Expand All @@ -44,6 +50,9 @@ public class TopologySpoutLag {
private static final String BOOTSTRAP_CONFIG = CONFIG_KEY_PREFIX + "bootstrap.servers";
private static final String LEADERS_CONFIG = CONFIG_KEY_PREFIX + "leaders";
private static final String ZKROOT_CONFIG = CONFIG_KEY_PREFIX + "zkRoot";
private static final String SECURITY_PROTOCOL_CONFIG = CONFIG_KEY_PREFIX + "security.protocol";
private static final Set<String> ALL_CONFIGS = new HashSet<>(Arrays.asList(TOPICS_CONFIG, GROUPID_CONFIG,
BOOTSTRAP_CONFIG, SECURITY_PROTOCOL_CONFIG));
private final static Logger logger = LoggerFactory.getLogger(TopologySpoutLag.class);

public static Map<String, Map<String, Object>> lag(StormTopology stormTopology, Map topologyConf) {
Expand Down Expand Up @@ -71,7 +80,7 @@ private static List<String> getCommandLineOptionsForNewKafkaSpout (Map<String, O
commands.add((String) jsonConf.get(GROUPID_CONFIG));
commands.add("-b");
commands.add((String) jsonConf.get(BOOTSTRAP_CONFIG));
String securityProtocol = (String) jsonConf.get(CONFIG_KEY_PREFIX + "security.protocol");
String securityProtocol = (String) jsonConf.get(SECURITY_PROTOCOL_CONFIG);
if (securityProtocol != null && !securityProtocol.isEmpty()) {
commands.add("-s");
commands.add(securityProtocol);
Expand Down Expand Up @@ -115,6 +124,30 @@ private static List<String> getCommandLineOptionsForOldKafkaSpout (Map<String, O
return commands;
}

private static File createExtraPropertiesFile(Map<String, Object> jsonConf) {
File file = null;
Map<String, String> extraProperties = new HashMap<>();
for (Map.Entry<String, Object> conf: jsonConf.entrySet()) {
if (conf.getKey().startsWith(CONFIG_KEY_PREFIX) && !ALL_CONFIGS.contains(conf.getKey())) {
extraProperties.put(conf.getKey().substring(CONFIG_KEY_PREFIX.length()), conf.getValue().toString());
}
}
if (!extraProperties.isEmpty()) {
try {
file = File.createTempFile("kafka-consumer-extra", "props");
file.deleteOnExit();
Properties properties = new Properties();
properties.putAll(extraProperties);
try(FileOutputStream fos = new FileOutputStream(file)) {
properties.store(fos, "Kafka consumer extra properties");
}
} catch (IOException ex) {
// ignore
}
}
return file;
}

private static void addLagResultForKafkaSpout(Map<String, Map<String, Object>> finalResult, String spoutId, SpoutSpec spoutSpec,
Map topologyConf) throws IOException {
ComponentCommon componentCommon = spoutSpec.get_common();
Expand Down Expand Up @@ -159,18 +192,29 @@ private static Map<String, Object> getLagResultForKafka (String spoutId, SpoutSp
}
commands.addAll(old ? getCommandLineOptionsForOldKafkaSpout(jsonMap, topologyConf) : getCommandLineOptionsForNewKafkaSpout(jsonMap));

File extraPropertiesFile = createExtraPropertiesFile(jsonMap);
if (extraPropertiesFile != null) {
commands.add("-c");
commands.add(extraPropertiesFile.getAbsolutePath());
}
logger.debug("Command to run: {}", commands);

// if commands contains one or more null value, spout is compiled with lower version of storm-kafka / storm-kafka-client
if (!commands.contains(null)) {
String resultFromMonitor = ShellUtils.execCommand(commands.toArray(new String[0]));

try {
result = (Map<String, Object>) JSONValue.parseWithException(resultFromMonitor);
} catch (ParseException e) {
logger.debug("JSON parsing failed, assuming message as error message: {}", resultFromMonitor);
// json parsing fail -> error received
errorMsg = resultFromMonitor;
String resultFromMonitor = ShellUtils.execCommand(commands.toArray(new String[0]));

try {
result = (Map<String, Object>) JSONValue.parseWithException(resultFromMonitor);
} catch (ParseException e) {
logger.debug("JSON parsing failed, assuming message as error message: {}", resultFromMonitor);
// json parsing fail -> error received
errorMsg = resultFromMonitor;
}
} finally {
if (extraPropertiesFile != null) {
extraPropertiesFile.delete();
}
}
}
}
Expand Down