Skip to content

Commit

Permalink
Refactor protocol API to allow fine grained per-request control over …
Browse files Browse the repository at this point in the history
…persistence and consistency levels.
  • Loading branch information
kuujo committed Apr 26, 2015
1 parent 2acf306 commit 2f86bf8
Show file tree
Hide file tree
Showing 45 changed files with 1,002 additions and 1,326 deletions.
13 changes: 9 additions & 4 deletions core/src/main/java/net/kuujo/copycat/protocol/Consistency.java
Expand Up @@ -25,16 +25,21 @@ public enum Consistency {
/**
* Weak consistency level.
*/
WEAK,
EVENTUAL,

/**
* Default consistency level.
* Lease consistency level.
*/
DEFAULT,
LEASE,

/**
* Strong consistency level.
*/
STRONG
STRICT,

/**
* Default consistency level.
*/
DEFAULT

}
50 changes: 50 additions & 0 deletions core/src/main/java/net/kuujo/copycat/protocol/Persistence.java
@@ -0,0 +1,50 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.protocol;

/**
* Protocol command persistence.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public enum Persistence {

/**
* Indicates a command that will not be persisted.
*/
NONE,

/**
* Indicates a command that will persist until a restart.
*/
EPHEMERAL,

/**
* Indicates a command that will persist until it has been received by all nodes.
*/
DURABLE,

/**
* Indicates a command that will persist for eternity.
*/
PERSISTENT,

/**
* Indicates a command that will be persisted according to the default persistence level.
*/
DEFAULT

}
105 changes: 45 additions & 60 deletions core/src/main/java/net/kuujo/copycat/protocol/Protocol.java
Expand Up @@ -95,105 +95,90 @@ public Protocol removeListener(EventListener<? extends Event> listener) {
}

/**
* Submits a read to the protocol.
* Submits a keyless command to the protocol.
*
* @param entry The read entry.
* @return A completable future to be completed with the read result.
* @param entry The command entry.
* @return A completable future to be completed with the command result.
*/
public CompletableFuture<Buffer> read(Buffer entry) {
return read(null, entry, Consistency.DEFAULT);
public CompletableFuture<Buffer> submit(Buffer entry) {
return submit(null, entry, Persistence.DEFAULT, Consistency.DEFAULT);
}

/**
* Submits a read to the protocol.
* Submits a keyless command to the protocol with the default consistency level.
*
* @param key The read key.
* @param entry The read entry.
* @return A completable future to be completed with the read result.
* @param entry The command entry.
* @param persistence The command persistence level.
* @return A completable future to be completed with the command result.
*/
public CompletableFuture<Buffer> read(Buffer key, Buffer entry) {
return read(key, entry, Consistency.DEFAULT);
public CompletableFuture<Buffer> submit(Buffer entry, Persistence persistence) {
return submit(null, entry, persistence, Consistency.DEFAULT);
}

/**
* Submits a read to the protocol.
* Submits a keyless command to the protocol with the default persistence level.
*
* @param key The read key.
* @param entry The read entry.
* @param consistency The read consistency.
* @return A completable future to be completed with the read result.
* @param entry The command entry.
* @param consistency The command consistency requirement.
* @return A completable future to be completed with the command result.
*/
public abstract CompletableFuture<Buffer> read(Buffer key, Buffer entry, Consistency consistency);

/**
* Submits a write to the protocol.
*
* @param entry The write entry.
* @return A completable future to be completed with the write result.
*/
public CompletableFuture<Buffer> write(Buffer entry) {
return write(null, entry, Consistency.DEFAULT);
public CompletableFuture<Buffer> submit(Buffer entry, Consistency consistency) {
return submit(null, entry, Persistence.DEFAULT, consistency);
}

/**
* Submits a write to the protocol.
* Submits a command to the protocol with the default persistence and consistency levels.
*
* @param key The write key.
* @param entry The write entry.
* @return A completable future to be completed with the write result.
* @param key The command key.
* @param entry The command entry.
* @return A completable future to be completed with the command result.
*/
public CompletableFuture<Buffer> write(Buffer key, Buffer entry) {
return write(key, entry, Consistency.DEFAULT);
public CompletableFuture<Buffer> submit(Buffer key, Buffer entry) {
return submit(key, entry, Persistence.DEFAULT, Consistency.DEFAULT);
}

/**
* Submits a write to the protocol.
*
* @param key The write key.
* @param entry The write entry.
* @param consistency The write consistency.
* @return A completable future to be completed with the write result.
*/
public abstract CompletableFuture<Buffer> write(Buffer key, Buffer entry, Consistency consistency);

/**
* Submits a delete to the protocol.
* Submits a command to the protocol with the default consistency level.
*
* @param entry The delete entry.
* @return A completable future to be completed with the delete result.
* @param key The command key.
* @param entry The command entry.
* @param persistence The command persistence level.
* @return A completable future to be completed with the command result.
*/
public CompletableFuture<Buffer> delete(Buffer entry) {
return delete(null, entry, Consistency.DEFAULT);
public CompletableFuture<Buffer> submit(Buffer key, Buffer entry, Persistence persistence) {
return submit(key, entry, persistence, Consistency.DEFAULT);
}

/**
* Submits a delete to the protocol.
* Submits a command to the protocol with the default persistence level.
*
* @param key The delete key.
* @param entry The delete entry.
* @return A completable future to be completed with the delete result.
* @param key The command key.
* @param entry The command entry.
* @param consistency The command consistency requirement.
* @return A completable future to be completed with the command result.
*/
public CompletableFuture<Buffer> delete(Buffer key, Buffer entry) {
return delete(key, entry, Consistency.DEFAULT);
public CompletableFuture<Buffer> submit(Buffer key, Buffer entry, Consistency consistency) {
return submit(key, entry, Persistence.DEFAULT, consistency);
}

/**
* Submits a delete to the protocol.
* Submits a command to the protocol.
*
* @param key The delete key.
* @param entry The delete entry.
* @param consistency The delete consistency.
* @return A completable future to be completed with the delete result.
* @param key The command key.
* @param entry The command entry.
* @param persistence The command persistence level.
* @param consistency The command consistency requirement.
* @return A completable future to be completed with the command result.
*/
public abstract CompletableFuture<Buffer> delete(Buffer key, Buffer entry, Consistency consistency);
public abstract CompletableFuture<Buffer> submit(Buffer key, Buffer entry, Persistence persistence, Consistency consistency);

/**
* Registers a protocol commit handler.
*
* @param handler The protocol commit handler.
* @return The protocol.
*/
public abstract Protocol commit(CommitHandler handler);
public abstract Protocol commitHandler(CommitHandler handler);

/**
* Protocol builder.
Expand Down
Expand Up @@ -56,7 +56,7 @@ public Cluster cluster() {
@Override
@SuppressWarnings("unchecked")
public CompletableFuture<U> open() {
protocol.commit(this::commit);
protocol.commitHandler(this::commit);
return super.open().thenCompose(v -> protocol.open()).thenApply(v -> (U) this);
}

Expand Down
Expand Up @@ -211,6 +211,8 @@ private void doApplyCommits(long commitIndex) {
}
}
}
} else {
context.setCommitIndex(commitIndex);
}
}

Expand All @@ -221,8 +223,7 @@ protected void applyEntry(long index) {
if ((context.getLastApplied() == 0 && index == context.log().firstIndex()) || (context.getLastApplied() != 0 && context.getLastApplied() == index - 1)) {
RaftEntry entry = context.log().getEntry(index);
if (entry != null) {
RaftEntry.Type type = entry.readType();
if (type == RaftEntry.Type.COMMAND || type == RaftEntry.Type.TOMBSTONE) {
if (entry.readType() == RaftEntry.Type.COMMAND) {
entry.readKey(KEY.clear());
entry.readEntry(ENTRY.clear());
try {
Expand Down

0 comments on commit 2f86bf8

Please sign in to comment.