diff --git a/mapper/orm-coordination-outbox-polling/src/main/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/AbstractAgentClusterLink.java b/mapper/orm-coordination-outbox-polling/src/main/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/AbstractAgentClusterLink.java index 7a1020786ad..1a46354c1ad 100644 --- a/mapper/orm-coordination-outbox-polling/src/main/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/AbstractAgentClusterLink.java +++ b/mapper/orm-coordination-outbox-polling/src/main/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/AbstractAgentClusterLink.java @@ -11,13 +11,13 @@ import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.hibernate.search.engine.reporting.FailureHandler; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.Agent; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentPersister; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentReference; -import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentRepository; import org.hibernate.search.mapper.orm.coordination.outboxpolling.logging.impl.Log; import org.hibernate.search.util.common.logging.impl.LoggerFactory; @@ -29,9 +29,8 @@ abstract class AbstractAgentClusterLink { protected final Duration pollingInterval; protected final Duration pulseInterval; protected final Duration pulseExpiration; + private final AgentPersister agentPersister; - // Accessible for test purposes - final AgentPersister agentPersister; public AbstractAgentClusterLink(AgentPersister agentPersister, FailureHandler failureHandler, Clock clock, @@ -48,11 +47,8 @@ public final R pulse(AgentClusterLinkContext context) { Agent self = ensureRegistered( context ); // In order to avoid transaction deadlocks with some RDBMS (yes, I mean SQL Server), - // we make sure that *all* reads (listing all agents) happen before the write (updating self). - // I'm not entirely sure, but I think it goes like this: - // if we read, then write, then read again, then the second read can trigger a deadlock, - // with each transaction holding a write lock on some rows - // while waiting for a read lock on the whole table. + // we make sure that reads listing other agents always happen in a different transaction + // than writes. List allAgentsInIdOrder = context.agentRepository().findAllOrderById(); Instant now = clock.instant(); @@ -69,44 +65,61 @@ public final R pulse(AgentClusterLinkContext context) { // then the first agent tries to delete the second agent because it expired, // then the second agent tries to delete the first agent because it expired, // all in the same transaction, well... here you have a deadlock. + Instant expirationLimit = now; List timedOutAgents = allAgentsInIdOrder.stream() - .filter( a -> !a.equals( self ) && a.getExpiration().isBefore( now ) ) + .filter( Predicate.isEqual( self ).negate() ) // Ignore self: if expired, we'll correct that. + .filter( a -> a.getExpiration().isBefore( expirationLimit ) ) .collect( Collectors.toList() ); if ( !timedOutAgents.isEmpty() ) { log.removingTimedOutAgents( selfReference(), timedOutAgents ); context.agentRepository().delete( timedOutAgents ); - log.infof( "Agent '%s': reassessing the new situation in the next pulse", - selfReference(), now - ); + log.infof( "Agent '%s': reassessing the new situation in the next pulse", selfReference() ); return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); } - // Delay expiration in each pulse - self.setExpiration( newExpiration ); - - return doPulse( context.agentRepository(), now, allAgentsInIdOrder, self ); + // Determine what needs to be done + WriteAction pulseResult = doPulse( allAgentsInIdOrder, self ); + + // Write actions are always executed in a new transaction, + // so that the risk of deadlocks (see above) is minimal, + // and transactions are shorter (for lower transaction contention, + // important on CockroachDB in particular: + // https://www.cockroachlabs.com/docs/v22.1/transactions#transaction-contention ). + context.commitAndBeginNewTransaction(); + now = clock.instant(); + self = findSelfExpectRegistered( context ); + // Delay expiration with each write + self.setExpiration( now.plus( pulseExpiration ) ); + R instructions = pulseResult.applyAndReturnInstructions( now, self, agentPersister ); + log.tracef( "Agent '%s': ending pulse at %s with self = %s", + selfReference(), now, self ); + return instructions; } private Agent ensureRegistered(AgentClusterLinkContext context) { - Instant now = clock.instant(); Agent self = agentPersister.findSelf( context.agentRepository() ); if ( self == null ) { + Instant now = clock.instant(); agentPersister.createSelf( context.agentRepository(), now.plus( pulseExpiration ) ); // Make sure the transaction *only* registers the agent, // so that the risk of deadlocks (see below) is minimal // and other agents are made aware of this agent as soon as possible. // This avoids unnecessary rebalancing when multiple nodes start in quick succession. context.commitAndBeginNewTransaction(); - self = agentPersister.findSelf( context.agentRepository() ); - if ( self == null ) { - throw log.agentRegistrationIneffective( selfReference() ); - } + self = findSelfExpectRegistered( context ); + } + return self; + } + + private Agent findSelfExpectRegistered(AgentClusterLinkContext context) { + Agent self = agentPersister.findSelf( context.agentRepository() ); + if ( self == null ) { + throw log.agentRegistrationIneffective( selfReference() ); } return self; } - protected abstract R doPulse(AgentRepository agentRepository, Instant now, - List allAgentsInIdOrder, Agent self); + protected abstract WriteAction doPulse(List allAgentsInIdOrder, Agent self); /** * Instructs the processor to commit the transaction, wait for the given delay, then pulse again. @@ -129,4 +142,11 @@ protected AgentReference selfReference() { return agentPersister.selfReference(); } + final AgentPersister getAgentPersisterForTests() { + return agentPersister; + } + + protected interface WriteAction { + R applyAndReturnInstructions(Instant now, Agent self, AgentPersister agentPersister); + } } diff --git a/mapper/orm-coordination-outbox-polling/src/main/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/OutboxPollingEventProcessorClusterLink.java b/mapper/orm-coordination-outbox-polling/src/main/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/OutboxPollingEventProcessorClusterLink.java index 9dd2b441b8a..dea426e2060 100644 --- a/mapper/orm-coordination-outbox-polling/src/main/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/OutboxPollingEventProcessorClusterLink.java +++ b/mapper/orm-coordination-outbox-polling/src/main/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/OutboxPollingEventProcessorClusterLink.java @@ -18,7 +18,6 @@ import org.hibernate.search.engine.reporting.FailureHandler; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.Agent; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentPersister; -import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentRepository; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentType; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.ClusterDescriptor; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentState; @@ -68,15 +67,16 @@ public OutboxPollingEventProcessorClusterLink(String agentName, } @Override - protected OutboxPollingEventProcessingInstructions doPulse(AgentRepository agentRepository, Instant now, - List allAgentsInIdOrder, Agent self) { + protected WriteAction doPulse(List allAgentsInIdOrder, Agent currentSelf) { for ( Agent agent : allAgentsInIdOrder ) { if ( AgentType.MASS_INDEXING.equals( agent.getType() ) ) { - log.logf( self.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE, + log.logf( currentSelf.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE, "Agent '%s': another agent '%s' is currently mass indexing", - selfReference(), now, agent ); - agentPersister.setSuspended( self ); - return instructCommitAndRetryPulseAfterDelay( now, pulseInterval ); + selfReference(), agent ); + return (now, self, agentPersister) -> { + agentPersister.setSuspended( self ); + return instructCommitAndRetryPulseAfterDelay( now, pulseInterval ); + }; } } @@ -93,34 +93,40 @@ protected OutboxPollingEventProcessingInstructions doPulse(AgentRepository agent allAgentsInIdOrder, e ) ); contextBuilder.failingOperation( log.outboxEventProcessorPulse( selfReference() ) ); failureHandler.handle( contextBuilder.build() ); - agentPersister.setSuspended( self ); - return instructCommitAndRetryPulseAfterDelay( now, pulseInterval ); + return (now, self, agentPersister) -> { + agentPersister.setSuspended( self ); + return instructCommitAndRetryPulseAfterDelay( now, pulseInterval ); + }; } Optional shardAssignmentOptional = ShardAssignmentDescriptor.fromClusterMemberList( clusterTarget.descriptor.memberIdsInShardOrder, selfReference().id ); if ( !shardAssignmentOptional.isPresent() ) { - log.logf( self.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE, + log.logf( currentSelf.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE, "Agent '%s': this agent is superfluous and will not perform event processing," + " because other agents are enough to handle all the shards." + " Target cluster: %s.", selfReference(), clusterTarget.descriptor ); - agentPersister.setSuspended( self ); - return instructCommitAndRetryPulseAfterDelay( now, pulseInterval ); + return (now, self, agentPersister) -> { + agentPersister.setSuspended( self ); + return instructCommitAndRetryPulseAfterDelay( now, pulseInterval ); + }; } ShardAssignmentDescriptor targetShardAssignment = shardAssignmentOptional.get(); if ( clusterTarget.descriptor.memberIdsInShardOrder.contains( null ) ) { - log.logf( self.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE, + log.logf( currentSelf.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE, "Agent '%s': some cluster members are missing; this agent will wait until they are present." + " Target cluster: %s.", selfReference(), clusterTarget.descriptor ); - agentPersister.setSuspended( self ); - return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); + return (now, self, agentPersister) -> { + agentPersister.setSuspended( self ); + return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); + }; } - ShardAssignmentDescriptor persistedShardAssignment = self.getShardAssignment(); + ShardAssignmentDescriptor persistedShardAssignment = currentSelf.getShardAssignment(); if ( !targetShardAssignment.equals( persistedShardAssignment ) ) { log.infof( "Agent '%s': the persisted shard assignment (%s) does not match the target." @@ -128,14 +134,18 @@ protected OutboxPollingEventProcessingInstructions doPulse(AgentRepository agent + " Cluster: %s.", selfReference(), persistedShardAssignment, targetShardAssignment, clusterTarget.descriptor ); - agentPersister.setWaiting( self, clusterTarget.descriptor, targetShardAssignment ); - return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); + return (now, self, agentPersister) -> { + agentPersister.setWaiting( self, clusterTarget.descriptor, targetShardAssignment ); + return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); + }; } // Check whether excluded (superfluous) agents complied with the cluster target and suspended themselves. if ( !excludedAgentsAreOutOfCluster( clusterTarget.excluded ) ) { - agentPersister.setWaiting( self, clusterTarget.descriptor, targetShardAssignment ); - return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); + return (now, self, agentPersister) -> { + agentPersister.setWaiting( self, clusterTarget.descriptor, targetShardAssignment ); + return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); + }; } // Check whether cluster members complied with the cluster target. @@ -149,8 +159,10 @@ protected OutboxPollingEventProcessingInstructions doPulse(AgentRepository agent // we make sure that on the second transaction, // one of those agents would see the other and take it into account when rebalancing. if ( !clusterMembersAreInCluster( clusterTarget.membersInShardOrder, clusterTarget.descriptor ) ) { - agentPersister.setWaiting( self, clusterTarget.descriptor, targetShardAssignment ); - return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); + return (now, self, agentPersister) -> { + agentPersister.setWaiting( self, clusterTarget.descriptor, targetShardAssignment ); + return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); + }; } // If all the conditions above are satisfied, then we can start processing. @@ -165,8 +177,10 @@ protected OutboxPollingEventProcessingInstructions doPulse(AgentRepository agent log.infof( "Agent '%s': assigning to %s", selfReference(), targetShardAssignment ); this.lastShardAssignment = ShardAssignment.of( targetShardAssignment, finderProvider ); } - agentPersister.setRunning( self, clusterTarget.descriptor ); - return instructProceedWithEventProcessing( now ); + return (now, self, agentPersister) -> { + agentPersister.setRunning( self, clusterTarget.descriptor ); + return instructProceedWithEventProcessing( now ); + }; } private boolean excludedAgentsAreOutOfCluster(List excludedAgents) { diff --git a/mapper/orm-coordination-outbox-polling/src/main/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/OutboxPollingMassIndexerAgentClusterLink.java b/mapper/orm-coordination-outbox-polling/src/main/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/OutboxPollingMassIndexerAgentClusterLink.java index c4b3422e6b4..b034ab83352 100644 --- a/mapper/orm-coordination-outbox-polling/src/main/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/OutboxPollingMassIndexerAgentClusterLink.java +++ b/mapper/orm-coordination-outbox-polling/src/main/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/OutboxPollingMassIndexerAgentClusterLink.java @@ -16,7 +16,6 @@ import org.hibernate.search.engine.reporting.FailureHandler; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.Agent; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentPersister; -import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentRepository; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentState; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentType; import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.ClusterDescriptor; @@ -43,16 +42,17 @@ public OutboxPollingMassIndexerAgentClusterLink(String agentName, } @Override - protected OutboxPollingMassIndexingInstructions doPulse(AgentRepository agentRepository, Instant now, - List allAgentsInIdOrder, Agent self) { + protected WriteAction doPulse(List allAgentsInIdOrder, Agent currentSelf) { List eventProcessors = allAgentsInIdOrder.stream() .filter( a -> AgentType.EVENT_PROCESSING.contains( a.getType() ) ) .collect( Collectors.toList() ); // Check whether event processors acknowledged our existence by suspending themselves. if ( !eventProcessorsAreSuspended( eventProcessors ) ) { - agentPersister.setWaiting( self, SINGLE_NODE_CLUSTER_DESCRIPTOR, SINGLE_NODE_SHARD_ASSIGNMENT ); - return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); + return (now, self, agentPersister) -> { + agentPersister.setWaiting( self, SINGLE_NODE_CLUSTER_DESCRIPTOR, SINGLE_NODE_SHARD_ASSIGNMENT ); + return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); + }; } // Ensure that we won't just spawn the agent directly in the RUNNING state, @@ -64,14 +64,18 @@ protected OutboxPollingMassIndexingInstructions doPulse(AgentRepository agentRep // By requiring at least two transactions to switch from "just spawned" to RUNNING, // we make sure that on the second transaction, // one of those agents would see the other and take it into account. - if ( AgentState.SUSPENDED.equals( self.getState() ) ) { - agentPersister.setWaiting( self, SINGLE_NODE_CLUSTER_DESCRIPTOR, SINGLE_NODE_SHARD_ASSIGNMENT ); - return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); + if ( AgentState.SUSPENDED.equals( currentSelf.getState() ) ) { + return (now, self, agentPersister) -> { + agentPersister.setWaiting( self, SINGLE_NODE_CLUSTER_DESCRIPTOR, SINGLE_NODE_SHARD_ASSIGNMENT ); + return instructCommitAndRetryPulseAfterDelay( now, pollingInterval ); + }; } // If all the conditions above are satisfied, then we can start mass indexing. - agentPersister.setRunning( self, SINGLE_NODE_CLUSTER_DESCRIPTOR ); - return instructProceedWithMassIndexing( now ); + return (now, self, agentPersister) -> { + agentPersister.setRunning( self, SINGLE_NODE_CLUSTER_DESCRIPTOR ); + return instructProceedWithMassIndexing( now ); + }; } private boolean eventProcessorsAreSuspended(List eventProcessors) { diff --git a/mapper/orm-coordination-outbox-polling/src/test/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/AbstractEventProcessorClusterLinkTest.java b/mapper/orm-coordination-outbox-polling/src/test/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/AbstractEventProcessorClusterLinkTest.java index 01059fb2176..b70f19c8c0b 100644 --- a/mapper/orm-coordination-outbox-polling/src/test/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/AbstractEventProcessorClusterLinkTest.java +++ b/mapper/orm-coordination-outbox-polling/src/test/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/AbstractEventProcessorClusterLinkTest.java @@ -98,13 +98,13 @@ public void verifyNoMoreInvocationsOnAllMocks() { } protected void defineSelfNotCreatedYet(OutboxPollingEventProcessorClusterLink link) { - link.agentPersister.setSelfReferenceForTests( null ); + link.getAgentPersisterForTests().setSelfReferenceForTests( null ); repositoryMockHelper.defineSelfCreatedByPulse( SELF_ID ); } protected void defineSelfCreatedAndStillPresent(OutboxPollingEventProcessorClusterLink link, AgentState state, ShardAssignmentDescriptor shardAssignment) { - link.agentPersister.setSelfReferenceForTests( SELF_REF ); + link.getAgentPersisterForTests().setSelfReferenceForTests( SELF_REF ); AgentType type; if ( link.shardAssignmentIsStatic ) { type = AgentType.EVENT_PROCESSING_STATIC_SHARDING; diff --git a/mapper/orm-coordination-outbox-polling/src/test/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/AbstractMassIndexerAgentClusterLinkTest.java b/mapper/orm-coordination-outbox-polling/src/test/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/AbstractMassIndexerAgentClusterLinkTest.java index 164f689cb5f..4b23e6b5d0b 100644 --- a/mapper/orm-coordination-outbox-polling/src/test/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/AbstractMassIndexerAgentClusterLinkTest.java +++ b/mapper/orm-coordination-outbox-polling/src/test/java/org/hibernate/search/mapper/orm/coordination/outboxpolling/event/impl/AbstractMassIndexerAgentClusterLinkTest.java @@ -86,13 +86,13 @@ public void verifyNoMoreInvocationsOnAllMocks() { } protected void defineSelfNotCreatedYet(OutboxPollingMassIndexerAgentClusterLink link) { - link.agentPersister.setSelfReferenceForTests( null ); + link.getAgentPersisterForTests().setSelfReferenceForTests( null ); repositoryMockHelper.defineSelfCreatedByPulse( SELF_ID ); } protected void defineSelfCreatedAndStillPresent(OutboxPollingMassIndexerAgentClusterLink link, AgentState state) { - link.agentPersister.setSelfReferenceForTests( SELF_REF ); + link.getAgentPersisterForTests().setSelfReferenceForTests( SELF_REF ); Agent self = new Agent( AgentType.MASS_INDEXING, SELF_REF.name, NOW, state, null ); self.setId( SELF_ID ); repositoryMockHelper.defineSelfPreExisting( self );