Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-6550: Create controller service for Azure Storage Credentials #3742

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions nifi-assembly/pom.xml
Expand Up @@ -451,6 +451,12 @@ language governing permissions and limitations under the License. -->
<version>1.10.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-services-api-nar</artifactId>
<version>1.10.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-scripting-nar</artifactId>
Expand Down
29 changes: 29 additions & 0 deletions nifi-mock/src/main/java/org/apache/nifi/util/NoOpProcessor.java
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.util;

import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;

public class NoOpProcessor extends AbstractProcessor {

@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
}
2 changes: 1 addition & 1 deletion nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
Expand Up @@ -38,7 +38,7 @@

<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<artifactId>nifi-azure-services-api-nar</artifactId>
<version>1.10.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
Expand Down
18 changes: 11 additions & 7 deletions nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
Expand Up @@ -44,6 +44,11 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-proxy-configuration-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-azure-services-api</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
Expand All @@ -57,13 +62,12 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>5.2.0</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- overriding jackson-core in azure-storage -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
Expand Down
Expand Up @@ -56,9 +56,10 @@ public abstract class AbstractAzureBlobProcessor extends AbstractProcessor {
private static final List<PropertyDescriptor> PROPERTIES = Collections
.unmodifiableList(Arrays.asList(
AzureStorageUtils.CONTAINER,
AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
AzureStorageUtils.PROP_SAS_TOKEN,
BLOB,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));

Expand Down
Expand Up @@ -48,7 +48,6 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
Expand Down Expand Up @@ -91,6 +90,8 @@
})
public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {

private static final String FORMAT_STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";

static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
.name("event-hub-namespace")
.displayName("Event Hub Namespace")
Expand Down Expand Up @@ -626,7 +627,7 @@ private void registerEventProcessor(final ProcessContext context) throws Excepti
.evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
options.setReceiveTimeOut(Duration.ofMillis(receiveTimeoutMillis));

final String storageConnectionString = String.format(AzureStorageUtils.FORMAT_BLOB_CONNECTION_STRING, storageAccountName, storageAccountKey);
final String storageConnectionString = String.format(FORMAT_STORAGE_CONNECTION_STRING, storageAccountName, storageAccountKey);

final ConnectionStringBuilder eventHubConnectionString = new ConnectionStringBuilder(namespaceName, eventHubName, sasName, sasKey);

Expand Down
Expand Up @@ -96,9 +96,10 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
LISTING_STRATEGY,
AzureStorageUtils.CONTAINER,
AzureStorageUtils.PROP_SAS_TOKEN,
AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE,
AzureStorageUtils.ACCOUNT_NAME,
AzureStorageUtils.ACCOUNT_KEY,
AzureStorageUtils.PROP_SAS_TOKEN,
PROP_PREFIX,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
ListedEntityTracker.TRACKING_STATE_CACHE,
Expand All @@ -113,6 +114,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

@Override
protected void customValidate(ValidationContext validationContext, Collection<ValidationResult> results) {
results.addAll(AzureStorageUtils.validateCredentialProperties(validationContext));
AzureStorageUtils.validateProxySpec(validationContext, results);
}

Expand Down
@@ -1,110 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;

import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature;
import com.microsoft.azure.storage.queue.CloudQueueClient;

import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;

import java.net.URI;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

public abstract class AbstractAzureQueueStorage extends AbstractProcessor {

public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
.name("storage-queue-name")
.displayName("Queue Name")
.description("Name of the Azure Storage Queue")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All successfully processed FlowFiles are routed to this relationship")
.build();

public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Unsuccessful operations will be transferred to the failure relationship.")
.build();

private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net";

private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));

@Override
public Set<Relationship> getRelationships() {
return relationships;
}

protected final CloudQueueClient createCloudQueueClient(final ProcessContext context, final FlowFile flowFile) {
final String storageAccountName;
final String storageAccountKey;
final String sasToken;
final String connectionString;

if (flowFile == null) {
storageAccountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
storageAccountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions().getValue();
} else {
storageAccountName = context.getProperty(AzureStorageUtils.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
storageAccountKey = context.getProperty(AzureStorageUtils.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
sasToken = context.getProperty(AzureStorageUtils.PROP_SAS_TOKEN).evaluateAttributeExpressions(flowFile).getValue();
}

CloudQueueClient cloudQueueClient;
try {
if (StringUtils.isNoneBlank(sasToken)) {
connectionString = String.format(FORMAT_QUEUE_BASE_URI, storageAccountName);
StorageCredentials storageCredentials = new StorageCredentialsSharedAccessSignature(sasToken);
cloudQueueClient = new CloudQueueClient(new URI(connectionString), storageCredentials);
} else {
connectionString = String.format(FORMAT_QUEUE_CONNECTION_STRING, storageAccountName, storageAccountKey);
CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
cloudQueueClient = storageAccount.createCloudQueueClient();
}
} catch (IllegalArgumentException | URISyntaxException e) {
getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e);
throw new IllegalArgumentException(e);
} catch (InvalidKeyException e) {
getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e);
throw new IllegalArgumentException(e);
}
return cloudQueueClient;
}

}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.azure.storage.queue;

import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.queue.CloudQueueClient;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.services.azure.storage.AzureStorageCredentialsDetails;

import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

public abstract class AbstractAzureQueueStorage extends AbstractProcessor {

public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder()
.name("storage-queue-name")
.displayName("Queue Name")
.description("Name of the Azure Storage Queue")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All successfully processed FlowFiles are routed to this relationship")
.build();

public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Unsuccessful operations will be transferred to the failure relationship.")
.build();

private static final Set<Relationship> relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));

@Override
public Set<Relationship> getRelationships() {
return relationships;
}

protected final CloudQueueClient createCloudQueueClient(final ProcessContext context, final FlowFile flowFile) throws URISyntaxException {
final AzureStorageCredentialsDetails storageCredentialsDetails = AzureStorageUtils.getStorageCredentialsDetails(context, flowFile);
final CloudStorageAccount cloudStorageAccount = new CloudStorageAccount(storageCredentialsDetails.getStorageCredentials(), true, null, storageCredentialsDetails.getStorageAccountName());
final CloudQueueClient cloudQueueClient = cloudStorageAccount.createCloudQueueClient();

return cloudQueueClient;
}

@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return AzureStorageUtils.validateCredentialProperties(validationContext);
}
}
Expand Up @@ -94,8 +94,8 @@ public class GetAzureQueueStorage extends AbstractAzureQueueStorage {
.build();

private static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays.asList(
AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN, QUEUE, AUTO_DELETE,
BATCH_SIZE, VISIBILITY_TIMEOUT, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));
AzureStorageUtils.STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.ACCOUNT_NAME, AzureStorageUtils.ACCOUNT_KEY, AzureStorageUtils.PROP_SAS_TOKEN,
QUEUE, AUTO_DELETE, BATCH_SIZE, VISIBILITY_TIMEOUT, AzureStorageUtils.PROXY_CONFIGURATION_SERVICE));

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
Expand Down