Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import com.azure.core.credential.AccessToken;
import com.azure.core.credential.TokenCredential;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.ManagedIdentityCredential;
import com.azure.identity.ManagedIdentityCredentialBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
Expand Down Expand Up @@ -53,12 +55,12 @@
public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor {

public static final PropertyDescriptor ADLS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
.name("adls-credentials-service")
.displayName("ADLS Credentials")
.description("Controller Service used to obtain Azure Credentials.")
.identifiesControllerService(ADLSCredentialsService.class)
.required(true)
.build();
.name("adls-credentials-service")
.displayName("ADLS Credentials")
.description("Controller Service used to obtain Azure Credentials.")
.identifiesControllerService(ADLSCredentialsService.class)
.required(true)
.build();

public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder()
.name("filesystem-name").displayName("Filesystem Name")
Expand Down Expand Up @@ -120,17 +122,21 @@ public static DataLakeServiceClient getStorageClient(PropertyContext context, Fl

final ADLSCredentialsService credentialsService = context.getProperty(ADLS_CREDENTIALS_SERVICE).asControllerService(ADLSCredentialsService.class);

ADLSCredentialsDetails credentialsDetails = credentialsService.getCredentialsDetails(attributes);
final ADLSCredentialsDetails credentialsDetails = credentialsService.getCredentialsDetails(attributes);

final String accountName = credentialsDetails.getAccountName();
final String accountKey = credentialsDetails.getAccountKey();
final String sasToken = credentialsDetails.getSasToken();
final AccessToken accessToken = credentialsDetails.getAccessToken();
final String endpointSuffix = credentialsDetails.getEndpointSuffix();
final boolean useManagedIdentity = credentialsDetails.getUseManagedIdentity();
final String servicePrincipalTenantId = credentialsDetails.getServicePrincipalTenantId();
final String servicePrincipalClientId = credentialsDetails.getServicePrincipalClientId();
final String servicePrincipalClientSecret = credentialsDetails.getServicePrincipalClientSecret();

final String endpoint = String.format("https://%s.%s", accountName,endpointSuffix);
DataLakeServiceClient storageClient;

final DataLakeServiceClient storageClient;
if (StringUtils.isNotBlank(accountKey)) {
final StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName,
accountKey);
Expand All @@ -140,17 +146,28 @@ public static DataLakeServiceClient getStorageClient(PropertyContext context, Fl
storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).sasToken(sasToken)
.buildClient();
} else if (accessToken != null) {
TokenCredential credential = tokenRequestContext -> Mono.just(accessToken);
final TokenCredential credential = tokenRequestContext -> Mono.just(accessToken);

storageClient = new DataLakeServiceClientBuilder().endpoint(endpoint).credential(credential)
.buildClient();
} else if(useManagedIdentity){
final ManagedIdentityCredential misCrendential = new ManagedIdentityCredentialBuilder()
.build();
storageClient = new DataLakeServiceClientBuilder()
.endpoint(endpoint)
.credential(misCrendential)
.buildClient();
.buildClient();
} else if (useManagedIdentity) {
final ManagedIdentityCredential misCredential = new ManagedIdentityCredentialBuilder()
.build();
storageClient = new DataLakeServiceClientBuilder()
.endpoint(endpoint)
.credential(misCredential)
.buildClient();
} else if (StringUtils.isNoneBlank(servicePrincipalTenantId, servicePrincipalClientId, servicePrincipalClientSecret)) {
final ClientSecretCredential credential = new ClientSecretCredentialBuilder()
.tenantId(servicePrincipalTenantId)
.clientId(servicePrincipalClientId)
.clientSecret(servicePrincipalClientSecret)
.build();

storageClient = new DataLakeServiceClientBuilder()
.endpoint(endpoint)
.credential(credential)
.buildClient();
} else {
throw new IllegalArgumentException("No valid credentials were provided");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;

Expand All @@ -35,7 +36,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.function.BiConsumer;
import java.util.function.Function;

Expand All @@ -49,37 +49,81 @@
public class ADLSCredentialsControllerService extends AbstractControllerService implements ADLSCredentialsService {

public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AzureStorageUtils.ACCOUNT_NAME)
.description(AzureStorageUtils.ACCOUNT_NAME_BASE_DESCRIPTION)
.required(true)
.build();
.fromPropertyDescriptor(AzureStorageUtils.ACCOUNT_NAME)
.description(AzureStorageUtils.ACCOUNT_NAME_BASE_DESCRIPTION)
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();

public static final PropertyDescriptor ENDPOINT_SUFFIX = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AzureStorageUtils.ENDPOINT_SUFFIX)
.displayName("Endpoint Suffix")
.description(
"Storage accounts in public Azure always use a common FQDN suffix. " +
"Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions).")
.required(true)
.defaultValue("dfs.core.windows.net")
.build();
.fromPropertyDescriptor(AzureStorageUtils.ENDPOINT_SUFFIX)
.displayName("Endpoint Suffix")
.description("Storage accounts in public Azure always use a common FQDN suffix. " +
"Override this endpoint suffix with a different suffix in certain circumstances (like Azure Stack or non-public Azure regions).")
.required(true)
.defaultValue("dfs.core.windows.net")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();

public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AzureStorageUtils.ACCOUNT_KEY)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();

public static final PropertyDescriptor SAS_TOKEN = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(AzureStorageUtils.PROP_SAS_TOKEN)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();

public static final PropertyDescriptor USE_MANAGED_IDENTITY = new PropertyDescriptor.Builder()
.name("storage-use-managed-identity")
.displayName("Use Azure Managed Identity")
.description("Choose whether or not to use the managed identity of Azure VM/VMSS ")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
.name("storage-use-managed-identity")
.displayName("Use Azure Managed Identity")
.description("Choose whether or not to use the managed identity of Azure VM/VMSS ")
.required(false)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

public static final PropertyDescriptor SERVICE_PRINCIPAL_TENANT_ID = new PropertyDescriptor.Builder()
.name("service-principal-tenant-id")
.displayName("Service Principal Tenant ID")
.description("Tenant ID of the Azure Active Directory hosting the Service Principal. The property is required when Service Principal authentication is used.")
.sensitive(true)
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();

public static final PropertyDescriptor SERVICE_PRINCIPAL_CLIENT_ID = new PropertyDescriptor.Builder()
.name("service-principal-client-id")
.displayName("Service Principal Client ID")
.description("Client ID (or Application ID) of the Client/Application having the Service Principal. The property is required when Service Principal authentication is used.")
.sensitive(true)
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();

public static final PropertyDescriptor SERVICE_PRINCIPAL_CLIENT_SECRET = new PropertyDescriptor.Builder()
.name("service-principal-client-secret")
.displayName("Service Principal Client Secret")
.description("Password of the Client/Application. The property is required when Service Principal authentication is used.")
.sensitive(true)
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();

private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
ACCOUNT_NAME,
ENDPOINT_SUFFIX,
AzureStorageUtils.ACCOUNT_KEY,
AzureStorageUtils.PROP_SAS_TOKEN,
USE_MANAGED_IDENTITY
ACCOUNT_NAME,
ENDPOINT_SUFFIX,
ACCOUNT_KEY,
SAS_TOKEN,
USE_MANAGED_IDENTITY,
SERVICE_PRINCIPAL_TENANT_ID,
SERVICE_PRINCIPAL_CLIENT_ID,
SERVICE_PRINCIPAL_CLIENT_SECRET
));

private ConfigurationContext context;
Expand All @@ -93,20 +137,41 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();

boolean accountKeySet = StringUtils.isNotBlank(validationContext.getProperty(AzureStorageUtils.ACCOUNT_KEY).getValue());
boolean sasTokenSet = StringUtils.isNotBlank(validationContext.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).getValue());
boolean accountKeySet = StringUtils.isNotBlank(validationContext.getProperty(ACCOUNT_KEY).getValue());
boolean sasTokenSet = StringUtils.isNotBlank(validationContext.getProperty(SAS_TOKEN).getValue());
boolean useManagedIdentitySet = validationContext.getProperty(USE_MANAGED_IDENTITY).asBoolean();

if (!onlyOneSet(accountKeySet, sasTokenSet, useManagedIdentitySet)) {
StringJoiner options = new StringJoiner(", ")
.add(AzureStorageUtils.ACCOUNT_KEY.getDisplayName())
.add(AzureStorageUtils.PROP_SAS_TOKEN.getDisplayName())
.add(USE_MANAGED_IDENTITY.getDisplayName());
boolean servicePrincipalTenantIdSet = StringUtils.isNotBlank(validationContext.getProperty(SERVICE_PRINCIPAL_TENANT_ID).getValue());
boolean servicePrincipalClientIdSet = StringUtils.isNotBlank(validationContext.getProperty(SERVICE_PRINCIPAL_CLIENT_ID).getValue());
boolean servicePrincipalClientSecretSet = StringUtils.isNotBlank(validationContext.getProperty(SERVICE_PRINCIPAL_CLIENT_SECRET).getValue());

boolean servicePrincipalSet = servicePrincipalTenantIdSet || servicePrincipalClientIdSet || servicePrincipalClientSecretSet;

if (!onlyOneSet(accountKeySet, sasTokenSet, useManagedIdentitySet, servicePrincipalSet)) {
results.add(new ValidationResult.Builder().subject(this.getClass().getSimpleName())
.valid(false)
.explanation("one and only one of [" + options + "] should be set")
.explanation("one and only one authentication method of [Account Key, SAS Token, Managed Identity, Service Principal] should be used")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to use displayName() here?

Suggested change
.explanation("one and only one authentication method of [Account Key, SAS Token, Managed Identity, Service Principal] should be used")
.explanation(String.format("one and only one authentication method of [%s, %s, %s, Service Principal] should be used",
ACCOUNT_KEY.displayName(), SAS_TOKEN.displayName(), USE_MANAGED_IDENTITY.displayName()))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Account Key and SAS Token would be fine but I would simply use Managed Identity (without the "Use" prefix from the displayname).
I think it is something we should rather fix when the new Authentication Type property is added (which will have the same AllowableValue-s that would be needed here too).

.build());
} else if (servicePrincipalSet) {
String template = "'%s' must be set when Service Principal authentication is being configured";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String template = "'%s' must be set when Service Principal authentication is being configured";
final String template = "'%s' must be set when Service Principal authentication is being configured";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for this, I would not add a new commit. Will fix it in the follow-up jira (where customValidate() will be modified).

if (!servicePrincipalTenantIdSet) {
results.add(new ValidationResult.Builder().subject(this.getClass().getSimpleName())
.valid(false)
.explanation(String.format(template, SERVICE_PRINCIPAL_TENANT_ID.getDisplayName()))
.build());
}
if (!servicePrincipalClientIdSet) {
results.add(new ValidationResult.Builder().subject(this.getClass().getSimpleName())
.valid(false)
.explanation(String.format(template, SERVICE_PRINCIPAL_CLIENT_ID.getDisplayName()))
.build());
}
if (!servicePrincipalClientSecretSet) {
results.add(new ValidationResult.Builder().subject(this.getClass().getSimpleName())
.valid(false)
.explanation(String.format(template, SERVICE_PRINCIPAL_CLIENT_SECRET.getDisplayName()))
.build());
}
}

return results;
Expand All @@ -129,23 +194,29 @@ public void onEnabled(ConfigurationContext context) {
public ADLSCredentialsDetails getCredentialsDetails(Map<String, String> attributes) {
ADLSCredentialsDetails.Builder credentialsBuilder = ADLSCredentialsDetails.Builder.newBuilder();

setValue(credentialsBuilder, ACCOUNT_NAME, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setAccountName);
setValue(credentialsBuilder, AzureStorageUtils.ACCOUNT_KEY, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setAccountKey);
setValue(credentialsBuilder, AzureStorageUtils.PROP_SAS_TOKEN, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setSasToken);
setValue(credentialsBuilder, ENDPOINT_SUFFIX, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setEndpointSuffix);
setValue(credentialsBuilder, USE_MANAGED_IDENTITY, PropertyValue::asBoolean, ADLSCredentialsDetails.Builder::setUseManagedIdentity);
setValue(credentialsBuilder, ACCOUNT_NAME, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setAccountName, attributes);
setValue(credentialsBuilder, ACCOUNT_KEY, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setAccountKey, attributes);
setValue(credentialsBuilder, SAS_TOKEN, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setSasToken, attributes);
setValue(credentialsBuilder, ENDPOINT_SUFFIX, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setEndpointSuffix, attributes);
setValue(credentialsBuilder, USE_MANAGED_IDENTITY, PropertyValue::asBoolean, ADLSCredentialsDetails.Builder::setUseManagedIdentity, attributes);
setValue(credentialsBuilder, SERVICE_PRINCIPAL_TENANT_ID, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setServicePrincipalTenantId, attributes);
setValue(credentialsBuilder, SERVICE_PRINCIPAL_CLIENT_ID, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setServicePrincipalClientId, attributes);
setValue(credentialsBuilder, SERVICE_PRINCIPAL_CLIENT_SECRET, PropertyValue::getValue, ADLSCredentialsDetails.Builder::setServicePrincipalClientSecret, attributes);

return credentialsBuilder.build();
}

private <T> void setValue(
ADLSCredentialsDetails.Builder credentialsBuilder,
PropertyDescriptor propertyDescriptor, Function<PropertyValue, T> getPropertyValue,
BiConsumer<ADLSCredentialsDetails.Builder, T> setBuilderValue
ADLSCredentialsDetails.Builder credentialsBuilder,
PropertyDescriptor propertyDescriptor, Function<PropertyValue, T> getPropertyValue,
BiConsumer<ADLSCredentialsDetails.Builder, T> setBuilderValue, Map<String, String> attributes
) {
PropertyValue property = context.getProperty(propertyDescriptor);

if (property.isSet()) {
if (propertyDescriptor.isExpressionLanguageSupported()) {
property = property.evaluateAttributeExpressions(attributes);
}
T value = getPropertyValue.apply(property);
setBuilderValue.accept(credentialsBuilder, value);
}
Expand Down
Loading