Skip to content

Commit

Permalink
HSEARCH-4283 Allow passing function to trx helper
Browse files Browse the repository at this point in the history
  • Loading branch information
fax4ever committed Jan 4, 2022
1 parent cac828b commit 32744de
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 27 deletions.
Expand Up @@ -16,8 +16,10 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.search.engine.backend.orchestration.spi.SingletonTask;
import org.hibernate.search.engine.cfg.ConfigurationPropertySource;
import org.hibernate.search.engine.cfg.spi.ConfigurationProperty;
Expand Down Expand Up @@ -288,7 +290,9 @@ public CompletableFuture<?> work() {
// For more information, see
// org.hibernate.search.mapper.orm.coordination.outboxpolling.impl.OutboxEventLoader.tryLoadLocking
while ( eventUpdater.thereAreStillEventsToProcess() ) {
transactionHelper.inTransaction( session, transactionTimeout, s -> eventUpdater.process() );
transactionHelper.inTransaction( session, transactionTimeout,
(Consumer<SharedSessionContractImplementor>) s -> eventUpdater.process()
);
}

return CompletableFuture.completedFuture( null );
Expand Down
Expand Up @@ -10,8 +10,6 @@

import java.lang.invoke.MethodHandles;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.hibernate.Session;
import org.hibernate.engine.spi.SessionFactoryImplementor;
Expand Down Expand Up @@ -50,92 +48,80 @@ public OutboxPollingSearchMappingImpl(CoordinationStrategyStartContext context,
public long countAbortedEvents() {
checkNoTenant();

AtomicLong result = new AtomicLong();
try ( Session session = sessionFactory.openSession() ) {
transactionHelper.inTransaction( (SharedSessionContractImplementor) session, null, s -> {
return transactionHelper.inTransaction( (SharedSessionContractImplementor) session, null, s -> {
Query<Long> query = session.createQuery( COUNT_EVENTS_WITH_STATUS, Long.class );
query.setParameter( "status", OutboxEvent.Status.ABORTED );
result.set( query.getSingleResult() );
return query.getSingleResult();
} );
}
return result.get();
}

@Override
public long countAbortedEvents(String tenantId) {
checkTenant( tenantId );

AtomicLong result = new AtomicLong();
try ( Session session = sessionFactory.withOptions().tenantIdentifier( tenantId ).openSession() ) {
transactionHelper.inTransaction( (SharedSessionContractImplementor) session, null, s -> {
return transactionHelper.inTransaction( (SharedSessionContractImplementor) session, null, s -> {
Query<Long> query = session.createQuery( COUNT_EVENTS_WITH_STATUS, Long.class );
query.setParameter( "status", OutboxEvent.Status.ABORTED );
result.set( query.getSingleResult() );
return query.getSingleResult();
} );
}
return result.get();
}

@Override
public int reprocessAbortedEvents() {
checkNoTenant();

AtomicInteger result = new AtomicInteger();
try ( Session session = sessionFactory.openSession() ) {
transactionHelper.inTransaction( (SharedSessionContractImplementor) session, null, s -> {
return transactionHelper.inTransaction( (SharedSessionContractImplementor) session, null, s -> {
Query<?> query = session.createQuery( UPDATE_EVENTS_WITH_STATUS );
query.setParameter( "status", OutboxEvent.Status.ABORTED );
query.setParameter( "newStatus", OutboxEvent.Status.PENDING );
result.set( query.executeUpdate() );
return query.executeUpdate();
} );
}
return result.get();
}

@Override
public int reprocessAbortedEvents(String tenantId) {
checkTenant( tenantId );

AtomicInteger result = new AtomicInteger();
try ( Session session = sessionFactory.withOptions().tenantIdentifier( tenantId ).openSession() ) {
transactionHelper.inTransaction( (SharedSessionContractImplementor) session, null, s -> {
return transactionHelper.inTransaction( (SharedSessionContractImplementor) session, null, s -> {
Query<?> query = session.createQuery( UPDATE_EVENTS_WITH_STATUS );
query.setParameter( "status", OutboxEvent.Status.ABORTED );
query.setParameter( "newStatus", OutboxEvent.Status.PENDING );
result.set( query.executeUpdate() );
return query.executeUpdate();
} );
}
return result.get();
}

@Override
public int clearAllAbortedEvents() {
checkNoTenant();

AtomicInteger result = new AtomicInteger();
try ( Session session = sessionFactory.openSession() ) {
transactionHelper.inTransaction( (SharedSessionContractImplementor) session, null, s -> {
return transactionHelper.inTransaction( (SharedSessionContractImplementor) session, null, s -> {
Query<?> query = session.createQuery( DELETE_EVENTS_WITH_STATUS );
query.setParameter( "status", OutboxEvent.Status.ABORTED );
result.set( query.executeUpdate() );
return query.executeUpdate();
} );
}
return result.get();
}

@Override
public int clearAllAbortedEvents(String tenantId) {
checkTenant( tenantId );

AtomicInteger result = new AtomicInteger();
try ( Session session = sessionFactory.withOptions().tenantIdentifier( tenantId ).openSession() ) {
transactionHelper.inTransaction( (SharedSessionContractImplementor) session, null, s -> {
return transactionHelper.inTransaction( (SharedSessionContractImplementor) session, null, s -> {
Query<?> query = session.createQuery( DELETE_EVENTS_WITH_STATUS );
query.setParameter( "status", OutboxEvent.Status.ABORTED );
result.set( query.executeUpdate() );
return query.executeUpdate();
} );
}
return result.get();
}

private void checkNoTenant() {
Expand Down
Expand Up @@ -8,6 +8,7 @@

import java.lang.invoke.MethodHandles;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
Expand Down Expand Up @@ -59,6 +60,22 @@ public void inTransaction(SharedSessionContractImplementor session, Integer tran
commit( session );
}

public <T> T inTransaction(SharedSessionContractImplementor session, Integer transactionTimeout,
Function<SharedSessionContractImplementor, T> function) {
begin( session, transactionTimeout );

T result;
try {
result = function.apply( session );
}
catch (Exception e) {
rollbackSafely( session, e );
throw e;
}
commit( session );
return result;
}

public void begin(SharedSessionContractImplementor session, Integer transactionTimeout) {
try {
if ( useJta ) {
Expand Down

0 comments on commit 32744de

Please sign in to comment.