Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-1734. Separate state machine snapshot install path #780

Merged
merged 2 commits into from
Nov 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.apache.ratis.statemachine;

import java.io.IOException;

import org.apache.ratis.server.storage.RaftStorage;

import java.io.File;
import java.io.IOException;

public interface StateMachineStorage {

void init(RaftStorage raftStorage) throws IOException;
Expand All @@ -38,4 +39,14 @@ public interface StateMachineStorage {
void format() throws IOException;

void cleanupOldSnapshots(SnapshotRetentionPolicy snapshotRetentionPolicy) throws IOException;

/** @return the state machine directory. */
default File getSnapshotDir() {
return null;
}

/** @return the temporary directory. */
default File getTmpDir() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,14 @@ class ServerState {
this.raftStorage = MemoizedCheckedSupplier.valueOf(
() -> StorageImplUtils.initRaftStorage(storageDirName, option, prop));

this.snapshotManager = StorageImplUtils.newSnapshotManager(id);
this.snapshotManager = StorageImplUtils.newSnapshotManager(id, () -> {
try {
return raftStorage.get().getStorageDir();
} catch (IOException e) {
throw new IllegalStateException("RaftStorageDirectory is not properly initialized " + storageDirName, e);
}
},
SzyWilliam marked this conversation as resolved.
Show resolved Hide resolved
stateMachine.getStateMachineStorage());

// On start the leader is null, start the clock now
this.leaderId = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.storage.FileChunkReader;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.util.JavaUtils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.Function;

/**
* An {@link Iterable} of {@link InstallSnapshotRequestProto} for sending a snapshot.
*
* <p>
* The snapshot is sent by one or more requests, where
* a snapshot has one or more files, and
* a file is sent by one or more chunks.
Expand All @@ -43,6 +48,7 @@
class InstallSnapshotRequests implements Iterable<InstallSnapshotRequestProto> {
private final RaftServer.Division server;
private final RaftPeerId followerId;
private final Function<FileInfo, Path> getRelativePath;

/** The snapshot to be sent. */
private final SnapshotInfo snapshot;
Expand Down Expand Up @@ -71,6 +77,21 @@ class InstallSnapshotRequests implements Iterable<InstallSnapshotRequestProto> {
this.snapshotChunkMaxSize = snapshotChunkMaxSize;
this.totalSize = snapshot.getFiles().stream().mapToLong(FileInfo::getFileSize).reduce(Long::sum).orElseThrow(
() -> new IllegalStateException("Failed to compute total size for snapshot " + snapshot));

final File snapshotDir = server.getStateMachine().getStateMachineStorage().getSnapshotDir();
final Function<Path, Path> relativize;
if (snapshotDir != null) {
final Path dir = snapshotDir.toPath();
// add STATE_MACHINE_DIR_NAME for compatibility.
relativize = p -> new File(RaftStorageDirectory.STATE_MACHINE_DIR_NAME, dir.relativize(p).toString()).toPath();
} else {
final Path dir = server.getRaftStorage().getStorageDir().getRoot().toPath();
relativize = dir::relativize;
}
this.getRelativePath = info -> Optional.of(info.getPath())
.filter(Path::isAbsolute)
.map(relativize)
.orElseGet(info::getPath);
}

@Override
Expand All @@ -97,7 +118,7 @@ private InstallSnapshotRequestProto nextInstallSnapshotRequestProto() {
final FileInfo info = snapshot.getFiles().get(fileIndex);
try {
if (current == null) {
current = new FileChunkReader(info, server.getRaftStorage().getStorageDir());
current = new FileChunkReader(info, getRelativePath.apply(info));
}
final FileChunkProto chunk = current.readFileChunk(snapshotChunkMaxSize);
if (chunk.getDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.nio.file.Path;
import java.security.DigestInputStream;
import java.security.MessageDigest;
import java.util.Optional;

/** Read {@link FileChunkProto}s from a file. */
public class FileChunkReader implements Closeable {
Expand All @@ -47,15 +46,12 @@ public class FileChunkReader implements Closeable {
* Construct a reader from a file specified by the given {@link FileInfo}.
*
* @param info the information of the file.
* @param directory the directory where the file is stored.
* @param relativePath the relative path of the file.
* @throws IOException if it failed to open the file.
*/
public FileChunkReader(FileInfo info, RaftStorageDirectory directory) throws IOException {
public FileChunkReader(FileInfo info, Path relativePath) throws IOException {
this.info = info;
this.relativePath = Optional.of(info.getPath())
.filter(Path::isAbsolute)
.map(p -> directory.getRoot().toPath().relativize(p))
.orElse(info.getPath());
this.relativePath = relativePath;
final File f = info.getPath().toFile();
if (info.getFileDigest() == null) {
digester = MD5Hash.getDigester();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,35 @@
*/
package org.apache.ratis.server.storage;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.function.Supplier;

import org.apache.ratis.io.CorruptedFileException;
import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.proto.RaftProtos.FileChunkProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MD5FileUtil;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Manage snapshots of a raft peer.
* TODO: snapshot should be treated as compaction log thus can be merged into
Expand All @@ -55,10 +58,22 @@ public class SnapshotManager {
private static final String TMP = ".tmp";

private final RaftPeerId selfId;

private final Supplier<File> snapshotDir;
private final Supplier<File> tmp;
private final Function<FileChunkProto, String> getRelativePath;
private final Supplier<MessageDigest> digester = JavaUtils.memoize(MD5Hash::getDigester);

SnapshotManager(RaftPeerId selfId) {
SnapshotManager(RaftPeerId selfId, Supplier<RaftStorageDirectory> dir, StateMachineStorage smStorage) {
this.selfId = selfId;
this.snapshotDir = MemoizedSupplier.valueOf(
() -> Optional.ofNullable(smStorage.getSnapshotDir()).orElseGet(() -> dir.get().getStateMachineDir()));
this.tmp = MemoizedSupplier.valueOf(
() -> Optional.ofNullable(smStorage.getTmpDir()).orElseGet(() -> dir.get().getTmpDir()));

this.getRelativePath =
c -> dir.get().getStateMachineDir().toPath()
.relativize(new File(dir.get().getRoot(), c.getFilename()).toPath()).toString();
SzyWilliam marked this conversation as resolved.
Show resolved Hide resolved
}

public void installSnapshot(InstallSnapshotRequestProto request, StateMachine stateMachine, RaftStorageDirectory dir)
Expand All @@ -67,7 +82,7 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
final long lastIncludedIndex = snapshotChunkRequest.getTermIndex().getIndex();

// create a unique temporary directory
final File tmpDir = new File(dir.getTmpDir(), "snapshot-" + snapshotChunkRequest.getRequestId());
final File tmpDir = new File(tmp.get(), "snapshot-" + snapshotChunkRequest.getRequestId());
FileUtils.createDirectories(tmpDir);
tmpDir.deleteOnExit();

Expand All @@ -77,7 +92,6 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
// TODO: Make sure that subsequent requests for the same installSnapshot are coming in order,
// and are not lost when whole request cycle is done. Check requestId and requestIndex here

final Path stateMachineDir = dir.getStateMachineDir().toPath();
for (FileChunkProto chunk : snapshotChunkRequest.getFileChunksList()) {
SnapshotInfo pi = stateMachine.getLatestSnapshot();
if (pi != null && pi.getTermIndex().getIndex() >= lastIncludedIndex) {
Expand All @@ -86,9 +100,7 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
+ " with endIndex >= lastIncludedIndex " + lastIncludedIndex);
}

String fileName = chunk.getFilename(); // this is relative to the root dir
final Path relative = stateMachineDir.relativize(new File(dir.getRoot(), fileName).toPath());
final File tmpSnapshotFile = new File(tmpDir, relative.toString());
final File tmpSnapshotFile = new File(tmpDir, getRelativePath.apply(chunk));
FileUtils.createDirectories(tmpSnapshotFile);

FileOutputStream out = null;
Expand Down Expand Up @@ -147,7 +159,7 @@ public void installSnapshot(InstallSnapshotRequestProto request, StateMachine st
}

if (snapshotChunkRequest.getDone()) {
rename(tmpDir, dir.getStateMachineDir());
rename(tmpDir, snapshotDir.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys.Log;
import org.apache.ratis.server.storage.RaftStorage.StartupOption;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.util.SizeInBytes;

import java.io.File;
Expand All @@ -31,6 +32,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.ratis.server.RaftServer.Division.LOG;
Expand All @@ -42,8 +44,9 @@ private StorageImplUtils() {
//Never constructed
}

public static SnapshotManager newSnapshotManager(RaftPeerId id) {
return new SnapshotManager(id);
public static SnapshotManager newSnapshotManager(RaftPeerId id,
Supplier<RaftStorageDirectory> dir, StateMachineStorage smStorage) {
return new SnapshotManager(id, dir, smStorage);
}

/** Create a {@link RaftStorageImpl}. */
Expand Down
Loading