Skip to content

Commit

Permalink
STORM-3123: Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arunmahadevan committed Nov 15, 2018
1 parent fa0b862 commit d8d8837
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,8 @@ public Map<String, Object> getComponentConfiguration() {
for (Entry<String, Object> conf: kafkaSpoutConfig.getKafkaProps().entrySet()) {
if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) {
configuration.put(configKeyPrefix + conf.getKey(), conf.getValue());
} else {
LOG.debug("Dropping Kafka prop '{}' from component configuration", conf.getKey());
}
}
return configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ public static List<KafkaOffsetLagResult> getOffsetLags(NewKafkaSpoutOffsetQuery
props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol());
}
// Read property file for extra consumer properties
if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
if (newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName() != null) {
props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName()));
}
List<TopicPartition> topicPartitionList = new ArrayList<>();
consumer = new KafkaConsumer<>(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public class NewKafkaSpoutOffsetQuery {
private final String consumerGroupId; // consumer group id for which the offset needs to be calculated
private final String bootStrapBrokers; // bootstrap brokers
private final String securityProtocol; // security protocol to connect to kafka
private final String consumerConfig; // security configuration file to connect to secure kafka
private final String consumerPropertiesFileName; // properties file containing additional kafka consumer configs

public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId, String securityProtocol,
String consumerConfig) {
String consumerPropertiesFileName) {
this.topics = topics;
this.bootStrapBrokers = bootstrapBrokers;
this.consumerGroupId = consumerGroupId;
this.securityProtocol = securityProtocol;
this.consumerConfig = consumerConfig;
this.consumerPropertiesFileName = consumerPropertiesFileName;
}

public String getTopics() {
Expand All @@ -54,8 +54,8 @@ public String getSecurityProtocol() {
return this.securityProtocol;
}

public String getConsumerConfig() {
return this.consumerConfig;
public String getConsumerPropertiesFileName() {
return this.consumerPropertiesFileName;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private static List<String> getCommandLineOptionsForNewKafkaSpout(Map<String, Ob
return commands;
}

private static File getExtraPropertiesFile(Map<String, Object> jsonConf) {
private static File createExtraPropertiesFile(Map<String, Object> jsonConf) {
File file = null;
Map<String, String> extraProperties = new HashMap<>();
for (Map.Entry<String, Object> conf: jsonConf.entrySet()) {
Expand Down Expand Up @@ -149,7 +149,7 @@ private static Map<String, Object> getLagResultForKafka(String spoutId, SpoutSpe
}
commands.addAll(getCommandLineOptionsForNewKafkaSpout(jsonMap));

File extraPropertiesFile = getExtraPropertiesFile(jsonMap);
File extraPropertiesFile = createExtraPropertiesFile(jsonMap);
if (extraPropertiesFile != null) {
commands.add("-c");
commands.add(extraPropertiesFile.getAbsolutePath());
Expand Down

0 comments on commit d8d8837

Please sign in to comment.