-
Notifications
You must be signed in to change notification settings - Fork 555
/
ControllableRaftContexts.java
347 lines (301 loc) · 12.6 KB
/
ControllableRaftContexts.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
/*
* Copyright © 2020 camunda services GmbH (info@camunda.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.raft;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.raft.impl.RaftContext;
import io.atomix.raft.partition.RaftElectionConfig;
import io.atomix.raft.partition.RaftPartitionConfig;
import io.atomix.raft.protocol.ControllableRaftServerProtocol;
import io.atomix.raft.roles.LeaderRole;
import io.atomix.raft.snapshot.TestSnapshotStore;
import io.atomix.raft.storage.RaftStorage;
import io.atomix.raft.storage.log.RaftLogReader;
import io.atomix.raft.zeebe.NoopEntryValidator;
import io.atomix.raft.zeebe.ZeebeLogAppender.AppendListener;
import io.camunda.zeebe.util.collection.Tuple;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jmock.lib.concurrent.DeterministicScheduler;
import org.slf4j.LoggerFactory;
/**
* Uses a DeterministicScheduler and controllable messaging layer to get a deterministic execution
* of raft threads. Note:- Currently there is some non-determinism hidden in the raft. Hence the
* resulting execution is not fully deterministic.
*/
public final class ControllableRaftContexts {
private final Map<MemberId, ControllableRaftServerProtocol> serverProtocols = new HashMap<>();
private final Map<MemberId, Queue<Tuple<Runnable, CompletableFuture>>> messageQueue =
new HashMap<>();
private final Map<MemberId, DeterministicSingleThreadContext> deterministicExecutors =
new HashMap<>();
private Path directory;
private final int nodeCount;
private final Map<MemberId, RaftContext> raftServers = new HashMap<>();
private Duration electionTimeout;
private Duration hearbeatTimeout;
private int nextEntry = 0;
// Used only for verification. Map[term -> leader]
private final Map<Long, MemberId> leadersAtTerms = new HashMap<>();
public ControllableRaftContexts(final int nodeCount) {
this.nodeCount = nodeCount;
}
public Map<MemberId, RaftContext> getRaftServers() {
return raftServers;
}
public RaftContext getRaftContext(final int memberId) {
return raftServers.get(MemberId.from(String.valueOf(memberId)));
}
public RaftContext getRaftContext(final MemberId memberId) {
return raftServers.get(memberId);
}
public void setup(final Path directory, final Random random) throws Exception {
this.directory = directory;
if (nodeCount > 0) {
createRaftContexts(nodeCount, random);
}
joinRaftServers();
electionTimeout = getRaftContext(0).getElectionTimeout();
hearbeatTimeout = getRaftContext(0).getHeartbeatInterval();
// expecting 0 to be the leader
tickHeartbeatTimeout(0);
}
public void shudown() throws IOException {
raftServers.forEach((m, c) -> c.close());
raftServers.clear();
serverProtocols.clear();
deterministicExecutors.forEach((m, e) -> e.close());
deterministicExecutors.clear();
messageQueue.clear();
leadersAtTerms.clear();
directory = null;
}
private void joinRaftServers() throws InterruptedException, ExecutionException, TimeoutException {
final Set<CompletableFuture<Void>> futures = new HashSet<>();
final var servers = getRaftServers();
final var serverIds = new ArrayList<>(servers.keySet());
final long electionTimeout =
servers.get(MemberId.from(String.valueOf(0))).getElectionTimeout().toMillis();
Collections.sort(serverIds);
servers.forEach(
(memberId, raftContext) -> futures.add(raftContext.getCluster().bootstrap(serverIds)));
runUntilDone(0);
// trigger election on 0 so that 0 is initially the leader
getDeterministicScheduler(MemberId.from(String.valueOf(0)))
.tick(2 * electionTimeout, TimeUnit.MILLISECONDS);
final var joinFuture = CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
// Should trigger executions and message delivery
while (!joinFuture.isDone()) {
processAllMessage();
runUntilDone();
}
joinFuture.get(1, TimeUnit.SECONDS);
}
private void createRaftContexts(final int nodeCount, final Random random) {
for (int i = 0; i < nodeCount; i++) {
final var memberId = MemberId.from(String.valueOf(i));
raftServers.put(memberId, createRaftContext(memberId, random));
}
}
public RaftContext createRaftContext(final MemberId memberId, final Random random) {
final var raft =
new RaftContext(
memberId.id() + "-partition-1",
1,
memberId,
mock(ClusterMembershipService.class),
new ControllableRaftServerProtocol(memberId, serverProtocols, messageQueue),
createStorage(memberId),
getRaftThreadContextFactory(memberId),
() -> random,
RaftElectionConfig.ofDefaultElection(),
new RaftPartitionConfig());
raft.setEntryValidator(new NoopEntryValidator());
return raft;
}
private RaftThreadContextFactory getRaftThreadContextFactory(final MemberId memberId) {
return (factory, uncaughtExceptionHandler) ->
deterministicExecutors.computeIfAbsent(
memberId,
m ->
(DeterministicSingleThreadContext)
DeterministicSingleThreadContext.createContext());
}
private RaftStorage createStorage(final MemberId memberId) {
return createStorage(memberId, Function.identity());
}
private RaftStorage createStorage(
final MemberId memberId,
final Function<RaftStorage.Builder, RaftStorage.Builder> configurator) {
final var memberDirectory = getMemberDirectory(directory, memberId.toString());
final RaftStorage.Builder defaults =
RaftStorage.builder()
.withDirectory(memberDirectory)
.withMaxSegmentSize(1024 * 10)
.withFreeDiskSpace(100)
.withSnapshotStore(new TestSnapshotStore(new AtomicReference<>()));
return configurator.apply(defaults).build();
}
private File getMemberDirectory(final Path directory, final String s) {
return new File(directory.toFile(), s);
}
public ControllableRaftServerProtocol getServerProtocol(final MemberId memberId) {
return serverProtocols.get(memberId);
}
public ControllableRaftServerProtocol getServerProtocol(final int memberId) {
return getServerProtocol(MemberId.from(String.valueOf(memberId)));
}
public DeterministicScheduler getDeterministicScheduler(final MemberId memberId) {
return deterministicExecutors.get(memberId).getDeterministicScheduler();
}
public DeterministicScheduler getDeterministicScheduler(final int memberId) {
return getDeterministicScheduler(MemberId.from(String.valueOf(memberId)));
}
// ------ Methods to control the execution of raft threads --------
// run until there are no more task to process
public void runUntilDone() {
final var serverIds = raftServers.keySet();
serverIds.forEach(memberId -> getDeterministicScheduler(memberId).runUntilIdle());
}
// run until there are no more tasks to processon member's scheduler
public void runUntilDone(final int memberId) {
getServerProtocol(memberId).receiveAll();
getDeterministicScheduler(memberId).runUntilIdle();
}
public void runUntilDone(final MemberId memberId) {
getDeterministicScheduler(memberId).runUntilIdle();
}
public void runNextTask(final MemberId memberId) {
final var scheduler = getDeterministicScheduler(memberId);
if (!scheduler.isIdle()) {
scheduler.runNextPendingCommand();
}
}
// Submit all messages from the incoming queue to the schedulers to process
public void processAllMessage() {
final var serverIds = raftServers.keySet();
serverIds.forEach(memberId -> getServerProtocol(memberId).receiveAll());
}
public void processAllMessage(final MemberId memberId) {
getServerProtocol(memberId).receiveAll();
}
// Submit the next message from the incoming queue to the scheduler of memberid.
public void processNextMessage(final MemberId memberId) {
getServerProtocol(memberId).receiveNextMessage();
}
public void tickElectionTimeout(final int memberId) {
getDeterministicScheduler(memberId).tick(electionTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
public void tickElectionTimeout(final MemberId memberId) {
getDeterministicScheduler(memberId).tick(electionTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
public void tickHeartbeatTimeout(final int memberId) {
getDeterministicScheduler(memberId).tick(hearbeatTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
public void tickHeartbeatTimeout(final MemberId memberId) {
getDeterministicScheduler(memberId).tick(hearbeatTimeout.toMillis(), TimeUnit.MILLISECONDS);
}
public void tick(final MemberId memberId, final Duration time) {
getDeterministicScheduler(memberId).tick(time.toMillis(), TimeUnit.MILLISECONDS);
}
// Execute an append on memberid. If memberid is not the the leader, the append will be rejected.
private void clientAppend(final MemberId memberId) {
final var role = getRaftContext(memberId).getRaftRole();
if (role instanceof LeaderRole) {
LoggerFactory.getLogger("TEST").info("Appending on leader {}", memberId.id());
final ByteBuffer data = ByteBuffer.allocate(Integer.BYTES).putInt(0, nextEntry++);
final LeaderRole leaderRole = (LeaderRole) role;
leaderRole.appendEntry(0, 1, data, mock(AppendListener.class));
}
}
// Find current leader and execute an append
public void clientAppendOnLeader() {
final var leaderTerm = leadersAtTerms.keySet().stream().max(Long::compareTo);
if (leaderTerm.isPresent()) {
final var leader = leadersAtTerms.get(leaderTerm.get());
if (leader != null) {
clientAppend(leader);
}
}
}
// ----------------------- Verifications -----------------------------
// Verify that committed entries in all logs are equal
public void assertAllLogsEqual() {
final var readers =
raftServers.values().stream()
.collect(Collectors.toMap(Function.identity(), s -> s.getLog().openCommittedReader()));
long index = 0;
while (true) {
final var entries =
readers.keySet().stream()
.filter(s -> readers.get(s).hasNext())
.collect(Collectors.toMap(s -> s.getName(), s -> readers.get(s).next()));
if (entries.size() == 0) {
break;
}
assertThat(entries.values().stream().distinct().count())
.withFailMessage(
"Expected to find the same entry at a committed index on all nodes, but found %s",
entries)
.isEqualTo(1);
index++;
}
final var commitIndexOnLeader =
raftServers.values().stream()
.map(RaftContext::getCommitIndex)
.max(Long::compareTo)
.orElseThrow();
assertThat(index).isEqualTo(commitIndexOnLeader);
readers.values().forEach(RaftLogReader::close);
}
public void assertOnlyOneLeader() {
raftServers.values().forEach(s -> updateAndVerifyLeaderTerm(s));
}
private void updateAndVerifyLeaderTerm(final RaftContext s) {
final long term = s.getTerm();
if (s.getLeader() != null) {
final var leader = s.getLeader().memberId();
if (leadersAtTerms.containsKey(term)) {
final var knownLeader = leadersAtTerms.get(term);
assertThat(knownLeader)
.withFailMessage("Found two leaders %s %s at term %s", knownLeader, leader, term)
.isEqualTo(leader);
} else {
leadersAtTerms.put(term, leader);
}
}
}
}