Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit failures array #2425

Closed
wants to merge 2 commits into from
Closed

Emit failures array #2425

wants to merge 2 commits into from

Conversation

martin-mucha
Copy link

2 patches:
1st is proposal on small refactoring of ValidateRecord, which should significantly simplify inner structure
2nd is introduction of new property; if set, array of validation errors will be stored in FlowFile attribute named accordingly to new property value.

all potential future expansion of this class is made pointlessly
harder, due to its questionable structure, therefore I decided to
provide refactor of it

Signed-off-by: Martin Mucha <alfonz19@gmail.com>
added new property. If set, array of failures will be stored into
attribute named according to value of this property

Signed-off-by: Martin Mucha <alfonz19@gmail.com>
Copy link
Contributor

@markap14 markap14 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @martin-mucha thanks for the contribution! I do think it's a good idea to include the information about why records were routed to 'invalid' as an attribute. Doing that means that we can also remove it from the Provenance Event details, as well, since the Provenance Event will contain all of the attributes also.

That said, I think we need to use a different approach to handling this. This PR does some major refactoring and buffers the records into the Java heap, which can be a show stopper for large FlowFiles. We need to ensure that we always handle this data in a streaming fashion, only holding 1 record (or a couple of records) in memory at a time.

FlowFile validFlowFile = null;
FlowFile invalidFlowFile = null;

final List<Record> validRecords = new LinkedList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be sure that we are not storing collections of records in heap but rather that we are writing them out in a streaming fashion. One of the goals of the record API is to allow arbitrarily large FlowFiles that are made up of small records. So if we have a 1 GB CSV file, for instance, this would result in OutOfMemoryError's very quickly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, but one question. I did all this refactoring to get rid of 'surprising' complexity of code. Now, if I do "writer.write(record);" given record won't be held in heap before completeFlowFile is called? Where is the FlowFile stored until 'completed'? If it's held outside of heap, then all this refactoring is invalid, indeed. If it's also in heap ...

Copy link
Member

@ijokarumawak ijokarumawak Feb 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @martin-mucha
Let me try to answer your question. @markap14 will correct me if I'm wrong :)

ValidateRecord.completeFlowFile method calls writer.finishRecordSet(), which let the writer to write the ending mark of record set, as some record format requires this, e.g. JSON } or XML </root> would be easy to imagine. Actual bytes for record contents had been written in advance.

I'd recommend reading NiFi in depth, Content Repository on how NiFi reads/writes FlowFile content in streaming manner without loading whole content on heap.

If you're interested in reading code, StandardProcessSession.write might be a good starting point for how FlowFile and its OutputStream is created.

And the OutputStream is passed to RecordSetWriter implementations. For example, when a processor writes a record, then it is sent to a method of a configured RecordSetWriter like this,
WriteCSVResult.writeRecord.

These RecordSetWriter does not hold contents on heap. They write records in streaming manner.

If we create a List and hold Record instances, then we keep content on heap as Record instances which can lead to a OOM.

Hope this helps!

private static final ObjectMapper objectMapper = new ObjectMapper();


private static Validator createAttributeNameValidator() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this validator is needed. Attribute names can be anything except for null and empty string. There does exist a validator for this already: StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR.


static final PropertyDescriptor ATTRIBUTE_NAME_TO_STORE_FAILURE_DESCRIPTION = new PropertyDescriptor.Builder()
.name("emit-failure-description-property")
.displayName("Variable Describing Parse Failure")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the word 'attribute' here, rather than 'variable', as these have different meanings in the context of NiFi.

@github-actions
Copy link

We're marking this PR as stale due to lack of updates in the past few months. If after another couple of weeks the stale label has not been removed this PR will be closed. This stale marker and eventual auto close does not indicate a judgement of the PR just lack of reviewer bandwidth and helps us keep the PR queue more manageable. If you would like this PR re-opened you can do so and a committer can remove the stale tag. Or you can open a new PR. Try to help review other PRs to increase PR review bandwidth which in turn helps yours.

@github-actions github-actions bot added the Stale label May 12, 2021
@github-actions github-actions bot closed this May 27, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants