Skip to content

Commit

Permalink
Add persistence modes for set values.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 2, 2015
1 parent 8e6fc17 commit 4f22c60
Showing 1 changed file with 117 additions and 21 deletions.
138 changes: 117 additions & 21 deletions collections/src/main/java/net/kuujo/copycat/collections/AsyncSet.java
Expand Up @@ -16,15 +16,19 @@
package net.kuujo.copycat.collections;

import net.kuujo.copycat.AbstractResource;
import net.kuujo.copycat.Mode;
import net.kuujo.copycat.Stateful;
import net.kuujo.copycat.cluster.Session;
import net.kuujo.copycat.io.Buffer;
import net.kuujo.copycat.io.serializer.Serializer;
import net.kuujo.copycat.io.serializer.Writable;
import net.kuujo.copycat.raft.*;
import net.kuujo.copycat.raft.log.Compaction;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

Expand All @@ -49,7 +53,7 @@ public AsyncSet(Protocol protocol) {
*/
public CompletableFuture<Boolean> add(T value) {
return submit(Add.builder()
.withValue(value)
.withValue(value.hashCode())
.build());
}

Expand All @@ -63,7 +67,7 @@ public CompletableFuture<Boolean> add(T value) {
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> add(T value, long ttl) {
return submit(Add.builder()
.withValue(value)
.withValue(value.hashCode())
.withTtl(ttl)
.build());
}
Expand All @@ -79,11 +83,46 @@ public CompletableFuture<Boolean> add(T value, long ttl) {
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> add(T value, long ttl, TimeUnit unit) {
return submit(Add.builder()
.withValue(value)
.withValue(value.hashCode())
.withTtl(ttl, unit)
.build());
}

/**
* Adds a value to the set with a TTL.
*
* @param value The value to add.
* @param ttl The time to live in milliseconds.
* @param mode The persistence mode.
* @return A completable future to be completed with the result once complete.
*/
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> add(T value, long ttl, Mode mode) {
return submit(Add.builder()
.withValue(value.hashCode())
.withTtl(ttl)
.withMode(mode)
.build());
}

/**
* Adds a value to the set with a TTL.
*
* @param value The value to add.
* @param ttl The time to live.
* @param unit The time to live unit.
* @param mode The persistence mode.
* @return A completable future to be completed with the result once complete.
*/
@SuppressWarnings("unchecked")
public CompletableFuture<Boolean> add(T value, long ttl, TimeUnit unit, Mode mode) {
return submit(Add.builder()
.withValue(value.hashCode())
.withTtl(ttl, unit)
.withMode(mode)
.build());
}

/**
* Removes a value from the set.
*
Expand All @@ -92,7 +131,7 @@ public CompletableFuture<Boolean> add(T value, long ttl, TimeUnit unit) {
*/
public CompletableFuture<Boolean> remove(T value) {
return submit(Remove.builder()
.withValue(value)
.withValue(value.hashCode())
.build());
}

Expand All @@ -104,7 +143,7 @@ public CompletableFuture<Boolean> remove(T value) {
*/
public CompletableFuture<Boolean> contains(Object value) {
return submit(Contains.builder()
.withValue(value)
.withValue(value.hashCode())
.build());
}

Expand Down Expand Up @@ -169,12 +208,12 @@ protected Builder(U query) {
* Abstract value command.
*/
public static abstract class ValueCommand<V> extends SetCommand<V> {
protected Object value;
protected int value;

/**
* Returns the value.
*/
public Object value() {
public int value() {
return value;
}

Expand Down Expand Up @@ -203,7 +242,7 @@ protected Builder(U command) {
* @return The command builder.
*/
@SuppressWarnings("unchecked")
public T withValue(Object value) {
public T withValue(int value) {
command.value = value;
return (T) this;
}
Expand All @@ -214,12 +253,12 @@ public T withValue(Object value) {
* Abstract value query.
*/
public static abstract class ValueQuery<V> extends SetQuery<V> {
protected Object value;
protected int value;

/**
* Returns the value.
*/
public Object value() {
public int value() {
return value;
}

Expand Down Expand Up @@ -248,7 +287,7 @@ protected Builder(U query) {
* @return The query builder.
*/
@SuppressWarnings("unchecked")
public T withValue(Object value) {
public T withValue(int value) {
query.value = value;
return (T) this;
}
Expand Down Expand Up @@ -287,6 +326,7 @@ public Builder() {
*/
public static abstract class TtlCommand<V> extends ValueCommand<V> {
protected long ttl;
protected Mode mode = Mode.PERSISTENT;

/**
* Returns the time to live in milliseconds.
Expand All @@ -297,15 +337,25 @@ public long ttl() {
return ttl;
}

/**
* Returns the persistence mode.
*
* @return The persistence mode.
*/
public Mode mode() {
return mode;
}

@Override
public void writeObject(Buffer buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
buffer.writeLong(ttl);
buffer.writeByte(mode.ordinal()).writeLong(ttl);
}

@Override
public void readObject(Buffer buffer, Serializer serializer) {
super.readObject(buffer, serializer);
mode = Mode.values()[buffer.readByte()];
ttl = buffer.readLong();
}

Expand Down Expand Up @@ -339,6 +389,17 @@ public Builder withTtl(long ttl, TimeUnit unit) {
command.ttl = unit.toMillis(ttl);
return this;
}

/**
* Sets the persistence mode.
*
* @param mode The persistence mode.
* @return The command builder.
*/
public Builder withMode(Mode mode) {
command.mode = mode;
return this;
}
}
}

Expand Down Expand Up @@ -497,6 +558,7 @@ public Builder() {
*/
public static class StateMachine extends net.kuujo.copycat.raft.StateMachine {
private final Map<Integer, Commit<? extends TtlCommand>> map = new HashMap<>();
private final Set<Long> sessions = new HashSet<>();
private long time;

/**
Expand All @@ -506,13 +568,47 @@ private void updateTime(Commit<?> commit) {
time = Math.max(time, commit.timestamp());
}

@Override
public void register(Session session) {
sessions.add(session.id());
}

@Override
public void expire(Session session) {
sessions.remove(session.id());
}

@Override
public void close(Session session) {
sessions.remove(session.id());
}

/**
* Returns a boolean value indicating whether the given commit is active.
*/
private boolean isActive(Commit<? extends TtlCommand> commit) {
if (commit == null) {
return false;
} else if (commit.operation().mode() == Mode.EPHEMERAL && !sessions.contains(commit.session().id())) {
return false;
} else if (commit.operation().ttl() != 0 && commit.operation().ttl() < time - commit.timestamp()) {
return false;
}
return true;
}

/**
* Handles a contains commit.
*/
@Apply(Contains.class)
protected boolean containsKey(Commit<Contains> commit) {
protected boolean contains(Commit<Contains> commit) {
updateTime(commit);
return map.containsKey(commit.operation().value().hashCode());
Commit<? extends TtlCommand> command = map.get(commit.operation().value());
if (!isActive(command)) {
map.remove(commit.operation().value());
return false;
}
return true;
}

/**
Expand All @@ -521,9 +617,9 @@ protected boolean containsKey(Commit<Contains> commit) {
@Apply(Add.class)
protected boolean put(Commit<Add> commit) {
updateTime(commit);
int hash = commit.operation().value().hashCode();
if (!map.containsKey(hash)) {
map.put(hash, commit);
Commit<? extends TtlCommand> command = map.get(commit.operation().value());
if (!isActive(command)) {
map.put(commit.operation().value(), commit);
return true;
}
return false;
Expand All @@ -534,8 +630,8 @@ protected boolean put(Commit<Add> commit) {
*/
@Filter({Add.class})
protected boolean filterPut(Commit<Add> commit) {
Commit<? extends TtlCommand> command = map.get(commit.operation().value().hashCode());
return command != null && command.index() == commit.index() && (command.operation().ttl() == 0 || command.operation().ttl() > time - command.timestamp());
Commit<? extends TtlCommand> command = map.get(commit.operation().value());
return command != null && command.index() == commit.index() && isActive(command);
}

/**
Expand All @@ -544,8 +640,8 @@ protected boolean filterPut(Commit<Add> commit) {
@Apply(Remove.class)
protected boolean remove(Commit<Remove> commit) {
updateTime(commit);
Commit<? extends TtlCommand> command = map.remove(commit.operation().value.hashCode());
return command != null && (command.operation().ttl() == 0 || command.operation().ttl() > time - command.timestamp());
Commit<? extends TtlCommand> command = map.remove(commit.operation().value());
return isActive(command);
}

/**
Expand Down

0 comments on commit 4f22c60

Please sign in to comment.