Skip to content

Commit

Permalink
Rename StateMachineID to ServiceId
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jul 3, 2017
1 parent f777ed2 commit 740be32
Show file tree
Hide file tree
Showing 12 changed files with 46 additions and 56 deletions.
Expand Up @@ -18,7 +18,6 @@
import io.atomix.protocols.raft.impl.DefaultRaftOperationExecutor;
import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.session.RaftSessions;
import io.atomix.protocols.raft.storage.snapshot.StateMachineId;
import io.atomix.time.LogicalClock;
import io.atomix.time.WallClock;
import io.atomix.utils.concurrent.Scheduler;
Expand Down Expand Up @@ -76,8 +75,8 @@ protected Scheduler getScheduler() {
*
* @return The unique state machine identifier.
*/
protected StateMachineId getStateMachineId() {
return context.stateMachineId();
protected ServiceId getStateMachineId() {
return context.serviceId();
}

/**
Expand Down
Expand Up @@ -17,7 +17,6 @@
package io.atomix.protocols.raft;

import io.atomix.protocols.raft.session.RaftSessions;
import io.atomix.protocols.raft.storage.snapshot.StateMachineId;
import io.atomix.time.LogicalClock;
import io.atomix.time.WallClock;

Expand All @@ -35,14 +34,7 @@ public interface ServiceContext {
*
* @return The unique state machine identifier.
*/
StateMachineId stateMachineId();

/**
* Returns the server name.
*
* @return The server name.
*/
String serverName();
ServiceId serviceId();

/**
* Returns the state machine name.
Expand Down
Expand Up @@ -13,23 +13,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.protocols.raft.storage.snapshot;
package io.atomix.protocols.raft;

import io.atomix.utils.AbstractIdentifier;

/**
* Snapshot identifier.
*/
public class StateMachineId extends AbstractIdentifier<Long> {
public class ServiceId extends AbstractIdentifier<Long> {

/**
* Creates a snapshot ID from the given number.
*
* @param id the number from which to create the identifier
* @return the snapshot identifier
*/
public static StateMachineId from(long id) {
return new StateMachineId(id);
public static ServiceId from(long id) {
return new ServiceId(id);
}

/**
Expand All @@ -38,11 +38,11 @@ public static StateMachineId from(long id) {
* @param id the string from which to create the identifier
* @return the snapshot identifier
*/
public static StateMachineId from(String id) {
public static ServiceId from(String id) {
return from(Long.parseLong(id));
}

public StateMachineId(Long value) {
public ServiceId(Long value) {
super(value);
}
}
Expand Up @@ -54,7 +54,7 @@ public class DefaultRaftOperationExecutor implements RaftOperationExecutor {

public DefaultRaftOperationExecutor(ServiceContext context) {
this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftService.class)
.addValue(context.stateMachineId())
.addValue(context.serviceId())
.add("type", context.serviceType())
.add("name", context.serviceName())
.build());
Expand Down
Expand Up @@ -25,6 +25,7 @@
import io.atomix.protocols.raft.RaftStateMachine;
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.ServiceContext;
import io.atomix.protocols.raft.ServiceId;
import io.atomix.protocols.raft.ServiceType;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.session.RaftSessionListener;
Expand All @@ -35,7 +36,6 @@
import io.atomix.protocols.raft.storage.snapshot.Snapshot;
import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.protocols.raft.storage.snapshot.StateMachineId;
import io.atomix.time.LogicalClock;
import io.atomix.time.LogicalTimestamp;
import io.atomix.time.WallClock;
Expand All @@ -57,7 +57,7 @@ public class RaftServerServiceContext implements ServiceContext {
private static final long SNAPSHOT_INTERVAL_MILLIS = 1000 * 60 * 10;

private final Logger log;
private final StateMachineId stateMachineId;
private final ServiceId serviceId;
private final String serviceName;
private final ServiceType serviceType;
private final RaftStateMachine stateMachine;
Expand Down Expand Up @@ -85,15 +85,15 @@ public WallClockTimestamp getTime() {
};

RaftServerServiceContext(
StateMachineId stateMachineId,
ServiceId serviceId,
String serviceName,
ServiceType serviceType,
RaftStateMachine stateMachine,
RaftServerContext server,
RaftSessionManager sessionManager,
ThreadContext stateMachineExecutor,
ThreadContext snapshotExecutor) {
this.stateMachineId = checkNotNull(stateMachineId);
this.serviceId = checkNotNull(serviceId);
this.serviceName = checkNotNull(serviceName);
this.serviceType = checkNotNull(serviceType);
this.stateMachine = checkNotNull(stateMachine);
Expand All @@ -102,7 +102,7 @@ public WallClockTimestamp getTime() {
this.stateMachineExecutor = checkNotNull(stateMachineExecutor);
this.snapshotExecutor = checkNotNull(snapshotExecutor);
this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftService.class)
.addValue(stateMachineId)
.addValue(serviceId)
.add("type", serviceType)
.add("name", serviceName)
.build());
Expand All @@ -118,13 +118,8 @@ private void init() {
}

@Override
public StateMachineId stateMachineId() {
return stateMachineId;
}

@Override
public String serverName() {
return server.getName();
public ServiceId serviceId() {
return serviceId;
}

@Override
Expand Down Expand Up @@ -237,7 +232,7 @@ private synchronized void maybeTakeSnapshot(long index, long timestamp) {
if (pendingSnapshot == null && snapshotTime == 0 || System.currentTimeMillis() - snapshotTime > SNAPSHOT_INTERVAL_MILLIS) {
log.info("Taking snapshot {}", index);
pendingSnapshot = server.getSnapshotStore()
.newTemporarySnapshot(stateMachineId, index, WallClockTimestamp.from(timestamp));
.newTemporarySnapshot(serviceId, index, WallClockTimestamp.from(timestamp));
try (SnapshotWriter writer = pendingSnapshot.openWriter()) {
writer.writeInt(sessions.getSessions().size());
for (RaftSessionContext session : sessions.getSessions()) {
Expand Down Expand Up @@ -290,7 +285,7 @@ private synchronized void maybeCompleteSnapshot(long index) {
* Installs a snapshot if one exists.
*/
private void maybeInstallSnapshot(long index) {
Snapshot snapshot = server.getSnapshotStore().getSnapshotById(stateMachineId);
Snapshot snapshot = server.getSnapshotStore().getSnapshotById(serviceId);
if (snapshot != null && snapshot.index() > snapshotIndex && snapshot.index() <= index) {
log.info("Installing snapshot {}", snapshot.index());
try (SnapshotReader reader = snapshot.openReader()) {
Expand Down Expand Up @@ -646,7 +641,7 @@ public String toString() {
.add("server", server.getName())
.add("type", serviceType)
.add("name", serviceName)
.add("id", stateMachineId)
.add("id", serviceId)
.toString();
}
}
Expand Up @@ -36,7 +36,7 @@
import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
import io.atomix.protocols.raft.storage.log.entry.RaftLogEntry;
import io.atomix.protocols.raft.storage.snapshot.Snapshot;
import io.atomix.protocols.raft.storage.snapshot.StateMachineId;
import io.atomix.protocols.raft.ServiceId;
import io.atomix.storage.journal.Indexed;
import io.atomix.utils.concurrent.ComposableFuture;
import io.atomix.utils.concurrent.Futures;
Expand Down Expand Up @@ -347,9 +347,9 @@ private CompletableFuture<Long> applyOpenSession(Indexed<OpenSessionEntry> entry
return Futures.exceptionalFuture(new RaftException.UnknownService("Unknown service type " + entry.entry().serviceType()));
}

StateMachineId stateMachineId = StateMachineId.from(entry.index());
ServiceId serviceId = ServiceId.from(entry.index());
stateMachineExecutor = new RaftServerServiceContext(
stateMachineId,
serviceId,
entry.entry().serviceName(),
ServiceType.from(entry.entry().serviceType()),
stateMachineSupplier.get(),
Expand Down Expand Up @@ -498,7 +498,7 @@ private void compactLog() {
// Iterate through state machines and compute the lowest stored snapshot for all state machines.
long snapshotIndex = state.getLogWriter().getLastIndex();
for (RaftServerServiceContext stateMachineExecutor : stateMachines.values()) {
Snapshot snapshot = state.getSnapshotStore().getSnapshotById(stateMachineExecutor.stateMachineId());
Snapshot snapshot = state.getSnapshotStore().getSnapshotById(stateMachineExecutor.serviceId());
if (snapshot == null) {
return;
} else {
Expand Down
Expand Up @@ -35,7 +35,7 @@
import io.atomix.protocols.raft.storage.log.entry.RaftLogEntry;
import io.atomix.protocols.raft.storage.snapshot.Snapshot;
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.protocols.raft.storage.snapshot.StateMachineId;
import io.atomix.protocols.raft.ServiceId;
import io.atomix.storage.journal.Indexed;
import io.atomix.time.WallClockTimestamp;

Expand Down Expand Up @@ -406,7 +406,7 @@ public CompletableFuture<InstallResponse> onInstall(InstallRequest request) {
}

pendingSnapshot = context.getSnapshotStore().newSnapshot(
StateMachineId.from(request.snapshotId()),
ServiceId.from(request.snapshotId()),
request.snapshotIndex(),
WallClockTimestamp.from(request.snapshotTimestamp()));
nextSnapshotOffset = 0;
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package io.atomix.protocols.raft.storage.snapshot;

import io.atomix.protocols.raft.ServiceId;
import io.atomix.storage.buffer.Buffer;
import io.atomix.storage.buffer.FileBuffer;
import io.atomix.time.WallClockTimestamp;
Expand All @@ -41,7 +42,7 @@ final class FileSnapshot extends Snapshot {
}

@Override
public StateMachineId snapshotId() {
public ServiceId snapshotId() {
return file.snapshotId();
}

Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package io.atomix.protocols.raft.storage.snapshot;

import io.atomix.protocols.raft.ServiceId;
import io.atomix.storage.StorageLevel;
import io.atomix.storage.buffer.HeapBuffer;
import io.atomix.time.WallClockTimestamp;
Expand All @@ -40,8 +41,8 @@ final class MemorySnapshot extends Snapshot {
}

@Override
public StateMachineId snapshotId() {
return StateMachineId.from(descriptor.snapshotId());
public ServiceId snapshotId() {
return ServiceId.from(descriptor.snapshotId());
}

@Override
Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package io.atomix.protocols.raft.storage.snapshot;

import io.atomix.serializer.Serializer;
import io.atomix.protocols.raft.ServiceId;
import io.atomix.time.WallClockTimestamp;

import static com.google.common.base.Preconditions.checkNotNull;
Expand Down Expand Up @@ -65,7 +65,7 @@ protected Snapshot(SnapshotStore store) {
*
* @return The snapshot identifier.
*/
public abstract StateMachineId snapshotId();
public abstract ServiceId snapshotId();

/**
* Returns the snapshot index.
Expand Down
Expand Up @@ -16,6 +16,7 @@
package io.atomix.protocols.raft.storage.snapshot;

import com.google.common.annotations.VisibleForTesting;
import io.atomix.protocols.raft.ServiceId;

import java.io.File;
import java.text.ParseException;
Expand Down Expand Up @@ -142,8 +143,8 @@ public File file() {
*
* @return The snapshot identifier.
*/
public StateMachineId snapshotId() {
return StateMachineId.from(parseId(file.getName()));
public ServiceId snapshotId() {
return ServiceId.from(parseId(file.getName()));
}

/**
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package io.atomix.protocols.raft.storage.snapshot;

import io.atomix.protocols.raft.ServiceId;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
import io.atomix.storage.buffer.FileBuffer;
Expand Down Expand Up @@ -50,7 +51,7 @@
* Snapshot snapshot = snapshots.snapshot(1);
* }
* </pre>
* To create a new {@link Snapshot}, use the {@link #newSnapshot(StateMachineId, long, WallClockTimestamp)} method. Each snapshot must
* To create a new {@link Snapshot}, use the {@link #newSnapshot(ServiceId, long, WallClockTimestamp)} method. Each snapshot must
* be created with a unique {@code index} which represents the index of the server state machine at
* the point at which the snapshot was taken. Snapshot indices are used to sort snapshots loaded from
* disk and apply them at the correct point in the state machine.
Expand All @@ -72,7 +73,7 @@ public class SnapshotStore implements AutoCloseable {
private final Logger log = LoggerFactory.getLogger(getClass());
final RaftStorage storage;
private final Map<Long, Snapshot> indexSnapshots = new ConcurrentHashMap<>();
private final Map<StateMachineId, Snapshot> stateMachineSnapshots = new ConcurrentHashMap<>();
private final Map<ServiceId, Snapshot> stateMachineSnapshots = new ConcurrentHashMap<>();

public SnapshotStore(RaftStorage storage) {
this.storage = checkNotNull(storage, "storage cannot be null");
Expand Down Expand Up @@ -107,7 +108,7 @@ private void open() {
* @param id The state machine identifier for which to return the snapshot.
* @return The latest snapshot for the given state machine.
*/
public Snapshot getSnapshotById(StateMachineId id) {
public Snapshot getSnapshotById(ServiceId id) {
return stateMachineSnapshots.get(id);
}

Expand Down Expand Up @@ -162,14 +163,14 @@ private Collection<Snapshot> loadSnapshots() {
/**
* Creates a temporary in-memory snapshot.
*
* @param stateMachineId The snapshot identifier.
* @param serviceId The snapshot identifier.
* @param index The snapshot index.
* @param timestamp The snapshot timestamp.
* @return The snapshot.
*/
public Snapshot newTemporarySnapshot(StateMachineId stateMachineId, long index, WallClockTimestamp timestamp) {
public Snapshot newTemporarySnapshot(ServiceId serviceId, long index, WallClockTimestamp timestamp) {
SnapshotDescriptor descriptor = SnapshotDescriptor.newBuilder()
.withId(stateMachineId.id())
.withId(serviceId.id())
.withIndex(index)
.withTimestamp(timestamp.unixTimestamp())
.build();
Expand All @@ -179,14 +180,14 @@ public Snapshot newTemporarySnapshot(StateMachineId stateMachineId, long index,
/**
* Creates a new snapshot.
*
* @param stateMachineId The snapshot identifier.
* @param serviceId The snapshot identifier.
* @param index The snapshot index.
* @param timestamp The snapshot timestamp.
* @return The snapshot.
*/
public Snapshot newSnapshot(StateMachineId stateMachineId, long index, WallClockTimestamp timestamp) {
public Snapshot newSnapshot(ServiceId serviceId, long index, WallClockTimestamp timestamp) {
SnapshotDescriptor descriptor = SnapshotDescriptor.newBuilder()
.withId(stateMachineId.id())
.withId(serviceId.id())
.withIndex(index)
.withTimestamp(timestamp.unixTimestamp())
.build();
Expand Down

0 comments on commit 740be32

Please sign in to comment.