-
Notifications
You must be signed in to change notification settings - Fork 242
/
AbstractAgentClusterLink.java
152 lines (132 loc) · 6.38 KB
/
AbstractAgentClusterLink.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*
* Hibernate Search, full-text search for your domain model
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.search.mapper.orm.coordination.outboxpolling.event.impl;
import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.Agent;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentPersister;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.cluster.impl.AgentReference;
import org.hibernate.search.mapper.orm.coordination.outboxpolling.logging.impl.Log;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;
abstract class AbstractAgentClusterLink<R> {
private static final Log log = LoggerFactory.make( Log.class, MethodHandles.lookup() );
protected final FailureHandler failureHandler;
protected final Clock clock;
protected final Duration pollingInterval;
protected final Duration pulseInterval;
protected final Duration pulseExpiration;
private final AgentPersister agentPersister;
public AbstractAgentClusterLink(AgentPersister agentPersister,
FailureHandler failureHandler, Clock clock,
Duration pollingInterval, Duration pulseInterval, Duration pulseExpiration) {
this.agentPersister = agentPersister;
this.failureHandler = failureHandler;
this.clock = clock;
this.pollingInterval = pollingInterval;
this.pulseInterval = pulseInterval;
this.pulseExpiration = pulseExpiration;
}
public final R pulse(AgentClusterLinkContext context) {
Agent self = ensureRegistered( context );
// In order to avoid transaction deadlocks with some RDBMS (yes, I mean SQL Server),
// we make sure that reads listing other agents always happen in a different transaction
// than writes.
List<Agent> allAgentsInIdOrder = context.agentRepository().findAllOrderById();
Instant now = clock.instant();
log.tracef( "Agent '%s': starting pulse at %s with self = %s, all agents = %s",
selfReference(), now, self, allAgentsInIdOrder );
// In order to avoid transaction deadlocks with some RDBMS (and this time I mean Oracle),
// we make sure that if we need to delete expired agents,
// we do it without updating the agent representing self in the same transaction
// (creation is fine).
// I'm not entirely sure, but I think it goes like this:
// if one (already created) agent starts, updates itself,
// then before its transaction is finished another agent does the same,
// then the first agent tries to delete the second agent because it expired,
// then the second agent tries to delete the first agent because it expired,
// all in the same transaction, well... here you have a deadlock.
Instant expirationLimit = now;
List<Agent> timedOutAgents = allAgentsInIdOrder.stream()
.filter( Predicate.isEqual( self ).negate() ) // Ignore self: if expired, we'll correct that.
.filter( a -> a.getExpiration().isBefore( expirationLimit ) )
.collect( Collectors.toList() );
if ( !timedOutAgents.isEmpty() ) {
log.removingTimedOutAgents( selfReference(), timedOutAgents );
context.agentRepository().delete( timedOutAgents );
log.infof( "Agent '%s': reassessing the new situation in the next pulse", selfReference() );
return instructCommitAndRetryPulseAfterDelay( now, pollingInterval );
}
// Determine what needs to be done
WriteAction<R> pulseResult = doPulse( allAgentsInIdOrder, self );
// Write actions are always executed in a new transaction,
// so that the risk of deadlocks (see above) is minimal,
// and transactions are shorter (for lower transaction contention,
// important on CockroachDB in particular:
// https://www.cockroachlabs.com/docs/v22.1/transactions#transaction-contention ).
context.commitAndBeginNewTransaction();
now = clock.instant();
self = findSelfExpectRegistered( context );
// Delay expiration with each write
self.setExpiration( now.plus( pulseExpiration ) );
R instructions = pulseResult.applyAndReturnInstructions( now, self, agentPersister );
log.tracef( "Agent '%s': ending pulse at %s with self = %s",
selfReference(), now, self );
return instructions;
}
private Agent ensureRegistered(AgentClusterLinkContext context) {
Agent self = agentPersister.findSelf( context.agentRepository() );
if ( self == null ) {
Instant now = clock.instant();
agentPersister.createSelf( context.agentRepository(), now.plus( pulseExpiration ) );
// Make sure the transaction *only* registers the agent,
// so that the risk of deadlocks (see below) is minimal
// and other agents are made aware of this agent as soon as possible.
// This avoids unnecessary rebalancing when multiple nodes start in quick succession.
context.commitAndBeginNewTransaction();
self = findSelfExpectRegistered( context );
}
return self;
}
private Agent findSelfExpectRegistered(AgentClusterLinkContext context) {
Agent self = agentPersister.findSelf( context.agentRepository() );
if ( self == null ) {
throw log.agentRegistrationIneffective( selfReference() );
}
return self;
}
protected abstract WriteAction<R> doPulse(List<Agent> allAgentsInIdOrder, Agent self);
/**
* Instructs the processor to commit the transaction, wait for the given delay, then pulse again.
* <p>
* Use with:
* <ul>
* <li>pollingInterval to apply a minimal delay before the next pulse, to avoid hitting the database continuously.
* Useful when waiting for external changes.</li>
* <li>pulseInterval to apply a large delay before the next pulse.
* Useful when suspended and waiting for a reason to resume.</li>
* </ul>
*/
protected abstract R instructCommitAndRetryPulseAfterDelay(Instant now, Duration delay);
public final void leaveCluster(AgentClusterLinkContext context) {
agentPersister.leaveCluster( context.agentRepository() );
}
protected AgentReference selfReference() {
return agentPersister.selfReference();
}
final AgentPersister getAgentPersisterForTests() {
return agentPersister;
}
protected interface WriteAction<R> {
R applyAndReturnInstructions(Instant now, Agent self, AgentPersister agentPersister);
}
}