diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionFactoryImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionFactoryImpl.java index f8029e297..19d34d410 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionFactoryImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionFactoryImpl.java @@ -13,8 +13,10 @@ import org.hibernate.reactive.boot.spi.ReactiveMetadataImplementor; import org.hibernate.reactive.mutiny.Mutiny; import org.hibernate.reactive.mutiny.impl.MutinySessionFactoryImpl; +import org.hibernate.reactive.sql.exec.internal.ReactiveJdbcSelectWithActions; import org.hibernate.reactive.stage.Stage; import org.hibernate.reactive.stage.impl.StageSessionFactoryImpl; +import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder; /** * A Hibernate {@link org.hibernate.SessionFactory} that can be @@ -42,4 +44,9 @@ public T unwrap(Class type) { } return super.unwrap( type ); } + + public JdbcSelectWithActionsBuilder getJdbcSelectWithActionsBuilder(){ + return new ReactiveJdbcSelectWithActions.Builder(); + } + } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/ReactiveJdbcSelectWithActions.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/ReactiveJdbcSelectWithActions.java new file mode 100644 index 000000000..ac069198d --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/ReactiveJdbcSelectWithActions.java @@ -0,0 +1,295 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal; + +import org.hibernate.LockOptions; +import org.hibernate.Locking; +import org.hibernate.dialect.lock.spi.LockTimeoutType; +import org.hibernate.dialect.lock.spi.LockingSupport; +import org.hibernate.internal.util.collections.CollectionHelper; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.sql.exec.internal.lock.ReactiveCollectionLockingAction; +import org.hibernate.reactive.sql.exec.internal.lock.ReactiveConnectionLockTimeoutStrategyBuilder; +import org.hibernate.reactive.sql.exec.internal.lock.ReactiveFollowOnLockingAction; +import org.hibernate.reactive.sql.exec.internal.lock.ReactiveLockTimeoutHandler; +import org.hibernate.reactive.sql.exec.spi.ReactiveJdbcSelect; +import org.hibernate.reactive.sql.exec.spi.ReactivePostAction; +import org.hibernate.reactive.sql.exec.spi.ReactivePreAction; +import org.hibernate.sql.ast.spi.LockingClauseStrategy; +import org.hibernate.sql.ast.tree.select.QuerySpec; +import org.hibernate.sql.exec.internal.JdbcOperationQuerySelect; +import org.hibernate.sql.exec.internal.JdbcSelectWithActions; +import org.hibernate.sql.exec.spi.ExecutionContext; +import org.hibernate.sql.exec.spi.JdbcSelect; +import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder; +import org.hibernate.sql.exec.spi.LoadedValuesCollector; +import org.hibernate.sql.exec.spi.PostAction; +import org.hibernate.sql.exec.spi.PreAction; +import org.hibernate.sql.exec.spi.SecondaryAction; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletionStage; + +import static org.hibernate.reactive.util.impl.CompletionStages.loop; +import static org.hibernate.reactive.util.impl.CompletionStages.nullFuture; + +/** + * Reactive version of {@link JdbcSelectWithActions} + */ +public class ReactiveJdbcSelectWithActions extends JdbcSelectWithActions implements ReactiveJdbcSelect { + + public ReactiveJdbcSelectWithActions( + JdbcOperationQuerySelect primaryOperation, + LoadedValuesCollector loadedValuesCollector, + PreAction[] preActions, + PostAction[] postActions) { + super( primaryOperation, loadedValuesCollector, preActions, postActions ); + } + + public ReactiveJdbcSelectWithActions( + JdbcOperationQuerySelect primaryAction, + LoadedValuesCollector loadedValuesCollector) { + super( primaryAction, loadedValuesCollector ); + } + + @Override + public CompletionStage reactivePerformPreActions( + ReactiveConnection connection, + ExecutionContext executionContext) { + if ( preActions == null ) { + return nullFuture(); + } + + return loop( preActions, preAction -> + ( (ReactivePreAction) preAction ).reactivePerformPreAction( connection, executionContext ) + ); + } + + @Override + public CompletionStage reactivePerformPostActions( + boolean succeeded, + ReactiveConnection connection, + ExecutionContext executionContext) { + if ( postActions != null ) { + return loop( + postActions, postAction -> { + if ( succeeded || postAction.shouldRunAfterFail() ) { + return ( (ReactivePostAction) postAction ).reactivePerformReactivePostAction( + connection, + executionContext + ); + } + return nullFuture(); + } + ).thenAccept( unused -> { + if ( loadedValuesCollector != null ) { + loadedValuesCollector.clear(); + } + } ); + } + else { + if ( loadedValuesCollector != null ) { + loadedValuesCollector.clear(); + } + return nullFuture(); + } + } + + public static class Builder implements JdbcSelectWithActionsBuilder { + private JdbcOperationQuerySelect primaryAction; + private LoadedValuesCollector loadedValuesCollector; + protected List preActions; + protected List postActions; + protected LockTimeoutType lockTimeoutType; + protected LockingSupport lockingSupport; + protected LockOptions lockOptions; + protected QuerySpec lockingTarget; + protected LockingClauseStrategy lockingClauseStrategy; + boolean isFollonOnLockStrategy; + + @Override + public Builder setPrimaryAction(JdbcSelect primaryAction) { + assert primaryAction instanceof JdbcOperationQuerySelect; + this.primaryAction = (JdbcOperationQuerySelect) primaryAction; + return this; + } + + @SuppressWarnings("UnusedReturnValue") + @Override + public Builder setLoadedValuesCollector(LoadedValuesCollector loadedValuesCollector) { + this.loadedValuesCollector = loadedValuesCollector; + return this; + } + + @Override + public Builder setLockTimeoutType(LockTimeoutType lockTimeoutType) { + this.lockTimeoutType = lockTimeoutType; + return this; + } + + @Override + public Builder setLockingSupport(LockingSupport lockingSupport) { + this.lockingSupport = lockingSupport; + return this; + } + + @Override + public Builder setLockOptions(LockOptions lockOptions) { + this.lockOptions = lockOptions; + return this; + } + + @Override + public Builder setLockingTarget(QuerySpec lockingTarget) { + this.lockingTarget = lockingTarget; + return this; + } + + @Override + public Builder setLockingClauseStrategy(LockingClauseStrategy lockingClauseStrategy) { + this.lockingClauseStrategy = lockingClauseStrategy; + return this; + } + + @Override + public Builder setIsFollowOnLockStrategy(boolean isFollonOnLockStrategy) { + this.isFollonOnLockStrategy = isFollonOnLockStrategy; + return this; + } + + @Override + public JdbcSelect build() { + if ( lockTimeoutType == LockTimeoutType.CONNECTION ) { + addSecondaryActionPair( + new ReactiveLockTimeoutHandler( + lockOptions.getTimeout(), + ReactiveConnectionLockTimeoutStrategyBuilder.build( lockingSupport.getConnectionLockTimeoutStrategy() ) + ) + ); + } + if ( isFollonOnLockStrategy ) { + ReactiveFollowOnLockingAction.apply( lockOptions, lockingTarget, lockingClauseStrategy, this ); + } + else if ( lockOptions.getScope() == Locking.Scope.INCLUDE_COLLECTIONS ) { + ReactiveCollectionLockingAction.apply( lockOptions, lockingTarget, this ); + } + if ( preActions == null && postActions == null ) { + assert loadedValuesCollector == null; + return primaryAction; + } + final PreAction[] preActions = toPreActionArray( this.preActions ); + final PostAction[] postActions = toPostActionArray( this.postActions ); + return new ReactiveJdbcSelectWithActions( primaryAction, loadedValuesCollector, preActions, postActions ); + } + + /** + * Appends the {@code actions} to the growing list of pre-actions, + * executed (in order) after all currently registered actions. + * + * @return {@code this}, for method chaining. + */ + @Override + public Builder appendPreAction(PreAction... actions) { + if ( preActions == null ) { + preActions = new ArrayList<>(); + } + Collections.addAll( preActions, actions ); + return this; + } + + /** + * Prepends the {@code actions} to the growing list of pre-actions + * + * @return {@code this}, for method chaining. + */ + @Override + public Builder prependPreAction(PreAction... actions) { + if ( preActions == null ) { + preActions = new ArrayList<>(); + } + // todo (DatabaseOperation) : should we invert the order of the incoming actions? + Collections.addAll( preActions, actions ); + return this; + } + + /** + * Appends the {@code actions} to the growing list of post-actions + * + * @return {@code this}, for method chaining. + */ + @Override + public Builder appendPostAction(PostAction... actions) { + if ( postActions == null ) { + postActions = new ArrayList<>(); + } + Collections.addAll( postActions, actions ); + return this; + } + + /** + * Prepends the {@code actions} to the growing list of post-actions + * + * @return {@code this}, for method chaining. + */ + @Override + public Builder prependPostAction(PostAction... actions) { + if ( postActions == null ) { + postActions = new ArrayList<>(); + } + // todo (DatabaseOperation) : should we invert the order of the incoming actions? + Collections.addAll( postActions, actions ); + return this; + } + + /** + * Adds a secondary action pair. + * Assumes the {@code action} implements both {@linkplain PreAction} and {@linkplain PostAction}. + * + * @return {@code this}, for method chaining. + * + * @apiNote Prefer {@linkplain #addSecondaryActionPair(PreAction, PostAction)} to avoid + * the casts needed here. + * @see #prependPreAction + * @see #appendPostAction + */ + @Override + public Builder addSecondaryActionPair(SecondaryAction action) { + return addSecondaryActionPair( (PreAction) action, (PostAction) action ); + } + + /** + * Adds a PreAction/PostAction pair. + * + * @return {@code this}, for method chaining. + * + * @see #prependPreAction + * @see #appendPostAction + */ + @Override + public Builder addSecondaryActionPair(PreAction preAction, PostAction postAction) { + prependPreAction( preAction ); + appendPostAction( postAction ); + return this; + } + + static PreAction[] toPreActionArray(List actions) { + if ( CollectionHelper.isEmpty( actions ) ) { + return null; + } + return actions.toArray( new PreAction[0] ); + } + + static PostAction[] toPostActionArray(List actions) { + if ( CollectionHelper.isEmpty( actions ) ) { + return null; + } + return actions.toArray( new PostAction[0] ); + } + + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveSelectExecutor.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveSelectExecutor.java index edadd83e9..338dd21df 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveSelectExecutor.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/StandardReactiveSelectExecutor.java @@ -22,6 +22,9 @@ import org.hibernate.query.TupleTransformer; import org.hibernate.reactive.engine.impl.ReactivePersistenceContextAdapter; import org.hibernate.query.spi.QueryOptions; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.session.ReactiveConnectionSupplier; +import org.hibernate.reactive.sql.exec.spi.ReactiveJdbcSelect; import org.hibernate.reactive.sql.exec.spi.ReactiveRowProcessingState; import org.hibernate.reactive.sql.exec.spi.ReactiveSelectExecutor; import org.hibernate.reactive.sql.exec.spi.ReactiveValuesResultSet; @@ -220,10 +223,15 @@ public boolean shouldReturnProxies() { }; final JdbcValuesSourceProcessingStateStandardImpl valuesProcessingState = - new JdbcValuesSourceProcessingStateStandardImpl( executionContext, processingOptions ); + new JdbcValuesSourceProcessingStateStandardImpl( + jdbcSelect.getLoadedValuesCollector(), + processingOptions, + executionContext + ); + final SharedSessionContractImplementor session = executionContext.getSession(); final ReactiveRowReader rowReader = ReactiveResultsHelper.createRowReader( - executionContext.getSession().getSessionFactory(), + session.getSessionFactory(), rowTransformer, domainResultType, jdbcValues @@ -237,21 +245,34 @@ public boolean shouldReturnProxies() { ); rowReader.startLoading( rowProcessingState ); - - return resultsConsumer - .consume( - jdbcValues, - executionContext.getSession(), - processingOptions, - valuesProcessingState, - rowProcessingState, - rowReader - ) - .thenApply( result -> { - statistics.end( jdbcSelect, result ); - return result; - } ); - } ); + ReactiveConnection reactiveConnection = ( (ReactiveConnectionSupplier) session ).getReactiveConnection(); + if ( jdbcSelect instanceof ReactiveJdbcSelect reactiveJdbcSelect ) { + return reactiveJdbcSelect.reactivePerformPreActions( reactiveConnection, executionContext ) + .thenCompose( unused -> resultsConsumer + .consume( + jdbcValues, + session, + processingOptions, + valuesProcessingState, + rowProcessingState, + rowReader + ) ) + .thenCompose( result -> reactiveJdbcSelect + .reactivePerformPostActions( true, reactiveConnection, executionContext ) + .thenApply( v -> { + statistics.end( jdbcSelect, result ); + return result; + } ) + ); + } + else { + return resultsConsumer.consume( jdbcValues, session, processingOptions, valuesProcessingState, rowProcessingState, rowReader ) + .thenApply( result -> { + statistics.end( jdbcSelect, result ); + return result; + } ); + } + }); } private static RowTransformer rowTransformer( diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/LockHelper.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/LockHelper.java new file mode 100644 index 000000000..a4db6a44c --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/LockHelper.java @@ -0,0 +1,80 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.HibernateException; +import org.hibernate.reactive.pool.ReactiveConnection; + +import jakarta.persistence.Timeout; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; + +/** + * Reactive version of {@link org.hibernate.dialect.lock.internal.Helper} + */ +public class LockHelper { + + public static CompletionStage getLockTimeout( + String sql, + TimeoutExtractor extractor, + ReactiveConnection connection) { + return connection.select( sql ) + .thenCompose( resultset -> { + if ( !resultset.hasNext() ) { + throw new HibernateException( + "Unable to query JDBC Connection for current lock-timeout setting (no result)" ); + + } + return extractor.extractFrom( resultset ); + } ); + + } + + /** + * Set the {@linkplain ReactiveConnection}-level lock-timeout using the given {@code sql} command. + */ + public static CompletionStage setLockTimeout( + String sql, + ReactiveConnection connection) { + return connection.execute( sql ); + } + + /** + * Set the {@linkplain ReactiveConnection}-level lock-timeout using + * the given {@code sqlFormat} (with a single format placeholder + * for the {@code milliseconds} value). + * + * @see #setLockTimeout(String, ReactiveConnection) + */ + public static CompletionStage setLockTimeout( + Integer milliseconds, + String sqlFormat, + ReactiveConnection connection) { + final String sql = String.format( sqlFormat, milliseconds ); + return setLockTimeout( sql, connection ); + } + + /** + * Set the {@linkplain ReactiveConnection}-level lock-timeout. The passed + * {@code valueStrategy} is used to interpret the {@code timeout} + * which is then used with {@code sqlFormat} to execute the command. + * + * @see #setLockTimeout(Integer, String, ReactiveConnection) + */ + public static CompletionStage setLockTimeout( + Timeout timeout, + Function valueStrategy, + String sqlFormat, + ReactiveConnection connection) { + final int milliseconds = valueStrategy.apply( timeout ); + return setLockTimeout( milliseconds, sqlFormat, connection ); + } + + @FunctionalInterface + public interface TimeoutExtractor { + CompletionStage extractFrom(ReactiveConnection.Result resultSet); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveCockroachConnectionLockTimeoutStrategyImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveCockroachConnectionLockTimeoutStrategyImpl.java new file mode 100644 index 000000000..36a537afb --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveCockroachConnectionLockTimeoutStrategyImpl.java @@ -0,0 +1,71 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.HibernateException; +import org.hibernate.dialect.lock.spi.ConnectionLockTimeoutStrategy; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.reactive.pool.ReactiveConnection; + +import jakarta.persistence.Timeout; +import java.util.concurrent.CompletionStage; + +import static org.hibernate.Timeouts.NO_WAIT_MILLI; +import static org.hibernate.Timeouts.SKIP_LOCKED_MILLI; +import static org.hibernate.Timeouts.WAIT_FOREVER; +import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; + +public class ReactiveCockroachConnectionLockTimeoutStrategyImpl + implements ReactiveConnectionLockTimeoutStrategy { + + public static final ReactiveCockroachConnectionLockTimeoutStrategyImpl INSTANCE = new ReactiveCockroachConnectionLockTimeoutStrategyImpl(); + + @Override + public Level getSupportedLevel() { + return ConnectionLockTimeoutStrategy.Level.SUPPORTED; + } + + @Override + public CompletionStage getReactiveLockTimeout( + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.getLockTimeout( + "show lock_timeout", + (resultSet) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int millis = (int) resultSet.next()[0]; + return switch ( millis ) { + case 0 -> completedFuture( WAIT_FOREVER ); + default -> completedFuture( Timeout.milliseconds( millis ) ); + }; + }, + connection + ); + } + + public CompletionStage setReactiveLockTimeout( + Timeout timeout, + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.setLockTimeout( + timeout, + (t) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int milliseconds = timeout.milliseconds(); + if ( milliseconds == SKIP_LOCKED_MILLI ) { + throw new HibernateException( "Connection lock-timeout does not accept skip-locked" ); + } + if ( milliseconds == NO_WAIT_MILLI ) { + throw new HibernateException( "Connection lock-timeout does not accept no-wait" ); + } + return milliseconds; + }, + "set lock_timeout = %s", + connection + ); + } + +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveCollectionLockingAction.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveCollectionLockingAction.java new file mode 100644 index 000000000..a16255a7e --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveCollectionLockingAction.java @@ -0,0 +1,154 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.LockMode; +import org.hibernate.LockOptions; +import org.hibernate.Locking; +import org.hibernate.engine.spi.EffectiveEntityGraph; +import org.hibernate.engine.spi.EntityKey; +import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.graph.GraphSemantic; +import org.hibernate.metamodel.mapping.EntityMappingType; +import org.hibernate.metamodel.mapping.PluralAttributeMapping; +import org.hibernate.query.sqm.mutation.internal.SqmMutationStrategyHelper; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.sql.exec.spi.ReactivePostAction; +import org.hibernate.reactive.util.impl.CompletionStages; +import org.hibernate.sql.ast.tree.select.QuerySpec; +import org.hibernate.sql.exec.internal.lock.CollectionLockingAction; +import org.hibernate.sql.exec.internal.lock.EntityDetails; +import org.hibernate.sql.exec.internal.lock.LockingHelper; +import org.hibernate.sql.exec.spi.ExecutionContext; +import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder; + +import jakarta.persistence.Timeout; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; + +import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; +import static org.hibernate.sql.exec.SqlExecLogger.SQL_EXEC_LOGGER; + +/** + * Reactive version of {@link CollectionLockingAction} + */ +public class ReactiveCollectionLockingAction extends CollectionLockingAction implements ReactivePostAction { + + protected ReactiveCollectionLockingAction( + LoadedValuesCollectorImpl loadedValuesCollector, + LockMode lockMode, + Timeout lockTimeout) { + super( loadedValuesCollector, lockMode, lockTimeout ); + } + + + public static void apply( + LockOptions lockOptions, + QuerySpec lockingTarget, + JdbcSelectWithActionsBuilder jdbcSelectBuilder) { + assert lockOptions.getScope() == Locking.Scope.INCLUDE_COLLECTIONS; + + final var loadedValuesCollector = resolveLoadedValuesCollector( lockingTarget.getFromClause() ); + + // NOTE: we need to set this separately so that it can get incorporated into + // the JdbcValuesSourceProcessingState for proper callbacks + jdbcSelectBuilder.setLoadedValuesCollector( loadedValuesCollector ); + + // additionally, add a post-action which uses the collected values. + jdbcSelectBuilder.appendPostAction( new ReactiveCollectionLockingAction( + loadedValuesCollector, + lockOptions.getLockMode(), + lockOptions.getTimeout() + ) ); + } + + @Override + public CompletionStage reactivePerformReactivePostAction( + ReactiveConnection jdbcConnection, + ExecutionContext executionContext) { + LockingHelper.logLoadedValues( loadedValuesCollector ); + + final var session = executionContext.getSession(); + + // NOTE: we deal with effective graphs here to make sure embedded associations are treated as lazy + final var effectiveEntityGraph = session.getLoadQueryInfluencers().getEffectiveEntityGraph(); + final var initialGraph = effectiveEntityGraph.getGraph(); + final var initialSemantic = effectiveEntityGraph.getSemantic(); + + // collect registrations by entity type + final var entitySegments = segmentLoadedValues( loadedValuesCollector ); + + try { + // for each entity-type, prepare a locking select statement per table. + // this is based on the attributes for "state array" ordering purposes - + // we match each attribute to the table it is mapped to and add it to + // the select-list for that table-segment. + CompletionStage loop = voidFuture(); + + for ( Map.Entry> entry : entitySegments.entrySet() ) { + loop = loop.thenCompose( v -> execute( executionContext, entry, session, effectiveEntityGraph ) ); + } + return loop; + } + finally { + // reset the effective graph to whatever it was when we started + effectiveEntityGraph.clear(); + session.getLoadQueryInfluencers().applyEntityGraph( initialGraph, initialSemantic ); + } + } + + private CompletionStage execute( + ExecutionContext executionContext, + Map.Entry> entry, + SharedSessionContractImplementor session, + EffectiveEntityGraph effectiveEntityGraph) { + EntityMappingType entityMappingType = entry.getKey(); + List entityKeys = entry.getValue(); + if ( SQL_EXEC_LOGGER.isDebugEnabled() ) { + SQL_EXEC_LOGGER.startingIncludeCollectionsLockingProcess( entityMappingType.getEntityName() ); + } + + // apply an empty "fetch graph" to make sure any embedded associations reachable from + // any of the DomainResults we will create are treated as lazy + final var graph = entityMappingType.createRootGraph( session ); + effectiveEntityGraph.clear(); + effectiveEntityGraph.applyGraph( graph, GraphSemantic.FETCH ); + + // create a cross-reference of information related to an entity based on its identifier. + // we use this as the collection owners whose collections need to be locked + final var entityDetailsMap = LockingHelper.resolveEntityKeys( entityKeys, executionContext ); + + final List>> suppliers = new ArrayList<>(); + SqmMutationStrategyHelper.visitCollectionTables( + entityMappingType, (attribute) -> { + // we may need to lock the "collection table". + // the conditions are a bit unclear as to directionality, etc., so for now lock each. + suppliers.add( () -> reactiveLockCollectionTable( + attribute, + lockMode, + lockTimeout, + entityDetailsMap, + executionContext + ) ); + } + ); + return CompletionStages.loop( suppliers, Supplier::get ); + } + + public static CompletionStage reactiveLockCollectionTable( + PluralAttributeMapping attributeMapping, + LockMode lockMode, + Timeout lockTimeout, + Map ownerDetailsMap, + ExecutionContext executionContext) { + /// execute query + return voidFuture(); + } + +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveConnectionLockTimeoutStrategy.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveConnectionLockTimeoutStrategy.java new file mode 100644 index 000000000..8e8194a14 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveConnectionLockTimeoutStrategy.java @@ -0,0 +1,45 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.dialect.lock.spi.ConnectionLockTimeoutStrategy; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.logging.impl.LoggerFactory; +import org.hibernate.reactive.pool.ReactiveConnection; + +import jakarta.persistence.Timeout; +import java.lang.invoke.MethodHandles; +import java.sql.Connection; +import java.util.concurrent.CompletionStage; + +/** + * Reactive version of {@link ConnectionLockTimeoutStrategy} + */ +public interface ReactiveConnectionLockTimeoutStrategy extends ConnectionLockTimeoutStrategy { + Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() ); + + @Override + default Timeout getLockTimeout(Connection connection, SessionFactoryImplementor factory) { + throw LOG.nonReactiveMethodCall( "getReactiveLockTimeout()" ); + } + + @Override + default void setLockTimeout(Timeout timeout, Connection connection, SessionFactoryImplementor factory) { + throw LOG.nonReactiveMethodCall( "setReactiveLockTimeout()" ); + } + + default CompletionStage getReactiveLockTimeout(ReactiveConnection connection, SessionFactoryImplementor factory){ + throw new UnsupportedOperationException( "Lock timeout on the connection is not supported" ); + } + + default CompletionStage setReactiveLockTimeout( + Timeout timeout, + ReactiveConnection connection, + SessionFactoryImplementor factory){ + throw new UnsupportedOperationException( "Lock timeout on the connection is not supported" ); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveConnectionLockTimeoutStrategyBuilder.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveConnectionLockTimeoutStrategyBuilder.java new file mode 100644 index 000000000..7619047e8 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveConnectionLockTimeoutStrategyBuilder.java @@ -0,0 +1,36 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.dialect.lock.internal.CockroachLockingSupport; +import org.hibernate.dialect.lock.internal.MySQLLockingSupport; +import org.hibernate.dialect.lock.internal.PostgreSQLLockingSupport; +import org.hibernate.dialect.lock.internal.TransactSQLLockingSupport; +import org.hibernate.dialect.lock.spi.ConnectionLockTimeoutStrategy; + +/** + * Builder to create {@link ReactiveConnectionLockTimeoutStrategy} equivalents of {@link ConnectionLockTimeoutStrategy} + */ +public class ReactiveConnectionLockTimeoutStrategyBuilder { + public static ReactiveConnectionLockTimeoutStrategy build(ConnectionLockTimeoutStrategy strategy) { + if ( strategy instanceof MySQLLockingSupport.ConnectionLockTimeoutStrategyImpl ) { + return ReactiveMySQLConnectionLockTimeoutStrategyImpl.INSTANCE; + } + else if ( strategy instanceof CockroachLockingSupport ) { + return ReactiveCockroachConnectionLockTimeoutStrategyImpl.INSTANCE; + } + else if ( strategy instanceof TransactSQLLockingSupport.SQLServerImpl ) { + return ReactiveSQLServerConnectionLockTimeoutStrategyImpl.INSTANCE; + } + else if ( strategy instanceof PostgreSQLLockingSupport ) { + return ReactivePostgreSQLConnectionLockTimeoutStrategyImpl.INSTANCE; + } + else { + throw new IllegalArgumentException( "Unsupported ConnectionLockTimeoutStrategy: " + strategy.getClass() + .getName() ); + } + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveFollowOnLockingAction.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveFollowOnLockingAction.java new file mode 100644 index 000000000..654c50924 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveFollowOnLockingAction.java @@ -0,0 +1,139 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.LockMode; +import org.hibernate.LockOptions; +import org.hibernate.Locking; +import org.hibernate.engine.spi.CollectionKey; +import org.hibernate.engine.spi.EntityKey; +import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.metamodel.mapping.EntityMappingType; +import org.hibernate.metamodel.mapping.PluralAttributeMapping; +import org.hibernate.metamodel.mapping.TableDetails; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.session.impl.ReactiveSessionImpl; +import org.hibernate.reactive.sql.exec.spi.ReactivePostAction; +import org.hibernate.sql.ast.spi.LockingClauseStrategy; +import org.hibernate.sql.ast.tree.select.QuerySpec; +import org.hibernate.sql.exec.internal.lock.FollowOnLockingAction; +import org.hibernate.sql.exec.internal.lock.LockingHelper; +import org.hibernate.sql.exec.internal.lock.TableLock; +import org.hibernate.sql.exec.spi.ExecutionContext; +import org.hibernate.sql.exec.spi.JdbcSelectWithActionsBuilder; + +import jakarta.persistence.Timeout; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +import static java.util.Collections.emptyMap; +import static org.hibernate.reactive.util.impl.CompletionStages.loop; + +/** + * Reactive version of {@link FollowOnLockingAction} + */ +public class ReactiveFollowOnLockingAction extends FollowOnLockingAction implements ReactivePostAction { + protected ReactiveFollowOnLockingAction( + LoadedValuesCollectorImpl loadedValuesCollector, + LockMode lockMode, + Timeout lockTimeout, + Locking.Scope lockScope) { + super( loadedValuesCollector, lockMode, lockTimeout, lockScope ); + } + + public static void apply( + LockOptions lockOptions, + QuerySpec lockingTarget, + LockingClauseStrategy lockingClauseStrategy, + JdbcSelectWithActionsBuilder jdbcSelectBuilder) { + final var fromClause = lockingTarget.getFromClause(); + final var loadedValuesCollector = resolveLoadedValuesCollector( fromClause, lockingClauseStrategy ); + + // NOTE: we need to set this separately so that it can get incorporated into + // the JdbcValuesSourceProcessingState for proper callbacks + jdbcSelectBuilder.setLoadedValuesCollector( loadedValuesCollector ); + + // additionally, add a post-action which uses the collected values. + jdbcSelectBuilder.appendPostAction( new ReactiveFollowOnLockingAction( + loadedValuesCollector, + lockOptions.getLockMode(), + lockOptions.getTimeout(), + lockOptions.getScope() + ) ); + } + + + @Override + public CompletionStage reactivePerformReactivePostAction( + ReactiveConnection jdbcConnection, + ExecutionContext executionContext) { + LockingHelper.logLoadedValues( loadedValuesCollector ); + + final var session = executionContext.getSession(); + + // NOTE: we deal with effective graphs here to make sure embedded associations are treated as lazy + final var effectiveEntityGraph = session.getLoadQueryInfluencers().getEffectiveEntityGraph(); + final var initialGraph = effectiveEntityGraph.getGraph(); + final var initialSemantic = effectiveEntityGraph.getSemantic(); + + // collect registrations by entity type + final var entitySegments = segmentLoadedValues(); + final Map>> collectionSegments = + lockScope == Locking.Scope.INCLUDE_FETCHES + ? segmentLoadedCollections() + : emptyMap(); + + // for each entity-type, prepare a locking select statement per table. + // this is based on the attributes for "state array" ordering purposes - + // we match each attribute to the table it is mapped to and add it to + // the select-list for that table-segment. + + return loop( + entitySegments.keySet().iterator(), (entityMappingType, index) -> { + List entityKeys = entitySegments.get( entityMappingType ); + final var tableLocks = prepareTableLocks( entityMappingType, entityKeys, session ); + + // create a cross-reference of information related to an entity based on its identifier, + // we'll use this later when we adjust the state array and inject state into the entity instance. + final var entityDetailsMap = LockingHelper.resolveEntityKeys( entityKeys, executionContext ); + + // at this point, we have all the individual locking selects ready to go - execute them + final var lockingOptions = buildLockingOptions( + tableLocks, + entityDetailsMap, + entityMappingType, + effectiveEntityGraph, + entityKeys, + collectionSegments, + session, + executionContext + ); + return loop( tableLocks.values().iterator(), (tableLock, i) -> + ( (ReactiveTableLock) tableLock ).reactivePerformActions( + entityDetailsMap, + lockingOptions, + (ReactiveSessionImpl) session + ) + ); + + } + ).whenComplete( (unused, throwable) -> { + effectiveEntityGraph.clear(); + session.getLoadQueryInfluencers().applyEntityGraph( initialGraph, initialSemantic ); + } ); + + } + + @Override + protected TableLock createTableLock( + TableDetails tableDetails, + EntityMappingType entityMappingType, + List entityKeys, + SharedSessionContractImplementor session) { + return new ReactiveTableLock( tableDetails, entityMappingType, entityKeys, session ); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveLockTimeoutHandler.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveLockTimeoutHandler.java new file mode 100644 index 000000000..7780166bd --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveLockTimeoutHandler.java @@ -0,0 +1,66 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.reactive.sql.exec.spi.ReactivePostAction; +import org.hibernate.reactive.sql.exec.spi.ReactivePreAction; +import org.hibernate.sql.exec.spi.ExecutionContext; + +import jakarta.persistence.Timeout; +import java.util.concurrent.CompletionStage; + +/** + * Reactive version of {@link org.hibernate.sql.exec.internal.LockTimeoutHandler} + */ +public class ReactiveLockTimeoutHandler implements ReactivePreAction, ReactivePostAction { + private final ReactiveConnectionLockTimeoutStrategy lockTimeoutStrategy; + private final Timeout timeout; + + private Timeout baseline; + private boolean setTimeout; + + public ReactiveLockTimeoutHandler(Timeout timeout, ReactiveConnectionLockTimeoutStrategy lockTimeoutStrategy) { + this.timeout = timeout; + this.lockTimeoutStrategy = lockTimeoutStrategy; + } + + + @Override + public CompletionStage reactivePerformPreAction( + ReactiveConnection connection, + ExecutionContext executionContext) { + final var factory = executionContext.getSession().getFactory(); + + // first, get the baseline (for post-action) + return lockTimeoutStrategy.getReactiveLockTimeout( connection, factory ) + .thenCompose( baseline -> { + this.baseline = baseline; + // now set the timeout + return lockTimeoutStrategy.setReactiveLockTimeout( timeout, connection, factory ); + } ) + .thenAccept( unused -> setTimeout = true ); + + } + + @Override + public CompletionStage reactivePerformReactivePostAction( + ReactiveConnection connection, + ExecutionContext executionContext) { + final var factory = executionContext.getSession().getFactory(); + + // reset the timeout + return lockTimeoutStrategy.setReactiveLockTimeout(baseline, connection,factory ); + } + + + + @Override + public boolean shouldRunAfterFail() { + // if we set the timeout in the pre-action, we should always reset it in post-action + return setTimeout; + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveMySQLConnectionLockTimeoutStrategyImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveMySQLConnectionLockTimeoutStrategyImpl.java new file mode 100644 index 000000000..4444233f7 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveMySQLConnectionLockTimeoutStrategyImpl.java @@ -0,0 +1,71 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.HibernateException; +import org.hibernate.Timeouts; +import org.hibernate.dialect.lock.spi.ConnectionLockTimeoutStrategy; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.reactive.pool.ReactiveConnection; + +import jakarta.persistence.Timeout; +import java.util.concurrent.CompletionStage; + +import static org.hibernate.Timeouts.SKIP_LOCKED_MILLI; +import static org.hibernate.Timeouts.WAIT_FOREVER_MILLI; +import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; + +/** + * Reactive version of {@link org.hibernate.dialect.lock.internal.MySQLLockingSupport.ConnectionLockTimeoutStrategyImpl} + */ +public class ReactiveMySQLConnectionLockTimeoutStrategyImpl implements ReactiveConnectionLockTimeoutStrategy { + + public static final ReactiveMySQLConnectionLockTimeoutStrategyImpl INSTANCE = new ReactiveMySQLConnectionLockTimeoutStrategyImpl(); + + @Override + public ConnectionLockTimeoutStrategy.Level getSupportedLevel() { + return ConnectionLockTimeoutStrategy.Level.EXTENDED; + } + + @Override + public CompletionStage getReactiveLockTimeout( ReactiveConnection connection, SessionFactoryImplementor factory) { + return LockHelper.getLockTimeout( + "SELECT @@SESSION.innodb_lock_wait_timeout", + (resultSet) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int millis = (int) resultSet.next()[0]; + return switch ( millis ) { + case 0 -> completedFuture( Timeouts.NO_WAIT ); + case 100000000 -> completedFuture( Timeouts.WAIT_FOREVER ); + default -> completedFuture( Timeout.milliseconds( millis ) ); + }; + }, + connection + ); + } + + public CompletionStage setReactiveLockTimeout( + Timeout timeout, + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.setLockTimeout( + timeout, + (t) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int milliseconds = timeout.milliseconds(); + if ( milliseconds == SKIP_LOCKED_MILLI ) { + throw new HibernateException( "Connection lock-timeout does not accept skip-locked" ); + } + if ( milliseconds == WAIT_FOREVER_MILLI ) { + return 100000000; + } + return milliseconds; + }, + "SET @@SESSION.innodb_lock_wait_timeout = %s", + connection + ); + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactivePostgreSQLConnectionLockTimeoutStrategyImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactivePostgreSQLConnectionLockTimeoutStrategyImpl.java new file mode 100644 index 000000000..99bc84d34 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactivePostgreSQLConnectionLockTimeoutStrategyImpl.java @@ -0,0 +1,78 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.HibernateException; +import org.hibernate.Timeouts; +import org.hibernate.dialect.lock.spi.ConnectionLockTimeoutStrategy; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.reactive.pool.ReactiveConnection; + +import jakarta.persistence.Timeout; +import java.util.concurrent.CompletionStage; + +import static org.hibernate.Timeouts.NO_WAIT_MILLI; +import static org.hibernate.Timeouts.SKIP_LOCKED_MILLI; +import static org.hibernate.Timeouts.WAIT_FOREVER_MILLI; +import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; + +/** + * Reactive version of {@link org.hibernate.dialect.lock.internal.PostgreSQLLockingSupport} + */ +public class ReactivePostgreSQLConnectionLockTimeoutStrategyImpl implements ReactiveConnectionLockTimeoutStrategy { + + public static final ReactivePostgreSQLConnectionLockTimeoutStrategyImpl INSTANCE = new ReactivePostgreSQLConnectionLockTimeoutStrategyImpl(); + + @Override + public Level getSupportedLevel() { + return ConnectionLockTimeoutStrategy.Level.SUPPORTED; + } + + @Override + public CompletionStage getReactiveLockTimeout( + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.getLockTimeout( + "select current_setting('lock_timeout', true)", + (resultSet) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final String value = (String) resultSet.next()[0]; + if ( "0".equals( value ) ) { + return completedFuture( Timeouts.WAIT_FOREVER ); + } + assert value.endsWith( "s" ); + final int secondsValue = Integer.parseInt( value.substring( 0, value.length() - 1 ) ); + return completedFuture( Timeout.seconds( secondsValue ) ); + }, + connection + ); + } + + public CompletionStage setReactiveLockTimeout( + Timeout timeout, + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.setLockTimeout( + timeout, + (t) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int milliseconds = timeout.milliseconds(); + if ( milliseconds == SKIP_LOCKED_MILLI ) { + throw new HibernateException( "Connection lock-timeout does not accept skip-locked" ); + } + if ( milliseconds == NO_WAIT_MILLI ) { + throw new HibernateException( "Connection lock-timeout does not accept no-wait" ); + } + return milliseconds == WAIT_FOREVER_MILLI + ? 0 + : milliseconds; + }, + "set lock_timeout = %s", + connection + ); + } + +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveSQLServerConnectionLockTimeoutStrategyImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveSQLServerConnectionLockTimeoutStrategyImpl.java new file mode 100644 index 000000000..4d8c9a156 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveSQLServerConnectionLockTimeoutStrategyImpl.java @@ -0,0 +1,71 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.HibernateException; +import org.hibernate.dialect.lock.spi.ConnectionLockTimeoutStrategy; +import org.hibernate.engine.spi.SessionFactoryImplementor; +import org.hibernate.reactive.pool.ReactiveConnection; + +import jakarta.persistence.Timeout; +import java.util.concurrent.CompletionStage; + +import static org.hibernate.Timeouts.NO_WAIT; +import static org.hibernate.Timeouts.SKIP_LOCKED_MILLI; +import static org.hibernate.Timeouts.WAIT_FOREVER; +import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; + +/** + * Reactive version of {@link org.hibernate.dialect.lock.internal.TransactSQLLockingSupport.SQLServerImpl} + */ +public class ReactiveSQLServerConnectionLockTimeoutStrategyImpl implements ReactiveConnectionLockTimeoutStrategy { + + public static final ReactiveSQLServerConnectionLockTimeoutStrategyImpl INSTANCE = new ReactiveSQLServerConnectionLockTimeoutStrategyImpl(); + + @Override + public Level getSupportedLevel() { + return ConnectionLockTimeoutStrategy.Level.EXTENDED; + } + + @Override + public CompletionStage getReactiveLockTimeout( + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.getLockTimeout( + "select @@lock_timeout", + (resultSet) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int millis = (int) resultSet.next()[0]; + return switch ( millis ) { + case -1 -> completedFuture( WAIT_FOREVER ); + case 0 -> completedFuture( NO_WAIT ); + default -> completedFuture( Timeout.milliseconds( millis ) ); + }; + }, + connection + ); + } + + public CompletionStage setReactiveLockTimeout( + Timeout timeout, + ReactiveConnection connection, + SessionFactoryImplementor factory) { + return LockHelper.setLockTimeout( + timeout, + (t) -> { + // see https://dev.mysql.com/doc/refman/8.4/en/innodb-parameters.html#sysvar_innodb_lock_wait_timeout + final int milliseconds = timeout.milliseconds(); + if ( milliseconds == SKIP_LOCKED_MILLI ) { + throw new HibernateException( "Connection lock-timeout does not accept skip-locked" ); + } + return milliseconds; + }, + "set lock_timeout %s", + connection + ); + } + +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveTableLock.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveTableLock.java new file mode 100644 index 000000000..8ef968c75 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/internal/lock/ReactiveTableLock.java @@ -0,0 +1,170 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.internal.lock; + +import org.hibernate.AssertionFailure; +import org.hibernate.engine.spi.EntityKey; +import org.hibernate.engine.spi.SharedSessionContractImplementor; +import org.hibernate.metamodel.mapping.AttributeMapping; +import org.hibernate.metamodel.mapping.EntityMappingType; +import org.hibernate.metamodel.mapping.ForeignKeyDescriptor; +import org.hibernate.metamodel.mapping.TableDetails; +import org.hibernate.metamodel.mapping.internal.ToOneAttributeMapping; +import org.hibernate.query.spi.QueryOptions; +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.logging.impl.LoggerFactory; +import org.hibernate.reactive.session.impl.ReactiveSessionImpl; +import org.hibernate.reactive.sql.exec.internal.StandardReactiveSelectExecutor; +import org.hibernate.reactive.sql.results.spi.ReactiveListResultsConsumer; +import org.hibernate.sql.ast.tree.select.SelectStatement; +import org.hibernate.sql.exec.internal.BaseExecutionContext; +import org.hibernate.sql.exec.internal.lock.EntityDetails; +import org.hibernate.sql.exec.internal.lock.TableLock; +import org.hibernate.sql.results.graph.DomainResult; + +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +import static org.hibernate.Hibernate.isEmpty; +import static org.hibernate.reactive.util.impl.CompletionStages.nullFuture; + +/** + * Reactive version of {@link TableLock} + */ +public class ReactiveTableLock extends TableLock { + Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() ); + + public ReactiveTableLock( + TableDetails tableDetails, + EntityMappingType entityMappingType, + List entityKeys, + SharedSessionContractImplementor session) { + super( tableDetails, entityMappingType, entityKeys, session ); + } + + @Override + public void applyAttribute(int index, AttributeMapping attributeMapping) { + final var attributePath = rootPath.append( attributeMapping.getPartName() ); + final DomainResult domainResult; + final ResultHandler resultHandler; + if ( attributeMapping instanceof ToOneAttributeMapping toOne ) { + domainResult = + toOne.getForeignKeyDescriptor().getKeyPart() + .createDomainResult( + attributePath, + logicalTableGroup, + ForeignKeyDescriptor.PART_NAME, + creationStates + ); + resultHandler = new ReactiveToOneResultHandler( index, toOne ); + } + else { + domainResult = + attributeMapping.createDomainResult( + attributePath, + logicalTableGroup, + null, + creationStates + ); + resultHandler = new NonToOneResultHandler( index ); + } + domainResults.add( domainResult ); + resultHandlers.add( resultHandler ); + } + + @Override + public void performActions( + Map entityDetailsMap, + QueryOptions lockingQueryOptions, + SharedSessionContractImplementor session) { + throw LOG.nonReactiveMethodCall( "reactivePerformActions()" ); + } + + public CompletionStage reactivePerformActions( + Map entityDetailsMap, + QueryOptions lockingQueryOptions, + ReactiveSessionImpl session) { + final var sessionFactory = session.getSessionFactory(); + final var jdbcServices = sessionFactory.getJdbcServices(); + final var selectStatement = new SelectStatement( querySpec, domainResults ); + + return StandardReactiveSelectExecutor.INSTANCE + .list( + jdbcServices.getDialect().getSqlAstTranslatorFactory() + .buildSelectTranslator( sessionFactory, selectStatement ) + .translate( jdbcParameterBindings, lockingQueryOptions ), + jdbcParameterBindings, + // IMPORTANT: we need a "clean" ExecutionContext to not further apply locking + new BaseExecutionContext( session ), + row -> row, + Object[].class, + ReactiveListResultsConsumer.UniqueSemantic.ALLOW + ).thenApply( results -> { + if ( isEmpty( results ) ) { + throw new AssertionFailure( "Expecting results" ); + } + + results.forEach( (row) -> { + final Object id = row[0]; + final var entityDetails = entityDetailsMap.get( id ); + for ( int i = 0; i < resultHandlers.size(); i++ ) { + // offset 1 because of the id at position 0 + resultHandlers.get( i ).applyResult( row[i + 1], entityDetails, session ); + } + } ); + return null; + } ); + } + + protected static class ReactiveToOneResultHandler extends ToOneResultHandler { + Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() ); + + public ReactiveToOneResultHandler(Integer statePosition, ToOneAttributeMapping toOne) { + super( statePosition, toOne ); + } + + public void applyResult( + Object stateValue, + EntityDetails entityDetails, + SharedSessionContractImplementor session) { + throw LOG.nonReactiveMethodCall( "reactiveApplyResult()" ); + } + + public CompletionStage reactiveApplyResult( + Object stateValue, + EntityDetails entityDetails, + ReactiveSessionImpl session) { + final Object reference; + if ( stateValue == null ) { + if ( !toOne.isNullable() ) { + throw new IllegalStateException( "Retrieved key was null, but to-one is not nullable : " + toOne.getNavigableRole() + .getFullPath() ); + } + else { + reference = null; + } + applyLoadedState( entityDetails, statePosition, reference ); + applyModelState( entityDetails, statePosition, reference ); + return nullFuture(); + } + else { + return session.reactiveInternalLoad( + toOne.getAssociatedEntityMappingType().getEntityName(), + stateValue, + false, + toOne.isNullable() + ).thenApply( ref -> { + applyLoadedState( entityDetails, statePosition, ref ); + applyModelState( entityDetails, statePosition, ref ); + return null; + } ); + } + + } + } +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactiveJdbcSelect.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactiveJdbcSelect.java new file mode 100644 index 000000000..876296690 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactiveJdbcSelect.java @@ -0,0 +1,50 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.spi; + +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.logging.impl.LoggerFactory; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.sql.exec.spi.ExecutionContext; +import org.hibernate.sql.exec.spi.JdbcSelect; +import org.hibernate.sql.exec.spi.StatementAccess; + +import java.lang.invoke.MethodHandles; +import java.sql.Connection; +import java.util.concurrent.CompletionStage; + +/** + * Reactive version of {@link JdbcSelect} + */ +public interface ReactiveJdbcSelect extends JdbcSelect { + Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() ); + + @Override + default void performPreActions( + StatementAccess jdbcStatementAccess, + Connection jdbcConnection, + ExecutionContext executionContext) { + throw LOG.nonReactiveMethodCall( "reactivePerformPreActions()" ); + + } + + @Override + default void performPostAction( + boolean succeeded, + StatementAccess jdbcStatementAccess, + Connection jdbcConnection, + ExecutionContext executionContext) { + throw LOG.nonReactiveMethodCall( "reactivePerformPostActions()" ); + } + + + CompletionStage reactivePerformPreActions(ReactiveConnection connection, ExecutionContext executionContext); + + CompletionStage reactivePerformPostActions( + boolean succeeded, + ReactiveConnection connection, + ExecutionContext executionContext); +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactivePostAction.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactivePostAction.java new file mode 100644 index 000000000..7a519902a --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactivePostAction.java @@ -0,0 +1,36 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.spi; + +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.logging.impl.LoggerFactory; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.sql.exec.spi.ExecutionContext; +import org.hibernate.sql.exec.spi.PostAction; +import org.hibernate.sql.exec.spi.StatementAccess; + +import java.lang.invoke.MethodHandles; +import java.sql.Connection; +import java.util.concurrent.CompletionStage; + +/** + * Reactive version of {@link PostAction} + */ +public interface ReactivePostAction extends PostAction { + Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() ); + + @Override + default void performPostAction( + StatementAccess jdbcStatementAccess, + Connection jdbcConnection, + ExecutionContext executionContext) { + throw LOG.nonReactiveMethodCall( "reactivePerformPostAction()" ); + } + + CompletionStage reactivePerformReactivePostAction( + ReactiveConnection jdbcConnection, + ExecutionContext executionContext); +} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactivePreAction.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactivePreAction.java new file mode 100644 index 000000000..3aba18ca4 --- /dev/null +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/sql/exec/spi/ReactivePreAction.java @@ -0,0 +1,34 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive.sql.exec.spi; + +import org.hibernate.reactive.logging.impl.Log; +import org.hibernate.reactive.logging.impl.LoggerFactory; +import org.hibernate.reactive.pool.ReactiveConnection; +import org.hibernate.sql.exec.spi.ExecutionContext; +import org.hibernate.sql.exec.spi.PreAction; +import org.hibernate.sql.exec.spi.StatementAccess; + +import java.lang.invoke.MethodHandles; +import java.sql.Connection; +import java.util.concurrent.CompletionStage; + +/** + * Reactive version of {@link PreAction} + */ +public interface ReactivePreAction extends PreAction { + Log LOG = LoggerFactory.make( Log.class, MethodHandles.lookup() ); + + @Override + default void performPreAction( + StatementAccess jdbcStatementAccess, + Connection jdbcConnection, + ExecutionContext executionContext) { + throw LOG.nonReactiveMethodCall( "reactivePerformPreAction()" ); + } + + CompletionStage reactivePerformPreAction(ReactiveConnection connection, ExecutionContext executionContext); +} diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/FindByIdWithLockTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/FindByIdWithLockTest.java index 9eb8bb8f8..85a39cecf 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/FindByIdWithLockTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/FindByIdWithLockTest.java @@ -16,7 +16,6 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import io.vertx.junit5.Timeout; @@ -84,7 +83,6 @@ context, getMutinySessionFactory() ); } - @Disabled @Test public void testFindUpgradeNoWait(VertxTestContext context) { Child child = new Child( CHILD_ID, "And" ); @@ -167,7 +165,7 @@ public static class Child { public String name; - @ManyToOne + @ManyToOne(fetch = FetchType.LAZY) public Parent parent; public Child() {