Skip to content
This repository has been archived by the owner on Dec 19, 2017. It is now read-only.

Commit

Permalink
Support optional snapshots for state machines.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Nov 30, 2015
1 parent 7f8760d commit f6dfb2d
Show file tree
Hide file tree
Showing 20 changed files with 820 additions and 152 deletions.
12 changes: 6 additions & 6 deletions server/src/main/java/io/atomix/copycat/server/CopycatServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,9 @@ public CompletableFuture<Void> delete() {
* Raft server builder.
*/
public static class Builder extends io.atomix.catalyst.util.Builder<CopycatServer> {
private static final Duration DEFAULT_RAFT_ELECTION_TIMEOUT = Duration.ofMillis(1000);
private static final Duration DEFAULT_RAFT_HEARTBEAT_INTERVAL = Duration.ofMillis(150);
private static final Duration DEFAULT_RAFT_SESSION_TIMEOUT = Duration.ofMillis(5000);
private static final Duration DEFAULT_ELECTION_TIMEOUT = Duration.ofMillis(1000);
private static final Duration DEFAULT_HEARTBEAT_INTERVAL = Duration.ofMillis(150);
private static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofMillis(5000);

private Transport clientTransport;
private Transport serverTransport;
Expand All @@ -547,9 +547,9 @@ public static class Builder extends io.atomix.catalyst.util.Builder<CopycatServe
private Address clientAddress;
private Address serverAddress;
private Set<Address> cluster;
private Duration electionTimeout = DEFAULT_RAFT_ELECTION_TIMEOUT;
private Duration heartbeatInterval = DEFAULT_RAFT_HEARTBEAT_INTERVAL;
private Duration sessionTimeout = DEFAULT_RAFT_SESSION_TIMEOUT;
private Duration electionTimeout = DEFAULT_ELECTION_TIMEOUT;
private Duration heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
private Duration sessionTimeout = DEFAULT_SESSION_TIMEOUT;

private Builder(Address clientAddress, Address serverAddress, Collection<Address> cluster) {
this.clientAddress = Assert.notNull(clientAddress, "clientAddress");
Expand Down
55 changes: 55 additions & 0 deletions server/src/main/java/io/atomix/copycat/server/SessionAware.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.copycat.server;

import io.atomix.copycat.client.session.Session;

/**
* Session aware state machine interface.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public interface SessionAware {

/**
* Called when a session is registered.
*
* @param session The session that was registered.
*/
void register(Session session);

/**
* Called when a session is unregistered by the client.
*
* @param session The session that was unregistered.
*/
void unregister(Session session);

/**
* Called when a session is expired by the system.
*
* @param session The session that was expired.
*/
void expire(Session session);

/**
* Called when a session was closed.
*
* @param session The session that was closed.
*/
void close(Session session);

}
42 changes: 42 additions & 0 deletions server/src/main/java/io/atomix/copycat/server/SnapshotAware.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package io.atomix.copycat.server;

import io.atomix.copycat.server.storage.SnapshotReader;
import io.atomix.copycat.server.storage.SnapshotWriter;

/**
* Snapshottable state machine interface.
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public interface SnapshotAware {

/**
* Takes a snapshot of the state machine state.
*
* @param writer The snapshot writer.
*/
void snapshot(SnapshotWriter writer);

/**
* Installs a snapshot of the state machine state.
*
* @param reader The snapshot reader.
*/
void install(SnapshotReader reader);

}
27 changes: 0 additions & 27 deletions server/src/main/java/io/atomix/copycat/server/StateMachine.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,33 +225,6 @@ protected Clock clock() {
return executor.context().clock();
}

/**
* Called when a new session is registered.
*
* @param session The session that was registered.
*/
public void register(Session session) {

}

/**
* Called when a session is expired.
*
* @param session The expired session.
*/
public void expire(Session session) {

}

/**
* Called when a session is closed.
*
* @param session The session that was closed.
*/
public void close(Session session) {

}

/**
* Closes the state machine.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.atomix.copycat.server.request;

import io.atomix.catalyst.buffer.Buffer;
import io.atomix.catalyst.buffer.BufferInput;
import io.atomix.catalyst.buffer.BufferOutput;
import io.atomix.catalyst.serializer.SerializeWith;
Expand Down Expand Up @@ -57,6 +58,8 @@ public static Builder builder(ConfigureRequest request) {
private int leader;
protected long version;
protected Collection<Member> members;
protected long snapshot;
protected Buffer data;

/**
* Returns the requesting node's current term.
Expand Down Expand Up @@ -94,23 +97,44 @@ public Collection<Member> members() {
return members;
}

/**
* Returns the snapshot index.
*
* @return The snapshot index.
*/
public long snapshot() {
return snapshot;
}

/**
* Returns the snapshot data.
*
* @return The snapshot data.
*/
public Buffer data() {
return data;
}

@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
buffer.writeLong(term).writeInt(leader).writeLong(version);
buffer.writeLong(term).writeInt(leader).writeLong(version).writeLong(snapshot);
serializer.writeObject(members, buffer);
serializer.writeObject(data, buffer);
}

@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
term = buffer.readLong();
leader = buffer.readInt();
version = buffer.readLong();
snapshot = buffer.readLong();
members = serializer.readObject(buffer);
data = serializer.readObject(buffer);
}

@Override
public int hashCode() {
return Objects.hash(getClass(), term, leader, version, members);
return Objects.hash(getClass(), term, leader, version, snapshot, members);
}

@Override
Expand All @@ -120,14 +144,15 @@ public boolean equals(Object object) {
return request.term == term
&& request.leader == leader
&& request.version == version
&& request.snapshot == snapshot
&& request.members.equals(members);
}
return false;
}

@Override
public String toString() {
return String.format("%s[term=%d, leader=%d, version=%d, members=%s]", getClass().getSimpleName(), term, leader, version, members);
return String.format("%s[term=%d, leader=%d, version=%d, snapshot=%d, members=%s]", getClass().getSimpleName(), term, leader, version, snapshot, members);
}

/**
Expand Down Expand Up @@ -173,6 +198,29 @@ public Builder withVersion(long version) {
return this;
}

/**
* Sets the request snapshot version.
*
* @param snapshot The snapshot version.
* @return The request builder.
*/
public Builder withSnapshot(long snapshot) {
request.snapshot = Assert.argNot(snapshot, snapshot < 0, "snapshot must be positive");
return this;
}

/**
* Sets the request snapshot data.
*
* @param data The request snapshot data.
* @return The request builder.
* @throws NullPointerException if {@code member} is null
*/
public Builder withData(Buffer data) {
request.data = Assert.notNull(data, "data");
return this;
}

/**
* Sets the request members.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package io.atomix.copycat.server.state;

import io.atomix.catalyst.transport.Connection;
import io.atomix.catalyst.util.Assert;
import io.atomix.copycat.server.request.AppendRequest;
import io.atomix.copycat.server.request.ConfigureRequest;
import io.atomix.copycat.server.response.AppendResponse;
import io.atomix.copycat.server.response.ConfigureResponse;
import io.atomix.copycat.server.storage.MetaStore;
import io.atomix.copycat.server.storage.entry.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -127,12 +129,28 @@ else if (!appending.contains(member)) {
*/
protected ConfigureRequest buildConfigureRequest(MemberState member) {
Member leader = context.getLeader();
return ConfigureRequest.builder()
.withTerm(context.getTerm())
.withLeader(leader != null ? leader.id() : 0)
.withVersion(context.getCluster().getVersion())
.withMembers(context.getCluster().getMembers())
.build();

// If the member's match index is less than the last stored snapshot index,
// send the snapshot to the member with the initial configuration.
if (member.getMatchIndex() < context.getStateMachine().getSnapshotIndex()) {
MetaStore.Snapshot snapshot = context.getMetaStore().loadSnapshot();
Assert.stateNot(snapshot == null, "inconsistent snapshot state");
return ConfigureRequest.builder()
.withTerm(context.getTerm())
.withLeader(leader != null ? leader.id() : 0)
.withVersion(context.getCluster().getVersion())
.withSnapshot(snapshot.version())
.withData(snapshot.data())
.withMembers(context.getCluster().getMembers())
.build();
} else {
return ConfigureRequest.builder()
.withTerm(context.getTerm())
.withLeader(leader != null ? leader.id() : 0)
.withVersion(context.getCluster().getVersion())
.withMembers(context.getCluster().getMembers())
.build();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private CompletableFuture<Void> commitInitialEntries() {
private void applyEntries(long index) {
if (!context.getLog().isEmpty()) {
int count = 0;
for (long lastApplied = Math.max(context.getLastApplied(), context.getLog().firstIndex()); lastApplied <= index; lastApplied++) {
for (long lastApplied = Math.max(context.getStateMachine().getLastApplied(), context.getLog().firstIndex()); lastApplied <= index; lastApplied++) {
Entry entry = context.getLog().get(lastApplied);
if (entry != null) {
context.getStateMachine().apply(entry).whenComplete((result, error) -> {
Expand All @@ -142,7 +142,7 @@ private void applyEntries(long index) {
}

LOGGER.debug("{} - Applied {} entries to log", context.getCluster().getMember().serverAddress(), count);
context.getLog().compactor().minorIndex(context.getLastCompleted());
context.getLog().compactor().minorIndex(context.getStateMachine().getLastCompleted());
}
}

Expand All @@ -164,7 +164,7 @@ private void appendMembers() {
context.checkThread();
if (isOpen()) {
appender.appendEntries().whenComplete((result, error) -> {
context.getLog().compactor().minorIndex(context.getLastCompleted());
context.getLog().compactor().minorIndex(context.getStateMachine().getLastCompleted());
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.atomix.copycat.server.CopycatServer;
import io.atomix.copycat.server.request.*;
import io.atomix.copycat.server.response.*;
import io.atomix.copycat.server.storage.MetaStore;
import io.atomix.copycat.server.storage.entry.ConfigurationEntry;
import io.atomix.copycat.server.storage.entry.ConnectEntry;
import io.atomix.copycat.server.storage.entry.Entry;
Expand Down Expand Up @@ -178,7 +179,7 @@ protected AppendResponse doAppendEntries(AppendRequest request) {
// If we've made it this far, apply commits and send a successful response.
long commitIndex = request.commitIndex();
context.getThreadContext().execute(() -> applyCommits(commitIndex)).thenRun(() -> {
context.getLog().compactor().minorIndex(context.getLastCompleted());
context.getLog().compactor().minorIndex(context.getStateMachine().getLastCompleted());
});

return AppendResponse.builder()
Expand All @@ -198,7 +199,7 @@ protected CompletableFuture<Void> applyCommits(long commitIndex) {

// The entries to be applied to the state machine are the difference between min(lastIndex, commitIndex) and lastApplied.
long lastIndex = context.getLog().lastIndex();
long lastApplied = context.getLastApplied();
long lastApplied = context.getStateMachine().getLastApplied();

long effectiveIndex = Math.min(lastIndex, context.getCommitIndex());

Expand Down Expand Up @@ -414,6 +415,12 @@ protected CompletableFuture<ConfigureResponse> configure(ConfigureRequest reques
// Configure the cluster membership.
context.getCluster().configure(request.version(), request.members());

// Install the snapshot if necessary.
if (request.snapshot() > context.getStateMachine().getSnapshotIndex()) {
context.getMetaStore().storeSnapshot(new MetaStore.Snapshot(request.version(), request.data()));
context.getStateMachine().setSnapshotIndex(request.snapshot());
}

// If the local member type changed, transition the state as appropriate.
// ACTIVE servers are initialized to the FOLLOWER state but may transition to CANDIDATE or LEADER.
// PASSIVE servers are transitioned to the PASSIVE state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
*
* @author <a href="http://github.com/kuujo>Jordan Halterman</a>
*/
public class ServerClock extends Clock {
final class ServerClock extends Clock {
private final ZoneId zoneId = ZoneId.of("UTC");
private Instant instant;

Expand Down
Loading

0 comments on commit f6dfb2d

Please sign in to comment.