-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-3404 Add lookup processor for enrichments/joins to reference data #1830
Conversation
79f1752
to
66d0731
Compare
Reviewing... |
@ijokarumawak FYI, I think we need to make sure we see what happens to NIFI-3946/#1833 before we merge this. By all means start having a look though. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @jfrazee for your contribution, these enhancements will be really helpful!
Although this PR is waiting for others to be finalized, I went ahead, and took a look through changes. Please check comments.
@@ -41,7 +48,9 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String | |||
.name(propertyDescriptorName) | |||
.required(false) | |||
.dynamic(true) | |||
.addValidator(Validator.VALID) | |||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) | |||
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR. The dynamic properties of this processor are not representing FlowFile attribute names. It is a little bit misleading.
.addValidator(Validator.VALID) | ||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true)) | ||
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) | ||
.expressionLanguageSupported(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change enabled EL support, however, cacheConfiguredValues method is not updated to evaluate EL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I was trying to merge changes in from a VolatileLookupService I wrote elsewhere and didn't get that updated. The use for this is a little marginal so I'll remove it and we can see if the need comes back.
* @throws LookupFailureException if the backing service is unavailable or | ||
* the table cannot be loaded | ||
*/ | ||
default Map<String, T> asMap() throws LookupFailureException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning entire lookup table as a Map would cause issue with a large number of entries. I think this should be avoided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think/know there's a use case for this but I get the worry, so I'll drop it.
if (dynamicProperties.isEmpty()) { | ||
try { | ||
// If there aren't any dynamic properties, load the entire lookup table | ||
final Map<String, String> lookupTable = lookupService.asMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I commented at the interface, loading entire lookup table and setting those into FlowFile attributes can be problematic if the number of entries increases. I'd prefer making at least one dynamicProperty required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove.
|
||
@Tags({"lookup", "cache", "enrich", "join", "jdbc", "database", "key", "value"}) | ||
@CapabilityDescription("A reloadable properties file-based lookup service") | ||
public class DatabaseLookupService extends AbstractControllerService implements StringLookupService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this lookup single column service is powerful, I think it can be more powerful if we also provide lookup-record implementation like IPLookupService which lookups Record in the future.
@OnEnabled | ||
public void onEnabled(final ConfigurationContext context) throws InitializationException { | ||
final DBCPService databaseService = context.getProperty(CONNECTION_POOL).asControllerService(DBCPService.class); | ||
final DataSource dataSource = databaseService.getDataSource(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed at #1450, I think we'd better to avoid exposing DataSource from DBCPService. Wrapping Connection to be a DataSource can be done here without requiring DBCPService change. Thought?
335a137
to
f9fb298
Compare
.required(true) | ||
.build(); | ||
|
||
public static final Relationship REL_SUCCESS = new Relationship.Builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be "matched" instead of "success"? Seems like it better goes with the "unmatched" relationship in terms of behavior.
@Tags({"lookup", "cache", "enrich", "join", "mutable", "attributes", "Attribute Expression Language"}) | ||
@CapabilityDescription("Lookup attributes from a lookup service") | ||
@DynamicProperty(name = "The name of the attribute to add to the FlowFile", | ||
value = "The name of the key or property to retrieve from the lookup service", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value of the dynamic property is the value to match as the key against the lookup service right? I wonder if saying "name of the key" is confusing vs "value for the key". Nothing required here, just thinking aloud :)
@SideEffectFree | ||
@SupportsBatching | ||
@InputRequirement(Requirement.INPUT_REQUIRED) | ||
@Tags({"lookup", "cache", "enrich", "join", "mutable", "attributes", "Attribute Expression Language"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "mutable" refer to here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Holdover from the interfaces this was originally written against. There was a LookupService and a MutableLookupService. Will remove.
d345267
to
56f1e58
Compare
@ijokarumawak The changes from NIFI-3339/#1450 have been removed from this PR along with the DatabaseLookupService. Will re-submit that stuff in another PR so we can move ahead with this and work out the details separately. |
@jfrazee I understand, thank you! I am reviewing the updated commit now. |
import org.apache.nifi.util.file.monitor.SynchronousFileWatcher; | ||
|
||
@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value"}) | ||
@CapabilityDescription("A reloadable properties file-based lookup service") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"CSV file-based" instead of "properties".
I can fix this when I merge, unless I find other issues.
@@ -47,6 +47,7 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String | |||
.required(false) | |||
.dynamic(true) | |||
.addValidator(Validator.VALID) | |||
.expressionLanguageSupported(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cacheConfiguredValues
is still not evaluating expression. I can add EL evaluation when I merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to remove the expressionLanguageSupported(true). I don't think I had an especially good reason to have added the EL support to the dynamic property for the CS in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think supporting EL is useful, because user can refer variable registries or do some text processing with EL.
@jfrazee @mattyb149 I reviewed the updated commit and found few points to improve. I was trying to comment all of those, but since it's easier to explain with code instead of comment, I created another PR based on this one. @jfrazee Would you take a look on #1856 to see if those changes are reasonable? Please find the commit comments for what I've changed. If my additional commit looks ok, then I'd give +1 to this PR. |
Superseded by #1856 |
Note: This is overlaid on NIFI-3339/#1450.