-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-5059 Updated MongoDBLookupService to be able to detect record sc… #2619
Conversation
@mattyb149 Had to change the schema handling in MongoDBLookupService. Can you take a look? |
@mattyb149 Any chance you can take a look? |
@pvillard31 @mattyb149 I updated this to have a clean separation between the controller and lookup service code and subclassed the lookup service from SchemaRegistryService. Can one of you do a review sometime soon? |
for (Map.Entry<String, Object> entry : result.entrySet()) { | ||
|
||
RecordField field; | ||
if (entry.getValue() instanceof Integer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At some point once this and the ES Lookup Service are merged, I'll refactor this into a helper method.
@mattyb149 can you review this? |
Reviewing... |
@@ -52,68 +54,125 @@ | |||
"The query is limited to the first result (findOne in the Mongo documentation). If no \"Lookup Value Field\" is specified " + | |||
"then the entire MongoDB result document minus the _id field will be returned as a record." | |||
) | |||
public class MongoDBLookupService extends MongoDBControllerService implements LookupService<Object> { | |||
public class MongoDBLookupService extends SchemaRegistryService implements LookupService<Object> { | |||
public static final PropertyDescriptor CONTROLLER_SERVICE = new PropertyDescriptor.Builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT this property is never added to the list of supported property descriptors, so I couldn't set it on the UI which causes an NPE when lookup() is called. Seems odd that for a required property that is not supported, setting it (in tests) would not complain. I haven't run the integration tests yet, just put the NARs into a live NiFi to try it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added to the property list.
.displayName("Client Service") | ||
.description("A MongoDB controller service to use with this lookup service.") | ||
.required(true) | ||
.identifiesControllerService(MongoDBControllerService.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is supposed to be an interface not the impl class (see my other comment below), so I think you want MongoDBClientService
here.
this.lookupValueField = context.getProperty(LOOKUP_VALUE_FIELD).getValue(); | ||
super.onEnabled(context); | ||
this.controllerService = context.getProperty(CONTROLLER_SERVICE).asControllerService(MongoDBControllerService.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get a runtime error here as it is expecting an interface not an impl class, I had to change this to MongoDBClientService
to get past the runtime error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this one is still there, still getting the runtime errors
Accidentally rebased it a while and so I had to force push. Sorry about that. |
@mattyb149 once this and the ES one are merged, it would probably be a good time to discuss extracting the schema builder code into a utility class. |
Agreed, I'll try to get this one in today then take a look at the ES one. |
@mattyb149 updated, but looks like Travis is busted at the moment (saying it can't find our repo) |
@mattyb149 can we close this one out? It's a good starting point for this cleanup task. |
I may have shot myself in the foot here by asking that this extend SchemaRegistryService, as that requires you supply some way to get to the schema. In this current form, how would I get to the code path where the Mongo document's schema is gleaned vs being provided from somewhere else? |
|
||
final RecordSchema schema = new SimpleRecordSchema(fields); | ||
return Optional.ofNullable(new MapRecord(schema, result)); | ||
RecordSchema toUse = schema != null ? schema : convertSchema(result); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mattyb149 I think the answer to your last question is here. If you specify schema.name
in the coordinates, it'll get that from loadSchema
. If not, it calls convertSchema
. The rest of the lookup strategies don't make much sense in this case so I can back out the change to extended SchemaRegistryService
if that makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing you could do is to override getSupportedPropertyDescriptors() and add your own property for Schema Access Strategy that only has the relevant ones, including your own strategy of "Infer Schema From Document" or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's probably the right way to do it because we should have it blow up if it can't get the schema on the first pass instead of silently falling back onto the inference option. Once I get that worked out, I'll copy pasta it over the ES one as well.
@mattyb149 @ijokarumawak @bbende I built on the schema registry service to add a new option for NoSQL options like Mongo, ES, Solr, etc. to just throw JSON in |
@@ -176,6 +176,8 @@ public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowabl | |||
return new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); | |||
} else if (allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) { | |||
return new ConfluentSchemaRegistryStrategy(schemaRegistry); | |||
} else if (allowableValue.equalsIgnoreCase(INFER_SCHEMA.getValue())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this inference only works when the content is JSON, I think this option should only be available when using a JSON related record reader, and not available in the default case.
This would be similar to how the AvroReader makes available the option for "Embedded Avro Schema" - https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java#L63
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I'll work on that.
import java.io.IOException; | ||
import java.util.Map; | ||
|
||
public interface JsonSchemaAccessStrategy extends SchemaAccessStrategy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be done without introducing a new method to the interface?
The original interface has:
getSchema(Map<String, String> variables, InputStream contentStream, RecordSchema readSchema
Since we know the content has to be json in this case, can't we read contentStream into the Map<String,Object> in the implementation of the access strategy, rather than requiring callers to do that first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client APIs for the third party systems usually return a Map, not a String that we can just pass on. I didn't want to serialize the client's output and then deserialize it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok but I'm confused because I'm not seeing an actual call that uses the new method...
The MongoLookupService does this:
private RecordSchema loadSchema(Map<String, Object> coordinates, Document doc) {
+ Map<String, String> variables = coordinates.entrySet().stream()
+ .collect(Collectors.toMap(
+ e -> e.getKey(),
+ e -> e.getValue().toString()
+ ));
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ byte[] bytes = mapper.writeValueAsBytes(doc);
+ return getSchema(variables, new ByteArrayInputStream(bytes), null);
+ } catch (Exception ex) {
+ return null;
+ }
+ }
So since we are reserializing the Doc here and putting the coordinates as variables, I'm not seeing where we call the new method, but I may be missing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're not missing anything...
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY; | ||
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION; | ||
|
||
public class JsonInferenceSchemaRegistryService extends SchemaRegistryService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not totally sure about this, but I think if we take the approach mentioned in my other comments, we probably wouldn't need this class since the JSON readers would handle the logic for when schemaAccess is set to "JSON Inference", similar to how AvroReader handles when embedded schema is selected - https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java#L78
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, if this gets expanded into the readers I could definitely see that being the case. For now, this is limited to being used by LookupServices that need schema access + JSON help like this one, the ES one and possibly later something like a RethinkDBLookupService.
I haven't gone too deep looking at this, but if the goal is to have a re-usable way to infer a schema from JSON across various NoSQL components, have we considered just putting some utility code in a JAR somewhere under nifi-nar-bundles/nifi-extension-utils rather than trying to hook into the SchemaAccessStrategy/SchemaRegistryService? I'm just on the fence about whether the schema access stuff makes sense here since that was designed for the readers/writers, and this is really coming from a different angle of already having some Map object in memory. |
That was the original approach. I'm now leaning toward going back to that because it's feeling like "less is more" here. |
@mattyb149 @ijokarumawak do either of you have time to get this reviewed before 1.7.0 release vote starts? |
…hemas or take one provided by the user.
@mattyb149 can we close the loop on this? |
@zenfenan can you review? I think we're almost at close out point. |
@MikeThomsen I'm actually traveling with limited access to mails and internet. I'll try to take a look as soon as I can, if someone doesn't get to already. |
Reviewing... |
@@ -50,7 +50,7 @@ | |||
"The content of the FlowFile contains a reference to a schema in the Schema Registry service. The reference is encoded as a single " | |||
+ "'Magic Byte' followed by 4 bytes representing the identifier of the schema, as outlined at http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html. " | |||
+ "This is based on version 3.2.x of the Confluent Schema Registry."); | |||
|
|||
public static final AllowableValue INFER_SCHEMA = new AllowableValue("infer", "Infer from JSON"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be "Infer from Result" or something? It could be used by other processors to infer the schema from whatever object is returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably. I'll go ahead and make that change.
} | ||
|
||
@Override | ||
protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this impl is specifically for JSON inference, perhaps it should override getDefaultSchemaAccessStrategy() to return the Infer one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I think that probably is the right thing to do here.
import java.util.List; | ||
import java.util.Set; | ||
|
||
public class TestSchemaRegistry extends AbstractControllerService implements SchemaRegistry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be called StubSchemaRegistry or MockSchemaRegistry? With Test at the front, I imagine it gets picked up by JUnit (although there are no @test methods, but still)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Changed it to StubSchemaRegistry.
@mattyb149 made the changes you requested. |
+1 LGTM, one of the unit tests in Travis is failing but it's not the fault of this code. I ran the unit tests and some tests on a live NiFi instance with "Infer" and "Schema Text" strategies, all looked well. Thanks for the addition! Merging to master |
…hemas or take one provided by the user.
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.