Skip to content

NIFI-10991 Add AWS MSK IAM support to Kafka processors#6846

Closed
nandorsoma wants to merge 5 commits intoapache:mainfrom
nandorsoma:NIFI-10991
Closed

NIFI-10991 Add AWS MSK IAM support to Kafka processors#6846
nandorsoma wants to merge 5 commits intoapache:mainfrom
nandorsoma:NIFI-10991

Conversation

@nandorsoma
Copy link
Contributor

Summary

NIFI-10991

This PR adds AWS IAM for authentication and authorization functionality against Amazon MSK clusters that have AWS IAM enabled as an authentication mechanism. You need to enable include-kafka-aws profile to include the required module. A previous PR (#5291) tried to tackle this problem. In this PR, though, we decided not to include awsRoleArn, awsRoleSessionName, awsStsRegion as properties because they just add extra complexity with minimal value. When NiFi runs in EC2, and no profile is specified, the processor will use the role assigned to the EC2 instance. When NiFi runs locally, we can define a profile name that will reference a profile in ~/.aws/credentials (depends on os) file. In this file, you can specify the above properties along with the access key id and secret.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 8
    • JDK 11
    • JDK 17

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

@exceptionfactory exceptionfactory self-requested a review January 13, 2023 17:11
Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for implementing this feature @nandorsoma, the implementation and integration approach with the shared Kafka library looks good.

I agree with @turcsanyip that the AWS Debug Credentials property seems unnecessary and should be removed. Other than that, I noted several minor naming and wording recommendations.

@nandorsoma
Copy link
Contributor Author

Thank you for the review @turcsanyip and @exceptionfactory! I've changed the code based on your advice.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making the adjustments @nandorsoma, the current version looks good from a code perspective.

Do you have any additional feedback @turcsanyip?

@turcsanyip
Copy link
Contributor

@nandorsoma There seems to be regression in the kerberos login.
I get the error below when trying to connect to a kerberized kafka using SASL_SSL + GSSAPI. I built the processors without the new include-kafka-aws profile. The same flow works with the version on the current main branch.

2023-01-18 14:28:17,992 WARN [Timer-Driven Process Thread-9] o.a.n.controller.tasks.ConnectableTask Processing halted: uncaught exception in Component [PublishKafka_2_6[id=c49ff5ca-0185-1000-d5c3-1f042d05afeb]]
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:441)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:273)
	at org.apache.nifi.processors.kafka.pubsub.PublisherPool.createLease(PublisherPool.java:88)
	at org.apache.nifi.processors.kafka.pubsub.PublisherPool.obtainPublisher(PublisherPool.java:78)
	at org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_6.onTrigger(PublishKafka_2_6.java:444)
	at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
	at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1356)
	at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:246)
	at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:102)
	at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Value not specified for key 'required' in JAAS config
	at org.apache.kafka.common.security.JaasConfig.parseAppConfigurationEntry(JaasConfig.java:116)
	at org.apache.kafka.common.security.JaasConfig.<init>(JaasConfig.java:63)
	at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:90)
	at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84)
	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:134)
	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:73)
	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
	at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:449)
	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
	... 16 common frames omitted

@nandorsoma
Copy link
Contributor Author

Thanks for noticing it @turcsanyip! I've changed the code, tested it with the 3 different Kerberos config methods.

Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the kerberos fix @nandorsoma!

I tested the kerberos cases too, and also SCRAM and the new AWS IAM. All work properly.

While I was configuring the different authentication mechanisms, I felt quite confusing that the kerberos properties are always shown on the processor, even when they are not relevant, and it is not clear which properties belong to the given authentication type that should be filled in (in case of PLAINTEXT or SCRAM, and also the new AWS IAM). Not sure why dependsOn() has not been applied here so far. I would suggest implementing it either in this PR or in a follow-up one.

Comment on lines +84 to +87
(ASLv2) aws-msk-iam-auth
The following NOTICE information applies:
aws-msk-iam-auth
Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aws-msk-iam-auth pulls in a couple of other dependencies transitively (e.g. further aws libraries, netty).
The NOTICE file needs to be updated with those as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for noticing it! Will update soon.

}
}

private void validateAWSIAMMechanism(final ValidationContext validationContext, final Collection<ValidationResult> results) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor:

Suggested change
private void validateAWSIAMMechanism(final ValidationContext validationContext, final Collection<ValidationResult> results) {
private void validateAwsMskIamMechanism(final ValidationContext validationContext, final Collection<ValidationResult> results) {

Comment on lines 239 to 241
final String saslMechanism = validationContext.getProperty(SASL_MECHANISM).getValue();

if (SaslMechanism.AWS_MSK_IAM.getValue().equals(saslMechanism) && !StandardKafkaPropertyProvider.isAwsMskIamCallbackHandlerFound()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: SaslMechanism.getSaslMechanism() could be used instead of the String comparison.

Suggested change
final String saslMechanism = validationContext.getProperty(SASL_MECHANISM).getValue();
if (SaslMechanism.AWS_MSK_IAM.getValue().equals(saslMechanism) && !StandardKafkaPropertyProvider.isAwsMskIamCallbackHandlerFound()) {
final SaslMechanism saslMechanism = SaslMechanism.getSaslMechanism(validationContext.getProperty(SASL_MECHANISM).getValue());
if (SaslMechanism.AWS_MSK_IAM == saslMechanism && !StandardKafkaPropertyProvider.isAwsMskIamCallbackHandlerFound()) {

@exceptionfactory
Copy link
Contributor

I felt quite confusing that the kerberos properties are always shown on the processor, even when they are not relevant, and it is not clear which properties belong to the given authentication type that should be filled in (in case of PLAINTEXT or SCRAM, and also the new AWS IAM). Not sure why dependsOn() has not been applied here so far. I would suggest implementing it either in this PR or in a follow-up one.

@turcsanyip Due to changes over time, there are multiple ways to configure Kerberos credentials, so that is part of the reason that some Kerberos properties are always shown. It might be possible to improve this in some scenarios, and that could be a useful improvement in a separate effort.

@nandorsoma
Copy link
Contributor Author

Thank you @turcsanyip for your review! I've changed what you have asked except the dependsOn part. I'll do that in a separate PR as @exceptionfactory suggested.

Copy link
Contributor

@turcsanyip turcsanyip left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nandorsoma and @exceptionfactory!
Let's investigate the improvements around the kerberos properties in a separate jira. Actually, the issue is two-fold:

  • hide all kerberos properties when kerberos is not relevant (and the SSL service similarly): I believe it can be solved with dependsOn() easily and would be a useful improvement in this processor
  • better handling of the different kerberos configuration methods maintained due to historical reasons (the same issue affects other processors / services too, like SSL and Proxy config): the solution is not trivial here but worth investigating

+1 LGTM
Merging to main.

@asfgit asfgit closed this in eb5d172 Jan 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants