From dc266d42f7acbb5654a89250bfe33e5c32a429da Mon Sep 17 00:00:00 2001
From: Simon Elliston Ball
Date: Mon, 2 May 2016 00:35:34 +0100
Subject: [PATCH 01/11] NIFI-1833 - Azure Storage processors
---
.../nifi-azure-bundle/nifi-azure-nar/pom.xml | 6 +
.../nifi-azure-processors/pom.xml | 47 ++++-
.../azure/AbstractAzureBlobProcessor.java | 39 ++++
.../azure/AbstractAzureProcessor.java | 85 ++++++++
.../nifi/processors/azure/AzureConstants.java | 38 ++++
.../azure/storage/FetchAzureBlobStorage.java | 114 +++++++++++
.../azure/storage/ListAzureBlobStorage.java | 193 ++++++++++++++++++
.../azure/storage/PutAzureBlobStorage.java | 116 +++++++++++
.../azure/storage/utils/BlobInfo.java | 188 +++++++++++++++++
.../org.apache.nifi.processor.Processor | 5 +-
.../azure/storage/AbstractAzureIT.java | 106 ++++++++++
.../storage/ITFetchAzureBlobStorage.java | 61 ++++++
.../azure/storage/ITListAzureBlobStorage.java | 75 +++++++
.../azure/storage/ITPutAzureStorageBlob.java | 51 +++++
14 files changed, 1114 insertions(+), 10 deletions(-)
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
index f823e6af944e..f75bb7f9b0a9 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
@@ -35,6 +35,12 @@
nifi-azure-processors1.2.0-SNAPSHOT
+
+
+ org.apache.nifi
+ nifi-standard-services-api-nar
+ nar
+
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 9049b3f1065e..8330bcc7b320 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -1,14 +1,14 @@
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
4.0.0
@@ -31,11 +31,35 @@ language governing permissions and limitations under the License. -->
org.apache.nifinifi-utils
+
+ org.apache.nifi
+ nifi-ssl-context-service-api
+ provided
+
+
+ org.apache.avro
+ avro
+ com.microsoft.azureazure-eventhubs0.9.0
+
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.8.6
+
+
+
+ com.microsoft.azure
+ azure-storage
+ 5.0.0
+ org.apache.nifinifi-mock
@@ -57,5 +81,10 @@ language governing permissions and limitations under the License. -->
${powermock.version}test
+
+ org.apache.nifi
+ nifi-standard-processors
+ ${project.version}
+
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
new file mode 100644
index 000000000000..82eae123a4d5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor {
+
+ public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
+
+ public static final List properties = Collections
+ .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
new file mode 100644
index 000000000000..5ab1f8bfbf57
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+
+public abstract class AbstractAzureProcessor extends AbstractProcessor {
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
+ protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any failed fetches will be transferred to the failure relation.").build();
+ public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+ protected CloudStorageAccount createStorageConnection(ProcessContext context) {
+ final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
+ final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
+ return createStorageConnectionFromNameAndKey(accountName, accountKey);
+ }
+
+ protected CloudStorageAccount createStorageConnection(ProcessContext context, FlowFile flowFile) {
+ final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+ return createStorageConnectionFromNameAndKey(accountName, accountKey);
+ }
+
+ private CloudStorageAccount createStorageConnectionFromNameAndKey(String accountName, String accountKey) {
+ final String storageConnectionString = String.format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s", accountName, accountKey);
+ try {
+ return createStorageAccountFromConnectionString(storageConnectionString);
+ } catch (InvalidKeyException | IllegalArgumentException | URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format.
+ *
+ * @param storageConnectionString
+ * Connection string for the storage service or the emulator
+ * @return The newly created CloudStorageAccount object
+ *
+ */
+ protected static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
+ CloudStorageAccount storageAccount;
+ try {
+ storageAccount = CloudStorageAccount.parse(storageConnectionString);
+ } catch (IllegalArgumentException | URISyntaxException e) {
+ throw e;
+ } catch (InvalidKeyException e) {
+ throw e;
+ }
+ return storageAccount;
+ }
+
+ @Override
+ public Set getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
new file mode 100644
index 000000000000..eaa234caa2b5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public final class AzureConstants {
+ public static final String BLOCK = "Block";
+ public static final String PAGE = "Page";
+
+ public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder().name("Storage Account Key").description("The storage account key")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
+
+ public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder().name("Storage Account Name").description("The storage account name")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
+
+ public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("Container name").description("Name of the azure storage container")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
+
+ private AzureConstants() {
+ // do not instantiate
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
new file mode 100644
index 000000000000..2229cfd097c8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
+import org.apache.nifi.processors.azure.AzureConstants;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
+@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile")
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched")
+})
+public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
+ public static final List PROPERTIES = Collections
+ .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final long startNanos = System.nanoTime();
+
+ String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
+ String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
+
+ try {
+ CloudStorageAccount storageAccount = createStorageConnection(context, flowFile);
+ CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+ CloudBlobContainer container = blobClient.getContainerReference(containerName);
+
+ final Map attributes = new HashMap<>();
+ final CloudBlob blob = container.getBlockBlobReference(blobPath);
+
+ // TODO - we may be able do fancier things with ranges and
+ // distribution of download over threads, investigate
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream os) throws IOException {
+ try {
+ blob.download(os);
+ } catch (StorageException e) {
+ throw new IOException(e);
+ }
+ }
+ });
+
+ long length = blob.getProperties().getLength();
+ attributes.put("azure.length", String.valueOf(length));
+
+ if (!attributes.isEmpty()) {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ }
+
+ session.transfer(flowFile, REL_SUCCESS);
+ final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().fetch(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
+
+ } catch (IllegalArgumentException | URISyntaxException | StorageException e1) {
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
new file mode 100644
index 000000000000..f4a793b83dfa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
+import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
+import org.apache.nifi.processors.standard.AbstractListProcessor;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.StorageUri;
+import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+
+@TriggerSerially
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
+@SeeAlso({ FetchAzureBlobStorage.class })
+@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"),
+ @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"), @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
+ @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"), @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
+ @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
+ @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), @WritesAttribute(attribute = "lang", description = "Language code for the content"),
+ @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
+@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
+ + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
+public class ListAzureBlobStorage extends AbstractListProcessor {
+
+ private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true).required(false).build();
+
+ public static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ protected Map createAttributes(BlobInfo entity, ProcessContext context) {
+ final Map attributes = new HashMap<>();
+ attributes.put("azure.etag", entity.getEtag());
+ attributes.put("azure.primaryUri", entity.getPrimaryUri());
+ attributes.put("azure.secondaryUri", entity.getSecondaryUri());
+ attributes.put("azure.blobname", entity.getName());
+ attributes.put("azure.blobtype", entity.getBlobType());
+ attributes.put("azure.length", String.valueOf(entity.getLength()));
+ attributes.put("azure.timestamp", String.valueOf(entity.getTimestamp()));
+ attributes.put("mime.type", entity.getContentType());
+ attributes.put("lang", entity.getContentLanguage());
+
+ return attributes;
+ }
+
+ @Override
+ protected String getPath(final ProcessContext context) {
+ return context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue();
+ }
+
+ @Override
+ protected boolean isListingResetNecessary(final PropertyDescriptor property) {
+ // TODO - implement
+ return false;
+ }
+
+ @Override
+ protected Scope getStateScope(final ProcessContext context) {
+ return Scope.CLUSTER;
+ }
+
+ @Override
+ protected List performListing(final ProcessContext context, final Long minTimestamp) throws IOException {
+ String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue();
+ String prefix = context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+ if (prefix == null) {
+ prefix = "";
+ }
+ final List listing = new ArrayList<>();
+ try {
+ CloudStorageAccount storageAccount = createStorageConnection(context);
+ CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+ CloudBlobContainer container = blobClient.getContainerReference(containerName);
+
+ BlobRequestOptions blobRequestOptions = null;
+ OperationContext operationContext = null;
+
+ for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), blobRequestOptions, operationContext)) {
+ if (blob instanceof CloudBlob) {
+ CloudBlob cloudBlob = (CloudBlob) blob;
+ BlobProperties properties = cloudBlob.getProperties();
+ StorageUri uri = cloudBlob.getSnapshotQualifiedStorageUri();
+
+ Builder builder = new BlobInfo.Builder().primaryUri(uri.getPrimaryUri().toString()).secondaryUri(uri.getSecondaryUri().toString()).contentType(properties.getContentType())
+ .contentLanguage(properties.getContentLanguage()).etag(properties.getEtag()).lastModifiedTime(properties.getLastModified().getTime()).length(properties.getLength());
+
+ if (blob instanceof CloudBlockBlob) {
+ builder.blobType(AzureConstants.BLOCK);
+ } else {
+ builder.blobType(AzureConstants.PAGE);
+ }
+ listing.add(builder.build());
+ }
+ }
+ } catch (IllegalArgumentException | URISyntaxException | StorageException e) {
+ throw (new IOException(e));
+ }
+ return listing;
+ }
+
+ protected static CloudStorageAccount createStorageConnection(ProcessContext context) {
+ final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
+ final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
+ final String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", accountName, accountKey);
+ try {
+ return createStorageAccountFromConnectionString(storageConnectionString);
+ } catch (InvalidKeyException | URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ /**
+ * Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format.
+ *
+ * @param storageConnectionString
+ * Connection string for the storage service or the emulator
+ * @return The newly created CloudStorageAccount object
+ *
+ */
+ private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
+
+ CloudStorageAccount storageAccount;
+ try {
+ storageAccount = CloudStorageAccount.parse(storageConnectionString);
+ } catch (IllegalArgumentException | URISyntaxException e) {
+ System.out.println("\nConnection string specifies an invalid URI.");
+ System.out.println("Please confirm the connection string is in the Azure connection string format.");
+ throw e;
+ } catch (InvalidKeyException e) {
+ System.out.println("\nConnection string specifies an invalid key.");
+ System.out.println("Please confirm the AccountName and AccountKey in the connection string are valid.");
+ throw e;
+ }
+ return storageAccount;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
new file mode 100644
index 000000000000..1327a0b2b519
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
+import org.apache.nifi.processors.azure.AzureConstants;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
+@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class })
+@CapabilityDescription("Puts content into an Azure Storage Blob")
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"),
+ @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"),
+ @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
+ @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
+ @WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
+ @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
+ @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
+public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
+
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final long startNanos = System.nanoTime();
+
+ String containerName = context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
+
+ String blobPath = context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
+
+ try {
+ CloudStorageAccount storageAccount = createStorageConnection(context, flowFile);
+ CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+ CloudBlobContainer container = blobClient.getContainerReference(containerName);
+
+ CloudBlob blob = container.getBlockBlobReference(blobPath);
+
+ final Map attributes = new HashMap<>();
+ long length = flowFile.getSize();
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream rawIn) throws IOException {
+ final InputStream in = new BufferedInputStream(rawIn);
+ try {
+ blob.upload(in, length);
+ BlobProperties properties = blob.getProperties();
+ attributes.put("azure.container", containerName);
+ attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
+ attributes.put("azure.etag", properties.getEtag());
+ attributes.put("azure.length", String.valueOf(length));
+ attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
+ } catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+ });
+
+ if (!attributes.isEmpty()) {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ }
+ session.transfer(flowFile, REL_SUCCESS);
+
+ final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+ session.getProvenanceReporter().send(flowFile, blob.getSnapshotQualifiedUri().toString(), transferMillis);
+
+ } catch (IllegalArgumentException | URISyntaxException | StorageException e) {
+ getLogger().error("Failed to put Azure blob {}", new Object[]{blobPath}, e);
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
new file mode 100644
index 000000000000..d429878b6df2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.io.Serializable;
+
+import org.apache.nifi.processors.standard.util.ListableEntity;
+
+public class BlobInfo implements Comparable, Serializable, ListableEntity {
+ private static final long serialVersionUID = 1L;
+
+ private final String primaryUri;
+ private final String secondaryUri;
+ private final String contentType;
+ private final String contentLanguage;
+ private final String etag;
+ private final long lastModifiedTime;
+ private final long length;
+ private final String blobType;
+
+ public static long getSerialversionuid() {
+ return serialVersionUID;
+ }
+
+ public String getPrimaryUri() {
+ return primaryUri;
+ }
+
+ public String getSecondaryUri() {
+ return secondaryUri;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public String getContentLanguage() {
+ return contentLanguage;
+ }
+
+ public String getEtag() {
+ return etag;
+ }
+
+ public long getLastModifiedTime() {
+ return lastModifiedTime;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public String getBlobType() {
+ return blobType;
+ }
+
+ public static final class Builder {
+ private String primaryUri;
+ private String secondaryUri;
+ private String contentType;
+ private String contentLanguage;
+ private String etag;
+ private long lastModifiedTime;
+ private long length;
+ private String blobType;
+
+ public Builder primaryUri(String primaryUri) {
+ this.primaryUri = primaryUri;
+ return this;
+ }
+
+ public Builder secondaryUri(String secondaryUri) {
+ this.secondaryUri = secondaryUri;
+ return this;
+ }
+
+ public Builder contentType(String contentType) {
+ this.contentType = contentType;
+ return this;
+ }
+
+ public Builder contentLanguage(String contentLanguage) {
+ this.contentLanguage = contentLanguage;
+ return this;
+ }
+
+ public Builder etag(String etag) {
+ this.etag = etag;
+ return this;
+ }
+
+ public Builder lastModifiedTime(long lastModifiedTime) {
+ this.lastModifiedTime = lastModifiedTime;
+ return this;
+ }
+
+ public Builder length(long length) {
+ this.length = length;
+ return this;
+ }
+
+ public Builder blobType(String blobType) {
+ this.blobType = blobType;
+ return this;
+ }
+
+ public BlobInfo build() {
+ return new BlobInfo(this);
+ }
+
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((etag == null) ? 0 : etag.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ BlobInfo other = (BlobInfo) obj;
+ if (etag == null) {
+ if (other.etag != null) {
+ return false;
+ }
+ } else if (!etag.equals(other.etag)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int compareTo(BlobInfo o) {
+ return etag.compareTo(o.etag);
+ }
+
+ protected BlobInfo(final Builder builder) {
+ this.primaryUri = builder.primaryUri;
+ this.secondaryUri = builder.secondaryUri;
+ this.contentType = builder.contentType;
+ this.contentLanguage = builder.contentLanguage;
+ this.etag = builder.etag;
+ this.lastModifiedTime = builder.lastModifiedTime;
+ this.length = builder.length;
+ this.blobType = builder.blobType;
+ }
+
+ @Override
+ public String getName() {
+ String primaryUri = getPrimaryUri();
+ return primaryUri.substring(primaryUri.lastIndexOf('/') + 1);
+ }
+
+ @Override
+ public String getIdentifier() {
+ return getPrimaryUri();
+ }
+
+ @Override
+ public long getTimestamp() {
+ return getLastModifiedTime();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 178e52c98425..84b3300f4cef 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,4 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.azure.eventhub.PutAzureEventHub
-org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
\ No newline at end of file
+org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
+org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage
+org.apache.nifi.processors.azure.storage.ListAzureBlobStorage
+org.apache.nifi.processors.azure.storage.PutAzureBlobStorage
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
new file mode 100644
index 000000000000..34702eb96f90
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import static org.junit.Assert.fail;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Properties;
+
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import com.microsoft.azure.storage.table.CloudTable;
+import com.microsoft.azure.storage.table.CloudTableClient;
+
+public abstract class AbstractAzureIT {
+ protected static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
+ public static final String TEST_CONTAINER_NAME = "nifitest";
+
+ private static final Properties CONFIG;
+ protected static final String TEST_BLOB_NAME = "testing";
+ protected static final String TEST_TABLE_NAME = "testing";
+
+ static {
+ final FileInputStream fis;
+ CONFIG = new Properties();
+ try {
+ fis = new FileInputStream(CREDENTIALS_FILE);
+ try {
+ CONFIG.load(fis);
+ } catch (IOException e) {
+ fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+ } finally {
+ FileUtils.closeQuietly(fis);
+ }
+ } catch (FileNotFoundException e) {
+ fail("Could not open credentials file " + CREDENTIALS_FILE + ": " + e.getLocalizedMessage());
+ }
+
+ }
+
+ @BeforeClass
+ public static void oneTimeSetup() throws StorageException, InvalidKeyException, URISyntaxException {
+ CloudBlobContainer container = getContainer();
+ container.createIfNotExists();
+ }
+
+ @AfterClass
+ public static void tearDown() throws InvalidKeyException, URISyntaxException, StorageException {
+ CloudBlobContainer container = getContainer();
+ for (ListBlobItem blob : container.listBlobs()) {
+ if (blob instanceof CloudBlob) {
+ ((CloudBlob) blob).delete(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS, null, null, null);
+ }
+ }
+ }
+
+ public static String getAccountName() {
+ return CONFIG.getProperty("accountName");
+ }
+
+ public static String getAccountKey() {
+ return CONFIG.getProperty("accountKey");
+ }
+
+ protected static CloudBlobContainer getContainer() throws InvalidKeyException, URISyntaxException, StorageException {
+ String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey());
+ CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
+ CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+ return blobClient.getContainerReference(TEST_CONTAINER_NAME);
+ }
+
+ protected static CloudTable getTable() throws InvalidKeyException, URISyntaxException, StorageException {
+ String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey());
+ CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
+ CloudTableClient tableClient = storageAccount.createCloudTableClient();
+ return tableClient.getTableReference(TEST_TABLE_NAME);
+ }
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
new file mode 100644
index 000000000000..1e8a8f702b87
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import com.microsoft.azure.storage.StorageException;
+
+public class ITFetchAzureBlobStorage extends AbstractAzureIT {
+
+ @Test
+ public void testFetchingBlob() throws InvalidKeyException, URISyntaxException, StorageException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new FetchAzureBlobStorage());
+
+ runner.setValidateExpressionUsage(true);
+
+ runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
+ runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
+ runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
+ runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}");
+
+ final Map attributes = new HashMap<>();
+ attributes.put("azure.primaryUri", "http://" + getAccountName() + ".blob.core.windows.net/" + TEST_CONTAINER_NAME + "/" + TEST_BLOB_NAME);
+ attributes.put("azure.blobname", TEST_BLOB_NAME);
+ attributes.put("azure.blobtype", AzureConstants.BLOCK);
+ runner.enqueue(new byte[0], attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage.REL_SUCCESS, 1);
+ List flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
+ for (MockFlowFile flowFile : flowFilesForRelationship) {
+ flowFile.assertContentEquals("0123456789".getBytes());
+ flowFile.assertAttributeEquals("azure.length", "10");
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
new file mode 100644
index 000000000000..277538cbcee2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+
+public class ITListAzureBlobStorage extends AbstractAzureIT {
+
+ @BeforeClass
+ public static void setupSomeFiles() throws InvalidKeyException, URISyntaxException, StorageException, IOException {
+ CloudBlobContainer container = getContainer();
+ container.createIfNotExists();
+
+ CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME);
+ byte[] buf = "0123456789".getBytes();
+ InputStream in = new ByteArrayInputStream(buf);
+ blob.upload(in, 10);
+ }
+
+ @AfterClass
+ public static void tearDown() throws InvalidKeyException, URISyntaxException, StorageException {
+ CloudBlobContainer container = getContainer();
+ container.deleteIfExists();
+ }
+
+ @Test
+ public void testListsAzureBlobStorageContent() {
+ final TestRunner runner = TestRunners.newTestRunner(new ListAzureBlobStorage());
+
+ runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
+ runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
+ runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
+
+ // requires multiple runs to deal with List processor checking
+ runner.run(3);
+
+ runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
+ runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 1);
+
+ for (MockFlowFile entry : runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
+ entry.assertAttributeEquals("azure.length", "10");
+ entry.assertAttributeEquals("mime.type", "application/octet-stream");
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
new file mode 100644
index 000000000000..0308add63eae
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class ITPutAzureStorageBlob extends AbstractAzureIT {
+
+ @Test
+ public void testPuttingBlob() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new PutAzureBlobStorage());
+
+ runner.setValidateExpressionUsage(true);
+
+ runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
+ runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
+ runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
+ runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload");
+
+ runner.enqueue("0123456789".getBytes());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 1);
+ List flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
+ for (MockFlowFile flowFile : flowFilesForRelationship) {
+ flowFile.assertContentEquals("0123456789".getBytes());
+ flowFile.assertAttributeEquals("azure.length", "10");
+ }
+ }
+}
From 7d1818b9053d328daaea82e50e19a64f34be3758 Mon Sep 17 00:00:00 2001
From: Andrew Grande
Date: Tue, 4 Apr 2017 14:57:21 -0400
Subject: [PATCH 02/11] Addressed dependency issues from the review.
---
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml | 2 +-
.../nifi-azure-bundle/nifi-azure-processors/pom.xml | 1 +
2 files changed, 2 insertions(+), 1 deletion(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
index f75bb7f9b0a9..e6c3c9b0ec05 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
@@ -38,7 +38,7 @@
org.apache.nifi
- nifi-standard-services-api-nar
+ nifi-standard-narnar
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 8330bcc7b320..9b4f28bfb39d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -85,6 +85,7 @@
org.apache.nifinifi-standard-processors${project.version}
+ provided
From bc338ff8c796c178281b5215969f7cd0072ca7cc Mon Sep 17 00:00:00 2001
From: Andrew Grande
Date: Tue, 4 Apr 2017 15:39:17 -0400
Subject: [PATCH 03/11] Addressed a checkstyle issue.
---
.../nifi/processors/azure/AbstractAzureBlobProcessor.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
index 82eae123a4d5..a7c9a103b594 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
@@ -27,7 +27,7 @@ public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor
public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
-
+
public static final List properties = Collections
.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
From 5848267ef8c54a4e848f0e4d46dcc11a7178a168 Mon Sep 17 00:00:00 2001
From: Andrew Grande
Date: Tue, 4 Apr 2017 18:47:01 -0400
Subject: [PATCH 04/11] Review: reworded the descriptions.
---
.../apache/nifi/processors/azure/AbstractAzureProcessor.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
index 5ab1f8bfbf57..4509691ecc0a 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
@@ -32,8 +32,8 @@
public abstract class AbstractAzureProcessor extends AbstractProcessor {
- public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build();
- protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any failed fetches will be transferred to the failure relation.").build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successfully processed FlowFiles are routed to this relationship").build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Unsuccessful operations will be transferred to the failure relationship.").build();
public static final Set RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
protected CloudStorageAccount createStorageConnection(ProcessContext context) {
From fe1e19cf30939419eb1b51828d205e95d166b1a2 Mon Sep 17 00:00:00 2001
From: Andrew Grande
Date: Tue, 4 Apr 2017 18:54:20 -0400
Subject: [PATCH 05/11] Review: implemented the reset condition logic.
---
.../nifi/processors/azure/storage/ListAzureBlobStorage.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index f4a793b83dfa..0663c1b49b8b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -106,8 +106,10 @@ protected String getPath(final ProcessContext context) {
@Override
protected boolean isListingResetNecessary(final PropertyDescriptor property) {
- // TODO - implement
- return false;
+ // re-list if configuration changed, but not when security keys are rolled (not included in the condition)
+ return PREFIX.equals(property)
+ || AzureConstants.ACCOUNT_NAME.equals(property)
+ || AzureConstants.CONTAINER.equals(property);
}
@Override
From 50b9fa1c0db92d308a4f2073f95ae3adf3807836 Mon Sep 17 00:00:00 2001
From: Andrew Grande
Date: Tue, 4 Apr 2017 19:01:41 -0400
Subject: [PATCH 06/11] Review: dropped static qualifier from method
signatures, not required really
---
.../nifi/processors/azure/storage/ListAzureBlobStorage.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index 0663c1b49b8b..6011f1b742ca 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -156,7 +156,7 @@ protected List performListing(final ProcessContext context, final Long
return listing;
}
- protected static CloudStorageAccount createStorageConnection(ProcessContext context) {
+ protected CloudStorageAccount createStorageConnection(ProcessContext context) {
final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", accountName, accountKey);
@@ -175,7 +175,7 @@ protected static CloudStorageAccount createStorageConnection(ProcessContext cont
* @return The newly created CloudStorageAccount object
*
*/
- private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
+ private CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
CloudStorageAccount storageAccount;
try {
From d9ac033d2ed1f292d940b8af2f8722448653dadf Mon Sep 17 00:00:00 2001
From: Andrew Grande
Date: Tue, 4 Apr 2017 19:16:00 -0400
Subject: [PATCH 07/11] Review: removed sys.out, inlined a single method to get
access to the ProcessContext.getName()
---
.../azure/storage/ListAzureBlobStorage.java | 38 ++++++-------------
1 file changed, 12 insertions(+), 26 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index 6011f1b742ca..2f2f53dbec42 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -161,35 +161,21 @@ protected CloudStorageAccount createStorageConnection(ProcessContext context) {
final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
final String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", accountName, accountKey);
try {
- return createStorageAccountFromConnectionString(storageConnectionString);
+
+ CloudStorageAccount storageAccount;
+ try {
+ storageAccount = CloudStorageAccount.parse(storageConnectionString);
+ } catch (IllegalArgumentException | URISyntaxException e) {
+ getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e);
+ throw e;
+ } catch (InvalidKeyException e) {
+ getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e);
+ throw e;
+ }
+ return storageAccount;
} catch (InvalidKeyException | URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
- /**
- * Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format.
- *
- * @param storageConnectionString
- * Connection string for the storage service or the emulator
- * @return The newly created CloudStorageAccount object
- *
- */
- private CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
-
- CloudStorageAccount storageAccount;
- try {
- storageAccount = CloudStorageAccount.parse(storageConnectionString);
- } catch (IllegalArgumentException | URISyntaxException e) {
- System.out.println("\nConnection string specifies an invalid URI.");
- System.out.println("Please confirm the connection string is in the Azure connection string format.");
- throw e;
- } catch (InvalidKeyException e) {
- System.out.println("\nConnection string specifies an invalid key.");
- System.out.println("Please confirm the AccountName and AccountKey in the connection string are valid.");
- throw e;
- }
- return storageAccount;
- }
-
}
From 72f589dbaa353212843527c427e17e54d3194a01 Mon Sep 17 00:00:00 2001
From: Andrew Grande
Date: Tue, 4 Apr 2017 19:28:42 -0400
Subject: [PATCH 08/11] Switched to HTTPS as per MSFT recommendation. Some DRY.
Dropped cruft.
---
.../azure/AbstractAzureProcessor.java | 2 +-
.../nifi/processors/azure/AzureConstants.java | 3 ++
.../azure/storage/ListAzureBlobStorage.java | 2 +-
.../azure/storage/AbstractAzureIT.java | 35 +++++++------------
4 files changed, 18 insertions(+), 24 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
index 4509691ecc0a..7f17a023539e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
@@ -49,7 +49,7 @@ protected CloudStorageAccount createStorageConnection(ProcessContext context, Fl
}
private CloudStorageAccount createStorageConnectionFromNameAndKey(String accountName, String accountKey) {
- final String storageConnectionString = String.format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s", accountName, accountKey);
+ final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey);
try {
return createStorageAccountFromConnectionString(storageConnectionString);
} catch (InvalidKeyException | IllegalArgumentException | URISyntaxException e) {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
index eaa234caa2b5..9a510301743e 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
@@ -32,6 +32,9 @@ public final class AzureConstants {
public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("Container name").description("Name of the azure storage container")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
+ // use HTTPS by default as per MSFT recommendation
+ public static final String FORMAT_DEFAULT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s";
+
private AzureConstants() {
// do not instantiate
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index 2f2f53dbec42..b83e428817c7 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -159,7 +159,7 @@ protected List performListing(final ProcessContext context, final Long
protected CloudStorageAccount createStorageConnection(ProcessContext context) {
final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
- final String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", accountName, accountKey);
+ final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey);
try {
CloudStorageAccount storageAccount;
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
index 34702eb96f90..91a8c7382602 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
@@ -16,7 +16,17 @@
*/
package org.apache.nifi.processors.azure.storage;
-import static org.junit.Assert.fail;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -25,19 +35,7 @@
import java.security.InvalidKeyException;
import java.util.Properties;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import com.microsoft.azure.storage.CloudStorageAccount;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlob;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
-import com.microsoft.azure.storage.blob.ListBlobItem;
-import com.microsoft.azure.storage.table.CloudTable;
-import com.microsoft.azure.storage.table.CloudTableClient;
+import static org.junit.Assert.fail;
public abstract class AbstractAzureIT {
protected static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
@@ -90,17 +88,10 @@ public static String getAccountKey() {
}
protected static CloudBlobContainer getContainer() throws InvalidKeyException, URISyntaxException, StorageException {
- String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey());
+ String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, getAccountName(), getAccountKey());
CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
return blobClient.getContainerReference(TEST_CONTAINER_NAME);
}
- protected static CloudTable getTable() throws InvalidKeyException, URISyntaxException, StorageException {
- String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey());
- CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString);
- CloudTableClient tableClient = storageAccount.createCloudTableClient();
- return tableClient.getTableReference(TEST_TABLE_NAME);
- }
-
}
\ No newline at end of file
From 1ab7cba0a3515fe8b1e47785bef78f4015f40551 Mon Sep 17 00:00:00 2001
From: Andrew Grande
Date: Wed, 5 Apr 2017 13:03:34 -0400
Subject: [PATCH 09/11] Addressing review suggestions from 4/5
---
.../azure/AbstractAzureBlobProcessor.java | 4 +--
.../azure/AbstractAzureProcessor.java | 6 ++--
.../azure/storage/FetchAzureBlobStorage.java | 18 ++++------
.../azure/storage/ListAzureBlobStorage.java | 9 ++---
.../azure/storage/PutAzureBlobStorage.java | 33 ++++++++++---------
.../azure/storage/utils/BlobInfo.java | 2 +-
.../storage/ITFetchAzureBlobStorage.java | 3 +-
7 files changed, 33 insertions(+), 42 deletions(-)
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
index a7c9a103b594..20267114681d 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
@@ -28,12 +28,12 @@ public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor
public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
- public static final List properties = Collections
+ private static final List PROPERTIES = Collections
.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
@Override
protected List getSupportedPropertyDescriptors() {
- return properties;
+ return PROPERTIES;
}
}
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
index 7f17a023539e..c95ee99832af 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
@@ -65,13 +65,11 @@ private CloudStorageAccount createStorageConnectionFromNameAndKey(String account
* @return The newly created CloudStorageAccount object
*
*/
- protected static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
+ private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException {
CloudStorageAccount storageAccount;
try {
storageAccount = CloudStorageAccount.parse(storageConnectionString);
- } catch (IllegalArgumentException | URISyntaxException e) {
- throw e;
- } catch (InvalidKeyException e) {
+ } catch (IllegalArgumentException | URISyntaxException | InvalidKeyException e) {
throw e;
}
return storageAccount;
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
index 2229cfd097c8..58ab192815b0 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.azure.storage;
import java.io.IOException;
-import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
@@ -37,7 +36,6 @@
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.AzureConstants;
@@ -54,7 +52,8 @@
@WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched")
})
public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
- public static final List PROPERTIES = Collections
+
+ private static final List PROPERTIES = Collections
.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
@Override
@@ -84,14 +83,11 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro
// TODO - we may be able do fancier things with ranges and
// distribution of download over threads, investigate
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(OutputStream os) throws IOException {
- try {
- blob.download(os);
- } catch (StorageException e) {
- throw new IOException(e);
- }
+ flowFile = session.write(flowFile, os -> {
+ try {
+ blob.download(os);
+ } catch (StorageException e) {
+ throw new IOException(e);
}
});
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index b83e428817c7..251e163abd83 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -46,12 +46,10 @@
import org.apache.nifi.processors.standard.AbstractListProcessor;
import com.microsoft.azure.storage.CloudStorageAccount;
-import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.StorageUri;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
-import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
@@ -76,7 +74,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor {
private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true).required(false).build();
- public static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
+ private static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
@Override
protected List getSupportedPropertyDescriptors() {
@@ -130,10 +128,7 @@ protected List performListing(final ProcessContext context, final Long
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
CloudBlobContainer container = blobClient.getContainerReference(containerName);
- BlobRequestOptions blobRequestOptions = null;
- OperationContext operationContext = null;
-
- for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), blobRequestOptions, operationContext)) {
+ for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) {
if (blob instanceof CloudBlob) {
CloudBlob cloudBlob = (CloudBlob) blob;
BlobProperties properties = cloudBlob.getProperties();
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
index 1327a0b2b519..a87efa77cf8f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -35,7 +35,6 @@
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
import org.apache.nifi.processors.azure.AzureConstants;
@@ -80,21 +79,23 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final Map attributes = new HashMap<>();
long length = flowFile.getSize();
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream rawIn) throws IOException {
- final InputStream in = new BufferedInputStream(rawIn);
- try {
- blob.upload(in, length);
- BlobProperties properties = blob.getProperties();
- attributes.put("azure.container", containerName);
- attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
- attributes.put("azure.etag", properties.getEtag());
- attributes.put("azure.length", String.valueOf(length));
- attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
- } catch (StorageException | URISyntaxException e) {
- throw new IOException(e);
- }
+ session.read(flowFile, rawIn -> {
+ InputStream in = rawIn;
+ if (!(in instanceof BufferedInputStream)) {
+ // do not double-wrap
+ in = new BufferedInputStream(rawIn);
+ }
+
+ try {
+ blob.upload(in, length);
+ BlobProperties properties = blob.getProperties();
+ attributes.put("azure.container", containerName);
+ attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
+ attributes.put("azure.etag", properties.getEtag());
+ attributes.put("azure.length", String.valueOf(length));
+ attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
+ } catch (StorageException | URISyntaxException e) {
+ throw new IOException(e);
}
});
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
index d429878b6df2..6907d945f3a1 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
@@ -159,7 +159,7 @@ public int compareTo(BlobInfo o) {
return etag.compareTo(o.etag);
}
- protected BlobInfo(final Builder builder) {
+ private BlobInfo(final Builder builder) {
this.primaryUri = builder.primaryUri;
this.secondaryUri = builder.secondaryUri;
this.contentType = builder.contentType;
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
index 1e8a8f702b87..7dc8830ec957 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.nifi.processors.azure.AbstractAzureProcessor;
import org.apache.nifi.processors.azure.AzureConstants;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -51,7 +52,7 @@ public void testFetchingBlob() throws InvalidKeyException, URISyntaxException, S
runner.enqueue(new byte[0], attributes);
runner.run();
- runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage.REL_SUCCESS, 1);
+ runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1);
List flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
for (MockFlowFile flowFile : flowFilesForRelationship) {
flowFile.assertContentEquals("0123456789".getBytes());
From 80b79e6b10579a06e67244b2f5425d5c01b4c53c Mon Sep 17 00:00:00 2001
From: Andrew Grande
Date: Mon, 10 Apr 2017 13:30:39 -0400
Subject: [PATCH 10/11] Review: documentation improvements
---
.../azure/storage/FetchAzureBlobStorage.java | 2 +
.../azure/storage/ListAzureBlobStorage.java | 16 +++++---
.../azure/storage/PutAzureBlobStorage.java | 7 ++--
.../additionalDetails.html | 39 +++++++++++++++++++
.../additionalDetails.html | 39 +++++++++++++++++++
.../additionalDetails.html | 39 +++++++++++++++++++
6 files changed, 132 insertions(+), 10 deletions(-)
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html
create mode 100644 nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
index 58ab192815b0..163a96272979 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
@@ -30,6 +30,7 @@
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
@@ -47,6 +48,7 @@
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile")
+@SeeAlso({ ListAzureBlobStorage.class, PutAzureBlobStorage.class })
@InputRequirement(Requirement.INPUT_REQUIRED)
@WritesAttributes({
@WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched")
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
index 251e163abd83..f8a6c4d8c86f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -58,14 +58,18 @@
@TriggerSerially
@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
-@SeeAlso({ FetchAzureBlobStorage.class })
+@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class })
@CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
@InputRequirement(Requirement.INPUT_FORBIDDEN)
-@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"),
- @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"), @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
- @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"), @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
- @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
- @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), @WritesAttribute(attribute = "lang", description = "Language code for the content"),
+@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
+ @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
+ @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
+ @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"),
+ @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
+ @WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
+ @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
+ @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"),
+ @WritesAttribute(attribute = "lang", description = "Language code for the content"),
@WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. "
+ "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.")
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
index a87efa77cf8f..e03bc258df1f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -49,13 +49,12 @@
@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class })
@CapabilityDescription("Puts content into an Azure Storage Blob")
@InputRequirement(Requirement.INPUT_REQUIRED)
-@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"),
- @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"),
+@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"),
+ @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"),
@WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"),
@WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"),
@WritesAttribute(attribute = "azure.length", description = "Length of the blob"),
- @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"),
- @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") })
+ @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob")})
public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html
new file mode 100644
index 000000000000..14975aab76b2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html
@@ -0,0 +1,39 @@
+
+
+
+
+
+ FetchAzureBlobStorage Processor
+
+
+
+
+
+
Apache NiFi Azure Processors
+
+
Important Security Note
+
+ There are certain risks in allowing the account name and key to be stored as flowfile
+ attributes. While it does provide for a more flexible flow by allowing the account name and key
+ be fetched dynamically from the flow file attributes, care must be taken to restrict access to
+ the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
+ In addition, the provenance partition may be put on an encrypted disk volume for extra layer of security.
+
+ There are certain risks in allowing the account name and key to be stored as flowfile
+ attributes. While it does provide for a more flexible flow by allowing the account name and key
+ be fetched dynamically from the flow file attributes, care must be taken to restrict access to
+ the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
+ In addition, the provenance partition may be put on an encrypted disk volume for extra layer of security.
+
+ There are certain risks in allowing the account name and key to be stored as flowfile
+ attributes. While it does provide for a more flexible flow by allowing the account name and key
+ be fetched dynamically from the flow file attributes, care must be taken to restrict access to
+ the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
+ In addition, the provenance partition may be put on an encrypted disk volume for extra layer of security.
+
attributes. While it does provide for a more flexible flow by allowing the account name and key
be fetched dynamically from the flow file attributes, care must be taken to restrict access to
the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
- In addition, the provenance partition may be put on an encrypted disk volume for extra layer of security.
+ In addition, the provenance repositories may be put on encrypted disk partitions.
Return to a previous page
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html
index 17f9534cfc40..76e8775ef88b 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html
@@ -30,7 +30,7 @@
Important Security Note
attributes. While it does provide for a more flexible flow by allowing the account name and key
be fetched dynamically from the flow file attributes, care must be taken to restrict access to
the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
- In addition, the provenance partition may be put on an encrypted disk volume for extra layer of security.
+ In addition, the provenance repositories may be put on encrypted disk partitions.
Return to a previous page
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html
index e540bd6f2e64..0a7ff3586fe3 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html
@@ -30,7 +30,7 @@
Important Security Note
attributes. While it does provide for a more flexible flow by allowing the account name and key
be fetched dynamically from the flow file attributes, care must be taken to restrict access to
the recorded event provenance data (e.g. by strictly controlling the provenance policy permission).
- In addition, the provenance partition may be put on an encrypted disk volume for extra layer of security.
+ In addition, the provenance repositories may be put on encrypted disk partitions.