-
Notifications
You must be signed in to change notification settings - Fork 554
/
FileBasedSnapshotStore.java
566 lines (499 loc) · 21.1 KB
/
FileBasedSnapshotStore.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.snapshots.impl;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistableSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.PersistedSnapshotListener;
import io.camunda.zeebe.snapshots.ReceivableSnapshotStore;
import io.camunda.zeebe.snapshots.SnapshotId;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class FileBasedSnapshotStore extends Actor
implements ConstructableSnapshotStore, ReceivableSnapshotStore {
// first is the metadata and the second the the received snapshot count
private static final String RECEIVING_DIR_FORMAT = "%s-%d";
private static final Logger LOGGER = LoggerFactory.getLogger(FileBasedSnapshotStore.class);
private static final String CHECKSUM_SUFFIX = ".checksum";
// the root snapshotsDirectory where all snapshots should be stored
private final Path snapshotsDirectory;
// the root snapshotsDirectory when pending snapshots should be stored
private final Path pendingDirectory;
// keeps track of all snapshot modification listeners
private final Set<PersistedSnapshotListener> listeners;
private final SnapshotMetrics snapshotMetrics;
// Use AtomicReference so that getting latest snapshot doesn't have to go through the actor
private final AtomicReference<FileBasedSnapshot> currentPersistedSnapshotRef =
new AtomicReference<>();
// used to write concurrently received snapshots in different pending directories
private final AtomicLong receivingSnapshotStartCount;
private final Set<PersistableSnapshot> pendingSnapshots = new HashSet<>();
private final String actorName;
private final int partitionId;
public FileBasedSnapshotStore(
final int nodeId,
final int partitionId,
final SnapshotMetrics snapshotMetrics,
final Path snapshotsDirectory,
final Path pendingDirectory) {
this.snapshotsDirectory = snapshotsDirectory;
this.pendingDirectory = pendingDirectory;
this.snapshotMetrics = snapshotMetrics;
receivingSnapshotStartCount = new AtomicLong();
listeners = new CopyOnWriteArraySet<>();
actorName = buildActorName(nodeId, "SnapshotStore", partitionId);
this.partitionId = partitionId;
}
@Override
protected Map<String, String> createContext() {
final var context = super.createContext();
context.put(ACTOR_PROP_PARTITION_ID, Integer.toString(partitionId));
return context;
}
@Override
public String getName() {
return actorName;
}
@Override
protected void onActorStarting() {
currentPersistedSnapshotRef.set(loadLatestSnapshot(snapshotsDirectory));
purgePendingSnapshotsDirectory();
}
@Override
protected void onActorClosing() {
listeners.clear();
}
private FileBasedSnapshot loadLatestSnapshot(final Path snapshotDirectory) {
FileBasedSnapshot latestPersistedSnapshot = null;
final List<FileBasedSnapshot> snapshots = new ArrayList<>();
try (final var stream =
Files.newDirectoryStream(
snapshotDirectory, p -> !p.getFileName().toString().endsWith(CHECKSUM_SUFFIX))) {
for (final var path : stream) {
final var snapshot = collectSnapshot(path);
if (snapshot != null) {
snapshots.add(snapshot);
if ((latestPersistedSnapshot == null)
|| snapshot.getMetadata().compareTo(latestPersistedSnapshot.getMetadata()) >= 0) {
latestPersistedSnapshot = snapshot;
}
}
}
// Delete older snapshots
if (latestPersistedSnapshot != null) {
snapshots.remove(latestPersistedSnapshot);
if (!snapshots.isEmpty()) {
LOGGER.debug("Purging snapshots older than {}", latestPersistedSnapshot);
snapshots.forEach(
oldSnapshot -> {
LOGGER.debug("Deleting snapshot {}", oldSnapshot);
oldSnapshot.delete();
});
}
}
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
return latestPersistedSnapshot;
}
// TODO(npepinpe): using Either here would improve readability and observability, as validation
// can have better error messages, and the return type better expresses what we attempt to do,
// i.e. either it failed (with an error) or it succeeded
private FileBasedSnapshot collectSnapshot(final Path path) throws IOException {
final var optionalMeta = FileBasedSnapshotMetadata.ofPath(path);
if (optionalMeta.isEmpty()) {
return null;
}
final var metadata = optionalMeta.get();
final var checksumPath = buildSnapshotsChecksumPath(metadata);
if (!Files.exists(checksumPath)) {
// checksum was not completely/successfully written, we can safely delete it and proceed
LOGGER.debug(
"Snapshot {} does not have a checksum file, which most likely indicates a partial write"
+ " (e.g. crash during move), and will be deleted",
path);
try {
FileUtil.deleteFolder(path);
} catch (final Exception e) {
// it's fine to ignore failures to delete here, as it would constitute mostly noise
LOGGER.debug("Failed to delete partial snapshot {}", path, e);
}
return null;
}
try {
final var expectedChecksum = SnapshotChecksum.read(checksumPath);
final var actualChecksum = SnapshotChecksum.calculate(path);
if (expectedChecksum != actualChecksum) {
LOGGER.warn(
"Expected snapshot {} to have checksum {}, but the actual checksum is {}; the snapshot is most likely corrupted. The startup will fail if there is no other valid snapshot and the log has been compacted.",
path,
expectedChecksum,
actualChecksum);
return null;
}
return new FileBasedSnapshot(path, checksumPath, actualChecksum, metadata);
} catch (final Exception e) {
LOGGER.warn("Could not load snapshot in {}", path, e);
return null;
}
}
private void purgePendingSnapshotsDirectory() {
try (final var files = Files.list(pendingDirectory)) {
files.filter(Files::isDirectory).forEach(this::purgePendingSnapshot);
} catch (final IOException e) {
LOGGER.error(
"Failed to purge pending snapshots, which may result in unnecessary disk usage and should be monitored",
e);
}
}
@Override
public boolean hasSnapshotId(final String id) {
final var optLatestSnapshot = getLatestSnapshot();
if (optLatestSnapshot.isPresent()) {
final var snapshot = optLatestSnapshot.get();
return snapshot.getPath().getFileName().toString().equals(id);
}
return false;
}
@Override
public Optional<PersistedSnapshot> getLatestSnapshot() {
return Optional.ofNullable(currentPersistedSnapshotRef.get());
}
@Override
public ActorFuture<Void> purgePendingSnapshots() {
final CompletableActorFuture<Void> abortFuture = new CompletableActorFuture<>();
actor.run(
() -> {
final var abortedAll =
pendingSnapshots.stream()
.map(PersistableSnapshot::abort)
.collect(Collectors.toList());
actor.runOnCompletion(
abortedAll,
error -> {
if (error == null) {
abortFuture.complete(null);
} else {
abortFuture.completeExceptionally(error);
}
});
});
return abortFuture;
}
@Override
public ActorFuture<Boolean> addSnapshotListener(final PersistedSnapshotListener listener) {
return actor.call(() -> listeners.add(listener));
}
@Override
public ActorFuture<Boolean> removeSnapshotListener(final PersistedSnapshotListener listener) {
return actor.call(() -> listeners.remove(listener));
}
@Override
public long getCurrentSnapshotIndex() {
return getLatestSnapshot().map(PersistedSnapshot::getIndex).orElse(0L);
}
@Override
public ActorFuture<Void> delete() {
return actor.call(
() -> {
currentPersistedSnapshotRef.set(null);
try {
LOGGER.debug("DELETE FOLDER {}", snapshotsDirectory);
FileUtil.deleteFolder(snapshotsDirectory);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
try {
LOGGER.debug("DELETE FOLDER {}", pendingDirectory);
FileUtil.deleteFolder(pendingDirectory);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
});
}
@Override
public Path getPath() {
return snapshotsDirectory;
}
@Override
public ActorFuture<Void> copySnapshot(
final PersistedSnapshot snapshot, final Path targetDirectory) {
final CompletableActorFuture<Void> result = new CompletableActorFuture<>();
actor.run(
() -> {
if (!Files.exists(snapshot.getPath())) {
result.completeExceptionally(
String.format(
"Expected to copy snapshot %s to directory %s, but snapshot directory %s does not exists. Snapshot may have been deleted.",
snapshot.getId(), targetDirectory, snapshot.getPath()),
new FileNotFoundException());
} else {
try {
FileUtil.copySnapshot(snapshot.getPath(), targetDirectory);
result.complete(null);
} catch (final Exception e) {
result.completeExceptionally(
String.format(
"Failed to copy snapshot %s to directory %s.",
snapshot.getId(), targetDirectory),
e);
}
}
});
return result;
}
@Override
public FileBasedReceivedSnapshot newReceivedSnapshot(final String snapshotId) {
final var optMetadata = FileBasedSnapshotMetadata.ofFileName(snapshotId);
final var metadata =
optMetadata.orElseThrow(
() ->
new IllegalArgumentException(
"Expected snapshot id in a format like 'index-term-processedPosition-exportedPosition', got '"
+ snapshotId
+ "'."));
// to make the pending dir unique
final var nextStartCount = receivingSnapshotStartCount.incrementAndGet();
final var pendingDirectoryName =
String.format(RECEIVING_DIR_FORMAT, metadata.getSnapshotIdAsString(), nextStartCount);
final var pendingSnapshotDir = pendingDirectory.resolve(pendingDirectoryName);
final var newPendingSnapshot =
new FileBasedReceivedSnapshot(metadata, pendingSnapshotDir, this, actor);
addPendingSnapshot(newPendingSnapshot);
return newPendingSnapshot;
}
// TODO(npepinpe): using Either here would improve readability and observability, as validation
// can have better error messages, and the return type better expresses what we attempt to do,
// i.e. either it failed (with an error) or it succeeded
@Override
public Optional<TransientSnapshot> newTransientSnapshot(
final long index,
final long term,
final long processedPosition,
final long exportedPosition) {
final var newSnapshotId =
new FileBasedSnapshotMetadata(index, term, processedPosition, exportedPosition);
final FileBasedSnapshot currentSnapshot = currentPersistedSnapshotRef.get();
if (currentSnapshot != null && currentSnapshot.getMetadata().compareTo(newSnapshotId) == 0) {
LOGGER.debug(
"Previous snapshot was taken for the same processed position {} and exported position {}, will not take snapshot.",
processedPosition,
exportedPosition);
return Optional.empty();
}
final var directory = buildPendingSnapshotDirectory(newSnapshotId);
final var newPendingSnapshot =
new FileBasedTransientSnapshot(newSnapshotId, directory, this, actor);
addPendingSnapshot(newPendingSnapshot);
return Optional.of(newPendingSnapshot);
}
private void addPendingSnapshot(final PersistableSnapshot pendingSnapshot) {
actor.submit(() -> pendingSnapshots.add(pendingSnapshot));
}
void removePendingSnapshot(final PersistableSnapshot pendingSnapshot) {
pendingSnapshots.remove(pendingSnapshot);
}
private void observeSnapshotSize(final FileBasedSnapshot persistedSnapshot) {
try (final var contents = Files.newDirectoryStream(persistedSnapshot.getPath())) {
var totalSize = 0L;
var totalCount = 0L;
for (final var path : contents) {
if (Files.isRegularFile(path)) {
final var size = Files.size(path);
snapshotMetrics.observeSnapshotFileSize(size);
totalSize += size;
totalCount++;
}
}
snapshotMetrics.observeSnapshotSize(totalSize);
snapshotMetrics.observeSnapshotChunkCount(totalCount);
} catch (final IOException e) {
LOGGER.warn("Failed to observe size for snapshot {}", persistedSnapshot, e);
}
}
private void purgePendingSnapshots(final SnapshotId cutoffIndex) {
LOGGER.trace(
"Search for orphaned snapshots below oldest valid snapshot with index {} in {}",
cutoffIndex.getSnapshotIdAsString(),
pendingDirectory);
pendingSnapshots.stream()
.filter(pendingSnapshot -> pendingSnapshot.snapshotId().compareTo(cutoffIndex) < 0)
.forEach(PersistableSnapshot::abort);
// If there are orphaned directories if a previous abort failed, delete them explicitly
try (final var pendingSnapshotsDirectories = Files.newDirectoryStream(pendingDirectory)) {
for (final var pendingSnapshot : pendingSnapshotsDirectories) {
purgePendingSnapshot(cutoffIndex, pendingSnapshot);
}
} catch (final IOException e) {
LOGGER.warn(
"Failed to delete orphaned snapshots, could not list pending directory {}",
pendingDirectory,
e);
}
}
private void purgePendingSnapshot(final SnapshotId cutoffIndex, final Path pendingSnapshot) {
final var optionalMetadata = FileBasedSnapshotMetadata.ofPath(pendingSnapshot);
if (optionalMetadata.isPresent() && optionalMetadata.get().compareTo(cutoffIndex) < 0) {
try {
FileUtil.deleteFolder(pendingSnapshot);
LOGGER.debug("Deleted orphaned snapshot {}", pendingSnapshot);
} catch (final IOException e) {
LOGGER.warn(
"Failed to delete orphaned snapshot {}, risk using unnecessary disk space",
pendingSnapshot,
e);
}
}
}
private boolean isCurrentSnapshotNewer(final FileBasedSnapshotMetadata metadata) {
final var persistedSnapshot = currentPersistedSnapshotRef.get();
return (persistedSnapshot != null && persistedSnapshot.getMetadata().compareTo(metadata) >= 0);
}
// TODO(npepinpe): using Either here would allow easy rollback regardless of when or where an
// exception is thrown, without having to catch and rollback for every possible case
FileBasedSnapshot newSnapshot(
final FileBasedSnapshotMetadata metadata, final Path directory, final long expectedChecksum) {
final var currentPersistedSnapshot = currentPersistedSnapshotRef.get();
if (isCurrentSnapshotNewer(metadata)) {
LOGGER.debug(
"Snapshot is older than the latest snapshot {}. Snapshot {} won't be committed.",
currentPersistedSnapshot.getMetadata(),
metadata);
purgePendingSnapshots(metadata);
return currentPersistedSnapshot;
}
// it's important to persist the checksum file only after the move is finished, since we use it
// as a marker file to guarantee the move was complete and not partial
final var destination = buildSnapshotDirectory(metadata);
moveToSnapshotDirectory(directory, destination);
final var checksumPath = buildSnapshotsChecksumPath(metadata);
final long actualChecksum;
try {
// computing the checksum on the final destination also lets us detect any failures during the
// copy/move that could occur
actualChecksum = SnapshotChecksum.calculate(destination);
if (actualChecksum != expectedChecksum) {
rollbackPartialSnapshot(destination);
throw new InvalidSnapshotChecksum(directory, expectedChecksum, actualChecksum);
}
SnapshotChecksum.persist(checksumPath, actualChecksum);
} catch (final IOException e) {
rollbackPartialSnapshot(destination);
throw new UncheckedIOException(e);
}
final var newPersistedSnapshot =
new FileBasedSnapshot(destination, checksumPath, actualChecksum, metadata);
final var failed =
!currentPersistedSnapshotRef.compareAndSet(currentPersistedSnapshot, newPersistedSnapshot);
if (failed) {
// we moved already the snapshot but we expected that this will be cleaned up by the next
// successful snapshot
final var errorMessage =
"Expected that last snapshot is '%s', which should be replace with '%s', but last snapshot was '%s'.";
throw new ConcurrentModificationException(
String.format(
errorMessage,
currentPersistedSnapshot,
newPersistedSnapshot.getMetadata(),
currentPersistedSnapshotRef.get()));
}
LOGGER.info("Committed new snapshot {}", newPersistedSnapshot.getId());
snapshotMetrics.incrementSnapshotCount();
observeSnapshotSize(newPersistedSnapshot);
LOGGER.trace(
"Purging snapshots older than {}",
newPersistedSnapshot.getMetadata().getSnapshotIdAsString());
if (currentPersistedSnapshot != null) {
LOGGER.debug(
"Deleting previous snapshot {}",
currentPersistedSnapshot.getMetadata().getSnapshotIdAsString());
currentPersistedSnapshot.delete();
}
purgePendingSnapshots(newPersistedSnapshot.getMetadata());
listeners.forEach(listener -> listener.onNewSnapshot(newPersistedSnapshot));
return newPersistedSnapshot;
}
private void moveToSnapshotDirectory(final Path directory, final Path destination) {
try {
tryAtomicDirectoryMove(directory, destination);
} catch (final FileAlreadyExistsException e) {
LOGGER.debug(
"Expected to move snapshot from {} to {}, but it already exists",
directory,
destination,
e);
} catch (final IOException e) {
rollbackPartialSnapshot(destination);
throw new UncheckedIOException(e);
}
}
private void rollbackPartialSnapshot(final Path destination) {
try {
FileUtil.deleteFolderIfExists(destination);
} catch (final IOException ioException) {
LOGGER.debug(
"Pending snapshot {} could not be deleted on rollback, but will be safely ignored as a "
+ "partial snapshot",
destination,
ioException);
}
}
private void purgePendingSnapshot(final Path pendingSnapshot) {
try {
FileUtil.deleteFolder(pendingSnapshot);
LOGGER.debug("Deleted not completed (orphaned) snapshot {}", pendingSnapshot);
} catch (final IOException e) {
LOGGER.warn("Failed to delete not completed (orphaned) snapshot {}", pendingSnapshot, e);
}
}
private void tryAtomicDirectoryMove(final Path directory, final Path destination)
throws IOException {
try {
FileUtil.moveDurably(directory, destination, StandardCopyOption.ATOMIC_MOVE);
} catch (final AtomicMoveNotSupportedException e) {
LOGGER.warn("Atomic move not supported. Moving the snapshot files non-atomically", e);
FileUtil.moveDurably(directory, destination);
}
}
private Path buildPendingSnapshotDirectory(final SnapshotId id) {
return pendingDirectory.resolve(id.getSnapshotIdAsString());
}
private Path buildSnapshotDirectory(final FileBasedSnapshotMetadata metadata) {
return snapshotsDirectory.resolve(metadata.getSnapshotIdAsString());
}
private Path buildSnapshotsChecksumPath(final FileBasedSnapshotMetadata metadata) {
return snapshotsDirectory.resolve(metadata.getSnapshotIdAsString() + CHECKSUM_SUFFIX);
}
SnapshotMetrics getSnapshotMetrics() {
return snapshotMetrics;
}
}