-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-4516 FetchSolr Processor #2517
Conversation
@ijokarumawak |
You should consider making the output format configurable. The Solr projects I've worked on in the past have used JSON instead of XML. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't had a chance to try running it, but looks very promising.
@@ -66,6 +67,15 @@ | |||
public static final AllowableValue SOLR_TYPE_STANDARD = new AllowableValue( | |||
"Standard", "Standard", "A stand-alone Solr instance."); | |||
|
|||
public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will work for now, but it would be great to have the Solr CRUD functions moved over to a controller service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a really nice idea. However, I would prefer to do this in a separate ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I might be able to help with some of that.
public static final String EXCEPTION = "fetchsolr.exeption"; | ||
public static final String EXCEPTION_MESSAGE = "fetchsolr.exeption.message"; | ||
|
||
public static final PropertyDescriptor SOLR_QUERY_STRING = new PropertyDescriptor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to support parameters like fq? If you intend to put them here, you might want to consider break it out into some additional fields for the sake of readability and ease of use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For parameters such as fq a single designated property will not be sufficient, as it is possible (and sometimes necessary) to define multiple filter queries. Furthermore, depending on the query parser, there are various parameters that can be used for queries. I considered to make the most common query parameters configurable via designated properties, and to provide the option of additional parameters via dynamic properties. However, I personally expect this single field to be more straightforward. I expect the majority of the users to test queries in Solr directly and to paste the query string afterwards into the query property. Which option do you prefer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's fine. I did something similar in a PR I have open for ElasticSearch + Kibana. If that's how you see most users doing it, seems fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally prefer "Solr Query String" to mean a lucene style query like "(foo AND bar)" and then have properties for the common parameters like start/end row, fq, etc, and then use dynamic props for others.
However if we want to stick with the current approach, I think the description of this property should mention that value is a URL style query string and not just a Lucene query. You also may want to implement a custom validator that ensures this style of query string can be parsed (when EL is not used).
FlowFile flowFileOriginal = session.get(); | ||
|
||
if (flowFileOriginal == null) { | ||
if (context.hasNonLoopConnection()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to have curly brackets around it.
final RecordSchema schema = writerFactory.getSchema(flowFileOriginal.getAttributes(), null); | ||
final RecordSet recordSet = SolrUtils.solrDocumentsToRecordSet(response.getResults(), schema); | ||
final StringBuffer mimeType = new StringBuffer(); | ||
flowFileResponse = session.write(flowFileResponse, new OutputStreamCallback() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compressing this down to a lambda would save a few lines. Not necessary, but your IDE should be able to do it automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change this and add the curly brackets :)
logger.error("Failed to execute query {} due to {}. FlowFile will be routed to relationship failure", new Object[]{solrQuery.toString(), e}, e); | ||
} | ||
|
||
if (!flowFileOriginal.isPenalized()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs curly brackets.
return Collections.unmodifiableSet(searchComponentsTemp); | ||
} | ||
|
||
private static void addStatsFromSolrResponseToJsonWriter(final QueryResponse response, final JsonWriter writer) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be simpler to tie this to adding wt=json and just pull out the entire stats/facets branch and put that into the flowfile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See this for an example if you haven't done it before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parameter wt only works for communication with Solr via http. This processor uses Solrj (which is highly recommended for SolrCloud mode). Solrj is only capable to handle binary response and XML. However, the processor makes use of the records functions. Therefore, the user easily can define an custom format (JSON, CSV, ...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I could have sworn that I used wt=json that way, but it's been a while.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the StackOverflow issue you mentioned above this is also discussed.
For more detailed information see here: https://lucene.apache.org/solr/guide/6_6/using-solrj.html
runner.setProperty(SolrUtils.SOLR_TYPE, SolrUtils.SOLR_TYPE_CLOUD.getValue()); | ||
runner.setProperty(SolrUtils.SOLR_LOCATION, "http://localhost:8443/solr"); | ||
runner.setProperty(SolrUtils.COLLECTION, "testCollection"); | ||
runner.setProperty(FetchSolr.SOLR_QUERY_STRING, "q=*:*" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, might want to think about multiple query-related fields so that this can be spread out and tested piece by piece by the user.
runner.setNonLoopConnection(false); | ||
runner.run(); | ||
|
||
runner.assertTransferCount(FetchSolr.FAILURE, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an assertAllTransferred that can be used here to simply things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, I wanted to validate that the flowfile actually is sent to FAILURE.
runner.assertQueueEmpty(); | ||
runner.assertTransferCount(FetchSolr.RESULTS, 1); | ||
runner.assertTransferCount(FetchSolr.ORIGINAL, 1); | ||
for (MockFlowFile flowFile : runner.getFlowFilesForRelationship(FetchSolr.RESULTS)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curly brackets.
while (reader.hasNext()) { | ||
reader.beginObject(); | ||
while (reader.hasNext()) { | ||
if (reader.nextName().equals("integer_single")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curly brackets.
Tested the processor in a local build where it worked as expected. |
Refactored the treatment of flowfiles routed to relationship ORIGINAL. More attributes are added to flowfiles for better descriptions of requests / responses. Additionally, I adjusted some tests. |
@MikeThomsen Any news? |
Sorry, haven't had time. There's a merge conflict now. Can you fix that? |
Oh, sorry. Done. |
@MikeThomsen any news? |
Sorry, other stuff got in the way. I'll try to find some time soon. |
@MikeThomsen ok, sorry, I am too impatient :D |
Believe me, I understand. I just checked out your branch and ran into this when doing a full build with
|
BTW, that's the reason why your Travis CI build failed last time. Keep an eye on that because it'll slow down your code reviews. |
I have Solr Cloud in a docker session on the usual ports, so if you haven't done them yet some integration tests (ex: |
@MikeThomsen I will keep an eye on the CI tests in the future, thanks for the advice. Actually, I did not take them into account as they frequently appear to fail for no reason... |
You would create a JUnit test called
|
Hi @MikeThomsen I added an IT and added a version to the dependency. |
</dependency> | ||
<dependency> | ||
<groupId>org.apache.solr</groupId> | ||
<artifactId>solr-core</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't made it through the rest of the PR yet, but what is the reason for making solr-core a non-test dependency? It would be better if we could only depend on Solr client.
If we do need to depend on solr-core we need to double check what additional dependencies this brings into the Solr NAR and possibly update the LICENSE/NOTICE of the NAR and the overall assembly. I would imagine solr-core has a lot more dependencies than solr-client.
@WritesAttribute(attribute = "fetchsolr.exeption.class", description = "The Java exception class raised when the processor fails"), | ||
@WritesAttribute(attribute = "fetchsolr.exeption.message", description = "The Java exception message raised when the processor fails") | ||
}) | ||
public class FetchSolr extends SolrProcessor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was thinking about this and wondering if this processor should actually be called QuerySolr?
I feel "fetch" is typically used in NiFi when a specific object is being retrieved by id, like FetchFile takes a specific file path and retrieves that file. Here we aren't fetching a specific Solr document, we are performing a general query.
public static final String EXCEPTION = "fetchsolr.exeption"; | ||
public static final String EXCEPTION_MESSAGE = "fetchsolr.exeption.message"; | ||
|
||
public static final PropertyDescriptor SOLR_QUERY_STRING = new PropertyDescriptor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally prefer "Solr Query String" to mean a lucene style query like "(foo AND bar)" and then have properties for the common parameters like start/end row, fq, etc, and then use dynamic props for others.
However if we want to stick with the current approach, I think the description of this property should mention that value is a URL style query string and not just a Lucene query. You also may want to implement a custom validator that ensures this style of query string can be parsed (when EL is not used).
responseAttributes.put(ATTRIBUTE_QUERY_TIME, String.valueOf(response.getQTime())); | ||
flowFileResponse = session.putAllAttributes(flowFileResponse, responseAttributes); | ||
|
||
if (response.getResults().size() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the expected behavior when there are more results than were asked for in the initial search?
Say there are 1k total results, but the query asked for rows 0 to 10.
Are we going to page through the results and send out multiple flow files?
Do we just don't handle this case and it is up to the user to make the rows large enough (which also means they could blow up memory if they ask for too many results in a single query)?
@JohannesDaniel Build failed because of missing imports:
I can get back into testing this once you update. |
@MikeThomsen Yeah, this time I looked at this, but haven't had time to fix this. I think I have to merge master into the branch due to updates. Local build worked. But thanks for the ping :) |
@JohannesDaniel just do a rebase against master. You shouldn't use |
@MikeThomsen rebased everything, build for solr processors works without any problems. however, when I try to build the whole application, I receive the following error: |
Do this:
I just built master and it didn't have that problem. Try a full rebuild with |
If you are building and skipping the tests use -DskipTests=true. You can also do to do at least one mvn install run as well so that your local repo has everything. |
@JohannesDaniel go ahead and check in the change even if it's broken. I'll help take a look. |
@MikeThomsen omg. now there are commits of others in the branch?? maybe I should simply close this PR and insert the code into a new branch. |
@JohannesDaniel see my earlier comment about rebasing. You cannot just do Give that a shot. No need to close out the PR over some minor learning pains with Git. Post a message when you've force pushed and I'll take a look. |
@MikeThomsen or is this a normal thing that the commits of others are shown here after the rebase? |
@JohannesDaniel nope. If you did a rebase, those new commits would be behind your commits, not merged ahead of them. What a rebase does is it sets aside your commits, brings the base pointer for the branch up to the current pointer for the source branch (master in this case) and replays your commits on top of that. |
@JohannesDaniel also, you don't need to wait for a new build to push the changes. Overwriting everything will schedule the job to be killed and a new one started. |
f25cb01
to
b82707a
Compare
@MikeThomsen done :) |
Ok. I'll take a look and let you know sometime in a little while. Got some other things on my plate at the moment. |
Builds just fine for me locally. If you're still having build problems, try deleting everything under org/apache/nifi from .m2/repository in your home folder. |
@MikeThomsen Thank you for your help with Git!!
|
Are there best practices for handling paging? I think the ES processors handle this differently. Does it matter? |
@ottobackwards deep paging should be done with scrolling using cursor marks (as it is done in GetSolr). simple paging can be done by sucessively increasing the offset (start parameter). however, using cursor marks requires sorting for id. more information can be found here: https://lucene.apache.org/solr/guide/6_6/pagination-of-results.html |
My comment is more about the way you work with the ES vs. the Solr processors and if there is or should be an expectation that there is some commonality between them. Like how this relates to QueryElasticSearchHttp etc. I'm sorry I don't mean to start a tangent on your PR. Please excuse me. |
Solr and ES are increasingly used for different use cases these days. Solr tends to be more commonly used for pure search whereas ES is frequently used for data analysis because it has really powerful and fast aggregation queries that Solr lacks. |
Ok. Verified with a test run that it appears to run fine against a vanilla Solr installation. Docker: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're pretty close to being ready to merge.
<p> | ||
This definition will be appended to the Solr URL as follows: | ||
fq=field1:value1&fq=field2:value2&fq=field3:value3 | ||
</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs detailed documentation for how to add the facet and stats support. I had to dig through the code to find that out. I would recommend taking the Docker solr demo that I referenced in the comments and using that as the basis so that new users can have something pretty painless as a reference point for starting out.
if (getEntireResults) { | ||
final Integer totalDocumentsReturned = solrQuery.getStart() + solrQuery.getRows(); | ||
if (totalDocumentsReturned < totalNumberOfResults) { | ||
solrQuery.setStart(totalDocumentsReturned); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there should be a sane default limit to how far this goes since it's not using a cursor to get there. As-is, I think it would enable a user to do something like page out tens of thousands of results which AFAIK without a cursor could be pretty bad for Solr's performance.
@bbende @ijokarumawak thoughts on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@MikeThomsen I could add a property limiting the total amout of results the processor requests. This property could have a default of let's say 10000. If this property is set to 0, there is no limit. The property's description could include a warning with respect to Solr performance issues in the case of deep paging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should set an upper limit of absolutely no more than 10000 because this doesn't use the Solr-approved method of deep pagination. In fact I would say that 1,000 to 5,000 should be where the range is with explicit documentation that this is not the processor you're looking for if your goal is to hoover up the collection and process it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, ES has the same issue and the solution in place now is a separate "scroll processor" which would be the equivalent of a "SolrCursorQuery" processor. In general, the use cases here are radically different so for the sake of our sanity and code quality, don't worry about people who want to process everything for the moment.
final Integer totalDocumentsReturned = solrQuery.getStart() + solrQuery.getRows(); | ||
if (totalDocumentsReturned < totalNumberOfResults) { | ||
solrQuery.setStart(totalDocumentsReturned); | ||
session.transfer(flowFileResponse, RESULTS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a call to the provenance manager to track the provenance. I think the RECEIVE event is the right one here.
} | ||
|
||
if (!flowFileResponse.isPenalized()) { | ||
session.transfer(flowFileResponse, RESULTS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs provenance tracking.
} | ||
}); | ||
flowFileFacets = session.putAttribute(flowFileFacets, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_JSON); | ||
session.transfer(flowFileFacets, FACETS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs provenance tracking.
} | ||
}); | ||
flowFileStats = session.putAttribute(flowFileStats, CoreAttributes.MIME_TYPE.key(), MIME_TYPE_JSON); | ||
session.transfer(flowFileStats, STATS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs provenance tracking.
@JohannesDaniel I didn't see a test that tests deep pagination. If you don't have one, please add one. You can just throw garbage data in there like random strings. Also, you should add at least one test that proves all of the declared attributes are written when they're expected to be available on output flowfiles. |
@JohannesDaniel If there are no results from a page request, does that mean there is no output? I ask because I have a PR for that use case with QueryElasticsearchHttp. |
@MikeThomsen I'm still impeded, because my Maven does not download the NIFI-SNAPSHOTS. It downloads all dependencies except the SNAPSHOTS. When I follow the links (e. g. https://repository.apache.org/snapshots/org/apache/nifi/nifi-processor-utils/1.7.0-SNAPSHOT/nifi-processor-utils-1.7.0-SNAPSHOT.pom) I get a 404. Shouldn't I be able to see any dependencies at https://repository.apache.org/content/groups/snapshots/org/apache/nifi/? |
If you are building Nifi, and need to get things working after merging in the new version poms, you can do a mvn clean install -DskipTests and that should populate your local repo with the 1.7.0-SNAPSHOT versions shouldn't it? |
@ottobackwards When I try this I always run into the same problem: [ERROR] Failed to execute goal on project nifi-livy-processors: Could not resolve dependencies for project org.apache.nifi:nifi-livy-processors🫙1.7.0-SNAPSHOT: Could not find artifact org.apache.nifi:nifi-standard-processors🫙tests:1.7.0-SNAPSHOT in apache.snapshots (https://repository.apache.org/snapshots) -> [Help 1] already deleted everything in .m2/repository/org/apache/nifi |
how are you building? Like what command line? |
There's a clause at the very bottom that will ensure they get sent. See:
|
@ottobackwards you made my day! |
@MikeThomsen There are two tests that test Solr result paging:
What do you mean exactly? A test that retrieves more results from Solr? What actually shall be the purpose of this test? |
@JohannesDaniel I missed those tests for some reason. Basically I was just looking for something that goes into deep pagination behavior. Disregard for now. Thanks. |
Have you had a chance to add the missing provenance tracking? |
hi @MikeThomsen I will upload the changes this evening. |
(means in 5-6 hous) |
@JohannesDaniel Sounds good. Make sure it includes the requested documentation as well. That's going to be a big deal in making this easy for new users to use. Thanks. |
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 LGTM. It looks like everything I requested in the last round is there. Integration tests still pass against a live SolrCloud instance.
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.