From 28e1ea6c9ab8da4b2977a17df0423b3329945640 Mon Sep 17 00:00:00 2001
From: William Song <48054931+SzyWilliam@users.noreply.github.com>
Date: Sat, 12 Nov 2022 13:47:26 +0800
Subject: [PATCH] RATIS-1734. Separate state machine snapshot install path
(#780)
---
.../statemachine/StateMachineStorage.java | 15 +-
.../apache/ratis/server/impl/ServerState.java | 3 +-
.../leader/InstallSnapshotRequests.java | 25 +-
.../ratis/server/storage/FileChunkReader.java | 10 +-
.../ratis/server/storage/SnapshotManager.java | 47 ++-
.../server/storage/StorageImplUtils.java | 7 +-
.../ratis/InstallSnapshotFromLeaderTests.java | 353 ++++++++++++------
7 files changed, 317 insertions(+), 143 deletions(-)
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
index 86ffdf6db5..f5853e1fbc 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
@@ -17,10 +17,11 @@
*/
package org.apache.ratis.statemachine;
-import java.io.IOException;
-
import org.apache.ratis.server.storage.RaftStorage;
+import java.io.File;
+import java.io.IOException;
+
public interface StateMachineStorage {
void init(RaftStorage raftStorage) throws IOException;
@@ -38,4 +39,14 @@ public interface StateMachineStorage {
void format() throws IOException;
void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) throws IOException;
+
+ /** @return the state machine directory. */
+ default File getSnapshotDir() {
+ return null;
+ }
+
+ /** @return the temporary directory. */
+ default File getTmpDir() {
+ return null;
+ }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index d90cddf017..d2d5623c8a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -116,7 +116,8 @@ class ServerState {
this.raftStorage = MemoizedCheckedSupplier.valueOf(
() -> StorageImplUtils.initRaftStorage(storageDirName, option, prop));
- this.snapshotManager = StorageImplUtils.newSnapshotManager(id);
+ this.snapshotManager = StorageImplUtils.newSnapshotManager(id, () -> getStorage().getStorageDir(),
+ stateMachine.getStateMachineStorage());
// On start the leader is null, start the clock now
this.leaderId = null;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
index 38d1f9a2aa..f52253b247 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
@@ -25,16 +25,21 @@
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.storage.FileChunkReader;
import org.apache.ratis.server.storage.FileInfo;
+import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.JavaUtils;
+import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.function.Function;
/**
* An {@link Iterable} of {@link InstallSnapshotRequestProto} for sending a snapshot.
- *
+ *
* The snapshot is sent by one or more requests, where
* a snapshot has one or more files, and
* a file is sent by one or more chunks.
@@ -43,6 +48,7 @@
class InstallSnapshotRequests implements Iterable {
private final RaftServer.Division server;
private final RaftPeerId followerId;
+ private final Function getRelativePath;
/** The snapshot to be sent. */
private final SnapshotInfo snapshot;
@@ -71,6 +77,21 @@ class InstallSnapshotRequests implements Iterable {
this.snapshotChunkMaxSize = snapshotChunkMaxSize;
this.totalSize = snapshot.getFiles().stream().mapToLong(FileInfo::getFileSize).reduce(Long::sum).orElseThrow(
() -> new IllegalStateException("Failed to compute total size for snapshot " + snapshot));
+
+ final File snapshotDir = server.getStateMachine().getStateMachineStorage().getSnapshotDir();
+ final Function relativize;
+ if (snapshotDir != null) {
+ final Path dir = snapshotDir.toPath();
+ // add STATE_MACHINE_DIR_NAME for compatibility.
+ relativize = p -> new File(RaftStorageDirectory.STATE_MACHINE_DIR_NAME, dir.relativize(p).toString()).toPath();
+ } else {
+ final Path dir = server.getRaftStorage().getStorageDir().getRoot().toPath();
+ relativize = dir::relativize;
+ }
+ this.getRelativePath = info -> Optional.of(info.getPath())
+ .filter(Path::isAbsolute)
+ .map(relativize)
+ .orElseGet(info::getPath);
}
@Override
@@ -97,7 +118,7 @@ private InstallSnapshotRequestProto nextInstallSnapshotRequestProto() {
final FileInfo info = snapshot.getFiles().get(fileIndex);
try {
if (current == null) {
- current = new FileChunkReader(info, server.getRaftStorage().getStorageDir());
+ current = new FileChunkReader(info, getRelativePath.apply(info));
}
final FileChunkProto chunk = current.readFileChunk(snapshotChunkMaxSize);
if (chunk.getDone()) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
index 15ed981572..a5ee662580 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
@@ -30,7 +30,6 @@
import java.nio.file.Path;
import java.security.DigestInputStream;
import java.security.MessageDigest;
-import java.util.Optional;
/** Read {@link FileChunkProto}s from a file. */
public class FileChunkReader implements Closeable {
@@ -47,15 +46,12 @@ public class FileChunkReader implements Closeable {
* Construct a reader from a file specified by the given {@link FileInfo}.
*
* @param info the information of the file.
- * @param directory the directory where the file is stored.
+ * @param relativePath the relative path of the file.
* @throws IOException if it failed to open the file.
*/
- public FileChunkReader(FileInfo info, RaftStorageDirectory directory) throws IOException {
+ public FileChunkReader(FileInfo info, Path relativePath) throws IOException {
this.info = info;
- this.relativePath = Optional.of(info.getPath())
- .filter(Path::isAbsolute)
- .map(p -> directory.getRoot().toPath().relativize(p))
- .orElse(info.getPath());
+ this.relativePath = relativePath;
final File f = info.getPath().toFile();
if (info.getFileDigest() == null) {
digester = MD5Hash.getDigester();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 17294b572f..dbde869409 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -17,32 +17,36 @@
*/
package org.apache.ratis.server.storage;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.nio.channels.FileChannel;
-import java.nio.file.Path;
-import java.security.DigestOutputStream;
-import java.security.MessageDigest;
-import java.util.function.Supplier;
-
import org.apache.ratis.io.CorruptedFileException;
import org.apache.ratis.io.MD5Hash;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.proto.RaftProtos.FileChunkProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MD5FileUtil;
+import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
/**
* Manage snapshots of a raft peer.
* TODO: snapshot should be treated as compaction log thus can be merged into
@@ -55,10 +59,22 @@ public class SnapshotManager {
private static final String TMP = ".tmp";
private final RaftPeerId selfId;
+
+ private final Supplier snapshotDir;
+ private final Supplier tmp;
+ private final Function getRelativePath;
private final Supplier digester = JavaUtils.memoize(MD5Hash::getDigester);
- SnapshotManager(RaftPeerId selfId) {
+ SnapshotManager(RaftPeerId selfId, Supplier dir, StateMachineStorage smStorage) {
this.selfId = selfId;
+ this.snapshotDir = MemoizedSupplier.valueOf(
+ () -> Optional.ofNullable(smStorage.getSnapshotDir()).orElseGet(() -> dir.get().getStateMachineDir()));
+ this.tmp = MemoizedSupplier.valueOf(
+ () -> Optional.ofNullable(smStorage.getTmpDir()).orElseGet(() -> dir.get().getTmpDir()));
+
+ final Supplier smDir = MemoizedSupplier.valueOf(() -> dir.get().getStateMachineDir().toPath());
+ this.getRelativePath = c -> smDir.get().relativize(
+ new File(dir.get().getRoot(), c.getFilename()).toPath()).toString();
}
public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine, RaftStorageDirectory dir)
@@ -67,7 +83,7 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();
// create a unique temporary directory
- final File tmpDir = new File(dir.getTmpDir(), "snapshot-" + snapshotChunkRequest.getRequestId());
+ final File tmpDir = new File(tmp.get(), "snapshot-" + snapshotChunkRequest.getRequestId());
FileUtils.createDirectories(tmpDir);
tmpDir.deleteOnExit();
@@ -77,7 +93,6 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
// TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
// and are not lost when whole request cycle is done. Check requestId and requestIndex here
- final Path stateMachineDir = dir.getStateMachineDir().toPath();
for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
SnapshotInfo pi = stateMachine.getLatestSnapshot();
if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
@@ -86,9 +101,7 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
+ " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
}
- String fileName = chunk.getFilename(); // this is relative to the root dir
- final Path relative = stateMachineDir.relativize(new File(dir.getRoot(), fileName).toPath());
- final File tmpSnapshotFile = new File(tmpDir, relative.toString());
+ final File tmpSnapshotFile = new File(tmpDir, getRelativePath.apply(chunk));
FileUtils.createDirectories(tmpSnapshotFile);
FileOutputStream out = null;
@@ -147,7 +160,7 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
}
if (snapshotChunkRequest.getDone()) {
- rename(tmpDir, dir.getStateMachineDir());
+ rename(tmpDir, snapshotDir.get());
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
index 865e2b2b16..bf10141e32 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/StorageImplUtils.java
@@ -22,6 +22,7 @@
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys.Log;
import org.apache.ratis.server.storage.RaftStorage.StartupOption;
+import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.util.SizeInBytes;
import java.io.File;
@@ -31,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.ratis.server.RaftServer.Division.LOG;
@@ -42,8 +44,9 @@ private StorageImplUtils() {
//Never constructed
}
- public static SnapshotManager newSnapshotManager(RaftPeerId id) {
- return new SnapshotManager(id);
+ public static SnapshotManager newSnapshotManager(RaftPeerId id,
+ Supplier dir, StateMachineStorage smStorage) {
+ return new SnapshotManager(id, dir, smStorage);
}
/** Create a {@link RaftStorageImpl}. */
diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
index 1cc0d3001e..bece4cb169 100644
--- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
@@ -30,10 +30,11 @@
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.FileListSnapshotInfo;
+import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.LifeCycle;
import org.junit.Assert;
@@ -43,78 +44,92 @@
import java.io.File;
import java.io.IOException;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
public abstract class InstallSnapshotFromLeaderTests
- extends BaseTest
- implements MiniRaftCluster.Factory.Get {
- static final Logger LOG = LoggerFactory.getLogger(InstallSnapshotFromLeaderTests.class);
- {
- final RaftProperties prop = getProperties();
- prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
- StateMachineWithMultiNestedSnapshotFile.class, StateMachine.class);
- RaftServerConfigKeys.Log.setPurgeGap(prop, PURGE_GAP);
- RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
- prop, SNAPSHOT_TRIGGER_THRESHOLD);
- RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true);
- }
+ extends BaseTest
+ implements MiniRaftCluster.Factory.Get {
+ static final Logger LOG = LoggerFactory.getLogger(InstallSnapshotFromLeaderTests.class);
- private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64;
- private static final int PURGE_GAP = 8;
+ {
+ final RaftProperties prop = getProperties();
+ RaftServerConfigKeys.Log.setPurgeGap(prop, PURGE_GAP);
+ RaftServerConfigKeys.Snapshot.setAutoTriggerThreshold(
+ prop, SNAPSHOT_TRIGGER_THRESHOLD);
+ RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(prop, true);
+ }
- @Test
- public void testMultiFileInstallSnapshot() throws Exception {
- runWithNewCluster(1, this::testMultiFileInstallSnapshot);
- }
+ private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64;
+ private static final int PURGE_GAP = 8;
+
+ @Test
+ public void testMultiFileInstallSnapshot() throws Exception {
+ getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ StateMachineWithMultiNestedSnapshotFile.class, StateMachine.class);
+ runWithNewCluster(1, this::testMultiFileInstallSnapshot);
+ }
+
+ @Test
+ public void testSeparateSnapshotInstallPath() throws Exception {
+ getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+ StateMachineWithSeparatedSnapshotPath.class, StateMachine.class);
+ runWithNewCluster(1, this::testMultiFileInstallSnapshot);
+ }
- private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
- try {
- int i = 0;
- RaftTestUtil.waitForLeader(cluster);
- final RaftPeerId leaderId = cluster.getLeader().getId();
-
- try (final RaftClient client = cluster.createClient(leaderId)) {
- for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
- RaftClientReply
- reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
- Assert.assertTrue(reply.isSuccess());
- }
-
- client.getSnapshotManagementApi(leaderId).create(3000);
- }
-
- final SnapshotInfo snapshot = cluster.getLeader().getStateMachine().getLatestSnapshot();
- Assert.assertEquals(3, snapshot.getFiles().size());
-
- // add two more peers
- final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
- true);
- // trigger setConfiguration
- cluster.setConfiguration(change.allPeersInNewConf);
-
- RaftServerTestUtil
- .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
-
- // Check the installed snapshot file number on each Follower matches with the
- // leader snapshot.
- for (RaftServer.Division follower : cluster.getFollowers()) {
- Assert.assertEquals(follower.getStateMachine().getLatestSnapshot().getFiles().size(), 3);
- }
- } finally {
- cluster.shutdown();
+ private void testMultiFileInstallSnapshot(CLUSTER cluster) throws Exception {
+ try {
+ int i = 0;
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ for (; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
+ RaftClientReply
+ reply = client.io().send(new RaftTestUtil.SimpleMessage("m" + i));
+ Assert.assertTrue(reply.isSuccess());
}
+
+ client.getSnapshotManagementApi(leaderId).create(3000);
+ }
+
+ final SnapshotInfo snapshot = cluster.getLeader().getStateMachine().getLatestSnapshot();
+ Assert.assertEquals(3, snapshot.getFiles().size());
+
+ // add two more peers
+ final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
+ true);
+ // trigger setConfiguration
+ cluster.setConfiguration(change.allPeersInNewConf);
+
+ RaftServerTestUtil
+ .waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
+
+ // Check the installed snapshot file number on each Follower matches with the
+ // leader snapshot.
+ for (RaftServer.Division follower : cluster.getFollowers()) {
+ Assert.assertEquals(3, follower.getStateMachine().getLatestSnapshot().getFiles().size());
+ }
+ } finally {
+ cluster.shutdown();
}
+ }
- private static class StateMachineWithMultiNestedSnapshotFile extends SimpleStateMachine4Testing {
+ private static class StateMachineWithMultiNestedSnapshotFile extends SimpleStateMachine4Testing {
- File snapshotRoot;
- File file1;
- File file2;
+ File snapshotRoot;
+ File file1;
+ File file2;
- @Override
- public synchronized void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException {
- super.initialize(server, groupId, raftStorage);
+ @Override
+ public synchronized void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException {
+ super.initialize(server, groupId, raftStorage);
// contains two snapshot files
// sm/snapshot/1.bin
@@ -124,61 +139,175 @@ public synchronized void initialize(RaftServer server, RaftGroupId groupId, Raft
file2 = new File(new File(snapshotRoot, "sub"), "2.bin");
}
- @Override
- public synchronized void pause() {
- if (getLifeCycle().getCurrentState() == LifeCycle.State.RUNNING) {
- getLifeCycle().transition(LifeCycle.State.PAUSING);
- getLifeCycle().transition(LifeCycle.State.PAUSED);
- }
+ @Override
+ public synchronized void pause() {
+ if (getLifeCycle().getCurrentState() == LifeCycle.State.RUNNING) {
+ getLifeCycle().transition(LifeCycle.State.PAUSING);
+ getLifeCycle().transition(LifeCycle.State.PAUSED);
+ }
+ }
+
+ @Override
+ public long takeSnapshot() {
+ final TermIndex termIndex = getLastAppliedTermIndex();
+ if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) {
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+
+ final long endIndex = termIndex.getIndex();
+ try {
+ if (!snapshotRoot.exists()) {
+ FileUtils.createDirectories(snapshotRoot);
+ FileUtils.createDirectories(file1.getParentFile());
+ FileUtils.createDirectories(file2.getParentFile());
+ FileUtils.createNewFile(file1.toPath());
+ FileUtils.createNewFile(file2.toPath());
}
- @Override
- public long takeSnapshot() {
- final TermIndex termIndex = getLastAppliedTermIndex();
- if (termIndex.getTerm() <= 0 || termIndex.getIndex() <= 0) {
- return RaftLog.INVALID_LOG_INDEX;
- }
-
- final long endIndex = termIndex.getIndex();
- try {
- if (!snapshotRoot.exists()) {
- FileUtils.createDirectories(snapshotRoot);
- FileUtils.createDirectories(file1.getParentFile());
- FileUtils.createDirectories(file2.getParentFile());
- FileUtils.createNewFile(file1.toPath());
- FileUtils.createNewFile(file2.toPath());
- }
-
- } catch (IOException ioException) {
- return RaftLog.INVALID_LOG_INDEX;
- }
-
- Assert.assertTrue(file1.exists());
- Assert.assertTrue(file2.exists());
- return super.takeSnapshot();
+ } catch (IOException ioException) {
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+
+ Assert.assertTrue(file1.exists());
+ Assert.assertTrue(file2.exists());
+ return super.takeSnapshot();
+ }
+
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ if (!snapshotRoot.exists() || !file1.exists() || !file2.exists()) {
+ return null;
+ }
+ List files = new ArrayList<>();
+ files.add(new FileInfo(
+ file1.toPath(),
+ null));
+ files.add(new FileInfo(
+ file2.toPath(),
+ null));
+ Assert.assertEquals(2, files.size());
+
+ SnapshotInfo info = super.getLatestSnapshot();
+ if (info == null) {
+ return null;
+ }
+ files.add(info.getFiles().get(0));
+ return new FileListSnapshotInfo(files,
+ info.getTerm(), info.getIndex());
+ }
+ }
+
+
+ private static class StateMachineWithSeparatedSnapshotPath extends SimpleStateMachine4Testing {
+ private File root;
+ private File snapshotDir;
+ private File tmpDir;
+
+ @Override
+ public synchronized void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException {
+ super.initialize(server, groupId, raftStorage);
+ this.root = new File("/tmp/ratis-tests/statemachine/" + getId().toString());
+ this.snapshotDir = new File(root, "snapshot");
+ this.tmpDir = new File(root, "tmp");
+ FileUtils.deleteFully(root);
+ Assert.assertTrue(this.snapshotDir.mkdirs());
+ Assert.assertTrue(this.tmpDir.mkdirs());
+ this.root.deleteOnExit();
+ }
+
+ @Override
+ public synchronized void pause() {
+ if (getLifeCycle().getCurrentState() == LifeCycle.State.RUNNING) {
+ getLifeCycle().transition(LifeCycle.State.PAUSING);
+ getLifeCycle().transition(LifeCycle.State.PAUSED);
+ }
+ }
+
+ @Override
+ public long takeSnapshot() {
+ final TermIndex lastApplied = getLastAppliedTermIndex();
+ final File snapshotTmpDir = new File(tmpDir, UUID.randomUUID().toString());
+ final File snapshotRealDir = new File(snapshotDir, String.format("%d_%d", lastApplied.getTerm(), lastApplied.getIndex()));
+
+ try {
+ FileUtils.deleteFully(snapshotRealDir);
+ FileUtils.deleteFully(snapshotTmpDir);
+ Assert.assertTrue(snapshotTmpDir.mkdirs());
+ final File snapshotFile1 = new File(snapshotTmpDir, "deer");
+ final File snapshotFile2 = new File(snapshotTmpDir, "loves");
+ final File snapshotFile3 = new File(snapshotTmpDir, "vegetable");
+ Assert.assertTrue(snapshotFile1.createNewFile());
+ Assert.assertTrue(snapshotFile2.createNewFile());
+ Assert.assertTrue(snapshotFile3.createNewFile());
+ FileUtils.move(snapshotTmpDir, snapshotRealDir);
+ } catch (IOException ioe) {
+ LOG.error("create snapshot data file failed", ioe);
+ return RaftLog.INVALID_LOG_INDEX;
+ }
+
+ return lastApplied.getIndex();
+ }
+
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ Path[] sortedSnapshots = getSortedSnapshotDirPaths();
+ if (sortedSnapshots == null || sortedSnapshots.length == 0) {
+ return null;
+ }
+
+ File latest = sortedSnapshots[sortedSnapshots.length - 1].toFile();
+ TermIndex snapshotLastIncluded = TermIndex.valueOf
+ (Long.parseLong(latest.getName().split("_")[0]), Long.parseLong(latest.getName().split("_")[1]));
+
+ List fileInfos = new ArrayList<>();
+ for (File f : Objects.requireNonNull(latest.listFiles())) {
+ if (!f.getName().endsWith(".md5")) {
+ fileInfos.add(new FileInfo(f.toPath(), null));
}
+ }
- @Override
- public SnapshotInfo getLatestSnapshot() {
- if (!snapshotRoot.exists() || !file1.exists() || !file2.exists()) {
- return null;
- }
- List files = new ArrayList<>();
- files.add(new FileInfo(
- file1.toPath(),
- null));
- files.add(new FileInfo(
- file2.toPath(),
- null));
- Assert.assertEquals(files.size(), 2);
-
- SnapshotInfo info = super.getLatestSnapshot();
- if (info == null) {
- return null;
- }
- files.add(info.getFiles().get(0));
- return new FileListSnapshotInfo(files,
- info.getTerm(), info.getIndex());
+ return new FileListSnapshotInfo(fileInfos, snapshotLastIncluded.getTerm(), snapshotLastIncluded.getIndex());
+ }
+
+ private Path[] getSortedSnapshotDirPaths() {
+ ArrayList snapshotPaths = new ArrayList<>();
+ try (DirectoryStream stream = Files.newDirectoryStream(snapshotDir.toPath())) {
+ for (Path path : stream) {
+ if (path.toFile().isDirectory()) {
+ snapshotPaths.add(path);
+ }
}
+ } catch (IOException exception) {
+ LOG.warn("cannot construct snapshot directory stream ", exception);
+ return null;
+ }
+
+ Path[] pathArray = snapshotPaths.toArray(new Path[0]);
+ Arrays.sort(
+ pathArray,
+ (o1, o2) -> {
+ String index1 = o1.toFile().getName().split("_")[1];
+ String index2 = o2.toFile().getName().split("_")[1];
+ return Long.compare(Long.parseLong(index1), Long.parseLong(index2));
+ });
+ return pathArray;
+ }
+
+ @Override
+ public SimpleStateMachineStorage getStateMachineStorage() {
+ return new SeparateSnapshotStateMachineStorage();
+ }
+
+ private class SeparateSnapshotStateMachineStorage extends SimpleStateMachineStorage {
+ @Override
+ public File getSnapshotDir() {
+ return snapshotDir;
+ }
+
+ @Override
+ public File getTmpDir() {
+ return tmpDir;
+ }
}
+ }
}