NIFI-9009: Created VerifiableProcessor, VerifiableControllerService, …#5288
NIFI-9009: Created VerifiableProcessor, VerifiableControllerService, …#5288markap14 wants to merge 3 commits intoapache:mainfrom
Conversation
gresockj
left a comment
There was a problem hiding this comment.
Thanks for this contribution, @markap14, it seems very useful! I'm especially excited about exposing the explicitly-used flow file attributes on processors.
I have a few minor comments and questions, and some suggestions on the AbstractS3Processor verification approach. I was also wondering if you'd consider including some CLI commands on this PR, though since it's already a fairly large one it also seems reasonable to defer this to a later PR.
I'll be taking this for a spin to verify the runtime behavior shortly.
| public interface VerifiableControllerService { | ||
|
|
||
| /** | ||
| * Verifies that the configuration defined by the given ProcessContext is valid. |
There was a problem hiding this comment.
Looks like a copy/paste error
| * Verifies that the configuration defined by the given ProcessContext is valid. | ||
| * @param context the ProcessContext that contains the necessary configuration | ||
| * @param verificationLogger a logger that can be used during verification. While the typical logger can be used, doing so may result | ||
| * in producing bulletins, which can be confusing. |
|
|
||
| @Override | ||
| public Set<String> getExplicitlyReferencedAttributes() { | ||
| final Set<String> variables = new HashSet<>(); |
There was a problem hiding this comment.
Minor nit: shall we call this attributes to match the method name?
|
|
||
| // Attempt to perform a listing of objects in the S3 bucket | ||
| try { | ||
| final ObjectListing listing = client.listObjects(bucketName); |
There was a problem hiding this comment.
Good thought, but I don't think we can have this check be listObjects(bucketName) in case there are a lot of objects, and because not all S3 processors should require the s3:ListBucket permission.
In fact, I propose you add an abstract method like ConfigVerificationResult verifyAccess(AmazonS3Client client) to specifically check the permission required by that processor. That way, ListS3 can check listObjects() just to verify that the operation can be performed.
Also, I'd recommend using listObjects(bucketName, "prefixthatdoesntexist") or some variant, so as not to actually list the entire bucket, since this will still check if the configured account has access to that operation. I don't think the bucket count is necessary for verification.
There was a problem hiding this comment.
Yeah that's a good point about permissions. I think what makes the most sense here is probably to move this from AbstractS3Processors to ListS3. I do believe it makes sense to perform the actual listing and determine how many objects are in the bucket, though. There are a couple of reasons for this. Firstly, if it's misconfigured you could end up attempting to get the listing for something like empty string - if you try that, no error. It returns successfully, and there will be 0 objects listed. So the fact that the listing came back with 0 objects helps to make it obvious that something is wrong. Also, if you perform a listing and expect 3 things in the bucket but get thousands (or vice versa) that can help to alert you that maybe you are configured for wrong bucket. I can definitely see a situation where a user is expecting to list a bucket with a few elements and enters the wrong bucket name because they have many buckets, and then they end up with a huge listing when they run the processor, and I think this will help there.
For the scope of this PR, I think what makes most sense is to just move this into ListS3. We can then iterate once these changes are merged and improve each of the processors. For this PR, I just wanted to pick a couple of components to use as a proof of concept, basically.
| results.add(new ConfigVerificationResult.Builder() | ||
| .outcome(Outcome.FAILED) | ||
| .explanation("Processor is invalid: " + result.toString()) | ||
| .verificationStepName("Perform Validation") |
There was a problem hiding this comment.
What do you think about having constant for this?
| results.add(new ConfigVerificationResult.Builder() | ||
| .outcome(Outcome.FAILED) | ||
| .explanation("Reporting Task is invalid: " + result.toString()) | ||
| .verificationStepName("Perform Validation") |
| final Map<PropertyDescriptor, PropertyConfiguration> descriptorToConfigMap = new LinkedHashMap<>(); | ||
| for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { | ||
| final PropertyDescriptor descriptor = entry.getKey(); | ||
| final String rawValue = entry.getValue(); | ||
| final String propertyValue = rawValue == null ? descriptor.getDefaultValue() : rawValue; | ||
|
|
||
| final PropertyConfiguration propertyConfiguration = new PropertyConfiguration(propertyValue, null, Collections.emptyList()); | ||
| descriptorToConfigMap.put(descriptor, propertyConfiguration); | ||
| } | ||
|
|
||
| final ValidationContext validationContext = getValidationContextFactory().newValidationContext(descriptorToConfigMap, context.getAnnotationData(), | ||
| getProcessGroupIdentifier(), getIdentifier(), null, false); | ||
|
|
||
| final ValidationState validationState = performValidation(validationContext); | ||
| final ValidationStatus validationStatus = validationState.getStatus(); | ||
|
|
||
| if (validationStatus == ValidationStatus.INVALID) { | ||
| for (final ValidationResult result : validationState.getValidationErrors()) { | ||
| if (result.isValid()) { | ||
| continue; | ||
| } | ||
|
|
||
| results.add(new ConfigVerificationResult.Builder() | ||
| .outcome(Outcome.FAILED) | ||
| .explanation("Reporting Task is invalid: " + result.toString()) | ||
| .verificationStepName("Perform Validation") | ||
| .build()); | ||
| } | ||
|
|
||
| if (results.isEmpty()) { | ||
| results.add(new ConfigVerificationResult.Builder() | ||
| .outcome(Outcome.FAILED) | ||
| .explanation("Reporting Task is invalid but provided no Validation Results to indicate why") | ||
| .verificationStepName("Perform Validation") | ||
| .build()); | ||
| } | ||
|
|
||
| logger.debug("{} is not valid with the given configuration. Will not attempt to perform any additional verification of configuration. Validation took {}. Reason not valid: {}", |
There was a problem hiding this comment.
Could we extract a method that does this initial validation check on the context properties? Since both ProcessContext and ConfigurationContext can return Map<String, PropertyDescriptor>, and this initial validation is needed in AbstractReportingTaskNode, StandardControllerServiceNode, and StandardProcessorNode, it seems like it would allow some pretty good reuse.
| this.component = component; | ||
| this.serviceLookup = serviceLookup; | ||
| this.schedulingPeriod = schedulingPeriod; | ||
| this.variableRegistry = variableRegistry; | ||
| this.annotationData = annotationDataOverride; | ||
|
|
||
| if (schedulingPeriod == null) { | ||
| schedulingNanos = null; | ||
| } else { | ||
| if (FormatUtils.TIME_DURATION_PATTERN.matcher(schedulingPeriod).matches()) { | ||
| schedulingNanos = FormatUtils.getTimeDuration(schedulingPeriod, TimeUnit.NANOSECONDS); | ||
| } else { | ||
| schedulingNanos = null; | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
It seems like we could reuse some of this by calling the original constructor from here.
There was a problem hiding this comment.
Yeah I think this can be cleaned up a bit. Will take a look.
| this.procNode = processorNode; | ||
| this.controllerServiceProvider = controllerServiceProvider; | ||
| this.propertyEncryptor = propertyEncryptor; | ||
| this.stateManager = stateManager; | ||
| this.taskTermination = taskTermination; | ||
| this.nodeTypeProvider = nodeTypeProvider; |
There was a problem hiding this comment.
Can this constructor be refactored to call the existing constructor? I notice preparedQueries would be different, but perhaps we could create a method that initializes the preparedQueries, given a Map<PropertyDescriptor, String>?
There was a problem hiding this comment.
Yeah I think this can be cleaned up a bit. Will take a look.
gresockj
left a comment
There was a problem hiding this comment.
I just tested out scenarios for the following, and everything worked as expected:
- JMSConnectionFactoryProvider: generated a FAILURE by verifying while the server was down. Generated a SUCCESS once the ActiveMQ server was up.
- SiteToSiteStatusReportingTask: generated a FAILURE by verifying when the input port was stopped. Generated a SKIPPED by verifying when the input port exerted backpressure. Generated a SUCCESS when the input port was started and available.
- PublishKafka_2_6: Generated FAILURE when the Kafka server was down, and when it was up but the topic was not created. Generated a SUCCESS once the topic was created.
The REST API was in line with what I expect from NiFi, and was fairly easy to use.
|
I started to look at this, and wanted to see it in use. I installed NiFi and created a ListS3 processor. (I believe this processor is one of the ones that will use the "verify" behavior.) I do not see any verification button or menu option. The Feature Proposal mentioned in NIFI-9009 indicates there would be button to initiate the verification. Can you clarify the expected behavior and/or options available? |
@markobean, this is just the back end implementation -- you need to use the REST API to exercise the verification. |
|
@gresockj I pushed a new commit that addresses the comments above. I also did some additional testing and found an issue related to properties that use |
…VerifiableReportingTask components; implemented backend work to call the methods. Added REST APIs and created/updated data models for component configuration verification
…implify some components; updated S3 processors such that only ListS3 supports VerifiableProcessor, since the code was really intended for ListS3
…fies classpath, that needs to be taken into account when performing verification
|
Nice work, @markap14, I'm going to merge this in! |
…VerifiableReportingTask components; implemented backend work to call the methods. Added REST APIs and created/updated data models for component configuration verification Signed-off-by: Joe Gresock <jgresock@gmail.com> This closes apache#5288
…VerifiableReportingTask components; implemented backend work to call the methods. Added REST APIs and created/updated data models for component configuration verification
Thank you for submitting a contribution to Apache NiFi.
Please provide a short description of the PR here:
Description of PR
Enables X functionality; fixes bug NIFI-YYYY.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically
main)?Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not
squashor use--forcewhen pushing to allow for clean monitoring of changes.For code changes:
mvn -Pcontrib-check clean installat the rootnififolder?LICENSEfile, including the mainLICENSEfile undernifi-assembly?NOTICEfile, including the mainNOTICEfile found undernifi-assembly?.displayNamein addition to .name (programmatic access) for each of the new properties?For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.