NIFI-1833 - Azure Storage processors#1636
Conversation
|
Initial PR submission, will inquire about checkstyle rules |
f87e0a8 to
41c1247
Compare
|
Reviewing... |
| public static final PropertyDescriptor BLOB_TYPE = new PropertyDescriptor.Builder() | ||
| .name("Blob type") | ||
| .description("Blobs can be block type of page type") | ||
| .expressionLanguageSupported(true) |
There was a problem hiding this comment.
Adding expression language support prevents the UI from creating the dropdown box with "Block" and "Page" values. If expression language is used for the value of the property in the config of the processor, validation fails. Is it necessary to support EL? If so, perhaps remove the allowable values, and use StandardValidators.NON_EMPTY_EL_VALIDATOR? Otherwise, EL should be disabled.
| * Get a reference to a blob based on the type. | ||
| * | ||
| */ | ||
| protected CloudBlob getBlob(CloudBlobContainer container, String blobType, String blobPath) throws URISyntaxException, StorageException { |
There was a problem hiding this comment.
Should blobType be checked more explicitly here? If blobType doesn't match the case of "Block", won't this end up returning a page blob? If an invalid blobType is passed in, is it appropriate that a page blob is returned, or should a ProcessorException be thrown?
|
Reviewing after squash... |
|
|
||
| <dependency> | ||
| <groupId>org.apache.nifi</groupId> | ||
| <artifactId>nifi-standard-services-api-nar</artifactId> |
There was a problem hiding this comment.
Since nifi-azure-processors depends on nifi-standard-processors, this dependency should be changed to nifi-standard-nar, which specifies a dependency on nifi-standard-services-api-nar itself. This will enable nifi-azure-nar's classloader to have the classes that are depended upon (in nifi-standard-processors) by nifi-azure-processors, which in turn allows nifi-azure-processors to specify nifi-standard-processors as a provided dependency, which fixes the standard processors being loaded twice.
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-nar</artifactId>
<type>nar</type>
</dependency> There was a problem hiding this comment.
Done, verified changes with a live Azure account.
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.nifi</groupId> | ||
| <artifactId>nifi-standard-processors</artifactId> |
There was a problem hiding this comment.
Please specify this dependency as provided, so that the processors in nifi-standard-processors aren't found in nifi-azure-nar as well as nifi-standard-nar.
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-processors</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>There was a problem hiding this comment.
Done, verified changes with a live Azure account.
| public abstract class AbstractAzureProcessor extends AbstractProcessor { | ||
|
|
||
| public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build(); | ||
| protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any failed fetches will be transferred to the failure relation.").build(); |
There was a problem hiding this comment.
This relationship is used in PutAzureBlobStorage, but the description of "fetches" doesn't match the function of that processor.
|
|
||
| @Override | ||
| protected boolean isListingResetNecessary(final PropertyDescriptor property) { | ||
| // TODO - implement |
There was a problem hiding this comment.
Is it intended that the listing reset is never necessary?
There was a problem hiding this comment.
Implemented the method.
|
|
||
| @Override | ||
| protected Scope getStateScope(final ProcessContext context) { | ||
| return Scope.CLUSTER; |
There was a problem hiding this comment.
Should this not support local state storage?
There was a problem hiding this comment.
IMO, local storage doesn't make sense. With the local scope one has to schedule the list on the primary node only and lose it after a failover event basically. List operation favors a single node feeding the cluster in general to avoid duplicates. BTW, cluster scope works fine in a single node deployment.
There was a problem hiding this comment.
OK. I can see how you'd want to force cluster state, since the files are being pulled from a remote location. Makes it easier for the user to not have to worry about the possible confusion of local vs cluster state and primary-only scheduling.
| return listing; | ||
| } | ||
|
|
||
| protected static CloudStorageAccount createStorageConnection(ProcessContext context) { |
There was a problem hiding this comment.
This method can be private.
There was a problem hiding this comment.
Inlined the method, dropped 'static'
| try { | ||
| storageAccount = CloudStorageAccount.parse(storageConnectionString); | ||
| } catch (IllegalArgumentException | URISyntaxException e) { | ||
| System.out.println("\nConnection string specifies an invalid URI."); |
There was a problem hiding this comment.
Should these messages be shown as bulletins to the user, or at least logged?
There was a problem hiding this comment.
Reworked to log properly, removed sys.out.
… ProcessContext.getName()
| public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | ||
| .expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build(); | ||
|
|
||
| public static final List<PropertyDescriptor> properties = Collections |
There was a problem hiding this comment.
This should probably be set to private, and by convention for static vars, should be uppercased.
|
|
||
| public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successfully processed FlowFiles are routed to this relationship").build(); | ||
| public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Unsuccessful operations will be transferred to the failure relationship.").build(); | ||
| public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); |
There was a problem hiding this comment.
This should be changed to private.
There was a problem hiding this comment.
Neither private, protected nor package-default will work, compilation errors. Needs to be public.
There was a problem hiding this comment.
I do not get a compilation issue with the below statement:
private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));| * @return The newly created CloudStorageAccount object | ||
| * | ||
| */ | ||
| protected static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException { |
There was a problem hiding this comment.
This method can be private.
| storageAccount = CloudStorageAccount.parse(storageConnectionString); | ||
| } catch (IllegalArgumentException | URISyntaxException e) { | ||
| throw e; | ||
| } catch (InvalidKeyException e) { |
There was a problem hiding this comment.
This could be added to the above catch, since it's doing the same thing, just throwing the exception.
| @WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched") | ||
| }) | ||
| public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { | ||
| public static final List<PropertyDescriptor> PROPERTIES = Collections |
There was a problem hiding this comment.
This can be changed to private.
|
|
||
| @Override | ||
| protected Scope getStateScope(final ProcessContext context) { | ||
| return Scope.CLUSTER; |
There was a problem hiding this comment.
OK. I can see how you'd want to force cluster state, since the files are being pulled from a remote location. Makes it easier for the user to not have to worry about the possible confusion of local vs cluster state and primary-only scheduling.
| BlobRequestOptions blobRequestOptions = null; | ||
| OperationContext operationContext = null; | ||
|
|
||
| for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), blobRequestOptions, operationContext)) { |
There was a problem hiding this comment.
Since blobRequestOptions and operationContent are always null here, might want to remove the declarations of them and pass nulls to the listBlobs method?
| return listing; | ||
| } | ||
|
|
||
| protected CloudStorageAccount createStorageConnection(ProcessContext context) { |
There was a problem hiding this comment.
This could be private... I notice that there are similar methods defined in AbstractAzureProcessor. The connection methods could be decoupled into an interface with default methods for reuse within AbstractAzureProcessor and ListAzureBlobStorage?
|
|
||
| final Map<String, String> attributes = new HashMap<>(); | ||
| long length = flowFile.getSize(); | ||
| session.read(flowFile, new InputStreamCallback() { |
There was a problem hiding this comment.
Can this be replace with a lambda as well?
session.read(flowFile, rawIn -> {
final InputStream in = new BufferedInputStream(rawIn);
try {
blob.upload(in, length);
BlobProperties properties = blob.getProperties();
attributes.put("azure.container", containerName);
attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString());
attributes.put("azure.etag", properties.getEtag());
attributes.put("azure.length", String.valueOf(length));
attributes.put("azure.timestamp", String.valueOf(properties.getLastModified()));
} catch (StorageException | URISyntaxException e) {
throw new IOException(e);
}
});There was a problem hiding this comment.
Done. Additionally, checking for the input stream type to avoid double-wrapping into a buffered stream.
| return etag.compareTo(o.etag); | ||
| } | ||
|
|
||
| protected BlobInfo(final Builder builder) { |
There was a problem hiding this comment.
With the builder being available for constructing this class, typically this constructor should be private.
jtstorck
left a comment
There was a problem hiding this comment.
Thanks for making these changes!
|
@aperepel Can you add usage documentation about the risks of allowing the account name and key to be stored as flowfile attributes? Mentioning that sensitive attributes are not supported, and that anyone with provenance access for the processors will be able to see the attribute values. Also, the flowfile repository, where the attributes are stored, does not support encryption internally, and unless the repository is being written to a TDE partition, the attribute values could possibly be accessed there, as well. |
|
@aperepel Integration tests are written for the processors, but aren't included in the build. There should probably be some tests that do get run with the build, using the TestRunner. The CloudStorageAccount would have to be mocked, which at a minimum you could do by making an extension of the class under test in which the createStorageAccount method used by the test returns a mock. |
| import com.microsoft.azure.storage.blob.CloudBlobContainer; | ||
|
|
||
| @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) | ||
| @CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile") |
There was a problem hiding this comment.
Should this be changed to say "Retrieves contents of a blob from an Azure Storage Container..."?
| @TriggerSerially | ||
| @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) | ||
| @SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class }) | ||
| @CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage") |
There was a problem hiding this comment.
Container should be capitalized here, flowfile lowercased?
|
|
||
| @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) | ||
| @SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class }) | ||
| @CapabilityDescription("Puts content into an Azure Storage Blob") |
There was a problem hiding this comment.
Should this be changed to say "Azure Storage Container"?
|
|
||
| <h2>Apache NiFi Azure Processors</h2> | ||
|
|
||
| <h3>Important Security Note</h3> |
There was a problem hiding this comment.
Since these are important security notices, should they be presented in the main documentation of the processor, rather than additional details?
Addressed dependency issues from the review. Addressed a checkstyle issue. Review: reworded the descriptions. Review: implemented the reset condition logic. Review: dropped static qualifier from method signatures, not required really Review: removed sys.out, inlined a single method to get access to the ProcessContext.getName() Switched to HTTPS as per MSFT recommendation. Some DRY. Dropped cruft. Addressing review suggestions from 4/5 Review: documentation improvements Review: documentation improvements This closes apache#1636. Signed-off-by: Bryan Rosander <brosander@apache.org>
Addressed dependency issues from the review. Addressed a checkstyle issue. Review: reworded the descriptions. Review: implemented the reset condition logic. Review: dropped static qualifier from method signatures, not required really Review: removed sys.out, inlined a single method to get access to the ProcessContext.getName() Switched to HTTPS as per MSFT recommendation. Some DRY. Dropped cruft. Addressing review suggestions from 4/5 Review: documentation improvements Review: documentation improvements This closes apache#1636. Signed-off-by: Bryan Rosander <brosander@apache.org>
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.