Skip to content

Commit

Permalink
Refactor storage module to support per-entry persistence levels.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Aug 17, 2015
1 parent 2681c7d commit 338b2ba
Show file tree
Hide file tree
Showing 32 changed files with 374 additions and 247 deletions.
Expand Up @@ -15,7 +15,7 @@
*/
package net.kuujo.copycat.atomic;

import net.kuujo.copycat.PersistenceLevel;
import net.kuujo.copycat.PersistenceMode;
import net.kuujo.copycat.Resource;
import net.kuujo.copycat.atomic.state.ReferenceCommands;
import net.kuujo.copycat.atomic.state.ReferenceState;
Expand Down Expand Up @@ -141,7 +141,7 @@ public CompletableFuture<Void> set(T value, Duration ttl) {
* @param persistence The write persistence.
* @return A completable future to be completed once the value has been set.
*/
public CompletableFuture<Void> set(T value, PersistenceLevel persistence) {
public CompletableFuture<Void> set(T value, PersistenceMode persistence) {
return submit(ReferenceCommands.Set.builder()
.withValue(value)
.withPersistence(persistence)
Expand All @@ -156,7 +156,7 @@ public CompletableFuture<Void> set(T value, PersistenceLevel persistence) {
* @param persistence The write persistence.
* @return A completable future to be completed once the value has been set.
*/
public CompletableFuture<Void> set(T value, Duration ttl, PersistenceLevel persistence) {
public CompletableFuture<Void> set(T value, Duration ttl, PersistenceMode persistence) {
return submit(ReferenceCommands.Set.builder()
.withValue(value)
.withTtl(ttl.toMillis())
Expand Down Expand Up @@ -197,7 +197,7 @@ public CompletableFuture<T> getAndSet(T value, Duration ttl) {
* @param persistence The write persistence.
* @return A completable future to be completed with the previous value.
*/
public CompletableFuture<T> getAndSet(T value, PersistenceLevel persistence) {
public CompletableFuture<T> getAndSet(T value, PersistenceMode persistence) {
return submit(ReferenceCommands.GetAndSet.<T>builder()
.withValue(value)
.withPersistence(persistence)
Expand All @@ -212,7 +212,7 @@ public CompletableFuture<T> getAndSet(T value, PersistenceLevel persistence) {
* @param persistence The write persistence.
* @return A completable future to be completed with the previous value.
*/
public CompletableFuture<T> getAndSet(T value, Duration ttl, PersistenceLevel persistence) {
public CompletableFuture<T> getAndSet(T value, Duration ttl, PersistenceMode persistence) {
return submit(ReferenceCommands.GetAndSet.<T>builder()
.withValue(value)
.withTtl(ttl.toMillis())
Expand Down Expand Up @@ -258,7 +258,7 @@ public CompletableFuture<Boolean> compareAndSet(T expect, T update, Duration ttl
* @param persistence The write persistence.
* @return A completable future to be completed with a boolean value indicating whether the value was updated.
*/
public CompletableFuture<Boolean> compareAndSet(T expect, T update, PersistenceLevel persistence) {
public CompletableFuture<Boolean> compareAndSet(T expect, T update, PersistenceMode persistence) {
return submit(ReferenceCommands.CompareAndSet.builder()
.withExpect(expect)
.withUpdate(update)
Expand All @@ -275,7 +275,7 @@ public CompletableFuture<Boolean> compareAndSet(T expect, T update, PersistenceL
* @param persistence The write persistence.
* @return A completable future to be completed with a boolean value indicating whether the value was updated.
*/
public CompletableFuture<Boolean> compareAndSet(T expect, T update, Duration ttl, PersistenceLevel persistence) {
public CompletableFuture<Boolean> compareAndSet(T expect, T update, Duration ttl, PersistenceMode persistence) {
return submit(ReferenceCommands.CompareAndSet.builder()
.withExpect(expect)
.withUpdate(update)
Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package net.kuujo.copycat.atomic.state;

import net.kuujo.copycat.PersistenceLevel;
import net.kuujo.copycat.PersistenceMode;
import net.kuujo.copycat.io.BufferInput;
import net.kuujo.copycat.io.BufferOutput;
import net.kuujo.copycat.io.serializer.CopycatSerializable;
Expand All @@ -41,15 +41,15 @@ private ReferenceCommands() {
* Abstract reference command.
*/
public static abstract class ReferenceCommand<V> implements Command<V>, CopycatSerializable {
protected PersistenceLevel mode = PersistenceLevel.PERSISTENT;
protected PersistenceMode mode = PersistenceMode.PERSISTENT;
protected long ttl;

/**
* Returns the persistence mode.
*
* @return The persistence mode.
*/
public PersistenceLevel mode() {
public PersistenceMode mode() {
return mode;
}

Expand All @@ -70,7 +70,7 @@ public void writeObject(BufferOutput buffer, Serializer serializer) {

@Override
public void readObject(BufferInput buffer, Serializer serializer) {
mode = PersistenceLevel.values()[buffer.readByte()];
mode = PersistenceMode.values()[buffer.readByte()];
ttl = buffer.readLong();
}

Expand All @@ -89,7 +89,7 @@ protected Builder(BuilderPool<T, U> pool) {
* @return The command builder.
*/
@SuppressWarnings("unchecked")
public T withPersistence(PersistenceLevel mode) {
public T withPersistence(PersistenceMode mode) {
if (mode == null)
throw new NullPointerException("mode cannot be null");
command.mode = mode;
Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package net.kuujo.copycat.atomic.state;

import net.kuujo.copycat.PersistenceLevel;
import net.kuujo.copycat.PersistenceMode;
import net.kuujo.copycat.raft.session.Session;
import net.kuujo.copycat.raft.Commit;
import net.kuujo.copycat.raft.StateMachine;
Expand Down Expand Up @@ -79,7 +79,7 @@ public void close(Session session) {
private boolean isActive(Commit<? extends ReferenceCommands.ReferenceCommand> commit, Instant time) {
if (commit == null) {
return false;
} else if (commit.operation().mode() == PersistenceLevel.EPHEMERAL && !sessions.contains(commit.session().id())) {
} else if (commit.operation().mode() == PersistenceMode.EPHEMERAL && !sessions.contains(commit.session().id())) {
return false;
} else if (commit.operation().ttl() != 0 && commit.operation().ttl() < time.toEpochMilli() - commit.time().toEpochMilli()) {
return false;
Expand Down
Expand Up @@ -20,7 +20,6 @@
import net.kuujo.copycat.CopycatServer;
import net.kuujo.copycat.Node;
import net.kuujo.copycat.io.storage.Storage;
import net.kuujo.copycat.io.storage.StorageLevel;
import net.kuujo.copycat.io.transport.LocalServerRegistry;
import net.kuujo.copycat.io.transport.LocalTransport;
import net.kuujo.copycat.raft.Member;
Expand Down Expand Up @@ -125,7 +124,7 @@ public void testMembershipChange() throws Throwable {
.withRegistry(registry)
.build())
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.MEMORY)
.withDirectory("test-logs")
.build())
.build();
Copycat copycat2 = CopycatServer.builder()
Expand All @@ -135,7 +134,7 @@ public void testMembershipChange() throws Throwable {
.withRegistry(registry)
.build())
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.MEMORY)
.withDirectory("test-logs")
.build())
.build();
Copycat copycat3 = CopycatServer.builder()
Expand All @@ -145,7 +144,7 @@ public void testMembershipChange() throws Throwable {
.withRegistry(registry)
.build())
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.MEMORY)
.withDirectory("test-logs")
.build())
.build();

Expand Down Expand Up @@ -185,7 +184,7 @@ public void testMembershipChange() throws Throwable {
.withRegistry(registry)
.build())
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.MEMORY)
.withDirectory("test-logs")
.build())
.build();

Expand Down Expand Up @@ -268,7 +267,7 @@ private List<Copycat> createCopycats(int nodes) throws Throwable {
.withRegistry(registry)
.build())
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.MEMORY)
.withDirectory("test-logs")
.build())
.build();

Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package net.kuujo.copycat.collections;

import net.kuujo.copycat.PersistenceLevel;
import net.kuujo.copycat.PersistenceMode;
import net.kuujo.copycat.Resource;
import net.kuujo.copycat.collections.state.MapCommands;
import net.kuujo.copycat.collections.state.MapState;
Expand Down Expand Up @@ -158,7 +158,7 @@ public CompletableFuture<V> put(K key, V value) {
* @return A completable future to be completed with the result once complete.
*/
@SuppressWarnings("unchecked")
public CompletableFuture<V> put(K key, V value, PersistenceLevel persistence) {
public CompletableFuture<V> put(K key, V value, PersistenceMode persistence) {
return submit(MapCommands.Put.builder()
.withKey(key)
.withValue(value)
Expand Down Expand Up @@ -195,7 +195,7 @@ public CompletableFuture<V> put(K key, V value, Duration ttl) {
* @return A completable future to be completed with the result once complete.
*/
@SuppressWarnings("unchecked")
public CompletableFuture<V> put(K key, V value, Duration ttl, PersistenceLevel persistence) {
public CompletableFuture<V> put(K key, V value, Duration ttl, PersistenceMode persistence) {
return submit(MapCommands.Put.builder()
.withKey(key)
.withValue(value)
Expand Down Expand Up @@ -297,7 +297,7 @@ public CompletableFuture<V> putIfAbsent(K key, V value, Duration ttl) {
* @return A completable future to be completed with the result once complete.
*/
@SuppressWarnings("unchecked")
public CompletableFuture<V> putIfAbsent(K key, V value, Duration ttl, PersistenceLevel persistence) {
public CompletableFuture<V> putIfAbsent(K key, V value, Duration ttl, PersistenceMode persistence) {
return submit(MapCommands.PutIfAbsent.builder()
.withKey(key)
.withValue(value)
Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package net.kuujo.copycat.collections;

import net.kuujo.copycat.PersistenceLevel;
import net.kuujo.copycat.PersistenceMode;
import net.kuujo.copycat.Resource;
import net.kuujo.copycat.collections.state.SetCommands;
import net.kuujo.copycat.collections.state.SetState;
Expand Down Expand Up @@ -58,7 +58,7 @@ public CompletableFuture<Boolean> add(T value) {
* @return A completable future to be completed with the result once complete.
*/
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> add(T value, PersistenceLevel persistence) {
public CompletableFuture<Boolean> add(T value, PersistenceMode persistence) {
return submit(SetCommands.Add.builder()
.withValue(value.hashCode())
.withPersistence(persistence)
Expand Down Expand Up @@ -89,7 +89,7 @@ public CompletableFuture<Boolean> add(T value, Duration ttl) {
* @return A completable future to be completed with the result once complete.
*/
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> add(T value, Duration ttl, PersistenceLevel persistence) {
public CompletableFuture<Boolean> add(T value, Duration ttl, PersistenceMode persistence) {
return submit(SetCommands.Add.builder()
.withValue(value.hashCode())
.withTtl(ttl.toMillis())
Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package net.kuujo.copycat.collections.state;

import net.kuujo.copycat.PersistenceLevel;
import net.kuujo.copycat.PersistenceMode;
import net.kuujo.copycat.io.BufferInput;
import net.kuujo.copycat.io.BufferOutput;
import net.kuujo.copycat.io.serializer.CopycatSerializable;
Expand Down Expand Up @@ -268,15 +268,15 @@ public T withValue(Object value) {
* TTL command.
*/
public static abstract class TtlCommand<V> extends KeyValueCommand<V> {
protected PersistenceLevel mode = PersistenceLevel.PERSISTENT;
protected PersistenceMode mode = PersistenceMode.PERSISTENT;
protected long ttl;

/**
* Returns the persistence mode.
*
* @return The persistence mode.
*/
public PersistenceLevel mode() {
public PersistenceMode mode() {
return mode;
}

Expand All @@ -298,7 +298,7 @@ public void writeObject(BufferOutput buffer, Serializer serializer) {
@Override
public void readObject(BufferInput buffer, Serializer serializer) {
super.readObject(buffer, serializer);
mode = PersistenceLevel.values()[buffer.readByte()];
mode = PersistenceMode.values()[buffer.readByte()];
ttl = buffer.readLong();
}

Expand All @@ -316,7 +316,7 @@ protected Builder(BuilderPool<T, U> pool) {
* @param mode The persistence mode.
* @return The command builder.
*/
public Builder withPersistence(PersistenceLevel mode) {
public Builder withPersistence(PersistenceMode mode) {
command.mode = mode;
return this;
}
Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package net.kuujo.copycat.collections.state;

import net.kuujo.copycat.PersistenceLevel;
import net.kuujo.copycat.PersistenceMode;
import net.kuujo.copycat.raft.session.Session;
import net.kuujo.copycat.raft.Commit;
import net.kuujo.copycat.raft.StateMachine;
Expand Down Expand Up @@ -70,7 +70,7 @@ public void close(Session session) {
private boolean isActive(Commit<? extends MapCommands.TtlCommand> commit, Instant instant) {
if (commit == null) {
return false;
} else if (commit.operation().mode() == PersistenceLevel.EPHEMERAL && !sessions.contains(commit.session().id())) {
} else if (commit.operation().mode() == PersistenceMode.EPHEMERAL && !sessions.contains(commit.session().id())) {
return false;
} else if (commit.operation().ttl() != 0 && commit.operation().ttl() < instant.toEpochMilli() - commit.time().toEpochMilli()) {
return false;
Expand Down
Expand Up @@ -222,7 +222,7 @@ protected Contains create() {
*/
public static abstract class TtlCommand<V> extends ValueCommand<V> {
protected long ttl;
protected PersistenceLevel mode = PersistenceLevel.PERSISTENT;
protected PersistenceMode mode = PersistenceMode.PERSISTENT;

/**
* Returns the time to live in milliseconds.
Expand All @@ -238,7 +238,7 @@ public long ttl() {
*
* @return The persistence mode.
*/
public PersistenceLevel mode() {
public PersistenceMode mode() {
return mode;
}

Expand All @@ -251,7 +251,7 @@ public void writeObject(BufferOutput buffer, Serializer serializer) {
@Override
public void readObject(BufferInput buffer, Serializer serializer) {
super.readObject(buffer, serializer);
mode = PersistenceLevel.values()[buffer.readByte()];
mode = PersistenceMode.values()[buffer.readByte()];
ttl = buffer.readLong();
}

Expand Down Expand Up @@ -292,7 +292,7 @@ public Builder withTtl(long ttl, TimeUnit unit) {
* @param mode The persistence mode.
* @return The command builder.
*/
public Builder withPersistence(PersistenceLevel mode) {
public Builder withPersistence(PersistenceMode mode) {
command.mode = mode;
return this;
}
Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package net.kuujo.copycat.collections.state;

import net.kuujo.copycat.PersistenceLevel;
import net.kuujo.copycat.PersistenceMode;
import net.kuujo.copycat.raft.session.Session;
import net.kuujo.copycat.raft.Commit;
import net.kuujo.copycat.raft.StateMachine;
Expand Down Expand Up @@ -67,7 +67,7 @@ public void close(Session session) {
private boolean isActive(Commit<? extends SetCommands.TtlCommand> commit, Instant instant) {
if (commit == null) {
return false;
} else if (commit.operation().mode() == PersistenceLevel.EPHEMERAL && !sessions.contains(commit.session().id())) {
} else if (commit.operation().mode() == PersistenceMode.EPHEMERAL && !sessions.contains(commit.session().id())) {
return false;
} else if (commit.operation().ttl() != 0 && commit.operation().ttl() < instant.toEpochMilli() - commit.time().toEpochMilli()) {
return false;
Expand Down
Expand Up @@ -20,7 +20,6 @@
import net.kuujo.copycat.CopycatServer;
import net.kuujo.copycat.Node;
import net.kuujo.copycat.io.storage.Storage;
import net.kuujo.copycat.io.storage.StorageLevel;
import net.kuujo.copycat.io.transport.LocalServerRegistry;
import net.kuujo.copycat.io.transport.LocalTransport;
import net.kuujo.copycat.raft.Member;
Expand Down Expand Up @@ -192,7 +191,7 @@ private List<Copycat> createCopycats(int nodes) throws Throwable {
.withRegistry(registry)
.build())
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.MEMORY)
.withDirectory("test-logs")
.build())
.build();

Expand Down
Expand Up @@ -20,7 +20,6 @@
import net.kuujo.copycat.CopycatServer;
import net.kuujo.copycat.Node;
import net.kuujo.copycat.io.storage.Storage;
import net.kuujo.copycat.io.storage.StorageLevel;
import net.kuujo.copycat.io.transport.LocalServerRegistry;
import net.kuujo.copycat.io.transport.LocalTransport;
import net.kuujo.copycat.raft.Member;
Expand Down Expand Up @@ -96,7 +95,7 @@ private List<Copycat> createCopycats(int nodes) throws Throwable {
.withRegistry(registry)
.build())
.withStorage(Storage.builder()
.withStorageLevel(StorageLevel.MEMORY)
.withDirectory("test-logs")
.build())
.build();

Expand Down

0 comments on commit 338b2ba

Please sign in to comment.