-
Notifications
You must be signed in to change notification settings - Fork 2.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-3449: Adding GCP Framework and GCS Processors #1482
Conversation
Reviewing... It will take a bit to get a full review, but I have a couple of quick comments:
|
4fe4c88
to
7b10f95
Compare
Hi there! I've added a mock google credentials JSON file to test resources, and configured Surefire to set the environmental variable appropriately when running those tests. It should resolve the issues you were seeing (and reminded me to always run tests in a vanilla environment!) I've also updated the NAR dependency in the nifi-assembly pom. It's my first time contributing to NiFi so I was a bit confused as to where to include those dependencies. Let me know if you have further questions or feedback! |
Hi, I like this implementation better than what I came up with and I'd like to collaborate with you on future processors. I have already put together code for Google PubSub and BigQuery but I will need to redo them a bit to take advantage of the auth controller. I have a minor comment on your pull:
Actually, now that I think about it, if I have to write data to different project ids that require different credentials, would I need to instantiate multiple credential controllers? |
@gene-telligent, Thanks for addressing those build issues, it's much improved. I'll move on to more exciting testing. |
…d abstract processor definitions. In addition, creating a series of GCS-themed processors and their corresponding tests.
7b10f95
to
40658f7
Compare
@fmaritato Hey frank! First of all, thank you so much for getting the ball started on the GCP processors -- I didn't mean for this to seem like a snub; I just had some spare time and pretty much did a 1:1 copy of the AWS processors (or at least their design principles). The biggest thing was adding those credential controllers, since it seems like that's the most appropriate pattern for dealing with credentials. I've made the changes to allow for Project ID being dynamic. And unfortunately yes, for the time being, if you're using separate credentials for different GCP processors, you'll need separate credential services. That being said, if you're primarily using Application Default credentials, you can apply those to multiple processors without a problem. |
* @param context the process context provided on scheduling the processor. | ||
*/ | ||
@OnScheduled | ||
public void onScheduled(ProcessContext context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't clear to me just yet but as a thing to consider.. This 'cloudService' instance being set means this cloud service object must be thread safe should these processors have more than one thread and if this service is like a connection then it may be better to lazily init during onTrigger calls so that it can be recovered if the connection drops. Open connections can be killed during onUnscheduled.
What I'm suggesting here might not matter if this object isn't what it seems in just scanning through the code. I checked the google api for 'getService()' to see if I could better understand what it was and it wasnt revealing.
In any even this is a really substantive PR with a lot of well thought out areas and attention to details like LICENSE/NOTICE so big thanks to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thankfully, according to googleapis/google-cloud-java#1238 , all service objects are thread-safe.
It looks like the unit tests are failing with the addition of expression language to properties in AbstractGCPProcessor. I thought it was just me, but TravisCI seems to have the same issue:
And many similar tests. I know commenting out AbstractGCPProcessor lines 45 and 55 makes the tests pass again. I think the problem is we need to call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gene-telligent Code quality looks great, unit test coverage is solid, and I haven't run into any significant functional issues except the unit test vs. EL thing above. I have a laundry list of minor issues and questions here, and I'm still testing away.
If you make additional code changes, please go ahead and add commits to the PR. I prefer to track the changes, and it won't be a problem to squash and rebase them later.
import java.util.Map; | ||
|
||
/** | ||
* Supports Google Cloud Application Default Credentials. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment looks like a copy/paste error.
|
||
import org.apache.nifi.components.PropertyDescriptor; | ||
import org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors; | ||
import org.apache.nifi.stream.io.BufferedInputStream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean to use this deprecated org.apache.nifi.stream.io.BufferedInputStream
? It looks like something your IDE might have "helpfully" chosen instead of java.io.BufferedInputStream
.
|
||
final ReadChannel reader = storage.reader(blobId, blobSourceOptions.toArray(new Storage.BlobSourceOption[blobSourceOptions.size()])); | ||
|
||
//TODO: Implement state checkpoints / ability to restore for long blob reads |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't recommend the TODO comments. They won't mean the same thing to other devs they mean to you, and might confuse people about what's working or not. They do exist in the NiFi codebase, however, I don't think there's a hard rule against it. Also, you can create a JIRA for the large blob feature.
attributes.put(UPDATE_TIME_ATTR, String.valueOf(blob.getUpdateTime())); | ||
} | ||
|
||
//TODO: Have some sensible way of including user defined metadata attached to the Blob |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing about TODO comments
/** | ||
* List objects in a google cloud storage bucket by object name pattern. | ||
*/ | ||
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean this to be INPUT_ALLOWED? It doesn't typically fit with List* processors, and doesn't look like you use input in onTrigger.
* Partial implementation of CredentialsStrategy to provide support for credential strategies specified by | ||
* a single boolean property. | ||
*/ | ||
public abstract class AbstractBooleanCredentialsStrategy extends AbstractCredentialsStrategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need an override for canCreatePrimaryCredential()
that parses the boolean property value. As-is, I believe it fails in the following scenario:
- Create a GCPCredentialsControllerService
- Edit properties to set "Use Application Default Credentials" to
true
. Click Apply. - Edit properties to set "Use Application Default Credentials" to
false
, and "Use Compute Engine Credentials" totrue
. Click Apply.
The controller service is now in an invalid state because "Use Application Default Credentials" thinks it's in a viable state with a non-null property value, without parsing the property to understand "false" means no.
|
||
try { | ||
|
||
//TODO: Take advantage of RestorableState to checkpoint large blob uploads |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing about TODO comments
attributes.put(UPDATE_TIME_ATTR, String.valueOf(blob.getUpdateTime())); | ||
} | ||
} catch (StorageException e) { | ||
getLogger().info("Failure completing upload flowfile={} bucket={} key={} reason={}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be error(...)
instead of info(...)
? I see the exception gets logged as an error below on 518, but not with the same level of helpful detail.
public static final String CONTENT_LANGUAGE_ATTR = "gcs.content.language"; | ||
public static final String CONTENT_LANGUAGE_DESC = "The content language of the object."; | ||
|
||
public static final String CONTENT_TYPE_ATTR = "gcs.content.type"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you consider using mime.type
here? It is a mostly-standardized NiFi attribute for content types, widely used by other processors. See CoreAttributes.
…gy from a copy/paste error
…ical use case would be too difficult for the time being
…' being evaluated as an acceptable flag for boolean credentials
…rty map but having values of 'null'
…istGCSBucket property is a bit different -- it can't accept expression language and has no default value
Hi @jvwing : I've gone through and made the changes that you've suggested. Thank you so much for the feedback! For the biggest issue -- using expression language for Project ID as per the request of @fmaritato -- I couldn't figure out an easy way of incorporating that functionality. The general pattern of NiFi processors seems to be "create a connector object when scheduled", and in the case of the GCP java libraries, the connector client requires a Project ID to be set in its configuration. Changing it to be dynamic on a per-flowfile basis would require rebuilding the client configuration / instantiating it for each flowfile which seems like a pretty big red flag. If there are any suggestions there I'd like to hear them. Everything else should be resolved (hopefully) so if you have further suggestions or comments they would be welcome. |
@gene-telligent, everything looks good to me, I merged to master. Thanks again for putting together such a great PR. |
* Credentials service with tests * Abstract processor definitions * GCS-themed processors and their corresponding tests Signed-off-by: James Wing <jvwing@gmail.com> This closes apache#1482.
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.