Skip to content
Permalink
Browse files

Add test ::dumpHeapCheckpointFromEmbeddedJournal

  • Loading branch information...
ggezer committed May 14, 2019
1 parent edf5a78 commit c2ebec53307b9014ee9e0ba7b5aed560623ae59d
@@ -329,9 +329,8 @@ public UfsJournalDumper(String master, long start, long end, String outputDir,
public void dumpJournal() throws Throwable {
UfsJournal journal = new UfsJournalSystem(getJournalLocation(mInputDir), 0)
.createJournal(new NoopMaster(sMaster));
try (
PrintStream out =
new PrintStream(new BufferedOutputStream(new FileOutputStream(mJournalEntryFile)));
try (PrintStream out = new PrintStream(new BufferedOutputStream(new FileOutputStream(
mJournalEntryFile)));
JournalReader reader = new UfsJournalReader(journal, sStart, true)) {
boolean done = false;
while (!done && reader.getNextSequenceNumber() < sEnd) {
@@ -475,7 +474,7 @@ private void dumpEntry(JournalEntry je, PrintStream out) {

private void dumpCheckpointStream(CheckpointInputStream checkpointStream) {
try {
Path snapshotDir = Paths.get(mCheckpointsDir, "copycat-snapshot.log");
Path snapshotDir = Paths.get(mCheckpointsDir);
readCheckpoint(checkpointStream, snapshotDir.toAbsolutePath());
} catch (IOException exc) {
LOG.error("Failed while reading checkpoint stream.", exc);
@@ -27,12 +27,23 @@
import alluxio.grpc.SetAttributePOptions;
import alluxio.master.journal.JournalTool;
import alluxio.master.journal.JournalType;
import alluxio.master.journal.raft.RaftJournalSystem;
import alluxio.multi.process.MultiProcessCluster;
import alluxio.multi.process.PortCoordination;
import alluxio.testutils.BaseIntegrationTest;
import alluxio.testutils.IntegrationTestUtils;
import alluxio.util.CommonUtils;
import alluxio.util.io.PathUtils;

import io.atomix.catalyst.concurrent.SingleThreadContext;
import io.atomix.catalyst.serializer.Serializer;
import io.atomix.copycat.protocol.ClientRequestTypeResolver;
import io.atomix.copycat.protocol.ClientResponseTypeResolver;
import io.atomix.copycat.server.storage.Storage;
import io.atomix.copycat.server.storage.snapshot.Snapshot;
import io.atomix.copycat.server.storage.util.StorageSerialization;
import io.atomix.copycat.server.util.ServerSerialization;
import io.atomix.copycat.util.ProtocolSerialization;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Rule;
@@ -47,6 +58,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;

/**
* Tests for {@link JournalTool}.
@@ -151,7 +164,7 @@ public void dumpEmbeddedJournal() throws Throwable {
}

@Test
public void dumpHeapCheckpoint() throws Throwable {
public void dumpHeapCheckpointFromUfsJournal() throws Throwable {
initializeCluster(new HashMap<PropertyKey, String>() {
{
put(PropertyKey.MASTER_METASTORE, "HEAP");
@@ -170,7 +183,7 @@ public void dumpHeapCheckpoint() throws Throwable {
SetAttributePOptions.newBuilder()
.setCommonOptions(FileSystemMasterCommonPOptions.newBuilder().setTtl(100000).build())
.build());
checkpoint();
checkpointUfsJournal();
JournalTool.main(new String[]{"-outputDir", mDumpDir.getAbsolutePath()});
String checkpointDir = findCheckpointDir();

@@ -182,6 +195,41 @@ public void dumpHeapCheckpoint() throws Throwable {
}
}

@Test
public void dumpHeapCheckpointFromEmbeddedJournal() throws Throwable {
initializeCluster(new HashMap<PropertyKey, String>() {
{
put(PropertyKey.MASTER_JOURNAL_TYPE, JournalType.EMBEDDED.toString());
put(PropertyKey.MASTER_METASTORE, "HEAP");
}
});

for (String name : Arrays.asList("/pin", "/max_replication", "/async_persist", "/ttl")) {
mFs.createFile(new AlluxioURI(name)).close();
}
mFs.setAttribute(new AlluxioURI("/pin"),
SetAttributePOptions.newBuilder().setPinned(true).build());
mFs.setAttribute(new AlluxioURI("/max_replication"),
SetAttributePOptions.newBuilder().setReplicationMax(5).build());
mFs.persist(new AlluxioURI("/async_persist"));
mFs.setAttribute(new AlluxioURI("/ttl"),
SetAttributePOptions.newBuilder()
.setCommonOptions(FileSystemMasterCommonPOptions.newBuilder().setTtl(100000).build())
.build());
checkpointEmbeddedJournal();
JournalTool.main(new String[] {"-outputDir", mDumpDir.getAbsolutePath()});
// Embedded journal checkpoints are grouped by masters.
String fsMasterCheckpointsDir =
PathUtils.concatPath(mDumpDir.getAbsolutePath(), "checkpoints", "FILE_SYSTEM_MASTER");

assertNonemptyFileExists(
PathUtils.concatPath(fsMasterCheckpointsDir, "INODE_DIRECTORY_ID_GENERATOR"));
for (String subPath : Arrays.asList("HEAP_INODE_STORE", "INODE_COUNTER",
"PINNED_INODE_FILE_IDS", "REPLICATION_LIMITED_FILE_IDS", "TO_BE_PERSISTED_FILE_IDS")) {
assertNonemptyFileExists(PathUtils.concatPath(fsMasterCheckpointsDir, "INODE_TREE", subPath));
}
}

@Test
public void dumpRocksCheckpoint() throws Throwable {
initializeCluster(new HashMap<PropertyKey, String>() {
@@ -190,21 +238,72 @@ public void dumpRocksCheckpoint() throws Throwable {
}
});

checkpoint();
checkpointUfsJournal();
JournalTool.main(new String[] {"-outputDir", mDumpDir.getAbsolutePath()});
String checkpointDir = findCheckpointDir();
assertNonemptyDirExists(
PathUtils.concatPath(checkpointDir, "INODE_TREE", "CACHING_INODE_STORE"));
}

private void checkpoint() throws Exception {
private void checkpointUfsJournal() throws Exception {
// Perform operations to generate a checkpoint.
for (int i = 0; i < CHECKPOINT_SIZE * 2; i++) {
mFs.createFile(new AlluxioURI("/" + i)).close();
}
IntegrationTestUtils.waitForUfsJournalCheckpoint(Constants.FILE_SYSTEM_MASTER_NAME);
}

private void checkpointEmbeddedJournal() throws Throwable {
int leaderIdx = mMultiProcessCluster.getPrimaryMasterIndex(GET_MASTER_INDEX_TIMEOUT_MS);
int followerIdx = leaderIdx ^ 1; // Assumes 2 masters.
String followerJournalFolder =
mMultiProcessCluster.getJournalDir() + Integer.toString(followerIdx);

// Get current snapshot before issuing operations.
long curentSnapshotIdx = getCurrentCopyCatSnapshotIndex(followerJournalFolder);

// Perform operations to generate a checkpoint.
for (int i = 0; i < CHECKPOINT_SIZE * 2; i++) {
mFs.createFile(new AlluxioURI("/" + i)).close();
}

// Wait until current snapshot index changes.
CommonUtils.waitFor("copycat checkpoint to be written", () -> {
try {
long latestSnapshotIdx = getCurrentCopyCatSnapshotIndex(followerJournalFolder);
return latestSnapshotIdx != curentSnapshotIdx;
} catch (Throwable err) {
return false;
}
});
}

private long getCurrentCopyCatSnapshotIndex(String journalFolder) throws Throwable {
Serializer serializer = RaftJournalSystem.createSerializer();
serializer.resolve(new ClientRequestTypeResolver());
serializer.resolve(new ClientResponseTypeResolver());
serializer.resolve(new ProtocolSerialization());
serializer.resolve(new ServerSerialization());
serializer.resolve(new StorageSerialization());

SingleThreadContext context = new SingleThreadContext("readJournal", serializer);
AtomicLong currentSnapshotIdx = new AtomicLong();
try {
// Read through the whole journal content, starting from snapshot.
context.execute(() -> {
Storage journalStorage = Storage.builder().withDirectory(journalFolder).build();
Snapshot currentShapshot = journalStorage.openSnapshotStore("copycat").currentSnapshot();
currentSnapshotIdx.set((currentShapshot != null) ? currentShapshot.index() : -1);
}).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
} catch (ExecutionException e) {
throw e.getCause();
}
return currentSnapshotIdx.get();
}

private String findCheckpointDir() throws IOException {
List<Path> checkpoint = Files.list(mDumpDir.toPath())
.filter(p -> p.toString().contains("checkpoints-")).collect(toList());

0 comments on commit c2ebec5

Please sign in to comment.
You can’t perform that action at this time.