diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml index b340d3bf8c2d..0a349e85a5c4 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/pom.xml @@ -39,6 +39,11 @@ nifi-utils 1.20.0-SNAPSHOT + + org.apache.nifi + nifi-oauth2-provider-api + 1.20.0-SNAPSHOT + javax.mail mail diff --git a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java index 19c64971fe63..a8b2cf0dbcaa 100644 --- a/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java +++ b/nifi-nar-bundles/nifi-email-bundle/nifi-email-processors/src/main/java/org/apache/nifi/processors/email/AbstractEmailProcessor.java @@ -16,10 +16,13 @@ */ package org.apache.nifi.processors.email; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -43,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map.Entry; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; @@ -56,7 +60,17 @@ * @param the type of {@link AbstractMailReceiver}. */ abstract class AbstractEmailProcessor extends AbstractProcessor { - + public static final AllowableValue PASSWORD_BASED_AUTHORIZATION_MODE = new AllowableValue( + "password-based-authorization-mode", + "Use Password", + "Use password" + ); + + public static final AllowableValue OAUTH_AUTHORIZATION_MODE = new AllowableValue( + "oauth-based-authorization-mode", + "Use OAuth2", + "Use OAuth2 to acquire access token" + ); public static final PropertyDescriptor HOST = new PropertyDescriptor.Builder() .name("host") .displayName("Host Name") @@ -73,6 +87,22 @@ abstract class AbstractEmailProcessor extends Ab .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.PORT_VALIDATOR) .build(); + public static final PropertyDescriptor AUTHORIZATION_MODE = new PropertyDescriptor.Builder() + .name("authorization-mode") + .displayName("Authorization Mode") + .description("How to authorize sending email on the user's behalf.") + .required(true) + .allowableValues(PASSWORD_BASED_AUTHORIZATION_MODE, OAUTH_AUTHORIZATION_MODE) + .defaultValue(PASSWORD_BASED_AUTHORIZATION_MODE.getValue()) + .build(); + public static final PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new PropertyDescriptor.Builder() + .name("oauth2-access-token-provider") + .displayName("OAuth2 Access Token Provider") + .description("OAuth2 service that can provide access tokens.") + .identifiesControllerService(OAuth2AccessTokenProvider.class) + .dependsOn(AUTHORIZATION_MODE, OAUTH_AUTHORIZATION_MODE) + .required(true) + .build(); public static final PropertyDescriptor USER = new PropertyDescriptor.Builder() .name("user") .displayName("User Name") @@ -85,6 +115,7 @@ abstract class AbstractEmailProcessor extends Ab .name("password") .displayName("Password") .description("Password used for authentication and authorization with Email server.") + .dependsOn(AUTHORIZATION_MODE, PASSWORD_BASED_AUTHORIZATION_MODE) .required(true) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -143,6 +174,8 @@ abstract class AbstractEmailProcessor extends Ab static { SHARED_DESCRIPTORS.add(HOST); SHARED_DESCRIPTORS.add(PORT); + SHARED_DESCRIPTORS.add(AUTHORIZATION_MODE); + SHARED_DESCRIPTORS.add(OAUTH2_ACCESS_TOKEN_PROVIDER); SHARED_DESCRIPTORS.add(USER); SHARED_DESCRIPTORS.add(PASSWORD); SHARED_DESCRIPTORS.add(FOLDER); @@ -165,6 +198,21 @@ abstract class AbstractEmailProcessor extends Ab private volatile boolean shouldSetDeleteFlag; + protected volatile Optional oauth2AccessTokenProviderOptional; + + @OnScheduled + public void onScheduled(final ProcessContext context) { + if (context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).isSet()) { + OAuth2AccessTokenProvider oauth2AccessTokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class); + + oauth2AccessTokenProvider.getAccessDetails(); + + oauth2AccessTokenProviderOptional = Optional.of(oauth2AccessTokenProvider); + } else { + oauth2AccessTokenProviderOptional = Optional.empty(); + } + } + @OnStopped public void stop(ProcessContext processContext) { this.flushRemainingMessages(processContext); @@ -229,7 +277,13 @@ String buildUrl(ProcessContext processContext) { String host = processContext.getProperty(HOST).evaluateAttributeExpressions().getValue(); String port = processContext.getProperty(PORT).evaluateAttributeExpressions().getValue(); String user = processContext.getProperty(USER).evaluateAttributeExpressions().getValue(); - String password = processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); + + String password = oauth2AccessTokenProviderOptional.map(oauth2AccessTokenProvider -> { + String accessToken = oauth2AccessTokenProvider.getAccessDetails().getAccessToken(); + + return accessToken; + }).orElse(processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue()); + String folder = processContext.getProperty(FOLDER).evaluateAttributeExpressions().getValue(); StringBuilder urlBuilder = new StringBuilder(); @@ -304,9 +358,14 @@ private Properties buildJavaMailProperties(ProcessContext context) { propertyDescriptorEntry.getValue()); } } - String propertyName = this.getProtocol(context).equals("pop3") ? "mail.pop3.timeout" : "mail.imap.timeout"; + String protocol = this.getProtocol(context); + + String propertyName = protocol.equals("pop3") ? "mail.pop3.timeout" : "mail.imap.timeout"; final String timeoutInMillis = String.valueOf(context.getProperty(CONNECTION_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS)); javaMailProperties.setProperty(propertyName, timeoutInMillis); + + oauth2AccessTokenProviderOptional.ifPresent(oauth2AccessTokenProvider -> javaMailProperties.put("mail." + protocol + ".auth.mechanisms", "XOAUTH2")); + return javaMailProperties; }