Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-11111 add option to output Elasticsearch error responses as FlowFile to PutElasticsearchJson and PutElasticsearchRecord #6903

Closed
wants to merge 4 commits into from

Conversation

ChrisSamo632
Copy link
Contributor

@ChrisSamo632 ChrisSamo632 commented Jan 29, 2023

Summary

NIFI-11111 add option to output Elasticsearch error responses as FlowFile to PutElasticsearchJson and PutElasticsearchRecord

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 8
    • JDK 11
    • JDK 17

Licensing

  • [ ] New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • [ ] New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • [ ] Documentation formatting appears as expected in rendered files

@ChrisSamo632 ChrisSamo632 force-pushed the NIFI-11111 branch 2 times, most recently from 653d4b3 to 7976f05 Compare February 5, 2023 17:02
@ChrisSamo632 ChrisSamo632 force-pushed the NIFI-11111 branch 2 times, most recently from 862428e to cc9622f Compare February 10, 2023 21:13
try (final OutputStream errorsOutputStream = session.write(errorResponsesFF)) {
errorMapper.writeValue(errorsOutputStream, errorResponses);

errorResponsesFF = session.putAttribute(errorResponsesFF, "elasticsearch.put.error.count", String.valueOf(errorResponses.size()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested this live? I've had problems with this in the past where the ProcessSession didn't want to update the attributes while the OutputStream was open.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seemed to work OK when I tried it, but should be able to move this outside of the try-with-resources if you've seen it cause problems in the past

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll test it out locally to verify. I think I had that problem on 1.13, so it may be fixed by now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be fair, looking at the code it would make more sense for the transfer and attributes calls to be outside of the try-with-resources, so I've made the change and will push it once all the tests have passed locally 👍

@ChrisSamo632
Copy link
Contributor Author

@mattyb149 I believe I addressed your comments

@MikeThomsen do the updates help with clarity around the transfer of FlowFiles/assignment of attributes?

@mattyb149
Copy link
Contributor

Sorry I lost track of this, @MikeThomsen I'm good if you are

@davis-anthony
Copy link

I've been struggling with the various PutElasticsearch processors and their different handling of responses/errors for a while now, and nothing really does what I would expect it to do.

The only one that kind of works is PutElasticsearchHttp because at least that one places the "reason" attribute on the flowfile, but it doesn't handle all types of errors and it makes a bunch of assumptions as to what is retryable.

What I really need is for the original documents that fail on a bulk index/create to be routed to failure or error and have the error message and status code added as flowfile attributes. This would enable custom routing logic based on status code and/or error message. I can't tell if that's what this PR is attempting to do. Is that what this PR does or should I post a new feature request for this?

If it's just sending the error response documents as separate flowfiles I can see how some developers might find that useful, but it doesn't really help with rerouting the original document and providing error information.

@ChrisSamo632
Copy link
Contributor Author

@davis-anthony this PR if/when Approved (@MikeThomsen / @mattyb149 ) was not going to introduce such behaviour, but it does make sense for PutElasticsearchJson so I've added a new elasticsearch.bulk.error attribute for files being sent to the errors relationship - this will contain the _bulk API's response for the document if Elasticsearch has marked it as errored (and if it's not_found if you set the Treat "Not Found" as Success to false)

It's not as simple for PutElasticsearchRecord because the errors output FlowFile may contain multiple records (each being a document sent to Elasticsearch). So we'd either be serialising all errors into a single attribute that could be huge (and probably break attribute value limits) or adding an attribute for every single record, which would cause memory issues in NiFi. An alternative would be to produce a single errors FlowFile for every errored Record from the input FlowFile, which again would cause performance problems in NiFi if you're trying to process large amounts of Records (which is the big benefit of Record-based processors, e.g. millions of records within a single file).

A flow I've used before for handling things like errors from a record processor such as PutElasticsearchRecord is to send the errors to PutDistributedMapCache keyed on the document _id (which is in the error response from Elasticsearch) and then using the FetchDistributedMapCache or LookupAttribute to enrich each of the records in the PutElasticsearchRecord output in the cases where there's an _id match - it's a bit fidly and could require splitting FlowFiles by Record (which again brings us back to the performance hit mentioned above).

Notes:

  • PutElasticsearchHttp (and PutElasticsearchHttpRecord) are deprecated is recent 1.x versions of NiFi and will be removed in NiFi 2.x
  • the elasticsearch.put.error attribute for both PutElasticsearchJson and PutElasticsearchRecord are used for general Elasticsearch connection error reporting, e.g. if the Elasticsearch instance/cluster cis not found or authentication/authorisation fails, etc., and FlowFile processing, e.g. if the content of the FlowFile sent to PutElastichsearchJson can't be parsed as a JSON object

@ChrisSamo632 ChrisSamo632 force-pushed the NIFI-11111 branch 2 times, most recently from ac9454c to 8aafcfb Compare April 19, 2023 17:36
…_bulk response for error documents in PutElasticsearchJson
@davis-anthony
Copy link

@ChrisSamo632 Thanks for the explanation and for helping me understand the issues with memory. Yes that makes a lot of sense and after reading the code I see that the processor makes every effort to just pass the documents to the various relationships. So with that in mind I propose that we make these processors behave like the RouteOnAttribute processor where the developer can define additional relationships based on expected error types. So that the processor can simply compare the error type for each item in the collection and route to the various relationships.

For example, I might define the following:

conflict -> version_conflict_engine_exception
schema -> mapper_parsing_exception
blocked -> cluster_block_exception

Then the following relationships could be routed in the flow: success, error, retry, failure, conflict, schema, blocked.

The result would be that errors that match the 3 types are routed to 3 custom routes. The remaining errors and other error routes would operate as they do now.

The only flaw I can think of here is that there is no definitive list of error types according to the following post, and so it takes quite a bit of sleuthing to find the possible errors and they are likely subject to change since there is no list.
https://discuss.elastic.co/t/bulk-api-item-error-types/242639

@ChrisSamo632
Copy link
Contributor Author

@davis-anthony that would definitely warrant a new Jira ticket to make such a change I think.

Some sort of "half-way house" approach may be more appropriate, e.g. like PartitionRecord where errors Records are grouped together into a FlowFile coming from PutElasticsearchRecord based upon the type of error indicated by the Elasticsearch response, with the type added to the FlowFile as the elasticsearch.bulk.error attribute, something like that might be helpful and hopefully avoid splitting FlowFiles up too much. This would also be an optional thing to enable in the processor.

Feel free to raise a ticket for further discussion.

@ChrisSamo632
Copy link
Contributor Author

ChrisSamo632 commented Apr 24, 2023

NIFI-11480 for the above ☝️ discussion, FYI @davis-anthony

static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All flowfiles that succeed in being transferred into Elasticsearch go here. " +
"Documents received by the Elasticsearch _bulk API may still result in errors on the Elasticsearch side. " +
"The Elasticsearch response will need to be examined to determine whether any Document(s)/Record(s) resulted in errors.")
.build();

static final Relationship REL_ERROR_RESPONSES = new Relationship.Builder()
.name("error_responses")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be autoterminated because it changes the behavior.

@asfgit asfgit closed this in 05418d9 Apr 25, 2023
asfgit pushed a commit that referenced this pull request Apr 25, 2023
…File to PutElasticsearchJson and PutElasticsearchRecord

NIFI-11111 clarify error_responses relationships in PutElasticsearchJson/Record processors
NIFI-11111 Refactor exception handling for error response flowfile transfer
NIFI-11111 Add elasticsearch.bulk.error attributes containing the Elasticsearch _bulk response for error documents in PutElasticsearchJson

This closes #6903

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
@ChrisSamo632 ChrisSamo632 deleted the NIFI-11111 branch May 2, 2023 16:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants