Skip to content

Commit

Permalink
[#1687] Fix race condition in ReactiveUpdateCoordinator
Browse files Browse the repository at this point in the history
Hibernate ORM can use the same update coordinator among
multiple update operations.

In Hibernate Reactive, the reactive update coordinator
has a state that cannot be shared. So, we decided
to create a new scoped coordinator for each update
operation.

Right now, we want to merge a fix for the issue that
doesn't require us to copy or change code from
Hibernate ORM.

The plan is to find a more efficient solution as
a separate issue. And, possible add some
infrastructure to make it easier to track these
type of issues.
  • Loading branch information
DavideD authored and Sanne committed Jun 30, 2023
1 parent b411fef commit f8407d6
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.hibernate.reactive.persister.entity.mutation.ReactiveInsertCoordinator;
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinator;
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinatorNoOp;
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinatorStandard;

public final class ReactiveCoordinatorFactory {

Expand All @@ -32,7 +31,7 @@ public static ReactiveUpdateCoordinator buildUpdateCoordinator(
for ( int i = 0; i < attributeMappings.size(); i++ ) {
AttributeMapping attributeMapping = attributeMappings.get( i );
if ( attributeMapping instanceof SingularAttributeMapping ) {
return new ReactiveUpdateCoordinatorStandard( entityPersister, factory );
return new ReactiveUpdateCoordinatorStandardScopeFactory( entityPersister, factory );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ public CompletionStage<Void> updateReactive(
Object rowId,
SharedSessionContractImplementor session) {
return ( (ReactiveUpdateCoordinator) getUpdateCoordinator() )
// This is different from Hibernate ORM because our reactive update coordinator cannot be share among
// multiple update operations
.makeScopedCoordinator()
.coordinateReactiveUpdate( object, id, rowId, values, oldVersion, oldValues, dirtyAttributeIndexes, hasDirtyCollection, session );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ public CompletionStage<Void> updateReactive(
final Object rowId,
final SharedSessionContractImplementor session) throws HibernateException {
return ( (ReactiveUpdateCoordinator) getUpdateCoordinator() )
// This is different from Hibernate ORM because our reactive update coordinator cannot be share among
// multiple update operations
.makeScopedCoordinator()
.coordinateReactiveUpdate( object, id, rowId, values, oldVersion, oldValues, dirtyAttributeIndexes, hasDirtyCollection, session );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ public CompletionStage<Void> updateReactive(
Object rowId,
SharedSessionContractImplementor session) {
return ( (ReactiveUpdateCoordinator) getUpdateCoordinator() )
// This is different from Hibernate ORM because our reactive update coordinator cannot be share among
// multiple update operations
.makeScopedCoordinator()
.coordinateReactiveUpdate( object, id, rowId, values, oldVersion, oldValues, dirtyAttributeIndexes, hasDirtyCollection, session );
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.persister.entity.impl;

import org.hibernate.engine.spi.SessionFactoryImplementor;
import org.hibernate.persister.entity.AbstractEntityPersister;
import org.hibernate.persister.entity.mutation.UpdateCoordinatorStandard;
import org.hibernate.reactive.persister.entity.mutation.ReactiveScopedUpdateCoordinator;
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinator;
import org.hibernate.reactive.persister.entity.mutation.ReactiveUpdateCoordinatorStandard;

public class ReactiveUpdateCoordinatorStandardScopeFactory extends UpdateCoordinatorStandard implements ReactiveUpdateCoordinator {

private final AbstractEntityPersister entityPersister;
private final SessionFactoryImplementor factory;

public ReactiveUpdateCoordinatorStandardScopeFactory(
AbstractEntityPersister entityPersister,
SessionFactoryImplementor factory) {
super( entityPersister, factory );
this.entityPersister = entityPersister;
this.factory = factory;
}

@Override
public ReactiveScopedUpdateCoordinator makeScopedCoordinator() {
return new ReactiveUpdateCoordinatorStandard( entityPersister, factory );
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/* Hibernate, Relational Persistence for Idiomatic Java
*
* SPDX-License-Identifier: Apache-2.0
* Copyright: Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive.persister.entity.mutation;

import java.util.concurrent.CompletionStage;

import org.hibernate.engine.spi.SharedSessionContractImplementor;

/**
* Scoped to a single operation, so that we can keep
* instance scoped state.
*
* @see org.hibernate.persister.entity.mutation.UpdateCoordinator
* @see ReactiveUpdateCoordinator
*/
public interface ReactiveScopedUpdateCoordinator {

CompletionStage<Void> coordinateReactiveUpdate(
Object entity,
Object id,
Object rowId,
Object[] values,
Object oldVersion,
Object[] incomingOldValues,
int[] dirtyAttributeIndexes,
boolean hasDirtyCollection,
SharedSessionContractImplementor session);

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,14 @@
*/
package org.hibernate.reactive.persister.entity.mutation;

import java.util.concurrent.CompletionStage;

import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.persister.entity.mutation.UpdateCoordinator;

/**
* A reactive {@link UpdateCoordinator} that allows the creation of a {@link ReactiveScopedUpdateCoordinator} scoped
* to a single update operation.
*/
public interface ReactiveUpdateCoordinator extends UpdateCoordinator {

CompletionStage<Void> coordinateReactiveUpdate(
Object entity,
Object id,
Object rowId,
Object[] values,
Object oldVersion,
Object[] incomingOldValues,
int[] dirtyAttributeIndexes,
boolean hasDirtyCollection,
SharedSessionContractImplementor session);
ReactiveScopedUpdateCoordinator makeScopedCoordinator();

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

public class ReactiveUpdateCoordinatorNoOp extends UpdateCoordinatorNoOp implements ReactiveUpdateCoordinator {
public class ReactiveUpdateCoordinatorNoOp extends UpdateCoordinatorNoOp implements ReactiveScopedUpdateCoordinator, ReactiveUpdateCoordinator {

public ReactiveUpdateCoordinatorNoOp(AbstractEntityPersister entityPersister) {
super( entityPersister );
Expand Down Expand Up @@ -45,4 +45,11 @@ public CompletionStage<Void> coordinateReactiveUpdate(
SharedSessionContractImplementor session) {
return voidFuture();
}

@Override
public ReactiveScopedUpdateCoordinator makeScopedCoordinator() {
//This particular implementation is stateless, so we can return ourselves w/o needing to create a scope.
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.CompletionStage;

import java.util.concurrent.atomic.AtomicInteger;

import org.hibernate.engine.jdbc.mutation.ParameterUsage;
import org.hibernate.engine.jdbc.mutation.spi.MutationExecutorService;
import org.hibernate.engine.spi.EntityEntry;
Expand All @@ -23,7 +24,6 @@
import org.hibernate.persister.entity.mutation.EntityTableMapping;
import org.hibernate.persister.entity.mutation.UpdateCoordinatorStandard;
import org.hibernate.reactive.engine.jdbc.env.internal.ReactiveMutationExecutor;
import org.hibernate.reactive.util.impl.CompletionStages;
import org.hibernate.sql.model.MutationOperationGroup;
import org.hibernate.tuple.entity.EntityMetamodel;

Expand All @@ -35,24 +35,31 @@
import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture;
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

public class ReactiveUpdateCoordinatorStandard extends UpdateCoordinatorStandard implements ReactiveUpdateCoordinator {

private CompletionStage<Void> stage;
/**
* Reactive version of {@link UpdateCoordinatorStandard}, but it cannot be share between multiple update operations.
*/
public class ReactiveUpdateCoordinatorStandard extends UpdateCoordinatorStandard implements ReactiveScopedUpdateCoordinator {

private CompletableFuture<Void> updateResultStage;

public ReactiveUpdateCoordinatorStandard(AbstractEntityPersister entityPersister, SessionFactoryImplementor factory) {
super( entityPersister, factory );
}

private void complete(Object o, Throwable throwable) {
// Utility method to use method reference
private void complete(final Object o, final Throwable throwable) {
if ( throwable != null ) {
stage.toCompletableFuture().completeExceptionally( throwable );
fail( throwable );
}
else {
stage.toCompletableFuture().complete( null );
updateResultStage.complete( null );
}
}

private void fail(Throwable throwable) {
updateResultStage.completeExceptionally( throwable );
}

@Override
public CompletionStage<Void> coordinateReactiveUpdate(
Object entity,
Expand Down Expand Up @@ -85,7 +92,8 @@ public CompletionStage<Void> coordinateReactiveUpdate(
// Ensure that an immutable or non-modifiable entity is not being updated unless it is
// in the process of being deleted.
if ( entry == null && !entityPersister().isMutable() ) {
return CompletionStages.failedFuture(new IllegalStateException( "Updating immutable entity that is not in session yet" ));
fail( new IllegalStateException( "Updating immutable entity that is not in session yet" ) );
return updateResultStage;
}

CompletionStage<Void> s = voidFuture();
Expand Down Expand Up @@ -144,8 +152,10 @@ && entityPersister().hasLazyDirtyFields( dirtyAttributeIndexes ) ) {
forceDynamicUpdate
);

// stage gets updated by doDynamicUpdate and doStaticUpdate which get called by performUpdate
return stage != null ? stage : voidFuture();
// doDynamicUpdate, doVersionUpdate, or doStaticUpdate will initialize the stage,
// if an update is necessary.
// Otherwise, updateResultStage could be null.
return updateResultStage != null ? updateResultStage : voidFuture();
});
}

Expand Down Expand Up @@ -202,7 +212,7 @@ protected void doVersionUpdate(
Object oldVersion,
SharedSessionContractImplementor session) {
assert getVersionUpdateGroup() != null;
this.stage = new CompletableFuture<>();
this.updateResultStage = new CompletableFuture<>();

final EntityTableMapping mutatingTableDetails = (EntityTableMapping) getVersionUpdateGroup()
.getSingleOperation().getTableDetails();
Expand Down Expand Up @@ -274,7 +284,7 @@ protected void doDynamicUpdate(
UpdateCoordinatorStandard.InclusionChecker dirtinessChecker,
UpdateCoordinatorStandard.UpdateValuesAnalysisImpl valuesAnalysis,
SharedSessionContractImplementor session) {
this.stage = new CompletableFuture<>();
this.updateResultStage = new CompletableFuture<>();
// Create the JDBC operation descriptors
final MutationOperationGroup dynamicUpdateGroup = generateDynamicUpdateGroup(
id,
Expand Down Expand Up @@ -342,7 +352,7 @@ protected void doStaticUpdate(
Object[] oldValues,
UpdateValuesAnalysisImpl valuesAnalysis,
SharedSessionContractImplementor session) {
this.stage = new CompletableFuture<>();
this.updateResultStage = new CompletableFuture<>();
final MutationOperationGroup staticUpdateGroup = getStaticUpdateGroup();
final ReactiveMutationExecutor mutationExecutor = mutationExecutor( session, staticUpdateGroup );

Expand Down

0 comments on commit f8407d6

Please sign in to comment.