Skip to content

Conversation

@turing85
Copy link
Contributor

@turing85 turing85 commented Feb 1, 2023

Rewrote JdbcOrphanLockAwareIdempotentRepository::insert to not rely on an exception. The exception caused problems with postgresql databases if autosave=always was not set.

- the initial exception will mark the transaction as dirty, blocking subsequent changes
- not all databases support rollback of DDL (e.g MySQL, older Oracle DBs)
@github-actions
Copy link
Contributor

github-actions bot commented Feb 1, 2023

🌟 Thank you for your contribution to the Apache Camel project! 🌟

⚠️ Please note that the changes on this PR may be tested automatically.

If necessary Apache Camel Committers may access logs and test results in the job summaries!

@github-actions
Copy link
Contributor

github-actions bot commented Feb 1, 2023

Components tested:

Total Tested Failed ❌ Passed ✅
1 1 0 1

@zhfeng
Copy link
Contributor

zhfeng commented Feb 2, 2023

@turing85 I can't understand

the initial exception will mark the transaction as dirty, blocking subsequent changes

What is the stack trace for the next creation sql?

@turing85
Copy link
Contributor Author

turing85 commented Feb 2, 2023

@zhfeng

2023-02-02 17:04:09,349 INFO  [org.apa.cam.qua.cor.CamelBootstrapRecorder] (Quarkus Main Thread) Bootstrap runtime: org.apache.camel.quarkus.main.CamelMainRuntime
2023-02-02 17:04:09,359 INFO  [org.apa.cam.mai.MainSupport] (Quarkus Main Thread) Apache Camel (Main) 3.20.1 is starting
2023-02-02 17:04:09,603 INFO  [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Apache Camel 3.20.1 (camel-1) is starting
2023-02-02 17:04:09,820 ERROR [org.apa.cam.pro.ide.jdb.JdbcMessageIdRepository] (Quarkus Main Thread) Can't create table for JdbcMessageIdRepository with query 'CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP, PRIMARY KEY (processorName, messageId))' because of: StatementCallback; uncategorized SQLException for SQL [CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP, PRIMARY KEY (processorName, messageId))]; SQL state [25P02]; error code [0]; ERROR: current transaction is aborted, commands ignored until end of transaction block; nested exception is org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block. This may be a permissions problem. Please create this table and try again.
2023-02-02 17:04:09,821 ERROR [org.apa.cam.imp.eng.AbstractCamelContext] (Quarkus Main Thread) Error starting CamelContext (camel-1) due to exception thrown: Failed to start route file-mover because of null: org.apache.camel.FailedToStartRouteException: Failed to start route file-mover because of null
	at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:123)
	at org.apache.camel.impl.engine.InternalRouteStartupManager.doWarmUpRoutes(InternalRouteStartupManager.java:306)
	at org.apache.camel.impl.engine.InternalRouteStartupManager.safelyStartRouteServices(InternalRouteStartupManager.java:189)
	at org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRoutes(InternalRouteStartupManager.java:147)
	at org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:3425)
	at org.apache.camel.impl.engine.AbstractCamelContext.doStartContext(AbstractCamelContext.java:3094)
	at org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:3049)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
	at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2698)
	at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:262)
	at org.apache.camel.quarkus.main.CamelMain.doStart(CamelMain.java:94)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
	at org.apache.camel.quarkus.main.CamelMain.startEngine(CamelMain.java:140)
	at org.apache.camel.quarkus.main.CamelMainRuntime.start(CamelMainRuntime.java:49)
	at org.apache.camel.quarkus.core.CamelBootstrapRecorder.start(CamelBootstrapRecorder.java:45)
	at io.quarkus.deployment.steps.CamelBootstrapProcessor$boot173480958.deploy_0(Unknown Source)
	at io.quarkus.deployment.steps.CamelBootstrapProcessor$boot173480958.deploy(Unknown Source)
	at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
	at io.quarkus.runtime.Application.start(Application.java:101)
	at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:108)
	at io.quarkus.runtime.Quarkus.run(Quarkus.java:71)
	at io.quarkus.runtime.Quarkus.run(Quarkus.java:44)
	at io.quarkus.runtime.Quarkus.run(Quarkus.java:124)
	at io.quarkus.runner.GeneratedMain.main(Unknown Source)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at io.quarkus.runner.bootstrap.StartupActionImpl$1.run(StartupActionImpl.java:104)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.camel.RuntimeCamelException: org.springframework.jdbc.UncategorizedSQLException: StatementCallback; uncategorized SQLException for SQL [CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP, PRIMARY KEY (processorName, messageId))]; SQL state [25P02]; error code [0]; ERROR: current transaction is aborted, commands ignored until end of transaction block; nested exception is org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block
	at org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException(RuntimeCamelException.java:51)
	at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:67)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:113)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:130)
	at org.apache.camel.impl.engine.DefaultChannel.doStart(DefaultChannel.java:126)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:113)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:116)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:130)
	at org.apache.camel.processor.Pipeline.doStart(Pipeline.java:207)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:113)
	at org.apache.camel.support.processor.DelegateAsyncProcessor.doStart(DelegateAsyncProcessor.java:89)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:113)
	at org.apache.camel.impl.engine.RouteService.startChildServices(RouteService.java:396)
	at org.apache.camel.impl.engine.RouteService.doWarmUp(RouteService.java:193)
	at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:121)
	... 29 more
Caused by: org.springframework.jdbc.UncategorizedSQLException: StatementCallback; uncategorized SQLException for SQL [CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP, PRIMARY KEY (processorName, messageId))]; SQL state [25P02]; error code [0]; ERROR: current transaction is aborted, commands ignored until end of transaction block; nested exception is org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block
	at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1542)
	at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:393)
	at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:431)
	at org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository$1.doInTransaction(JdbcMessageIdRepository.java:92)
	at org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository$1.doInTransaction(JdbcMessageIdRepository.java:81)
	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
	at org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository.doStart(JdbcMessageIdRepository.java:81)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:113)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:130)
	at org.apache.camel.processor.idempotent.IdempotentConsumer.doStart(IdempotentConsumer.java:220)
	at org.apache.camel.support.service.BaseService.start(BaseService.java:119)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:113)
	at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:130)
	at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler.doStart(RedeliveryErrorHandler.java:1670)
	at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:60)
	... 45 more
Caused by: org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356)
	at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413)
	at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:333)
	at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:319)
	at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:295)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:290)
	at io.agroal.pool.wrapper.StatementWrapper.execute(StatementWrapper.java:235)
	at org.springframework.jdbc.core.JdbcTemplate$1ExecuteStatementCallback.doInStatement(JdbcTemplate.java:422)
	at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:381)
	... 59 more
Caused by: org.postgresql.util.PSQLException: ERROR: relation "camel_messageprocessed" does not exist
  Position: 15
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356)
	at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413)
	at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:333)
	at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:319)
	at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:295)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:290)
	at io.agroal.pool.wrapper.StatementWrapper.execute(StatementWrapper.java:235)
	at org.springframework.jdbc.core.JdbcTemplate$1ExecuteStatementCallback.doInStatement(JdbcTemplate.java:422)
	at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:381)
	at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:431)
	at org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository$1.doInTransaction(JdbcMessageIdRepository.java:86)
	... 57 more

Reproducer:

@turing85
Copy link
Contributor Author

turing85 commented Feb 2, 2023

An alternative solution would be to:

  • keep the transaction,
  • create a savepoint before the getTableExistsString()-query is executed,
  • restore the savepoint at the beginning of the catch-block, and
  • release the savepoint in a finally-block

This would look something like this (gist.github.com).

@zhfeng
Copy link
Contributor

zhfeng commented Feb 3, 2023

Thanks @turing85 for sharing and I will check it.

@turing85
Copy link
Contributor Author

turing85 commented Feb 5, 2023

@zhfeng this got a lot more complicated. As soon as we set the route .transacted(), we get exceptions stating that:

...
Caused by: java.sql.SQLException: Attempting to commit while taking part in a transaction
...

This is due to the fact that we try to commit a local transaction (through transactionManager.execute(...)) while we are in a global transaction. If we remove the transactionManager.execute(...) but keep the body, we get the original exception (because we are in a global transaction). We'd need to somehow decouple the exeuctions in the idempotency repository from the global transaction.

I haven't checked yet if this also affects other components, but I suspect it does. This seems to be a general issue.

@zhfeng
Copy link
Contributor

zhfeng commented Feb 6, 2023

Hi @turing85

I can re-producer the issue with postgresql on my local machine. But I also tried with h2 and mysql database, and both of them work. Can you verify it?

It only needs to change quarkus-jdbc-postgresql to quarkus-jdbc-h2(mysql) and related db_kind in application.yml and only remain %dev% configuration.

mvn quarkus:dev should start a database devservice.

I think this might be a limitation on postgresql and we need more investigation before removing this transaction block.

@zhfeng
Copy link
Contributor

zhfeng commented Feb 6, 2023

Just get something from https://stackoverflow.com/questions/10399727/psqlexception-current-transaction-is-aborted-commands-ignored-until-end-of-tra

It confirms that this is an expected behavior on postgresql and also mentions autosave=always option.

@zhfeng
Copy link
Contributor

zhfeng commented Feb 6, 2023

OK, I tried the following configurations and it can work with postgresql.

"%dev":
  quarkus:
    datasource:
      db-kind: postgresql
      devservices:
        properties:
          autosave: always
    flyway:
      migrate-at-start: false

@zhfeng
Copy link
Contributor

zhfeng commented Feb 6, 2023

@turing85 I also investigate transacted() issue and it seems that you need to set TransactionTemplate when creating JdbcMessageIdRepository

  1. Add came-quarkus-jta in the pom.xml which can leverage the quarkus-narayana-jta to provide the JTA transaction support.
  2. Make some changes in Route.java to add TransactionTemplate
 public Route(@SuppressWarnings("CdiInjectionPointsInspection") DataSource dataSource,
              @SuppressWarnings("CdiInjectionPointsInspection") TransactionManager transactionManager) {
    TransactionTemplate template = new TransactionTemplate(new JtaTransactionManager(transactionManager));
    idempotentRepository =
        constructRepository(dataSource, template);
  }

  private static JdbcMessageIdRepository constructRepository(DataSource dataSource, TransactionTemplate template) {
    final JdbcMessageIdRepository repository =
        new JdbcMessageIdRepository(dataSource, template, ROUTE_ID);
    repository.setCreateTableIfNotExists(true);
    return repository;
  }

I test it locally and it should work with transacted.

@zhfeng
Copy link
Contributor

zhfeng commented Feb 6, 2023

I would say this is a great example for using JdbcMessageIdRepository and transaction. Can you get it in https://github.com/apache/camel-quarkus-examples/ if possible?

Thanks a lot!

@turing85
Copy link
Contributor Author

turing85 commented Feb 6, 2023

Hi @turing85

I can re-producer the issue with postgresql on my local machine. But I also tried with h2 and mysql database, and both of them work. Can you verify it?

It only needs to change quarkus-jdbc-postgresql to quarkus-jdbc-h2(mysql) and related db_kind in application.yml and only remain %dev% configuration.

mvn quarkus:dev should start a database devservice.

I think this might be a limitation on postgresql and we need more investigation before removing this transaction block.

@zhfeng I can confirm that the issue does not occur with h2.

@zhfeng
Copy link
Contributor

zhfeng commented Feb 6, 2023

@turing85 is it OK to close this PR and maybe we can add a NOTE with postgresql database in camel-sql and make some improvement on doc?

@turing85
Copy link
Contributor Author

turing85 commented Feb 6, 2023

@zhfeng I think it is not only postgres; db2 might also be affected.
What exactly does autosave: always do? I am a little bit concerned that the server has to be reconfigured for this to work. Wouldn't the proper way be to rewrite the repositories to not use queries that could throw a SQLException?

@zhfeng
Copy link
Contributor

zhfeng commented Feb 6, 2023

Please check https://jdbc.postgresql.org/documentation/use/ for autosave=always.

Specifies what the driver should do if a query fails. 
In autosave=always mode, JDBC driver sets a savepoint before each query, and rolls back to that savepoint in case of failure. 
In autosave=never mode (default), no savepoint dance is made ever. 
In autosave=conservative mode, savepoint is set for each query, however the rollback is done only for rare cases like ‘cached statement cannot change return type’ or ‘statement XXX is not valid’ so JDBC driver rolls back and retries

@zhfeng
Copy link
Contributor

zhfeng commented Feb 6, 2023

Wouldn't the proper way be to rewrite the repositories to not use queries that could throw a SQLException?

It could be good but I wonder if tableExistsSql is db vendor specific?

@turing85
Copy link
Contributor Author

turing85 commented Feb 6, 2023

Wouldn't the proper way be to rewrite the repositories to not use queries that could throw a SQLException?

It could be good but I wonder if tableExistsSql is db vendor specific?

This query is vendor-specific. But this part does not seem to be coupled to the global transaction manager, hence we might be in the clear here. The critical part is the insert(...) that might produce a SqlException if the entry already exists.

@zhfeng
Copy link
Contributor

zhfeng commented Feb 6, 2023

If I uderstand correctly, you were wanting to seperate the transaction in insert(...) method from the global transaction?

@turing85
Copy link
Contributor Author

turing85 commented Feb 6, 2023

Either this or rewrite insert(...) so it does not use queries that could throw (i.e. check whether an entry is present; if so UPDATE ...; if not INSERT ....). Right now, I tend to prefer the rewrite of insert(...) since this would be db-vendor-agnostic.

@turing85 turing85 changed the title Remove Transaction Rewrite JdbcOrphanLockAwareIdempotentRepository::insert to not rely on exception Feb 8, 2023
@turing85
Copy link
Contributor Author

turing85 commented Feb 8, 2023

@zhfeng I revised tie PR:

  • I reverted the changes to JdbcMessageIdRepository since they still caused issues on postgres; I think this is unavoidable without setting autosave=always, either on database- or on datasource-level.
  • I rewrote the JdbcOrphanLockAwareIdempotentRepository::insert to not rely on an exception. This allows using the repository, even on postgres, without setting autosave=always at the cost of an additional query.
  • I renamed the PR accordingly

With those changes and your comment about the TransactionTemplate, everything seems to work as expected.

@github-actions
Copy link
Contributor

github-actions bot commented Feb 9, 2023

Components tested:

Total Tested Failed ❌ Passed ✅
1 1 1 0

@zhfeng
Copy link
Contributor

zhfeng commented Feb 9, 2023

@turing85 Well done! - I will have a look today.

@github-actions
Copy link
Contributor

github-actions bot commented Feb 9, 2023

Components tested:

Total Tested Failed ❌ Passed ✅
1 1 0 1

@davsclaus
Copy link
Contributor

Can we also have the catch duplicate exception still just in case it still happens in some of the other databases ?

@zhfeng
Copy link
Contributor

zhfeng commented Feb 9, 2023

@davsclaus

I rewrote the JdbcOrphanLockAwareIdempotentRepository::insert to not rely on an exception.

I belive that there will be no DuplicationException in insert and just check if there is an exist lock in table at first.

@davsclaus
Copy link
Contributor

@davsclaus

I rewrote the JdbcOrphanLockAwareIdempotentRepository::insert to not rely on an exception.

I belive that there will be no DuplicationException in insert and just check if there is an exist lock in table at first.

Ok perfect

@zhfeng
Copy link
Contributor

zhfeng commented Feb 9, 2023

@turing85 One more question:

If I understand correctly, this insertSqlString is vendor-specific, for Mysql there is insert-on-duplicate statment. I just wonder if this is much better for inserting?

@turing85
Copy link
Contributor Author

@turing85 One more question:

If I understand correctly, this insertSqlString is vendor-specific, for Mysql there is insert-on-duplicate statment. I just wonder if this is much better for inserting?

@zhfeng Hmm... we could use SQL:2003's MERGE (which is effectively an upsert, have to play around with this a little bit to figure out how to use it with literals). But this does not seem widely adopted, e.g. MySQL does not support SQL:2003. And the general question is if we want to bind this implementation to such a "young" standard. This would eliminate the possibility to use it with older database systems/versions.

@turing85
Copy link
Contributor Author

turing85 commented Feb 10, 2023

@zhfeng An upsert based on MERGE would look something like this (in psql, haven't checked whether this works in other databases):

MERGE INTO CAMEL_MESSAGEPROCESSED AS saved
USING (
  VALUES (?, ?, CURRENT_TIMESTAMP)
) AS param (
  messageId,
  createdat
)
ON (
  saved.processorname = param.processorname AND
  saved.messageId = param.messageId)
WHEN NOT MATCHED THEN
  INSERT (processorname, messageId, createdat)
  VALUES (param.processorname, param.messageId, param.createdat)
WHEN MATCHED THEN
  UPDATE SET createdat = param.createdat, done = false;

I am not sure if this is a good user experience; those queries are complex...

@davsclaus
Copy link
Contributor

I think this PR is good as-is. And agree its better to support existing databases as-is.
And for users that want to try other queries can specify custom SQL to use, or extend this and implement their own version.

We had SQL for many many decades and UPSERT is not common practice and every database is a bit special.

@turing85
Copy link
Contributor Author

@davsclaus should I then squash and push again, or will you guys squash on merge?

@davsclaus
Copy link
Contributor

github has a squash and merge button that we use

@zhfeng zhfeng merged commit 6d56647 into apache:main Feb 10, 2023
@zhfeng
Copy link
Contributor

zhfeng commented Feb 10, 2023

Thanks @turing85 for your contributions!

@turing85
Copy link
Contributor Author

@zhfeng you're welcome 🙂 I'll add the example when I have some spare time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants