Skip to content

Comments

NIFI-5248 Added new Elasticsearch record processor#2861

Closed
MikeThomsen wants to merge 9 commits intoapache:masterfrom
MikeThomsen:NIFI-5248
Closed

NIFI-5248 Added new Elasticsearch record processor#2861
MikeThomsen wants to merge 9 commits intoapache:masterfrom
MikeThomsen:NIFI-5248

Conversation

@MikeThomsen
Copy link
Contributor

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically master)?

  • Is your initial contribution a single, squashed commit?

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

@MikeThomsen MikeThomsen force-pushed the NIFI-5248 branch 2 times, most recently from bb8e474 to 692ba19 Compare August 31, 2018 13:49
@MikeThomsen
Copy link
Contributor Author

@mattyb149 Made the requested changes.

@mattyb149
Copy link
Contributor

Mind rebasing this one? I think you were waiting on the ES LookupService PR anyway, and now it's in :)

@MikeThomsen
Copy link
Contributor Author

One of the lookup integration tests is failing. I need to figure out why.

@MikeThomsen
Copy link
Contributor Author

@mattyb149 should be good to go now. Found the bug and pushed a fix that worked locally.

@MikeThomsen
Copy link
Contributor Author

@mattyb149 anything?

.required(true)
.build();

static final PropertyDescriptor OPERATION_RECORD_PATH = new PropertyDescriptor.Builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is specified, does the field remain in the document? What if the user didn't want to have that field included in the document, but needed it in order to specify the operation as a field?

What if the index operation were in an attribute versus a record field? Like if you partitioned records based on whether they were upsert vs delete operations (for example). Perhaps consider something like UpdateRecord's "Replacement Value Strategy" property, which allows you to choose whether the Operation field would be evaluated as a Record Path or a Literal Value. Both can support EL, but the latter would allow the setting of a single operation for all the records in the flow file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this capability is to be able to specify bulk operations. In ElasticSearch, you can specify the whole range of CRUD operations in a single bulk operation. That's what this is going for. So you can say "add this, this and this, delete this, upsert that and delete this thing too" in one push.

It has the potential to be quite useful to teams doing a lot of work with ElasticSearch because they will not have to segment changes into different flows, but can create a single flow that is able to take actions to be performed and bulk deliver them to ES through one pipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, early morning and coffee didn't kick in.

Yeah, those values should be removed and they are now. I moved to just purge nulls and empty strings because they're semantically meaningless AFAICT in Lucene and ES since they are truthy-false whether they don't exist, are null or empty. So all gone now if you specify the record path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still feels a little awkward to me for the case when you want a single operation. I like the idea of being able to specify a RecordPath for the case when you are doing a "bulk" operation but performing multiple types of operations inside it, but if I have a bunch of records I just want to update, I need to add a synthetic field to each with a value of update. If that field gets indexed/updated in Elasticsearch, then it's unnecessary for the single-operation case. If instead it gets removed before index/update, then if it wasn't synthetic (such as in a Change Data Capture case where the operation is an existing valid field that needs to be in the doc) then we are removing user data.

Not sure the best way to tackle, but it seems like something you'd want to just pick from, like "use this RecordPath and keep the field in the doc" or "use this value after evaluating EL". If the incoming data is JSON this might be a moot point since one of the other processors can probably handle the single-operation case, but if it can't or the data isn't JSON, you'd need an UpdateRecord first, to convert the data and/or add a synthetic operation field. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have to think about it. Might need to reevaluate a few things.

@MikeThomsen
Copy link
Contributor Author

@mattyb149 review changes are in.

@MikeThomsen
Copy link
Contributor Author

@mattyb149 I'll try to get some time set aside to get my head back into this commit. Thanks for picking it back up on review.

@MikeThomsen
Copy link
Contributor Author

New_Elastic_processor_tests.xml.txt

This is a simple test for them. I tested it against Elastic 6.6 using Docker

@AaronLeon
Copy link
Contributor

AaronLeon commented Feb 28, 2019

Regarding the REST client, I firmly believe the service should use the low-level REST client.

The major difference being that the high-level client is tied to major versions of Elasticsearch (see below). This means the implementation in this PR, which uses high-level client 5.6.x, is tied to Elasticsearch 5.X. In order to support ES 6.X and the newly released ES 7.X, we would need to create a new set of NARs, similar to how the transport client processors were organized (e.g. PutElasticsearch, PutElasticsearch5, etc.)

Using the low-level client requires some extra work as it does not help you build requests or unmarshall responses, but it is designed to have "compatibility with any Elasticsearch version". The good thing about the current code is that you have already created classes that encode requests and responses, e.g. IndexOperationRequest, IndexOperationResponse, which will make the change much easier.

Alternatively, these could be the first step to using the REST client, and then the PutElasticsearchHTTP processor can eventually be phased out to use the low-level client, providing a more flexible processor.

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-compatibility.html
https://www.elastic.co/blog/state-of-the-official-elasticsearch-java-clients

@MikeThomsen
Copy link
Contributor Author

MikeThomsen commented Feb 28, 2019

@AaronLeon I'm perfectly fine with using the low level client, with the caveat that I have not seen any information on how to detect the version of the cluster. That has validation implications because types are required in 5 and 6, but somewhere between 7 and 8 they won't be. In fact, they won't even exist by 8.

@AaronLeon
Copy link
Contributor

@MikeThomsen Good point about types; even a low-level client wouldn't truly eliminate the need for version-aware components as the user would still need to configure the "type" according to whether they are using ES 7.X+. But still there would be the benefit of not needing to create an Elasticsearch rest-api NAR per major version.

Although the code here was tested against ES 6.6, AFAIK the 5.6.x high-level client is not guaranteed to be compatible with 6.x+ clusters, so the implementation in this PR may or may not work with ES 6.x, 7.x or eventually 8.x. As long as this risk is understood, everything else checks out and I think is a good step to supporting ES long-term.

@MikeThomsen
Copy link
Contributor Author

We don't need to create multiple NARs right now because we can just throw a Maven profile in there that switches to the 6.X client.

@mattyb149 @JPercivall thoughts on this?

@JPercivall
Copy link
Contributor

Not 100% sure I understand the choices. Could you restate the options?

Ultimately my preference would be for the version of ES able to communicate with not being based on how the instance was built and instead just be able to choose the version by a processor property passed to designate the ES version but, that may be a misunderstanding of the options.

@MikeThomsen
Copy link
Contributor Author

Now that we have Elastic 7.0.0 to contend with, I think the issue has been forced on using the low-level client at least as much as possible.

@MikeThomsen
Copy link
Contributor Author

@AaronLeon They did introduce some breaking changes to their REST API in 7.X. Specifically in the update API. 7.X:

POST test/_update/1
{
    "doc" : {
        "name" : "new_name"
    },
    "doc_as_upsert" : true
}

6.7 and earlier:

POST update_test/default/3/_update
{
  "doc": {
    "name": "new_name"
  },
  "doc_as_upsert": true
}

Copy link
Contributor

@AaronLeon AaronLeon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small housekeeping details, but the new changes look good to me. Will be giving it a shot in one of our test environments.

@MikeThomsen
Copy link
Contributor Author

@mattyb149 @zenfenan @JPercivall could we restart the review and close this out?

}

@Test
void testErrorRelationship() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to test the non-happy path here, where the error record writer has either a bad and/or different schema (rather than using the same schema which is technically like inheriting it). Like what if it had an extra non-nullable field, does everything still work as expected (rollback vs. failure, e.g.)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is that likely to happen?

@MikeThomsen
Copy link
Contributor Author

@mattyb149 I think I get can get these done today. Broadly, my purpose in creating these was to give us a clean path forward to start walking lock step with Elastic on the evolution of their products by doing things like using their APIs, following the newer practice of using controller services to configure connections, etc. One key difference between the two record processors is that this one enables the user to combine together operations against an index beyond "index" type commands. You can, for instance, multiplex index, update and delete against the same index or really as many indices as you want with this because it really is just exposing the bulk api through our record api.

In the long run, I'd like to see the "REST API" bundle become the consolidated path forward with the others deprecated and supported only with emergency patches. As far as I know, they have none of the Elastic features we need like automatic master detection (both on start and change).

@MikeThomsen
Copy link
Contributor Author

MikeThomsen commented Jul 6, 2019

@mattyb149 gave it some thought, and I agree that schema inference makes the Json processor OBE at this point. I'll rip it out tonight so we can narrow this down.

@MikeThomsen
Copy link
Contributor Author

@mattyb149 I refactored the heck out of this one because I decided that what we really need is just a smooth put record processor for indexing documents using the new Elastic official APIs. So, I basically ripped out all of the operation stuff and have made it behave mostly like the existing PutElasticsearchHttpRecord with the caveat that it also supports batch operations.

Not quite ready for giving it another look, but wanted to ping you FYSA.

@MikeThomsen
Copy link
Contributor Author

Also, once this is done, I'll probably pivot to refactoring where needed to add a bulk delete by id processor and whatever is needed for an update processor. I have an update by query processor locally that I've been hacking on a bit, but that's not ready for inclusion yet.

MikeThomsen and others added 7 commits November 11, 2019 18:01
NIFI-5248 Fixed a few stray 1.7.0 references.
NIFI-5248 Removed build helper plugin.
NIFI-5248 Made changes requested in a review.
NIFI-5248 Updated dummy service.
NIFI-5248 Made a few changes from a code review.
NIFI-5248 Added logic for removing nulls so record paths can be removed when no longer needed.
NIFI-5248 Switched from variable registry to flowfile level EL.
NIFI-5248 Added JsonPath code to remove index, id and type path statements.
NIFI-5248 Updated validation.
NIFI-5248 Set the field to null instead of empty string when nulling records.
NIFI-5248 Fixed TestElasticSearchClientService.
NIFI-5248 Removed high level client and switched over to low level client for everything.
NIFI-5248 Added profiles for ES 6 and ES 7 integration testing.
NIFI-5248 Updated integration tests to support 5 and 6.
NIFI-5248 Fixed some style check breaks.
NIFI-5248 Added create operation type.
NIFI-5248 Updated documentation.
NIFI-5248 Added error handling to PutElasticsearchRecord.
NIFI-5248 Added error logging to PutElasticsearchJson.
NIFI-5248 Added split failed records option to PutElasticsearchJson.
NIFI-5248 Added documentation for PutElasticsearchRecord.
NIFI-5248 Updated import to not use * import.
NIFI-5248 Removed processor that is no longer relevant due to schema inference.
NIFI-5248 Renamed ElasticSearch instances to Elasticsearch where we can within API guidelines.
@MikeThomsen
Copy link
Contributor Author

@mattyb149 changes are all in now and Travis is passing.

@MikeThomsen
Copy link
Contributor Author

@mattyb149 I think we're good to go now.

@mattyb149 mattyb149 changed the title NIFI-5248 Added new Elasticsearch json and record processors. NIFI-5248 Added new Elasticsearch record processor Nov 14, 2019
@mattyb149
Copy link
Contributor

+1 LGTM, checked documentation and tested various settings in the PutDatabaseRecord processor. Thanks for the new feature! Merging to master

@asfgit asfgit closed this in 4c79ff0 Nov 19, 2019
patricker pushed a commit to patricker/nifi that referenced this pull request Jan 22, 2020
NIFI-5248 Fixed a few stray 1.7.0 references.
NIFI-5248 Removed build helper plugin.
NIFI-5248 Made changes requested in a review.
NIFI-5248 Updated dummy service.
NIFI-5248 Made a few changes from a code review.
NIFI-5248 Added logic for removing nulls so record paths can be removed when no longer needed.
NIFI-5248 Switched from variable registry to flowfile level EL.
NIFI-5248 Added JsonPath code to remove index, id and type path statements.
NIFI-5248 Updated validation.
NIFI-5248 Set the field to null instead of empty string when nulling records.
NIFI-5248 Fixed TestElasticSearchClientService.
NIFI-5248 Removed high level client and switched over to low level client for everything.
NIFI-5248 Added profiles for ES 6 and ES 7 integration testing.
NIFI-5248 Updated integration tests to support 5 and 6.
NIFI-5248 Fixed some style check breaks.
NIFI-5248 Added create operation type.
NIFI-5248 Updated documentation.
NIFI-5248 Added error handling to PutElasticsearchRecord.
NIFI-5248 Added error logging to PutElasticsearchJson.
NIFI-5248 Added split failed records option to PutElasticsearchJson.
NIFI-5248 Added documentation for PutElasticsearchRecord.
NIFI-5248 Updated import to not use * import.
NIFI-5248 Removed processor that is no longer relevant due to schema inference.
NIFI-5248 Renamed ElasticSearch instances to Elasticsearch where we can within API guidelines.

NIFI-5248 Added groovy-json test dependency.

NIFI-5248 Updated PutElasticsearchRecord to only do index operations.

NIFI-5248 Added batch size property and refactored the way relationships and properties are added.

NIFI-5248 Added batch processing support.

NIFI-5248 Updated error handling.

NIFI-5248 Updated to 1.11.0-SNAPSHOT.

NIFI-5248 Made changes requested in a code review.

NIFI-5248 Made a few more changes from a code review.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes apache#2861
natural pushed a commit to natural/nifi that referenced this pull request Feb 1, 2020
NIFI-5248 Fixed a few stray 1.7.0 references.
NIFI-5248 Removed build helper plugin.
NIFI-5248 Made changes requested in a review.
NIFI-5248 Updated dummy service.
NIFI-5248 Made a few changes from a code review.
NIFI-5248 Added logic for removing nulls so record paths can be removed when no longer needed.
NIFI-5248 Switched from variable registry to flowfile level EL.
NIFI-5248 Added JsonPath code to remove index, id and type path statements.
NIFI-5248 Updated validation.
NIFI-5248 Set the field to null instead of empty string when nulling records.
NIFI-5248 Fixed TestElasticSearchClientService.
NIFI-5248 Removed high level client and switched over to low level client for everything.
NIFI-5248 Added profiles for ES 6 and ES 7 integration testing.
NIFI-5248 Updated integration tests to support 5 and 6.
NIFI-5248 Fixed some style check breaks.
NIFI-5248 Added create operation type.
NIFI-5248 Updated documentation.
NIFI-5248 Added error handling to PutElasticsearchRecord.
NIFI-5248 Added error logging to PutElasticsearchJson.
NIFI-5248 Added split failed records option to PutElasticsearchJson.
NIFI-5248 Added documentation for PutElasticsearchRecord.
NIFI-5248 Updated import to not use * import.
NIFI-5248 Removed processor that is no longer relevant due to schema inference.
NIFI-5248 Renamed ElasticSearch instances to Elasticsearch where we can within API guidelines.

NIFI-5248 Added groovy-json test dependency.

NIFI-5248 Updated PutElasticsearchRecord to only do index operations.

NIFI-5248 Added batch size property and refactored the way relationships and properties are added.

NIFI-5248 Added batch processing support.

NIFI-5248 Updated error handling.

NIFI-5248 Updated to 1.11.0-SNAPSHOT.

NIFI-5248 Made changes requested in a code review.

NIFI-5248 Made a few more changes from a code review.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes apache#2861
@MikeThomsen MikeThomsen deleted the NIFI-5248 branch August 14, 2024 21:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants