Skip to content

Commit

Permalink
Add snapshot store tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jul 3, 2017
1 parent 8323b4d commit 37f961a
Show file tree
Hide file tree
Showing 11 changed files with 295 additions and 15 deletions.
Expand Up @@ -475,7 +475,7 @@ protected InstallRequest buildInstallRequest(RaftMemberContext member) {
request = InstallRequest.newBuilder() request = InstallRequest.newBuilder()
.withTerm(server.getTerm()) .withTerm(server.getTerm())
.withLeader(leader != null ? leader.memberId() : null) .withLeader(leader != null ? leader.memberId() : null)
.withId(snapshot.snapshotId().id()) .withId(snapshot.serviceId().id())
.withIndex(snapshot.index()) .withIndex(snapshot.index())
.withOffset(member.getNextSnapshotOffset()) .withOffset(member.getNextSnapshotOffset())
.withData(data) .withData(data)
Expand Down
Expand Up @@ -42,7 +42,7 @@ final class FileSnapshot extends Snapshot {
} }


@Override @Override
public ServiceId snapshotId() { public ServiceId serviceId() {
return file.snapshotId(); return file.snapshotId();
} }


Expand Down
Expand Up @@ -41,7 +41,7 @@ final class MemorySnapshot extends Snapshot {
} }


@Override @Override
public ServiceId snapshotId() { public ServiceId serviceId() {
return ServiceId.from(descriptor.snapshotId()); return ServiceId.from(descriptor.snapshotId());
} }


Expand Down Expand Up @@ -75,10 +75,10 @@ public synchronized SnapshotReader openReader() {
@Override @Override
public Snapshot persist() { public Snapshot persist() {
if (store.storage.storageLevel() != StorageLevel.MEMORY) { if (store.storage.storageLevel() != StorageLevel.MEMORY) {
try (Snapshot newSnapshot = store.newSnapshot(snapshotId(), index(), timestamp())) { try (Snapshot newSnapshot = store.newSnapshot(serviceId(), index(), timestamp())) {
try (SnapshotWriter newSnapshotWriter = newSnapshot.openWriter()) { try (SnapshotWriter newSnapshotWriter = newSnapshot.openWriter()) {
buffer.flip(); buffer.flip().skip(SnapshotDescriptor.BYTES);
newSnapshotWriter.write(buffer.array(), 0, buffer.remaining()); newSnapshotWriter.write(buffer.array(), buffer.position(), buffer.remaining());
} }
return newSnapshot; return newSnapshot;
} }
Expand All @@ -93,7 +93,7 @@ public boolean isPersisted() {


@Override @Override
public Snapshot complete() { public Snapshot complete() {
buffer.flip(); buffer.flip().skip(SnapshotDescriptor.BYTES).mark();
descriptor.lock(); descriptor.lock();
return super.complete(); return super.complete();
} }
Expand Down
Expand Up @@ -65,7 +65,7 @@ protected Snapshot(SnapshotStore store) {
* *
* @return The snapshot identifier. * @return The snapshot identifier.
*/ */
public abstract ServiceId snapshotId(); public abstract ServiceId serviceId();


/** /**
* Returns the snapshot index. * Returns the snapshot index.
Expand Down
Expand Up @@ -30,7 +30,7 @@
*/ */
public final class SnapshotFile { public final class SnapshotFile {
@VisibleForTesting @VisibleForTesting
static final SimpleDateFormat TIMESTAMP_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss"); static final SimpleDateFormat TIMESTAMP_FORMAT = new SimpleDateFormat("yyyyMMddHHmmssSSS");
private static final char PART_SEPARATOR = '-'; private static final char PART_SEPARATOR = '-';
private static final char EXTENSION_SEPARATOR = '.'; private static final char EXTENSION_SEPARATOR = '.';
private static final String EXTENSION = "snapshot"; private static final String EXTENSION = "snapshot";
Expand Down
Expand Up @@ -85,9 +85,9 @@ public SnapshotStore(RaftStorage storage) {
*/ */
private void open() { private void open() {
for (Snapshot snapshot : loadSnapshots()) { for (Snapshot snapshot : loadSnapshots()) {
Snapshot existingSnapshot = stateMachineSnapshots.get(snapshot.snapshotId()); Snapshot existingSnapshot = stateMachineSnapshots.get(snapshot.serviceId());
if (existingSnapshot == null || existingSnapshot.index() < snapshot.index()) { if (existingSnapshot == null || existingSnapshot.index() < snapshot.index()) {
stateMachineSnapshots.put(snapshot.snapshotId(), snapshot); stateMachineSnapshots.put(snapshot.serviceId(), snapshot);


// If a newer snapshot was found, delete the old snapshot if necessary. // If a newer snapshot was found, delete the old snapshot if necessary.
if (existingSnapshot != null && !storage.isRetainStaleSnapshots()) { if (existingSnapshot != null && !storage.isRetainStaleSnapshots()) {
Expand Down Expand Up @@ -232,9 +232,9 @@ protected synchronized void completeSnapshot(Snapshot snapshot) {
checkNotNull(snapshot, "snapshot cannot be null"); checkNotNull(snapshot, "snapshot cannot be null");


// Only store the snapshot if no existing snapshot exists. // Only store the snapshot if no existing snapshot exists.
Snapshot existingSnapshot = stateMachineSnapshots.get(snapshot.snapshotId()); Snapshot existingSnapshot = stateMachineSnapshots.get(snapshot.serviceId());
if (existingSnapshot == null || existingSnapshot.index() <= snapshot.index()) { if (existingSnapshot == null || existingSnapshot.index() <= snapshot.index()) {
stateMachineSnapshots.put(snapshot.snapshotId(), snapshot); stateMachineSnapshots.put(snapshot.serviceId(), snapshot);
indexSnapshots.put(snapshot.index(), snapshot); indexSnapshots.put(snapshot.index(), snapshot);


// Delete the old snapshot if necessary. // Delete the old snapshot if necessary.
Expand Down
Expand Up @@ -72,6 +72,8 @@
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import static org.testng.Assert.assertEquals;

/** /**
* Raft test. * Raft test.
*/ */
Expand Down Expand Up @@ -1355,7 +1357,7 @@ public void snapshot(SnapshotWriter writer) {


@Override @Override
public void install(SnapshotReader reader) { public void install(SnapshotReader reader) {
assert reader.readLong() == 10; assertEquals(reader.readLong(), 10);
} }


protected long write(RaftCommit<Void> commit) { protected long write(RaftCommit<Void> commit) {
Expand Down
@@ -0,0 +1,86 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.protocols.raft.storage.snapshot;

import io.atomix.protocols.raft.service.ServiceId;
import io.atomix.time.WallClockTimestamp;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;

/**
* Snapshot store test.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
@Test
public abstract class AbstractSnapshotStoreTest {

/**
* Returns a new snapshot store.
*/
protected abstract SnapshotStore createSnapshotStore();

/**
* Tests writing a snapshot.
*/
public void testWriteSnapshotChunks() {
SnapshotStore store = createSnapshotStore();
WallClockTimestamp timestamp = new WallClockTimestamp();
Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, timestamp);
assertEquals(snapshot.serviceId(), ServiceId.from(1));
assertEquals(snapshot.index(), 2);
assertEquals(snapshot.timestamp(), timestamp);

assertNull(store.getSnapshotById(ServiceId.from(1)));
assertNull(store.getSnapshotByIndex(2));

try (SnapshotWriter writer = snapshot.openWriter()) {
writer.writeLong(10);
}

assertNull(store.getSnapshotById(ServiceId.from(1)));
assertNull(store.getSnapshotByIndex(2));

try (SnapshotWriter writer = snapshot.openWriter()) {
writer.writeLong(11);
}

assertNull(store.getSnapshotById(ServiceId.from(1)));
assertNull(store.getSnapshotByIndex(2));

try (SnapshotWriter writer = snapshot.openWriter()) {
writer.writeLong(12);
}

assertNull(store.getSnapshotById(ServiceId.from(1)));
assertNull(store.getSnapshotByIndex(2));
snapshot.complete();

assertEquals(store.getSnapshotById(ServiceId.from(1)).serviceId(), ServiceId.from(1));
assertEquals(store.getSnapshotById(ServiceId.from(1)).index(), 2);
assertEquals(store.getSnapshotByIndex(2).serviceId(), ServiceId.from(1));
assertEquals(store.getSnapshotByIndex(2).index(), 2);

try (SnapshotReader reader = store.getSnapshotById(ServiceId.from(1)).openReader()) {
assertEquals(reader.readLong(), 10);
assertEquals(reader.readLong(), 11);
assertEquals(reader.readLong(), 12);
}
}

}
@@ -0,0 +1,151 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.protocols.raft.storage.snapshot;

import io.atomix.protocols.raft.service.ServiceId;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
import io.atomix.time.WallClockTimestamp;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.UUID;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

/**
* File snapshot store test.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
@Test
public class FileSnapshotStoreTest extends AbstractSnapshotStoreTest {
private String testId;

/**
* Returns a new snapshot store.
*/
protected SnapshotStore createSnapshotStore() {
RaftStorage storage = RaftStorage.newBuilder()
.withPrefix("test")
.withDirectory(new File(String.format("target/test-logs/%s", testId)))
.withStorageLevel(StorageLevel.DISK)
.build();
return new SnapshotStore(storage);
}

/**
* Tests storing and loading snapshots.
*/
public void testStoreLoadSnapshot() {
SnapshotStore store = createSnapshotStore();

Snapshot snapshot = store.newSnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
try (SnapshotWriter writer = snapshot.openWriter()) {
writer.writeLong(10);
}
snapshot.complete();
assertNotNull(store.getSnapshotById(ServiceId.from(1)));
assertNotNull(store.getSnapshotByIndex(2));
store.close();

store = createSnapshotStore();
assertNotNull(store.getSnapshotById(ServiceId.from(1)));
assertNotNull(store.getSnapshotByIndex(2));
assertEquals(store.getSnapshotById(ServiceId.from(1)).serviceId(), ServiceId.from(1));
assertEquals(store.getSnapshotById(ServiceId.from(1)).index(), 2);
assertEquals(store.getSnapshotByIndex(2).serviceId(), ServiceId.from(1));
assertEquals(store.getSnapshotByIndex(2).index(), 2);

try (SnapshotReader reader = snapshot.openReader()) {
assertEquals(reader.readLong(), 10);
}
}

/**
* Tests persisting and loading snapshots.
*/
public void testPersistLoadSnapshot() {
SnapshotStore store = createSnapshotStore();

Snapshot snapshot = store.newTemporarySnapshot(ServiceId.from(1), 2, new WallClockTimestamp());
try (SnapshotWriter writer = snapshot.openWriter()) {
writer.writeLong(10);
}

snapshot = snapshot.persist();

assertNull(store.getSnapshotById(ServiceId.from(1)));
assertNull(store.getSnapshotByIndex(2));

snapshot.complete();
assertNotNull(store.getSnapshotById(ServiceId.from(1)));
assertNotNull(store.getSnapshotByIndex(2));

try (SnapshotReader reader = snapshot.openReader()) {
assertEquals(reader.readLong(), 10);
}

store.close();

store = createSnapshotStore();
assertNotNull(store.getSnapshotById(ServiceId.from(1)));
assertNotNull(store.getSnapshotByIndex(2));
assertEquals(store.getSnapshotById(ServiceId.from(1)).serviceId(), ServiceId.from(1));
assertEquals(store.getSnapshotById(ServiceId.from(1)).index(), 2);
assertEquals(store.getSnapshotByIndex(2).serviceId(), ServiceId.from(1));
assertEquals(store.getSnapshotByIndex(2).index(), 2);

snapshot = store.getSnapshotById(ServiceId.from(1));
try (SnapshotReader reader = snapshot.openReader()) {
assertEquals(reader.readLong(), 10);
}
}

@BeforeMethod
@AfterMethod
protected void cleanupStorage() throws IOException {
Path directory = Paths.get("target/test-logs/");
if (Files.exists(directory)) {
Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
}
testId = UUID.randomUUID().toString();
}

}
@@ -0,0 +1,41 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.protocols.raft.storage.snapshot;

import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
import org.testng.annotations.Test;

/**
* Memory snapshot store test.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
@Test
public class MemorySnapshotStoreTest extends AbstractSnapshotStoreTest {

/**
* Returns a new snapshot store.
*/
protected SnapshotStore createSnapshotStore() {
RaftStorage storage = RaftStorage.newBuilder()
.withPrefix("test")
.withStorageLevel(StorageLevel.MEMORY)
.build();
return new SnapshotStore(storage);
}

}
Expand Up @@ -72,7 +72,7 @@ public void testParseSnapshotTimestamp() throws Exception {
} }


public void testTimestampDecoder() throws Exception { public void testTimestampDecoder() throws Exception {
String timestampString = "20170624151018"; String timestampString = "20170624151018000";
SnapshotFile.TIMESTAMP_FORMAT.parse(timestampString); SnapshotFile.TIMESTAMP_FORMAT.parse(timestampString);
} }


Expand Down

0 comments on commit 37f961a

Please sign in to comment.