Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-8605 Adding a new property to enable/disable auto committing #5554

Closed
wants to merge 6 commits into from

Conversation

VibAruna
Copy link
Contributor

Description of PR

This fixes the bug of NIFI consuming large heap space when it queries large data sets (millions or billions of records) through postgresql JDBC driver.

According to the PostgreSQL JDBC driver document; https://jdbc.postgresql.org//documentation/head/query.html, driver collects all the results for the query at once by default. This behavior leads the ExecuteSQL and ExeccuteSQLRecord processors for a large heap usage. Caching a small amount of records (defined by fetch size) in client side of the connation and retrieving the next data block when exhausted is possible only if auto committing is set to false.

image

This PR adds a new property for above mentioned two processors to enable/disable auto-committing in the connection.

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically main)?

  • Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not squash or use --force when pushing to allow for clean monitoring of changes.

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • Have you verified that the full build is successful on JDK 8?
  • Have you verified that the full build is successful on JDK 11?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.

Copy link
Contributor

@mattyb149 mattyb149 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see unit test(s) around this capability, you should be able to verify the correct methods were called using Mockito.spy() with the Derby connection. At the very least they would verify illegal actions don't happen, like calling commit() or rollback() on an autocommit connection (see here and here).

One very important thing to note is that spy() is much slower than using the real object, in my IDE the current unit tests took 3x as long to run. I recommend creating another DBCPService stub that uses spy(), and only have the new tests use that one.

// Not all drivers support this, just log the error (at debug level) and move on
logger.debug("Cannot set fetch size to {} due to {}", new Object[]{fetchSize, se.getLocalizedMessage()}, se);
try (final Connection con = dbcpService.getConnection(fileToProcess == null ? Collections.emptyMap() : fileToProcess.getAttributes())) {
con.setAutoCommit(context.getProperty(AUTO_COMMIT).asBoolean());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If autocommit is set to false, then commit() must be called explicitly on the connection and I don't see that here. However some drivers throw an exception if commit() is called on an autocommit connection, so I recommend here that we save off the current value of autocommit, then call setAutoCommit(), then at the end of processing check autocommit and call commit() if it is false. Then we can restore the original value of autocommit as good housekeeping.

Also for the PostgreSQL case, what is expected of the code if any error occurs during processing? Usually for non-autocommit connections we'd call rollback() rather than commit(). Perhaps we do something similar to the above and check for autocommit in a catch() clause for the try that creates the connection, and if false call rollback().

Copy link
Contributor Author

@VibAruna VibAruna Dec 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mattyb149,

The change I have done only affects to 'ExecuteSqlRecord' and 'ExecuteSql' processors which are used to execute 'Select sqls'. This new feature to set or unset auto-commit is required to avoid these processors from consuming large heap space when they query large data sets (There may be some other drivers with this same behavior). Since no data writing happens from these processors, I believe we don't need to call commit() or rollback().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we don't need to call those, but we shouldn't need to set autocommit to do a SELECT anyway, PostgreSQL does that for its own transactional system. Having said that, I don't like the idea of leaving the transaction "open" at the end of processing. Does calling commit() and/or rollback() cause an issue with PostgreSQL and/or Derby (the latter for unit testing)? I would hope not, and if it isn't an issue I'd like to see the call to commit() or rollback(), just for consistency. I'll test with MySQL and Oracle to make sure there's nothing weird there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mattyb149,

I have updated the pull request with calling commit and added unit tests. Could you please review.

public static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder()
.name("auto commit")
.displayName("Set Auto Commit")
.description("Enables or disables the auto commit functionality of the DB connection. For some JDBC drivers, it is required to disable the auto committing "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see more documentation around this property to inform the user of the behavior they can expect. For your PostgreSQL example, you could use the doc you already have here, but in general you could describe it using the information here: https://docs.oracle.com/javase/tutorial/jdbc/basics/transactions.html or whatever.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the description. Please review.

@@ -169,6 +169,16 @@
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();

public static final PropertyDescriptor AUTO_COMMIT = new PropertyDescriptor.Builder()
.name("auto commit")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency can we name this esql-auto-commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the name. Please review.

@mattyb149
Copy link
Contributor

+1 LGTM, tested with PostgreSQL, MySQL, and Oracle, verified expected behavior. Thanks for the improvement! Merging to main

@mattyb149 mattyb149 closed this in 0ac8f1b Dec 30, 2021
asfgit pushed a commit that referenced this pull request Jan 12, 2022
…rocessors to enable/disable auto committing

change the default value of auto commit function to true
Changed the auto commit property name and add more details in the description
If the auto commit is set to false, commit() is called for consistency
adds unit tests
Fix the check style issue of having more than 200 characters in single line

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5554
krisztina-zsihovszki pushed a commit to krisztina-zsihovszki/nifi that referenced this pull request Jun 28, 2022
…rocessors to enable/disable auto committing

change the default value of auto commit function to true
Changed the auto commit property name and add more details in the description
If the auto commit is set to false, commit() is called for consistency
adds unit tests
Fix the check style issue of having more than 200 characters in single line

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes apache#5554
@abdelrahim-ahmad
Copy link

abdelrahim-ahmad commented Apr 13, 2023

Hi All,
I would appreciate your kind help if anyone knows how to enable autocommit in the PutDatabaseRecord process in Nifi. I have tried to do this many ways but still not working. I know that putSQL will work but I cannot use it as it's not optimized to insert small files into an Iceberg table in Trino.

`
PutDatabaseRecord

  • Caused by: io.trino.spi.TrinoException: Catalog only supports writes using autocommit: iceberg
    `

Thanks and best regards
Abdelrahim Ahmad

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants