From 18c54b2a8236469bb1b9acd5aaa228b04a315904 Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Tue, 18 Nov 2025 09:06:53 +0100 Subject: [PATCH 1/4] [#2768] Add exception for the execution on the wrong context For now we use it only when the connection is used --- .../reactive/common/InternalStateAssertions.java | 11 +++++++++++ .../java/org/hibernate/reactive/logging/impl/Log.java | 9 ++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/common/InternalStateAssertions.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/common/InternalStateAssertions.java index d356334c1..a24c3b8fd 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/common/InternalStateAssertions.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/common/InternalStateAssertions.java @@ -7,11 +7,13 @@ import java.lang.invoke.MethodHandles; import java.util.Locale; +import java.util.Objects; import org.hibernate.reactive.logging.impl.Log; import org.hibernate.reactive.logging.impl.LoggerFactory; import io.vertx.core.Context; +import io.vertx.core.internal.ContextInternal; /** * Commonly used assertions to verify that the operations @@ -52,4 +54,13 @@ public static void assertCurrentThreadMatches(Thread expectedThread) { } } + public static void assertCurrentContextMatches(Object object, ContextInternal expectedContext) { + if ( ENFORCE ) { + final ContextInternal currentContext = ContextInternal.current(); + Objects.requireNonNull( currentContext, "Current context cannot be null" ); + if ( !currentContext.equals( expectedContext ) ) { + throw LOG.unexpectedContextDetected( object, expectedContext, currentContext ); + } + } + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java index 32d0b41c4..60d25d802 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java @@ -6,12 +6,9 @@ package org.hibernate.reactive.logging.impl; - import java.sql.SQLException; import java.sql.SQLWarning; -import jakarta.persistence.PersistenceException; - import org.hibernate.HibernateException; import org.hibernate.JDBCException; import org.hibernate.LazyInitializationException; @@ -26,6 +23,9 @@ import org.jboss.logging.annotations.Message; import org.jboss.logging.annotations.MessageLogger; +import io.vertx.core.internal.ContextInternal; +import jakarta.persistence.PersistenceException; + import static org.jboss.logging.Logger.Level.DEBUG; import static org.jboss.logging.Logger.Level.ERROR; import static org.jboss.logging.Logger.Level.INFO; @@ -277,6 +277,9 @@ public interface Log extends BasicLogger { @Message(id = 87, value = "Retrieved key was null, but to-one is not nullable : %s") IllegalStateException notNullableToOneAssociationMissingKey(String toOneNavigablePath); + @Message(id = 88, value = "Expected to use the object %1$s on context %2$s but was %3$s") + HibernateException unexpectedContextDetected(Object obj, ContextInternal expectedContext, ContextInternal currentContext); + // Same method that exists in CoreMessageLogger @LogMessage(level = WARN) @Message(id = 104, value = "firstResult/maxResults specified with collection fetch; applying in memory!" ) From 6b8a2f19d0704fe41643b9538e0efcffe1c1b0bd Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Tue, 18 Nov 2025 09:09:30 +0100 Subject: [PATCH 2/4] [#2768] Avoid context switching on id generation This commit: * Stop connection leaks in BlockingIdentifierGenerator * Assert that the connection is used in the expected context --- .../id/impl/BlockingIdentifierGenerator.java | 68 ++++++----- .../hibernate/reactive/logging/impl/Log.java | 3 + .../pool/impl/SqlClientConnection.java | 15 ++- .../reactive/pool/impl/SqlClientPool.java | 112 +++++++++++++----- 4 files changed, 135 insertions(+), 63 deletions(-) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java index 329792f28..f680638f8 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/id/impl/BlockingIdentifierGenerator.java @@ -14,11 +14,13 @@ import io.vertx.core.Context; import io.vertx.core.Vertx; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.internal.pool.CombinerExecutor; import io.vertx.core.internal.pool.Executor; import io.vertx.core.internal.pool.Task; import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; +import static org.hibernate.reactive.util.impl.CompletionStages.supplyStage; /** * A {@link ReactiveIdentifierGenerator} which uses the database to allocate @@ -93,39 +95,34 @@ public CompletionStage generate(ReactiveConnectionSupplier connectionSuppl } final CompletableFuture resultForThisEventLoop = new CompletableFuture<>(); - final CompletableFuture result = new CompletableFuture<>(); - final Context context = Vertx.currentContext(); - executor.submit( new GenerateIdAction( connectionSupplier, result ) ); - result.whenComplete( (id, t) -> { - final Context newContext = Vertx.currentContext(); - //Need to be careful in resuming processing on the same context as the original - //request, potentially having to switch back if we're no longer executing on the same: - if ( newContext != context ) { - if ( t != null ) { - context.runOnContext( v -> resultForThisEventLoop.completeExceptionally( t ) ); + // We use supplyStage so that, no matter if there's an exception, we always return something that will complete + return supplyStage( () -> { + final CompletableFuture result = new CompletableFuture<>(); + final Context context = Vertx.currentContext(); + executor.submit( new GenerateIdAction( connectionSupplier, result ) ); + result.whenComplete( (id, t) -> { + final Context newContext = Vertx.currentContext(); + //Need to be careful in resuming processing on the same context as the original + //request, potentially having to switch back if we're no longer executing on the same: + if ( newContext != context ) { + context.runOnContext( v -> complete( resultForThisEventLoop, id, t ) ); } else { - context.runOnContext( v -> resultForThisEventLoop.complete( id ) ); + complete( resultForThisEventLoop, id, t ); } - } - else { - if ( t != null ) { - resultForThisEventLoop.completeExceptionally( t ); - } - else { - resultForThisEventLoop.complete( id ); - } - } + } ); + return resultForThisEventLoop; } ); - return resultForThisEventLoop; } private final class GenerateIdAction implements Executor.Action { private final ReactiveConnectionSupplier connectionSupplier; private final CompletableFuture result; + private final ContextInternal creationContext; public GenerateIdAction(ReactiveConnectionSupplier connectionSupplier, CompletableFuture result) { + this.creationContext = ContextInternal.current(); this.connectionSupplier = Objects.requireNonNull( connectionSupplier ); this.result = Objects.requireNonNull( result ); } @@ -137,9 +134,16 @@ public Task execute(GeneratorState state) { // We don't need to update or initialize the hi // value in the table, so just increment the lo // value and return the next id in the block - completedFuture( local ).whenComplete( this::acceptAsReturnValue ); + result.complete( local ); } else { + creationContext.runOnContext( this::generateNewHiValue ); + } + return null; + } + + private void generateNewHiValue(Void v) { + try { nextHiValue( connectionSupplier ) .whenComplete( (newlyGeneratedHi, throwable) -> { if ( throwable != null ) { @@ -155,17 +159,19 @@ public Task execute(GeneratorState state) { } } ); } - return null; - } - - private void acceptAsReturnValue(final Long aLong, final Throwable throwable) { - if ( throwable != null ) { - result.completeExceptionally( throwable ); - } - else { - result.complete( aLong ); + catch ( Throwable e ) { + // nextHivalue() could throw an exception before returning a completion stage + result.completeExceptionally( e ); } } } + private static void complete(CompletableFuture future, final T result, final Throwable throwable) { + if ( throwable != null ) { + future.completeExceptionally( throwable ); + } + else { + future.complete( result ); + } + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java index 60d25d802..631c8e7ed 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/logging/impl/Log.java @@ -280,6 +280,9 @@ public interface Log extends BasicLogger { @Message(id = 88, value = "Expected to use the object %1$s on context %2$s but was %3$s") HibernateException unexpectedContextDetected(Object obj, ContextInternal expectedContext, ContextInternal currentContext); + @Message(id = 89, value = "Connection is closed") + IllegalStateException connectionIsClosed(); + // Same method that exists in CoreMessageLogger @LogMessage(level = WARN) @Message(id = 104, value = "firstResult/maxResults specified with collection fetch; applying in memory!" ) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java index 88b536223..c2467da64 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientConnection.java @@ -18,12 +18,14 @@ import org.hibernate.engine.jdbc.spi.SqlStatementLogger; import org.hibernate.reactive.adaptor.impl.JdbcNull; import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor; +import org.hibernate.reactive.common.InternalStateAssertions; import org.hibernate.reactive.logging.impl.Log; import org.hibernate.reactive.logging.impl.LoggerFactory; import org.hibernate.reactive.pool.BatchingConnection; import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.util.impl.CompletionStages; +import io.vertx.core.internal.ContextInternal; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.sqlclient.DatabaseException; @@ -59,14 +61,22 @@ public class SqlClientConnection implements ReactiveConnection { private final Pool pool; private final SqlConnection connection; + // The context associated to the connection. We expect the connection to be executed in this context. + private final ContextInternal connectionContext; private Transaction transaction; - SqlClientConnection(SqlConnection connection, Pool pool, SqlStatementLogger sqlStatementLogger, SqlExceptionHelper sqlExceptionHelper) { + SqlClientConnection( + SqlConnection connection, + Pool pool, + SqlStatementLogger sqlStatementLogger, + SqlExceptionHelper sqlExceptionHelper, + ContextInternal connectionContext) { + this.connectionContext = connectionContext; this.pool = pool; this.sqlStatementLogger = sqlStatementLogger; this.connection = connection; this.sqlExceptionHelper = sqlExceptionHelper; - LOG.tracef( "Connection created: %s", connection ); + LOG.tracef( "Connection created for %1$s associated to context %2$s: ", connection, connectionContext ); } @Override @@ -338,6 +348,7 @@ public CompletionStage> preparedQueryOutsideTransaction(String sql) } private void feedback(String sql) { + InternalStateAssertions.assertCurrentContextMatches( this, connectionContext ); Objects.requireNonNull( sql, "SQL query cannot be null" ); // DDL already gets formatted by the client, so don't reformat it FormatStyle formatStyle = sqlStatementLogger.isFormat() && !sql.contains( System.lineSeparator() ) diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java index bfbf8bc76..c3766de6e 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java @@ -5,6 +5,8 @@ */ package org.hibernate.reactive.pool.impl; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; @@ -18,10 +20,13 @@ import org.hibernate.engine.jdbc.spi.SqlExceptionHelper; import org.hibernate.engine.jdbc.spi.SqlStatementLogger; import org.hibernate.reactive.adaptor.impl.ResultSetAdaptor; +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.logging.impl.LoggerFactory; import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.pool.ReactiveConnectionPool; import io.vertx.core.Future; +import io.vertx.core.internal.ContextInternal; import io.vertx.sqlclient.DatabaseException; import io.vertx.sqlclient.Pool; import io.vertx.sqlclient.Row; @@ -30,7 +35,8 @@ import io.vertx.sqlclient.Tuple; import io.vertx.sqlclient.spi.DatabaseMetadata; -import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; +import static java.lang.invoke.MethodHandles.lookup; +import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture; import static org.hibernate.reactive.util.impl.CompletionStages.rethrow; import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; @@ -123,12 +129,16 @@ public CompletionStage getConnection(String tenantId, SqlExc } private CompletionStage getConnectionFromPool(Pool pool) { - return completionStage( pool.getConnection().map( this::newConnection ), ReactiveConnection::close ); + return completeFuture( + pool.getConnection().map( this::newConnection ), + ReactiveConnection::close + ); } private CompletionStage getConnectionFromPool(Pool pool, SqlExceptionHelper sqlExceptionHelper) { - return completionStage( - pool.getConnection().map( sqlConnection -> newConnection( sqlConnection, sqlExceptionHelper ) ), + return completeFuture( + pool.getConnection() + .map( sqlConnection -> newConnection( sqlConnection, sqlExceptionHelper ) ), ReactiveConnection::close ); } @@ -189,8 +199,8 @@ private void feedback(String sql) { /** * @param onCancellation invoke when converted {@link java.util.concurrent.CompletionStage} cancellation. */ - private CompletionStage completionStage(Future future, Consumer onCancellation) { - CompletableFuture completableFuture = new CompletableFuture<>(); + private CompletionStage completeFuture(Future future, Consumer onCancellation) { + final CompletableFuture completableFuture = new CompletableFuture<>(); future.onComplete( ar -> { if ( ar.succeeded() ) { if ( completableFuture.isCancelled() ) { @@ -210,13 +220,35 @@ private SqlClientConnection newConnection(SqlConnection connection) { } private SqlClientConnection newConnection(SqlConnection connection, SqlExceptionHelper sqlExceptionHelper) { - return new SqlClientConnection( connection, getPool(), getSqlStatementLogger(), sqlExceptionHelper ); + return new SqlClientConnection( + connection, + getPool(), + getSqlStatementLogger(), + sqlExceptionHelper, + ContextInternal.current() + ); } private static class ProxyConnection implements ReactiveConnection { + + private static final Log LOG = LoggerFactory.make( Log.class, lookup() ); + + private static final VarHandle OPENED_HANDLE; + + static { + try { + MethodHandles.Lookup lookup = lookup(); + OPENED_HANDLE = lookup.findVarHandle( ProxyConnection.class, "opened", boolean.class ); + } + catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError( e ); + } + } + private final Supplier> connectionSupplier; - private Integer batchSize; - private ReactiveConnection connection; + private final CompletableFuture connectionFuture = new CompletableFuture<>(); + private volatile boolean opened = false; + private volatile boolean closed = false; public ProxyConnection(Supplier> connectionSupplier) { this.connectionSupplier = connectionSupplier; @@ -225,29 +257,41 @@ public ProxyConnection(Supplier> connectionS /** * @return the existing {@link ReactiveConnection}, or open a new one */ - CompletionStage connection() { - if ( connection == null ) { - return connectionSupplier.get() - .thenApply( conn -> { - if ( batchSize != null ) { - conn.withBatchSize( batchSize ); - } - connection = conn; - return connection; - } ); + private CompletionStage connection() { + if ( closed ) { + return failedFuture( LOG.connectionIsClosed() ); + } + if ( opened ) { + return connectionFuture; } - return completedFuture( connection ); + if ( OPENED_HANDLE.compareAndSet( this, false, true ) ) { + connectionSupplier.get().whenComplete( (connection, throwable) -> { + if ( throwable != null ) { + connectionFuture.completeExceptionally( throwable ); + } + else { + connectionFuture.complete( connection ); + } + } ); + } + return connectionFuture; } @Override public boolean isTransactionInProgress() { - return connection != null && connection.isTransactionInProgress(); + ReactiveConnection reactiveConnection = connectionFuture.getNow( null ); + return reactiveConnection != null && reactiveConnection.isTransactionInProgress(); } @Override public DatabaseMetadata getDatabaseMetadata() { - Objects.requireNonNull( connection, "Database metadata not available until the connection is opened" ); - return connection.getDatabaseMetadata(); + if ( closed ) { + throw LOG.connectionIsClosed(); + } + + return Objects + .requireNonNull( connectionFuture.getNow( null ), "Database metadata not available until a connection has been created" ) + .getDatabaseMetadata(); } @Override @@ -355,15 +399,22 @@ public CompletionStage rollbackTransaction() { return connection().thenCompose( ReactiveConnection::rollbackTransaction ); } - @Override public ReactiveConnection withBatchSize(int batchSize) { - if ( connection == null ) { - this.batchSize = batchSize; + if ( closed ) { + throw LOG.connectionIsClosed(); + } + + if ( connectionFuture.isDone() ) { + // connection exists, we can let callers use the delegate and forget about the proxy. + return connectionFuture.getNow( null ).withBatchSize( batchSize ); } else { - connection = connection.withBatchSize( batchSize ); + return new ProxyConnection( () -> opened + // Connection has been requested but not created yet + ? connectionFuture.thenApply( c -> c.withBatchSize( batchSize ) ) + // Connection has not been requested + : connectionSupplier.get().thenApply( c -> c.withBatchSize( batchSize ) ) ); } - return this; } @Override @@ -373,8 +424,9 @@ public CompletionStage executeBatch() { @Override public CompletionStage close() { - return connection != null - ? connection.close().thenAccept( v -> connection = null ) + closed = true; + return opened + ? connectionFuture.thenCompose( ReactiveConnection::close ) : voidFuture(); } } From dd2457ec71681c9e6c34453614c76cc4b8950bd6 Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Tue, 18 Nov 2025 09:09:04 +0100 Subject: [PATCH 3/4] [#2768] Update existing test Now we test with and without transactions. And the test will throw an exception if context and connections aren't handled correctly. --- ...readedInsertionWithLazyConnectionTest.java | 52 +++++++++++++++---- 1 file changed, 41 insertions(+), 11 deletions(-) diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionWithLazyConnectionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionWithLazyConnectionTest.java index 2948cce27..37c4fcb31 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionWithLazyConnectionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionWithLazyConnectionTest.java @@ -8,18 +8,20 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; import org.hibernate.SessionFactory; import org.hibernate.boot.registry.StandardServiceRegistry; import org.hibernate.boot.registry.StandardServiceRegistryBuilder; import org.hibernate.cfg.Configuration; +import org.hibernate.reactive.annotations.DisabledFor; import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder; import org.hibernate.reactive.stage.Stage; import org.hibernate.reactive.util.impl.CompletionStages; import org.hibernate.reactive.vertx.VertxInstance; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; @@ -41,6 +43,7 @@ import static org.assertj.core.api.Assertions.fail; import static org.hibernate.cfg.AvailableSettings.SHOW_SQL; import static org.hibernate.reactive.BaseReactiveTest.setDefaultProperties; +import static org.hibernate.reactive.containers.DatabaseConfiguration.DBType.DB2; import static org.hibernate.reactive.provider.Settings.POOL_CONNECT_TIMEOUT; import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture; import static org.hibernate.reactive.util.impl.CompletionStages.loop; @@ -101,8 +104,8 @@ public class MultithreadedInsertionWithLazyConnectionTest { private static Vertx vertx; private static SessionFactory sessionFactory; - @BeforeAll - public static void setupSessionFactory() { + @BeforeEach + public void setupSessionFactory() { vertx = Vertx.vertx( getVertxOptions() ); Configuration configuration = new Configuration(); setDefaultProperties( configuration ); @@ -130,8 +133,8 @@ private static VertxOptions getVertxOptions() { return vertxOptions; } - @AfterAll - public static void closeSessionFactory() { + @AfterEach + public void closeSessionFactory() { stageSessionFactory.close(); } @@ -140,8 +143,33 @@ public void testIdentityGenerator(VertxTestContext context) { final DeploymentOptions deploymentOptions = new DeploymentOptions(); deploymentOptions.setInstances( N_THREADS ); + // We are not using transactions on purpose here, because this approach will cause a context switch + // and an assertion error if things aren't handled correctly. + // See Hibernate Reactive issue #2768: https://github.com/hibernate/hibernate-reactive/issues/2768 vertx - .deployVerticle( InsertEntitiesVerticle::new, deploymentOptions ) + .deployVerticle( () -> new InsertEntitiesVerticle( (s, entity) -> s + .persist( entity ) + .thenCompose( v -> s.flush() ) + .thenAccept( v -> s.clear() ) ), deploymentOptions + ) + .onSuccess( res -> { + endLatch.waitForEveryone(); + context.completeNow(); + } ) + .onFailure( context::failNow ) + .eventually( () -> vertx.close() ); + } + + @Test + @DisabledFor(value = DB2, reason = "Exception: IllegalStateException: Needed to have 6 in buffer but only had 0") + public void testIdentityGeneratorWithTransaction(VertxTestContext context) { + final DeploymentOptions deploymentOptions = new DeploymentOptions(); + deploymentOptions.setInstances( N_THREADS ); + vertx + .deployVerticle( + () -> new InsertEntitiesVerticle( (s, entity) -> s + .withTransaction( t -> s.persist( entity ) ) ), deploymentOptions + ) .onSuccess( res -> { endLatch.waitForEveryone(); context.completeNow(); @@ -152,9 +180,12 @@ public void testIdentityGenerator(VertxTestContext context) { private static class InsertEntitiesVerticle extends AbstractVerticle { + final BiFunction> insertFun; + int sequentialOperation = 0; - public InsertEntitiesVerticle() { + public InsertEntitiesVerticle(BiFunction> insertFun) { + this.insertFun = insertFun; } @Override @@ -196,9 +227,8 @@ private CompletionStage storeEntity(Stage.Session s) { final int localVerticleOperationSequence = sequentialOperation++; final EntityWithGeneratedId entity = new EntityWithGeneratedId(); entity.name = beforeOperationThread + "__" + localVerticleOperationSequence; - - return s - .withTransaction( t -> s.persist( entity ) ) + return insertFun + .apply( s, entity ) .thenCompose( v -> beforeOperationThread != Thread.currentThread() ? failedFuture( new IllegalStateException( "Detected an unexpected switch of carrier threads!" ) ) : voidFuture() ); From 01ebf2cd6ee8265e83d15631bebf5e8530c1d1e1 Mon Sep 17 00:00:00 2001 From: Davide D'Alto Date: Wed, 26 Nov 2025 15:52:00 +0100 Subject: [PATCH 4/4] [#2768] Test batching with ProxyConnection --- .../reactive/BatchingConnectionTest.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/BatchingConnectionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/BatchingConnectionTest.java index 869d2ab9b..419832dd0 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/BatchingConnectionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/BatchingConnectionTest.java @@ -8,11 +8,13 @@ import org.hibernate.boot.registry.StandardServiceRegistryBuilder; import org.hibernate.cfg.AvailableSettings; import org.hibernate.cfg.Configuration; +import org.hibernate.reactive.mutiny.Mutiny; import org.hibernate.reactive.mutiny.impl.MutinySessionImpl; import org.hibernate.reactive.mutiny.impl.MutinyStatelessSessionImpl; import org.hibernate.reactive.pool.BatchingConnection; import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.pool.impl.SqlClientConnection; +import org.hibernate.reactive.stage.Stage; import org.hibernate.reactive.stage.impl.StageSessionImpl; import org.hibernate.reactive.stage.impl.StageStatelessSessionImpl; import org.hibernate.reactive.testing.SqlStatementTracker; @@ -85,6 +87,56 @@ protected void assertConnectionIsLazy(ReactiveConnection connection, boolean sta .isEqualTo( org.hibernate.reactive.pool.impl.SqlClientPool.class.getName() + "$ProxyConnection" ); } + @Test + public void testBatchingWithPersistAllAndProxyConnection(VertxTestContext context) { + Stage.Session session = getSessionFactory().createSession(); + test( context, session + .persist( + new GuineaPig( 11, "One" ), + new GuineaPig( 22, "Two" ), + new GuineaPig( 33, "Three" ) + ) + // Auto-flush + .thenCompose( v -> session + .createSelectionQuery( "select name from GuineaPig", String.class ) + .getResultList() + .thenAccept( names -> { + assertThat( names ).containsExactlyInAnyOrder( "One", "Two", "Three" ); + assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); + // Parameters are different for different dbs, so we cannot do an exact match + assertThat( sqlTracker.getLoggedQueries().get( 0 ) ) + .startsWith( "insert into pig (name,version,id) values " ); + sqlTracker.clear(); + } ) + ) + ); + } + + @Test + public void testBatchingWithPersistAllAndProxyConnectionAndMutiny(VertxTestContext context) { + Mutiny.Session session = getMutinySessionFactory().createSession(); + test( context, session + .persistAll( + new GuineaPig( 11, "One" ), + new GuineaPig( 22, "Two" ), + new GuineaPig( 33, "Three" ) + ) + // Auto-flush + .chain( v -> session + .createSelectionQuery( "select name from GuineaPig", String.class ) + .getResultList() + .invoke( names -> { + assertThat( names ).containsExactlyInAnyOrder( "One", "Two", "Three" ); + assertThat( sqlTracker.getLoggedQueries() ).hasSize( 1 ); + // Parameters are different for different dbs, so we cannot do an exact match + assertThat( sqlTracker.getLoggedQueries().get( 0 ) ) + .startsWith( "insert into pig (name,version,id) values " ); + sqlTracker.clear(); + } ) + ) + ); + } + @Test public void testBatchingWithPersistAll(VertxTestContext context) { test( context, openSession().thenCompose( s -> s