Skip to content

Commit

Permalink
HSEARCH-4647 Split outbox polling transactions into smaller chunks
Browse files Browse the repository at this point in the history
... to reduce transaction contention.
  • Loading branch information
yrodiere committed Aug 1, 2022
1 parent 527e76e commit 30c7787
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 61 deletions.
Expand Up @@ -11,13 +11,13 @@
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.Agent;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentPersister;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentReference;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentRepository;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.logging.impl.Log;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;

Expand All @@ -29,9 +29,8 @@ abstract class AbstractAgentClusterLink<R> {
protected final Duration pollingInterval;
protected final Duration pulseInterval;
protected final Duration pulseExpiration;
private final AgentPersister agentPersister;

// Accessible for test purposes
final AgentPersister agentPersister;

public AbstractAgentClusterLink(AgentPersister agentPersister,
FailureHandler failureHandler, Clock clock,
Expand All @@ -48,11 +47,8 @@ public final R pulse(AgentClusterLinkContext context) {
Agent self = ensureRegistered( context );

// In order to avoid transaction deadlocks with some RDBMS (yes, I mean SQL Server),
// we make sure that *all* reads (listing all agents) happen before the write (updating self).
// I'm not entirely sure, but I think it goes like this:
// if we read, then write, then read again, then the second read can trigger a deadlock,
// with each transaction holding a write lock on some rows
// while waiting for a read lock on the whole table.
// we make sure that reads listing other agents always happen in a different transaction
// than writes.
List<Agent> allAgentsInIdOrder = context.agentRepository().findAllOrderById();

Instant now = clock.instant();
Expand All @@ -69,44 +65,61 @@ public final R pulse(AgentClusterLinkContext context) {
// then the first agent tries to delete the second agent because it expired,
// then the second agent tries to delete the first agent because it expired,
// all in the same transaction, well... here you have a deadlock.
Instant expirationLimit = now;
List<Agent> timedOutAgents = allAgentsInIdOrder.stream()
.filter( a -> !a.equals( self ) && a.getExpiration().isBefore( now ) )
.filter( Predicate.isEqual( self ).negate() ) // Ignore self: if expired, we'll correct that.
.filter( a -> a.getExpiration().isBefore( expirationLimit ) )
.collect( Collectors.toList() );
if ( !timedOutAgents.isEmpty() ) {
log.removingTimedOutAgents( selfReference(), timedOutAgents );
context.agentRepository().delete( timedOutAgents );
log.infof( "Agent '%s': reassessing the new situation in the next pulse",
selfReference(), now
);
log.infof( "Agent '%s': reassessing the new situation in the next pulse", selfReference() );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
}

// Delay expiration in each pulse
self.setExpiration( newExpiration );

return doPulse( context.agentRepository(), now, allAgentsInIdOrder, self );
// Determine what needs to be done
WriteAction<R> pulseResult = doPulse( allAgentsInIdOrder, self );

// Write actions are always executed in a new transaction,
// so that the risk of deadlocks (see above) is minimal,
// and transactions are shorter (for lower transaction contention,
// important on CockroachDB in particular:
// https://www.cockroachlabs.com/docs/v22.1/transactions#transaction-contention ).
context.commitAndBeginNewTransaction();
now = clock.instant();
self = findSelfExpectRegistered( context );
// Delay expiration with each write
self.setExpiration( now.plus( pulseExpiration ) );
R instructions = pulseResult.applyAndReturnInstructions( now, self, agentPersister );
log.tracef( "Agent '%s': ending pulse at %s with self = %s",
selfReference(), now, self );
return instructions;
}

private Agent ensureRegistered(AgentClusterLinkContext context) {
Instant now = clock.instant();
Agent self = agentPersister.findSelf( context.agentRepository() );
if ( self == null ) {
Instant now = clock.instant();
agentPersister.createSelf( context.agentRepository(), now.plus( pulseExpiration ) );
// Make sure the transaction *only* registers the agent,
// so that the risk of deadlocks (see below) is minimal
// and other agents are made aware of this agent as soon as possible.
// This avoids unnecessary rebalancing when multiple nodes start in quick succession.
context.commitAndBeginNewTransaction();
self = agentPersister.findSelf( context.agentRepository() );
if ( self == null ) {
throw log.agentRegistrationIneffective( selfReference() );
}
self = findSelfExpectRegistered( context );
}
return self;
}

private Agent findSelfExpectRegistered(AgentClusterLinkContext context) {
Agent self = agentPersister.findSelf( context.agentRepository() );
if ( self == null ) {
throw log.agentRegistrationIneffective( selfReference() );
}
return self;
}

protected abstract R doPulse(AgentRepository agentRepository, Instant now,
List<Agent> allAgentsInIdOrder, Agent self);
protected abstract WriteAction<R> doPulse(List<Agent> allAgentsInIdOrder, Agent self);

/**
* Instructs the processor to commit the transaction, wait for the given delay, then pulse again.
Expand All @@ -129,4 +142,11 @@ protected AgentReference selfReference() {
return agentPersister.selfReference();
}

final AgentPersister getAgentPersisterForTests() {
return agentPersister;
}

protected interface WriteAction<R> {
R applyAndReturnInstructions(Instant now, Agent self, AgentPersister agentPersister);
}
}
Expand Up @@ -18,7 +18,6 @@
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.Agent;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentPersister;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentRepository;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentType;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.ClusterDescriptor;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentState;
Expand Down Expand Up @@ -68,15 +67,16 @@ public OutboxPollingEventProcessorClusterLink(String agentName,
}

@Override
protected OutboxPollingEventProcessingInstructions doPulse(AgentRepository agentRepository, Instant now,
List<Agent> allAgentsInIdOrder, Agent self) {
protected WriteAction<OutboxPollingEventProcessingInstructions> doPulse(List<Agent> allAgentsInIdOrder, Agent currentSelf) {
for ( Agent agent : allAgentsInIdOrder ) {
if ( AgentType.MASS_INDEXING.equals( agent.getType() ) ) {
log.logf( self.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE,
log.logf( currentSelf.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE,
"Agent '%s': another agent '%s' is currently mass indexing",
selfReference(), now, agent );
agentPersister.setSuspended( self );
return instructCommitAndRetryPulseAfterDelay( now, pulseInterval );
selfReference(), agent );
return (now, self, agentPersister) -> {
agentPersister.setSuspended( self );
return instructCommitAndRetryPulseAfterDelay( now, pulseInterval );
};
}
}

Expand All @@ -93,49 +93,59 @@ protected OutboxPollingEventProcessingInstructions doPulse(AgentRepository agent
allAgentsInIdOrder, e ) );
contextBuilder.failingOperation( log.outboxEventProcessorPulse( selfReference() ) );
failureHandler.handle( contextBuilder.build() );
agentPersister.setSuspended( self );
return instructCommitAndRetryPulseAfterDelay( now, pulseInterval );
return (now, self, agentPersister) -> {
agentPersister.setSuspended( self );
return instructCommitAndRetryPulseAfterDelay( now, pulseInterval );
};
}

Optional<ShardAssignmentDescriptor> shardAssignmentOptional =
ShardAssignmentDescriptor.fromClusterMemberList( clusterTarget.descriptor.memberIdsInShardOrder, selfReference().id );
if ( !shardAssignmentOptional.isPresent() ) {
log.logf( self.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE,
log.logf( currentSelf.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE,
"Agent '%s': this agent is superfluous and will not perform event processing,"
+ " because other agents are enough to handle all the shards."
+ " Target cluster: %s.",
selfReference(), clusterTarget.descriptor );
agentPersister.setSuspended( self );
return instructCommitAndRetryPulseAfterDelay( now, pulseInterval );
return (now, self, agentPersister) -> {
agentPersister.setSuspended( self );
return instructCommitAndRetryPulseAfterDelay( now, pulseInterval );
};
}

ShardAssignmentDescriptor targetShardAssignment = shardAssignmentOptional.get();

if ( clusterTarget.descriptor.memberIdsInShardOrder.contains( null ) ) {
log.logf( self.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE,
log.logf( currentSelf.getState() != AgentState.SUSPENDED ? Logger.Level.INFO : Logger.Level.TRACE,
"Agent '%s': some cluster members are missing; this agent will wait until they are present."
+ " Target cluster: %s.",
selfReference(), clusterTarget.descriptor );
agentPersister.setSuspended( self );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
return (now, self, agentPersister) -> {
agentPersister.setSuspended( self );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
};
}

ShardAssignmentDescriptor persistedShardAssignment = self.getShardAssignment();
ShardAssignmentDescriptor persistedShardAssignment = currentSelf.getShardAssignment();

if ( !targetShardAssignment.equals( persistedShardAssignment ) ) {
log.infof( "Agent '%s': the persisted shard assignment (%s) does not match the target."
+ " Target assignment: %s."
+ " Cluster: %s.",
selfReference(), persistedShardAssignment, targetShardAssignment,
clusterTarget.descriptor );
agentPersister.setWaiting( self, clusterTarget.descriptor, targetShardAssignment );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
return (now, self, agentPersister) -> {
agentPersister.setWaiting( self, clusterTarget.descriptor, targetShardAssignment );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
};
}

// Check whether excluded (superfluous) agents complied with the cluster target and suspended themselves.
if ( !excludedAgentsAreOutOfCluster( clusterTarget.excluded ) ) {
agentPersister.setWaiting( self, clusterTarget.descriptor, targetShardAssignment );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
return (now, self, agentPersister) -> {
agentPersister.setWaiting( self, clusterTarget.descriptor, targetShardAssignment );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
};
}

// Check whether cluster members complied with the cluster target.
Expand All @@ -149,8 +159,10 @@ protected OutboxPollingEventProcessingInstructions doPulse(AgentRepository agent
// we make sure that on the second transaction,
// one of those agents would see the other and take it into account when rebalancing.
if ( !clusterMembersAreInCluster( clusterTarget.membersInShardOrder, clusterTarget.descriptor ) ) {
agentPersister.setWaiting( self, clusterTarget.descriptor, targetShardAssignment );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
return (now, self, agentPersister) -> {
agentPersister.setWaiting( self, clusterTarget.descriptor, targetShardAssignment );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
};
}

// If all the conditions above are satisfied, then we can start processing.
Expand All @@ -165,8 +177,10 @@ protected OutboxPollingEventProcessingInstructions doPulse(AgentRepository agent
log.infof( "Agent '%s': assigning to %s", selfReference(), targetShardAssignment );
this.lastShardAssignment = ShardAssignment.of( targetShardAssignment, finderProvider );
}
agentPersister.setRunning( self, clusterTarget.descriptor );
return instructProceedWithEventProcessing( now );
return (now, self, agentPersister) -> {
agentPersister.setRunning( self, clusterTarget.descriptor );
return instructProceedWithEventProcessing( now );
};
}

private boolean excludedAgentsAreOutOfCluster(List<Agent> excludedAgents) {
Expand Down
Expand Up @@ -16,7 +16,6 @@
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.Agent;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentPersister;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentRepository;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentState;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentType;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.ClusterDescriptor;
Expand All @@ -43,16 +42,17 @@ public OutboxPollingMassIndexerAgentClusterLink(String agentName,
}

@Override
protected OutboxPollingMassIndexingInstructions doPulse(AgentRepository agentRepository, Instant now,
List<Agent> allAgentsInIdOrder, Agent self) {
protected WriteAction<OutboxPollingMassIndexingInstructions> doPulse(List<Agent> allAgentsInIdOrder, Agent currentSelf) {
List<Agent> eventProcessors = allAgentsInIdOrder.stream()
.filter( a -> AgentType.EVENT_PROCESSING.contains( a.getType() ) )
.collect( Collectors.toList() );

// Check whether event processors acknowledged our existence by suspending themselves.
if ( !eventProcessorsAreSuspended( eventProcessors ) ) {
agentPersister.setWaiting( self, SINGLE_NODE_CLUSTER_DESCRIPTOR, SINGLE_NODE_SHARD_ASSIGNMENT );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
return (now, self, agentPersister) -> {
agentPersister.setWaiting( self, SINGLE_NODE_CLUSTER_DESCRIPTOR, SINGLE_NODE_SHARD_ASSIGNMENT );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
};
}

// Ensure that we won't just spawn the agent directly in the RUNNING state,
Expand All @@ -64,14 +64,18 @@ protected OutboxPollingMassIndexingInstructions doPulse(AgentRepository agentRep
// By requiring at least two transactions to switch from "just spawned" to RUNNING,
// we make sure that on the second transaction,
// one of those agents would see the other and take it into account.
if ( AgentState.SUSPENDED.equals( self.getState() ) ) {
agentPersister.setWaiting( self, SINGLE_NODE_CLUSTER_DESCRIPTOR, SINGLE_NODE_SHARD_ASSIGNMENT );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
if ( AgentState.SUSPENDED.equals( currentSelf.getState() ) ) {
return (now, self, agentPersister) -> {
agentPersister.setWaiting( self, SINGLE_NODE_CLUSTER_DESCRIPTOR, SINGLE_NODE_SHARD_ASSIGNMENT );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
};
}

// If all the conditions above are satisfied, then we can start mass indexing.
agentPersister.setRunning( self, SINGLE_NODE_CLUSTER_DESCRIPTOR );
return instructProceedWithMassIndexing( now );
return (now, self, agentPersister) -> {
agentPersister.setRunning( self, SINGLE_NODE_CLUSTER_DESCRIPTOR );
return instructProceedWithMassIndexing( now );
};
}

private boolean eventProcessorsAreSuspended(List<Agent> eventProcessors) {
Expand Down
Expand Up @@ -98,13 +98,13 @@ public void verifyNoMoreInvocationsOnAllMocks() {
}

protected void defineSelfNotCreatedYet(OutboxPollingEventProcessorClusterLink link) {
link.agentPersister.setSelfReferenceForTests( null );
link.getAgentPersisterForTests().setSelfReferenceForTests( null );
repositoryMockHelper.defineSelfCreatedByPulse( SELF_ID );
}

protected void defineSelfCreatedAndStillPresent(OutboxPollingEventProcessorClusterLink link,
AgentState state, ShardAssignmentDescriptor shardAssignment) {
link.agentPersister.setSelfReferenceForTests( SELF_REF );
link.getAgentPersisterForTests().setSelfReferenceForTests( SELF_REF );
AgentType type;
if ( link.shardAssignmentIsStatic ) {
type = AgentType.EVENT_PROCESSING_STATIC_SHARDING;
Expand Down
Expand Up @@ -86,13 +86,13 @@ public void verifyNoMoreInvocationsOnAllMocks() {
}

protected void defineSelfNotCreatedYet(OutboxPollingMassIndexerAgentClusterLink link) {
link.agentPersister.setSelfReferenceForTests( null );
link.getAgentPersisterForTests().setSelfReferenceForTests( null );
repositoryMockHelper.defineSelfCreatedByPulse( SELF_ID );
}

protected void defineSelfCreatedAndStillPresent(OutboxPollingMassIndexerAgentClusterLink link,
AgentState state) {
link.agentPersister.setSelfReferenceForTests( SELF_REF );
link.getAgentPersisterForTests().setSelfReferenceForTests( SELF_REF );
Agent self = new Agent( AgentType.MASS_INDEXING, SELF_REF.name, NOW, state, null );
self.setId( SELF_ID );
repositoryMockHelper.defineSelfPreExisting( self );
Expand Down

0 comments on commit 30c7787

Please sign in to comment.