Skip to content

Commit

Permalink
RATIS-1734. Separate state machine snapshot install path (#780)
Browse files Browse the repository at this point in the history
  • Loading branch information
SzyWilliam committed Nov 12, 2022
1 parent 5912ce5 commit 28e1ea6
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.apache.ratis.statemachine;

import java.io.IOException;

import org.apache.ratis.server.storage.RaftStorage;

import java.io.File;
import java.io.IOException;

public interface StateMachineStorage {

void init(RaftStorage raftStorage) throws IOException;
Expand All @@ -38,4 +39,14 @@ public interface StateMachineStorage {
void format() throws IOException;

void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) throws IOException;

/** @return the state machine directory. */
default File getSnapshotDir() {
return null;
}

/** @return the temporary directory. */
default File getTmpDir() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ class ServerState {
this.raftStorage = MemoizedCheckedSupplier.valueOf(
() -> StorageImplUtils.initRaftStorage(storageDirName, option, prop));

this.snapshotManager = StorageImplUtils.newSnapshotManager(id);
this.snapshotManager = StorageImplUtils.newSnapshotManager(id, () -> getStorage().getStorageDir(),
stateMachine.getStateMachineStorage());

// On start the leader is null, start the clock now
this.leaderId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.storage.FileChunkReader;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.JavaUtils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.Function;

/**
* An {@link Iterable} of {@link InstallSnapshotRequestProto} for sending a snapshot.
*
* <p>
* The snapshot is sent by one or more requests, where
* a snapshot has one or more files, and
* a file is sent by one or more chunks.
Expand All @@ -43,6 +48,7 @@
class InstallSnapshotRequests implements Iterable<InstallSnapshotRequestProto> {
private final RaftServer.Division server;
private final RaftPeerId followerId;
private final Function<FileInfo, Path> getRelativePath;

/** The snapshot to be sent. */
private final SnapshotInfo snapshot;
Expand Down Expand Up @@ -71,6 +77,21 @@ class InstallSnapshotRequests implements Iterable<InstallSnapshotRequestProto> {
this.snapshotChunkMaxSize = snapshotChunkMaxSize;
this.totalSize = snapshot.getFiles().stream().mapToLong(FileInfo::getFileSize).reduce(Long::sum).orElseThrow(
() -> new IllegalStateException("Failed to compute total size for snapshot " + snapshot));

final File snapshotDir = server.getStateMachine().getStateMachineStorage().getSnapshotDir();
final Function<Path, Path> relativize;
if (snapshotDir != null) {
final Path dir = snapshotDir.toPath();
// add STATE_MACHINE_DIR_NAME for compatibility.
relativize = p -> new File(RaftStorageDirectory.STATE_MACHINE_DIR_NAME, dir.relativize(p).toString()).toPath();
} else {
final Path dir = server.getRaftStorage().getStorageDir().getRoot().toPath();
relativize = dir::relativize;
}
this.getRelativePath = info -> Optional.of(info.getPath())
.filter(Path::isAbsolute)
.map(relativize)
.orElseGet(info::getPath);
}

@Override
Expand All @@ -97,7 +118,7 @@ private InstallSnapshotRequestProto nextInstallSnapshotRequestProto() {
final FileInfo info = snapshot.getFiles().get(fileIndex);
try {
if (current == null) {
current = new FileChunkReader(info, server.getRaftStorage().getStorageDir());
current = new FileChunkReader(info, getRelativePath.apply(info));
}
final FileChunkProto chunk = current.readFileChunk(snapshotChunkMaxSize);
if (chunk.getDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.nio.file.Path;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.Optional;

/** Read {@link FileChunkProto}s from a file. */
public class FileChunkReader implements Closeable {
Expand All @@ -47,15 +46,12 @@ public class FileChunkReader implements Closeable {
* Construct a reader from a file specified by the given {@link FileInfo}.
*
* @param info the information of the file.
* @param directory the directory where the file is stored.
* @param relativePath the relative path of the file.
* @throws IOException if it failed to open the file.
*/
public FileChunkReader(FileInfo info, RaftStorageDirectory directory) throws IOException {
public FileChunkReader(FileInfo info, Path relativePath) throws IOException {
this.info = info;
this.relativePath = Optional.of(info.getPath())
.filter(Path::isAbsolute)
.map(p -> directory.getRoot().toPath().relativize(p))
.orElse(info.getPath());
this.relativePath = relativePath;
final File f = info.getPath().toFile();
if (info.getFileDigest() == null) {
digester = MD5Hash.getDigester();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,36 @@
*/
package org.apache.ratis.server.storage;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.function.Supplier;

import org.apache.ratis.io.CorruptedFileException;
import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.proto.RaftProtos.FileChunkProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MD5FileUtil;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Manage snapshots of a raft peer.
* TODO: snapshot should be treated as compaction log thus can be merged into
Expand All @@ -55,10 +59,22 @@ public class SnapshotManager {
private static final String TMP = ".tmp";

private final RaftPeerId selfId;

private final Supplier<File> snapshotDir;
private final Supplier<File> tmp;
private final Function<FileChunkProto, String> getRelativePath;
private final Supplier<MessageDigest> digester = JavaUtils.memoize(MD5Hash::getDigester);

SnapshotManager(RaftPeerId selfId) {
SnapshotManager(RaftPeerId selfId, Supplier<RaftStorageDirectory> dir, StateMachineStorage smStorage) {
this.selfId = selfId;
this.snapshotDir = MemoizedSupplier.valueOf(
() -> Optional.ofNullable(smStorage.getSnapshotDir()).orElseGet(() -> dir.get().getStateMachineDir()));
this.tmp = MemoizedSupplier.valueOf(
() -> Optional.ofNullable(smStorage.getTmpDir()).orElseGet(() -> dir.get().getTmpDir()));

final Supplier<Path> smDir = MemoizedSupplier.valueOf(() -> dir.get().getStateMachineDir().toPath());
this.getRelativePath = c -> smDir.get().relativize(
new File(dir.get().getRoot(), c.getFilename()).toPath()).toString();
}

public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine, RaftStorageDirectory dir)
Expand All @@ -67,7 +83,7 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();

// create a unique temporary directory
final File tmpDir = new File(dir.getTmpDir(), "snapshot-" + snapshotChunkRequest.getRequestId());
final File tmpDir = new File(tmp.get(), "snapshot-" + snapshotChunkRequest.getRequestId());
FileUtils.createDirectories(tmpDir);
tmpDir.deleteOnExit();

Expand All @@ -77,7 +93,6 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
// TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
// and are not lost when whole request cycle is done. Check requestId and requestIndex here

final Path stateMachineDir = dir.getStateMachineDir().toPath();
for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
SnapshotInfo pi = stateMachine.getLatestSnapshot();
if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
Expand All @@ -86,9 +101,7 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
+ " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
}

String fileName = chunk.getFilename(); // this is relative to the root dir
final Path relative = stateMachineDir.relativize(new File(dir.getRoot(), fileName).toPath());
final File tmpSnapshotFile = new File(tmpDir, relative.toString());
final File tmpSnapshotFile = new File(tmpDir, getRelativePath.apply(chunk));
FileUtils.createDirectories(tmpSnapshotFile);

FileOutputStream out = null;
Expand Down Expand Up @@ -147,7 +160,7 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
}

if (snapshotChunkRequest.getDone()) {
rename(tmpDir, dir.getStateMachineDir());
rename(tmpDir, snapshotDir.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys.Log;
import org.apache.ratis.server.storage.RaftStorage.StartupOption;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.util.SizeInBytes;

import java.io.File;
Expand All @@ -31,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.ratis.server.RaftServer.Division.LOG;
Expand All @@ -42,8 +44,9 @@ private StorageImplUtils() {
//Never constructed
}

public static SnapshotManager newSnapshotManager(RaftPeerId id) {
return new SnapshotManager(id);
public static SnapshotManager newSnapshotManager(RaftPeerId id,
Supplier<RaftStorageDirectory> dir, StateMachineStorage smStorage) {
return new SnapshotManager(id, dir, smStorage);
}

/** Create a {@link RaftStorageImpl}. */
Expand Down
Loading

0 comments on commit 28e1ea6

Please sign in to comment.