Skip to content

STORM-3123 - add support for Kafka security config in storm-kafka-monitor#2906

Merged
asfgit merged 3 commits intoapache:masterfrom
arunmahadevan:STORM-3123
Nov 15, 2018
Merged

STORM-3123 - add support for Kafka security config in storm-kafka-monitor#2906
asfgit merged 3 commits intoapache:masterfrom
arunmahadevan:STORM-3123

Conversation

@arunmahadevan
Copy link
Copy Markdown
Contributor

No description provided.

@arunmahadevan
Copy link
Copy Markdown
Contributor Author

@VipinRathor
I pulled in the relevant changes from #2760 and created this so that we can take it forward. I need to do some tests and will update once done.

cc @priyank5485 @HeartSaVioR

@arunmahadevan arunmahadevan force-pushed the STORM-3123 branch 3 times, most recently from f9d7f7b to 9f59628 Compare November 13, 2018 02:11
@arunmahadevan
Copy link
Copy Markdown
Contributor Author

Have tested the changes with Kafka broker running in 2-way SSL and unsecure modes and able to see the lags. The change on the kafka spout side is to return the KafkaConfig properties in getComponentConfiguration so that the lag util can use it.

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @VipinRathor and @arunmahadevan for working on this. Only some nits.

configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
configuration.put(configKeyPrefix + "security.protocol", kafkaSpoutConfig.getKafkaProps().get("security.protocol"));
for (Entry<String, Object> conf: kafkaSpoutConfig.getKafkaProps().entrySet()) {
if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Might be better to leave a log message for dropped configuration keys from here. Maybe DEBUG is fine since I guess they're only used for storm-kafka-monitor.

private final String consumerGroupId; // consumer group id for which the offset needs to be calculated
private final String bootStrapBrokers; // bootstrap brokers
private final String securityProtocol; // security protocol to connect to kafka
private final String consumerConfig; // security configuration file to connect to secure kafka
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: IMHO we may be able to have better name (like securityConfFilePath?) to represent what comment says. Other parameters look like self-described but I couldn't imagine new parameter points to the file by its name.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be any properties. renamed to consumerPropertiesFileName.

return commands;
}

private static File getExtraPropertiesFile(Map<String, Object> jsonConf) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe using create or build or so instead of get would be clear to represent that new (temporary) file is generated.

@arunmahadevan
Copy link
Copy Markdown
Contributor Author

@HeartSaVioR thanks for reviewing. Addressed comments.

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants