Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-6926: Fixed memory leak in NiFiAtlasHook #3915

Closed
wants to merge 4 commits into from

Conversation

turcsanyip
Copy link
Contributor

@turcsanyip turcsanyip commented Dec 4, 2019

Thank you for submitting a contribution to Apache NiFi.

Please provide a short description of the PR here:

Description of PR

Enables X functionality; fixes bug NIFI-YYYY.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically master)?

  • Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not squash or use --force when pushing to allow for clean monitoring of changes.

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • Have you verified that the full build is successful on both JDK 8 and JDK 11?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

Comment on lines 60 to 61
notificationSender.send(messages, this::notifyEntities);
messages.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that the send method accepts a function makes me think that the send happens asynchronously. Are you sure that send is a blocking method? If not, clearing the messages will cause a ConcurrentModificationException

Copy link
Contributor Author

@turcsanyip turcsanyip Dec 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I've checked it now: the items from list are streamed / collected into other data structures at the beginning of send() and the list itself never used later, so it shouldn't be a problem.
Though, just to be on the safe side, I'll change it to create a new list after every commit.

@belugabehr
Copy link
Contributor

Hello @turcsanyip

I hope you will consider another solution:

https://github.com/belugabehr/nifi/blob/6c224dbf08731f6d90366911f3e0aaf24c5010bd/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java

I have implemented this change with a BlockingQueue because I believe adding to the collection and submitting it for processing can occur on different threads... it makes sense that it would: thread from repository is filling it and every N minutes, a new thread sends out the messages. Also allows one to re-use the queue (don't have to replace it every iteration).

@turcsanyip
Copy link
Contributor Author

@belugabehr Thanks for your suggestion and in a concurrent environment it would be perfect.
In our case both addMessage() and commitMessages() are called from the same place and on the same thread: ReportLineageToAtlas.consumeNiFiProvenanceEvents().
Although we could enhance it to support concurrent access as well, but NiFiAtlasHook explicitly declares itself as not being thread safe in its javadoc. That's why I would not add concurrency handling code to it and mix thread safe / non thread safe behaviours.

@belugabehr
Copy link
Contributor

@turcsanyip Thank you for the review! I actually liked your first solution of using clear() better because I think using immutable values are easier to work with when debugging. Sorry to nit, but could you please consider reverting back to using clear()? Perhaps you can make a copy of the list and pass the copy into the method before calling clear(), but if you're confident that the send method is synchronous, then you don't even need to make a copy of the list.

@belugabehr
Copy link
Contributor

@turcsanyip Also, does the list of messages need to be flushed in the close() method?

@turcsanyip
Copy link
Contributor Author

@belugabehr Passing a copy and clear the list sounds reasonable, pls. check my last commit.

Regarding flushing (calling commit) in close():
The reporting task stores the id of the last committed message in its persistent state and it happens after the commit returned.

if (!filteredEvents.isEmpty()) {
// Executes callback.
consumer.accept(componentMapHolder, filteredEvents);
}
firstEventId = updateLastEventId(rawEvents, stateManager);

So even if close() gets called with uncommitted messages in the list, the next execution of the reporting task will continue where it left off and reloads the uncommitted messages from the provenance repository and process them again. The at-least-once delivery is guaranteed.
If I understand correctly, this was your concern.

@belugabehr
Copy link
Contributor

belugabehr commented Dec 6, 2019

@turcsanyip Yes. You understand my concern. Thanks for the collaboration and running down all of my concerns. Looks good to me.

@asfgit asfgit closed this in ba6d050 Dec 9, 2019
@mcgilman
Copy link
Contributor

mcgilman commented Dec 9, 2019

Thanks @turcsanyip for the PR! Thanks @belugabehr for the review! This has been merged to master.

patricker pushed a commit to patricker/nifi that referenced this pull request Jan 22, 2020
NIFI-6926: Use new instance of list instead of clearing it
NIFI-6926: Logging the number of messages to be sent to Atlas.
NIFI-6926: Pass a copy of the messages list to send() and clear the original list.

This closes apache#3915
natural pushed a commit to natural/nifi that referenced this pull request Feb 1, 2020
NIFI-6926: Use new instance of list instead of clearing it
NIFI-6926: Logging the number of messages to be sent to Atlas.
NIFI-6926: Pass a copy of the messages list to send() and clear the original list.

This closes apache#3915
ekovacs pushed a commit to ekovacs/nifi-apache that referenced this pull request Feb 24, 2020
NIFI-6926: Use new instance of list instead of clearing it
NIFI-6926: Logging the number of messages to be sent to Atlas.
NIFI-6926: Pass a copy of the messages list to send() and clear the original list.

This closes apache#3915
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants