diff --git a/components/camel-jta/pom.xml b/components/camel-jta/pom.xml index 9bdb5f9c52fae..9fed72e22c554 100644 --- a/components/camel-jta/pom.xml +++ b/components/camel-jta/pom.xml @@ -53,5 +53,68 @@ jakarta.transaction-api ${jakarta-transaction-api-version} + + + org.apache.camel + camel-core + test + + + org.apache.camel + camel-sql + test + + + org.apache.camel + camel-test-infra-postgres + ${project.version} + test + + + org.postgresql + postgresql + ${pgjdbc-driver-version} + test + + + + + org.jboss.narayana.jta + narayana-jta-jakarta + ${narayana-version} + test + + + org.jboss + jboss-transaction-spi-jakarta + ${jboss-transaction-spi-version} + test + + + org.jboss.logging + jboss-logging + ${jboss-logging-version} + test + + + + + io.agroal + agroal-pool + ${agroal-version} + test + + + io.agroal + agroal-narayana + ${agroal-version} + test + + + + org.junit.jupiter + junit-jupiter + test + diff --git a/components/camel-jta/src/main/java/org/apache/camel/jta/TransactionErrorHandler.java b/components/camel-jta/src/main/java/org/apache/camel/jta/TransactionErrorHandler.java index 2958309726fac..bdee9c0d97daf 100644 --- a/components/camel-jta/src/main/java/org/apache/camel/jta/TransactionErrorHandler.java +++ b/components/camel-jta/src/main/java/org/apache/camel/jta/TransactionErrorHandler.java @@ -18,7 +18,9 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import jakarta.transaction.TransactionRolledbackException; @@ -58,6 +60,7 @@ public class TransactionErrorHandler extends ErrorHandlerSupport private JtaTransactionPolicy transactionPolicy; private final String transactionKey; private final LoggingLevel rollbackLoggingLevel; + private final Set inflightTransactedExchanges = ConcurrentHashMap.newKeySet(); /** * Creates the transaction error handler. @@ -133,6 +136,7 @@ protected void processInTransaction(final Exchange exchange) { try { // mark the beginning of this transaction boundary exchange.getUnitOfWork().beginTransactedBy(transactionKey); + inflightTransactedExchanges.add(exchange); // do in transaction logTransactionBegin(redelivered, ids); doInTransactionTemplate(exchange); @@ -145,6 +149,7 @@ protected void processInTransaction(final Exchange exchange) { exchange.setException(e); logTransactionRollback(redelivered, ids, e, false); } finally { + inflightTransactedExchanges.remove(exchange); // mark the end of this transaction boundary exchange.getUnitOfWork().endTransactedBy(transactionKey); } @@ -196,6 +201,13 @@ public void run() throws Throwable { // and now let process the exchange by the error handler processByErrorHandler(exchange); + // if forced shutdown is in progress, mark the exchange for rollback + if (preparingShutdown) { + LOG.debug("Forced shutdown in progress, marking exchange for rollback: {}", + exchange.getExchangeId()); + exchange.setRollbackOnly(true); + } + // after handling and still an exception or marked as rollback // only then rollback if (exchange.getException() != null || exchange.isRollbackOnly()) { @@ -346,5 +358,14 @@ public void prepareShutdown(boolean suspendOnly, boolean forced) { // prepare for shutdown, eg do not allow redelivery if configured LOG.trace("Prepare shutdown on error handler {}", this); preparingShutdown = true; + if (forced) { + // mark all in-flight transacted exchanges for rollback so the transaction + // is rolled back before the connection pool is destroyed during shutdown + for (Exchange exchange : inflightTransactedExchanges) { + LOG.debug("Marking in-flight transacted exchange for rollback due to forced shutdown: {}", + exchange.getExchangeId()); + exchange.setRollbackOnly(true); + } + } } } diff --git a/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerGracePeriodShutdownIT.java b/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerGracePeriodShutdownIT.java new file mode 100644 index 0000000000000..64ad1bc97e70a --- /dev/null +++ b/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerGracePeriodShutdownIT.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.jta; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import jakarta.transaction.Status; +import jakarta.transaction.TransactionManager; + +import io.agroal.api.AgroalDataSource; +import io.agroal.api.configuration.supplier.AgroalDataSourceConfigurationSupplier; +import io.agroal.api.security.NamePrincipal; +import io.agroal.api.security.SimplePassword; +import io.agroal.narayana.NarayanaTransactionIntegration; +import org.apache.camel.CamelContext; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.sql.SqlComponent; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.test.infra.postgres.services.PostgresLocalContainerService; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.postgresql.ds.PGSimpleDataSource; +import org.postgresql.xa.PGXADataSource; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration test that mimics a production scenario where a transacted route performs a SQL operation followed by a + * long-running step (e.g., a stored procedure call or a delay that exceeds the shutdown grace period). + * + *

+ * The key behavior being tested: when the shutdown grace period expires and forced shutdown is triggered, the + * {@link TransactionErrorHandler} marks the in-flight exchange as {@code rollbackOnly}. After the delay/procedure + * completes, the exchange finishes processing without an exception, but the {@code rollbackOnly} flag causes the + * transaction to be rolled back instead of committed. Without the fix, the INSERT would be committed. + */ +public class TransactionErrorHandlerGracePeriodShutdownIT { + + @RegisterExtension + static PostgresLocalContainerService postgres = new PostgresLocalContainerService(); + + private CamelContext camelContext; + private AgroalDataSource agroalDataSource; + private PGSimpleDataSource plainDataSource; + private TransactionManager tm; + + private final CountDownLatch firstInsertDone = new CountDownLatch(1); + private final CountDownLatch delayStarted = new CountDownLatch(1); + private final CountDownLatch releaseLatch = new CountDownLatch(1); + + @BeforeEach + public void setUp() throws Exception { + // Narayana transaction manager and synchronization registry + tm = com.arjuna.ats.jta.TransactionManager.transactionManager(); + jakarta.transaction.TransactionSynchronizationRegistry tsr + = new com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple(); + + String jdbcUrl = postgres.jdbcUrl(); + + // Agroal DataSource with Narayana integration — same stack as Quarkus. + // Agroal automatically enlists connections in the active JTA transaction + // via NarayanaTransactionIntegration, no custom wrapper needed. + agroalDataSource = AgroalDataSource.from( + new AgroalDataSourceConfigurationSupplier() + .connectionPoolConfiguration(pool -> pool + .maxSize(5) + .transactionIntegration(new NarayanaTransactionIntegration(tm, tsr)) + .connectionFactoryConfiguration(cf -> cf + .connectionProviderClass(PGXADataSource.class) + .jdbcUrl(jdbcUrl) + .principal(new NamePrincipal(postgres.userName())) + .credential(new SimplePassword(postgres.password()))))); + + // Plain DataSource for assertions (outside of JTA) + plainDataSource = new PGSimpleDataSource(); + plainDataSource.setServerNames(new String[] { postgres.host() }); + plainDataSource.setPortNumbers(new int[] { postgres.port() }); + plainDataSource.setDatabaseName(postgres.database()); + plainDataSource.setUser(postgres.userName()); + plainDataSource.setPassword(postgres.password()); + + // Create test table + try (Connection conn = plainDataSource.getConnection(); + Statement stmt = conn.createStatement()) { + stmt.execute("DROP TABLE IF EXISTS orders"); + stmt.execute("CREATE TABLE orders (id SERIAL PRIMARY KEY, item VARCHAR(255))"); + } + + // JtaTransactionPolicy backed by Narayana (same pattern as Quarkus RequiredJtaTransactionPolicy) + JtaTransactionPolicy requiredPolicy = new JtaTransactionPolicy() { + @Override + public void run(Runnable runnable) throws Throwable { + boolean isNew = tm.getStatus() == Status.STATUS_NO_TRANSACTION; + if (isNew) { + tm.begin(); + } + try { + runnable.run(); + } catch (Throwable e) { + if (isNew) { + tm.rollback(); + } else { + tm.setRollbackOnly(); + } + throw e; + } + if (isNew) { + tm.commit(); + } + } + }; + + camelContext = new DefaultCamelContext(); + + // Short shutdown timeout: simulates the production grace period expiring + camelContext.getShutdownStrategy().setTimeout(2); + camelContext.getShutdownStrategy().setTimeUnit(TimeUnit.SECONDS); + + // Register Agroal DataSource for camel-sql + SqlComponent sqlComponent = new SqlComponent(); + sqlComponent.setDataSource(agroalDataSource); + camelContext.addComponent("sql", sqlComponent); + + camelContext.getRegistry().bind("PROPAGATION_REQUIRED", requiredPolicy); + + // Route: SQL INSERT then a long delay. + // No steps after the delay — the exchange completes normally (no exception). + // The ONLY thing that should cause rollback is the rollbackOnly flag + // set by TransactionErrorHandler.prepareShutdown(forced=true). + camelContext.addRoutes(new RouteBuilder() { + @Override + public void configure() { + from("direct:start").routeId("transactedRoute") + .transacted() + // SQL insert — completes successfully and is enlisted in the JTA transaction + .to("sql:INSERT INTO orders(item) VALUES ('first')") + .process(exchange -> firstInsertDone.countDown()) + // Long delay (simulates stored procedure or long-running operation + // that exceeds the shutdown grace period) + .process(exchange -> { + delayStarted.countDown(); + releaseLatch.await(30, TimeUnit.SECONDS); + }); + } + }); + + camelContext.start(); + } + + @AfterEach + public void tearDown() throws Exception { + releaseLatch.countDown(); + if (camelContext != null && camelContext.isStarted()) { + camelContext.stop(); + } + if (agroalDataSource != null) { + agroalDataSource.close(); + } + } + + /** + * Simulates the production scenario: + *

    + *
  1. Exchange enters transacted route
  2. + *
  3. SQL INSERT ('first') completes against PostgreSQL (enlisted in JTA tx)
  4. + *
  5. Long delay begins (simulating stored procedure exceeding grace period)
  6. + *
  7. CamelContext.stop() -> DefaultShutdownStrategy waits 2s -> forced shutdown
  8. + *
  9. TransactionErrorHandler.prepareShutdown(forced=true) marks exchange rollbackOnly
  10. + *
  11. context.stop() returns (timeout expired, in-flight exchange still running)
  12. + *
  13. Delay released, exchange completes normally (no exception)
  14. + *
  15. TransactionErrorHandler checks preparingShutdown -> sets rollbackOnly -> throws
  16. + *
  17. JtaTransactionPolicy.run() catches -> Narayana rolls back the JTA transaction
  18. + *
  19. Assert: the INSERT is NOT in the database (rolled back)
  20. + *
+ * + *

+ * Without the fix, step 8 does not happen — the exchange commits normally because there is no exception and + * rollbackOnly is never set. The INSERT persists in the database. + */ + @Test + public void testForcedShutdownRollsBackDatabaseTransaction() throws Exception { + assertEquals(0, countOrders(), "Table should be empty initially"); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + // Send message in a background thread (the route blocks during the delay) + executor.submit(() -> { + try { + camelContext.createProducerTemplate().sendBody("direct:start", "trigger"); + } catch (Exception e) { + // expected — rollback triggers TransactionRolledbackException + } + return null; + }); + + assertTrue(firstInsertDone.await(10, TimeUnit.SECONDS), + "SQL insert should have completed"); + + assertTrue(delayStarted.await(10, TimeUnit.SECONDS), + "Long delay should have started"); + + // Stop the CamelContext. The DefaultShutdownStrategy will: + // 1. Stop the direct consumer (no new messages) + // 2. Wait up to 2s for in-flight exchanges + // 3. Exchange is stuck in the delay -> timeout expires + // 4. Forced shutdown -> prepareShutdown(false, true) on TransactionErrorHandler + // 5. context.stop() returns (in-flight exchange still blocked) + camelContext.stop(); + + // Now release the delay. The exchange wakes up and finishes processing. + // The route has no more steps, so processByErrorHandler() returns normally. + // Back in doInTransactionTemplate(), the preparingShutdown check (the fix) + // sets rollbackOnly, causing the transaction to be rolled back. + releaseLatch.countDown(); + + executor.shutdown(); + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), + "Exchange processing should have completed"); + + int finalCount = countOrders(); + assertEquals(0, finalCount, + "No rows should be in the database — the JTA transaction should have been rolled back. " + + "Found " + finalCount + " rows instead."); + } finally { + releaseLatch.countDown(); + executor.shutdownNow(); + } + } + + private int countOrders() throws SQLException { + try (Connection conn = plainDataSource.getConnection(); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM orders")) { + rs.next(); + return rs.getInt(1); + } + } +} diff --git a/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerShutdownTest.java b/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerShutdownTest.java new file mode 100644 index 0000000000000..d6b436080788e --- /dev/null +++ b/components/camel-jta/src/test/java/org/apache/camel/jta/TransactionErrorHandlerShutdownTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.jta; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.engine.DefaultUnitOfWork; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test that verifies in-flight transacted exchanges are rolled back when a forced shutdown occurs on the JTA + * TransactionErrorHandler. + */ +public class TransactionErrorHandlerShutdownTest { + + private CamelContext camelContext; + private final CountDownLatch processingStarted = new CountDownLatch(1); + private final CountDownLatch releaseLatch = new CountDownLatch(1); + private final AtomicBoolean rollbackTriggered = new AtomicBoolean(false); + private TransactionErrorHandler transactionErrorHandler; + + @BeforeEach + public void setUp() throws Exception { + camelContext = new DefaultCamelContext(); + + // Create a test JtaTransactionPolicy that tracks whether rollback was triggered + JtaTransactionPolicy testPolicy = new JtaTransactionPolicy() { + @Override + public void run(Runnable runnable) throws Throwable { + try { + runnable.run(); + } catch (Throwable t) { + rollbackTriggered.set(true); + throw t; + } + } + }; + + // Create the TransactionErrorHandler with a processor that blocks + Processor blockingProcessor = new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + processingStarted.countDown(); + releaseLatch.await(30, TimeUnit.SECONDS); + } + }; + + transactionErrorHandler = new TransactionErrorHandler( + camelContext, blockingProcessor, testPolicy, LoggingLevel.WARN); + + camelContext.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:test").routeId("testRoute") + .process(blockingProcessor); + } + }); + + camelContext.start(); + transactionErrorHandler.start(); + } + + @AfterEach + public void tearDown() throws Exception { + releaseLatch.countDown(); // ensure we don't hang + if (transactionErrorHandler != null) { + transactionErrorHandler.stop(); + } + if (camelContext != null) { + camelContext.stop(); + } + } + + @Test + public void testForcedShutdownMarksExchangeForRollback() throws Exception { + AtomicReference exchangeRef = new AtomicReference<>(); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + // process an exchange through the TransactionErrorHandler in a separate thread + executor.submit(() -> { + Exchange exchange = camelContext.getEndpoint("direct:test").createExchange(); + exchange.getIn().setBody("test"); + // set up UnitOfWork so transacted tracking works + DefaultUnitOfWork uow = new DefaultUnitOfWork(exchange); + exchange.getExchangeExtension().setUnitOfWork(uow); + exchangeRef.set(exchange); + try { + transactionErrorHandler.process(exchange); + } catch (Exception e) { + // expected - rollback may throw + } + return null; + }); + + // wait for the exchange to enter the blocking processor + assertTrue(processingStarted.await(10, TimeUnit.SECONDS), "Exchange should have started processing"); + + // simulate forced shutdown + transactionErrorHandler.prepareShutdown(false, true); + + // release the blocking processor + releaseLatch.countDown(); + + // wait for processing to complete + executor.shutdown(); + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Processing should have completed"); + + // verify the transaction was rolled back + // before the fix: rollbackTriggered = false (exchange completes normally, transaction commits) + // after the fix: rollbackTriggered = true (exchange marked rollbackOnly, transaction rolls back) + assertTrue(rollbackTriggered.get(), + "Transaction should have been rolled back due to forced shutdown"); + } finally { + releaseLatch.countDown(); + executor.shutdown(); + } + } +} diff --git a/components/camel-jta/src/test/resources/log4j2.properties b/components/camel-jta/src/test/resources/log4j2.properties new file mode 100644 index 0000000000000..d9ae9de26bd89 --- /dev/null +++ b/components/camel-jta/src/test/resources/log4j2.properties @@ -0,0 +1,28 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +appender.file.type = File +appender.file.name = file +appender.file.fileName = target/camel-jta-test.log +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +appender.out.type = Console +appender.out.name = out +appender.out.layout.type = PatternLayout +appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n +rootLogger.level = INFO +rootLogger.appenderRef.file.ref = file diff --git a/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceForcedShutdownTest.java b/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceForcedShutdownTest.java new file mode 100644 index 0000000000000..cb4bfff2aa835 --- /dev/null +++ b/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceForcedShutdownTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.interceptor; + +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.Service; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.spring.SpringRouteBuilder; +import org.apache.camel.spring.spi.SpringTransactionPolicy; +import org.apache.camel.spring.spi.TransactionErrorHandler; +import org.apache.camel.support.service.ServiceHelper; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test that verifies in-flight transacted exchanges are rolled back when a forced shutdown occurs. + */ +public class TransactionalClientDataSourceForcedShutdownTest extends TransactionClientDataSourceSupport { + + private final CountDownLatch firstInsertDone = new CountDownLatch(1); + private final CountDownLatch releaseLatch = new CountDownLatch(1); + + @Test + public void testForcedShutdownRollsBackInFlightTransaction() throws Exception { + // verify initial state: 1 book from init.sql + int initialCount = jdbc.queryForObject("select count(*) from books", Integer.class); + assertEquals(1, initialCount, "Initial number of books"); + + // send message asynchronously since the route will block + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + executor.submit(() -> { + template.sendBody("direct:forceShutdown", "Hello World"); + return null; + }); + + // wait for the first insert to complete + firstInsertDone.await(10, TimeUnit.SECONDS); + + // find the TransactionErrorHandler in the route's services and call prepareShutdown + Route route = context.getRoute("forceShutdownRoute"); + TransactionErrorHandler teh = findTransactionErrorHandler(route); + if (teh != null) { + teh.prepareShutdown(false, true); + } + + // release the blocking processor so the exchange can complete + releaseLatch.countDown(); + + // wait for the exchange to finish processing + executor.shutdown(); + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), + "Exchange processing should have completed"); + + // verify that the transaction was rolled back + // before the fix: count = 3 (original + 2 inserts committed) + // after the fix: count = 1 (both inserts rolled back) + int count = jdbc.queryForObject("select count(*) from books", Integer.class); + assertEquals(1, count, "Number of books after forced shutdown - transaction should have been rolled back"); + } finally { + releaseLatch.countDown(); // ensure we don't hang if test fails + executor.shutdownNow(); + } + } + + private TransactionErrorHandler findTransactionErrorHandler(Route route) { + Processor processor = route.getProcessor(); + if (processor instanceof Service service) { + Set children = ServiceHelper.getChildServices(service, true); + for (Service child : children) { + if (child instanceof TransactionErrorHandler teh) { + return teh; + } + } + } + return null; + } + + @Override + @SuppressWarnings("deprecation") + protected RouteBuilder createRouteBuilder() throws Exception { + return new SpringRouteBuilder() { + public void configure() throws Exception { + SpringTransactionPolicy required = lookup("PROPAGATION_REQUIRED", SpringTransactionPolicy.class); + errorHandler(transactionErrorHandler(required)); + + from("direct:forceShutdown").routeId("forceShutdownRoute") + .policy(required) + .setBody(constant("Tiger in Action")).bean("bookService") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // signal that the first insert is done + firstInsertDone.countDown(); + // block until released + releaseLatch.await(30, TimeUnit.SECONDS); + } + }) + .setBody(constant("Elephant in Action")).bean("bookService"); + } + }; + } +} diff --git a/components/camel-spring-parent/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java b/components/camel-spring-parent/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java index f31374ddd8bb6..8afd21871dbec 100644 --- a/components/camel-spring-parent/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java +++ b/components/camel-spring-parent/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandler.java @@ -16,6 +16,8 @@ */ package org.apache.camel.spring.spi; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.AsyncCallback; @@ -49,6 +51,7 @@ public class TransactionErrorHandler extends RedeliveryErrorHandler { private final TransactionTemplate transactionTemplate; private final String transactionKey; private final LoggingLevel rollbackLoggingLevel; + private final Set inflightTransactedExchanges = ConcurrentHashMap.newKeySet(); /** * Creates the transaction error handler. @@ -149,6 +152,7 @@ protected void processInTransaction(final Exchange exchange) { if (exchange.getUnitOfWork() != null) { exchange.getUnitOfWork().beginTransactedBy(transactionKey); } + inflightTransactedExchanges.add(exchange); // do in transaction logTransactionBegin(redelivered, ids); @@ -162,6 +166,7 @@ protected void processInTransaction(final Exchange exchange) { exchange.setException(e); logTransactionRollback(redelivered, ids, e, false); } finally { + inflightTransactedExchanges.remove(exchange); // mark the end of this transaction boundary if (exchange.getUnitOfWork() != null) { exchange.getUnitOfWork().endTransactedBy(transactionKey); @@ -206,6 +211,13 @@ protected void doInTransactionWithoutResult(TransactionStatus status) { // and now let process the exchange by the error handler processByErrorHandler(exchange); + // if forced shutdown is in progress, mark the exchange for rollback + if (preparingShutdown) { + LOG.debug("Forced shutdown in progress, marking exchange for rollback: {}", + exchange.getExchangeId()); + exchange.setRollbackOnly(true); + } + // after handling and still an exception or marked as rollback only then rollback if (exchange.getException() != null || exchange.isRollbackOnly() || exchange.isRollbackOnlyLast()) { @@ -326,6 +338,20 @@ private void logTransactionRollback(String redelivered, String ids, Throwable e, } } + @Override + public void prepareShutdown(boolean suspendOnly, boolean forced) { + super.prepareShutdown(suspendOnly, forced); + if (forced) { + // mark all in-flight transacted exchanges for rollback so the transaction + // is rolled back before the connection pool is destroyed during shutdown + for (Exchange exchange : inflightTransactedExchanges) { + LOG.debug("Marking in-flight transacted exchange for rollback due to forced shutdown: {}", + exchange.getExchangeId()); + exchange.setRollbackOnly(true); + } + } + } + private static String propagationBehaviorToString(int propagationBehavior) { String rc; switch (propagationBehavior) { diff --git a/parent/pom.xml b/parent/pom.xml index 891f9e99f8af3..bdd7ea95819e2 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -61,6 +61,7 @@ 5.19.2 6.2.1 2.44.0 + 3.0 0.3.0 3.4.1 2.0.5