Skip to content

Commit

Permalink
Add RaftEntry to wrap Raft-specific type/term entry metadata.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Feb 18, 2015
1 parent 6880233 commit 6fd7943
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 53 deletions.
11 changes: 6 additions & 5 deletions raft/src/main/java/net/kuujo/copycat/raft/ActiveState.java
Expand Up @@ -15,6 +15,7 @@
*/
package net.kuujo.copycat.raft;

import net.kuujo.copycat.raft.log.RaftEntry;
import net.kuujo.copycat.raft.protocol.*;

import java.io.IOException;
Expand Down Expand Up @@ -226,21 +227,21 @@ private AppendResponse doAppendEntries(AppendRequest request) {
}

// Iterate through request entries and append them to the log.
for (ByteBuffer entry : request.entries()) {
for (RaftEntry entry : request.entries()) {
index++;
// Replicated snapshot entries are *always* immediately logged and applied to the status machine
// since snapshots are only taken of committed status machine status. This will cause all previous
// entries to be removed from the log.
if (context.log().containsIndex(index)) {
// Compare the term of the received entry with the matching entry in the log.
ByteBuffer match = context.log().getEntry(index);
if (entry.getLong() != match.getLong()) {
RaftEntry match = new RaftEntry(context.log().getEntry(index));
if (entry.term() != match.term()) {
// We found an invalid entry in the log. Remove the invalid entry and append the new entry.
// If appending to the log fails, apply commits and reply false to the append request.
LOGGER.warn("{} - Appended entry term does not match local log, removing incorrect entries", context.getLocalMember().id());
try {
context.log().removeAfter(index - 1);
context.log().appendEntry(entry);
context.log().appendEntry(entry.buffer());
} catch (IOException e) {
doApplyCommits(request.commitIndex());
return AppendResponse.builder()
Expand All @@ -255,7 +256,7 @@ private AppendResponse doAppendEntries(AppendRequest request) {
} else {
// If appending to the log fails, apply commits and reply false to the append request.
try {
context.log().appendEntry(entry);
context.log().appendEntry(entry.buffer());
} catch (IOException e) {
doApplyCommits(request.commitIndex());
return AppendResponse.builder()
Expand Down
25 changes: 13 additions & 12 deletions raft/src/main/java/net/kuujo/copycat/raft/LeaderState.java
Expand Up @@ -15,6 +15,7 @@
package net.kuujo.copycat.raft;

import net.kuujo.copycat.CopycatException;
import net.kuujo.copycat.raft.log.RaftEntry;
import net.kuujo.copycat.raft.protocol.*;

import java.io.IOException;
Expand Down Expand Up @@ -606,7 +607,7 @@ private void commitEntries() {
* Remote replica.
*/
private class Replica {
private final List<ByteBuffer> EMPTY_LIST = new ArrayList<>(0);
private final List<RaftEntry> EMPTY_LIST = new ArrayList<>(0);
private final int id;
private final RaftMember member;
private Long nextIndex;
Expand Down Expand Up @@ -647,17 +648,17 @@ private Long getPrevIndex() {
/**
* Gets the previous entry.
*/
private ByteBuffer getPrevEntry(Long prevIndex) {
private RaftEntry getPrevEntry(Long prevIndex) {
if (prevIndex != null && context.log().containsIndex(prevIndex)) {
return context.log().getEntry(prevIndex);
return new RaftEntry(context.log().getEntry(prevIndex));
}
return null;
}

/**
* Gets a list of entries to send.
*/
private List<ByteBuffer> getEntries(Long prevIndex) {
private List<RaftEntry> getEntries(Long prevIndex) {
long index;
if (context.log().isEmpty()) {
return EMPTY_LIST;
Expand All @@ -667,11 +668,11 @@ private List<ByteBuffer> getEntries(Long prevIndex) {
index = context.log().firstIndex();
}

List<ByteBuffer> entries = new ArrayList<>(1024);
List<RaftEntry> entries = new ArrayList<>(1024);
int size = 0;
while (size < MAX_BATCH_SIZE && index <= context.log().lastIndex()) {
ByteBuffer entry = context.log().getEntry(index);
size += entry.limit();
RaftEntry entry = new RaftEntry(context.log().getEntry(index));
size += entry.size();
entries.add(entry);
index++;
}
Expand All @@ -683,7 +684,7 @@ private List<ByteBuffer> getEntries(Long prevIndex) {
*/
private void emptyCommit() {
Long prevIndex = getPrevIndex();
ByteBuffer prevEntry = getPrevEntry(prevIndex);
RaftEntry prevEntry = getPrevEntry(prevIndex);
commit(prevIndex, prevEntry, EMPTY_LIST);
}

Expand All @@ -692,21 +693,21 @@ private void emptyCommit() {
*/
private void entriesCommit() {
Long prevIndex = getPrevIndex();
ByteBuffer prevEntry = getPrevEntry(prevIndex);
List<ByteBuffer> entries = getEntries(prevIndex);
RaftEntry prevEntry = getPrevEntry(prevIndex);
List<RaftEntry> entries = getEntries(prevIndex);
commit(prevIndex, prevEntry, entries);
}

/**
* Sends a commit message.
*/
private void commit(Long prevIndex, ByteBuffer prevEntry, List<ByteBuffer> entries) {
private void commit(Long prevIndex, RaftEntry prevEntry, List<RaftEntry> entries) {
AppendRequest request = AppendRequest.builder()
.withId(member.id())
.withTerm(context.getTerm())
.withLeader(context.getLocalMember().id())
.withLogIndex(prevIndex)
.withLogTerm(prevEntry != null ? prevEntry.getLong() : null)
.withLogTerm(prevEntry != null ? prevEntry.term() : null)
.withEntries(entries)
.withFirstIndex(prevIndex == null || context.log().firstIndex() == prevIndex + 1)
.withCommitIndex(context.getCommitIndex())
Expand Down
18 changes: 7 additions & 11 deletions raft/src/main/java/net/kuujo/copycat/raft/PassiveState.java
Expand Up @@ -15,10 +15,10 @@
*/
package net.kuujo.copycat.raft;

import net.kuujo.copycat.raft.log.RaftEntry;
import net.kuujo.copycat.raft.protocol.*;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -104,15 +104,15 @@ private CompletableFuture<Void> recursiveSync(RaftMember member) {
*/
private void recursiveSync(RaftMember member, boolean requireEntries, CompletableFuture<Void> future) {
// Get a list of entries up to 1MB in size.
List<ByteBuffer> entries = new ArrayList<>(1024);
List<RaftEntry> entries = new ArrayList<>(1024);
Long firstIndex = null;
if (!context.log().isEmpty() && context.getCommitIndex() != null) {
firstIndex = Math.max(member.index() != null ? member.index() + 1 : context.log().firstIndex(), context.log().lastIndex());
long index = firstIndex;
int size = 0;
while (size < MAX_BATCH_SIZE && index <= context.getCommitIndex()) {
ByteBuffer entry = context.log().getEntry(index);
size += entry.limit();
RaftEntry entry = new RaftEntry(context.log().getEntry(index));
size += entry.size();
entries.add(entry);
index++;
}
Expand Down Expand Up @@ -201,17 +201,13 @@ public CompletableFuture<SyncResponse> sync(SyncRequest request) {
for (int i = 0; i < request.entries().size(); i++) {
long index = request.logIndex() != null ? request.logIndex() + i + 1 : i + 1;
if (!context.log().containsIndex(index)) {
ByteBuffer entry = request.entries().get(i);
RaftEntry entry = request.entries().get(i);
try {
context.log().appendEntry(entry);
context.log().appendEntry(entry.buffer());
context.setCommitIndex(index);

// Extract a view of the entry after the entry term.
long term = entry.getLong();
ByteBuffer userEntry = entry.slice();

try {
context.commitHandler().commit(term, index, userEntry);
context.commitHandler().commit(entry.term(), index, entry.entry());
} catch (Exception e) {
}

Expand Down
153 changes: 153 additions & 0 deletions raft/src/main/java/net/kuujo/copycat/raft/log/RaftEntry.java
@@ -0,0 +1,153 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.raft.log;

import net.kuujo.copycat.util.internal.Assert;

import java.nio.ByteBuffer;

/**
* Raft log entry.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class RaftEntry {

/**
* Raft entry type.
*/
public static enum Type {
CONFIGURATION((byte) 0),
COMMAND((byte) 1),
SNAPSHOT((byte) 2);

private final byte id;

private Type(byte id) {
this.id = id;
}

/**
* Returns the entry type ID.
*
* @return The entry type ID.
*/
public byte id() {
return id;
}

/**
* Looks up the type for the given type ID.
*
* @param type The type ID for the type to look up.
* @return The related type.
*/
public static Type lookup(byte type) {
switch (type) {
case 0:
return CONFIGURATION;
case 1:
return COMMAND;
default:
throw new IllegalStateException();
}
}
}

private ByteBuffer buffer;

public RaftEntry() {
}

public RaftEntry(ByteBuffer buffer) {
this.buffer = Assert.notNull(buffer, "buffer");
}

public RaftEntry(Type type, long term, ByteBuffer entry) {
buffer = ByteBuffer.allocate(9 + entry.remaining());
buffer.put(type.id);
buffer.putLong(term);
buffer.put(entry);
buffer.flip();
}

/**
* Returns the entry type.
*
* @return The entry type.
*/
public Type type() {
buffer.rewind();
return Type.lookup(buffer.get());
}

/**
* Returns the entry term.
*
* @return The entry term.
*/
public long term() {
buffer.position(1);
return buffer.getLong();
}

/**
* Returns the entry buffer.
*
* @return The entry buffer.
*/
public ByteBuffer entry() {
buffer.position(9);
return buffer.slice();
}

/**
* Returns the complete entry buffer size.
*
* @return The complete entry buffer size.
*/
public int size() {
return buffer.limit();
}

/**
* Returns the entry buffer.
*
* @return The entry buffer.
*/
public ByteBuffer buffer() {
buffer.rewind();
return buffer;
}

@Override
public boolean equals(Object object) {
return object instanceof RaftEntry && ((RaftEntry) object).buffer.equals(buffer);
}

@Override
public int hashCode() {
int hashCode = 17;
hashCode = 37 * hashCode + buffer.hashCode();
return hashCode;
}

@Override
public String toString() {
return String.format("RaftEntry[type=%s, term=%d, entry=%s]", type(), term(), entry());
}

}
Expand Up @@ -14,9 +14,9 @@
*/
package net.kuujo.copycat.raft.protocol;

import net.kuujo.copycat.raft.log.RaftEntry;
import net.kuujo.copycat.util.internal.Assert;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -51,7 +51,7 @@ public static Builder builder(AppendRequest request) {
private String leader;
private Long logIndex;
private Long logTerm;
private List<ByteBuffer> entries;
private List<RaftEntry> entries;
private boolean firstIndex;
private Long commitIndex;

Expand Down Expand Up @@ -96,7 +96,7 @@ public Long logTerm() {
*
* @return A list of log entries.
*/
public List<ByteBuffer> entries() {
public List<RaftEntry> entries() {
return entries;
}

Expand Down Expand Up @@ -206,7 +206,7 @@ public Builder withLogTerm(Long term) {
* @param entries The request entries.
* @return The append request builder.
*/
public Builder withEntries(ByteBuffer... entries) {
public Builder withEntries(RaftEntry... entries) {
return withEntries(Arrays.asList(entries));
}

Expand All @@ -216,7 +216,7 @@ public Builder withEntries(ByteBuffer... entries) {
* @param entries The request entries.
* @return The append request builder.
*/
public Builder withEntries(List<ByteBuffer> entries) {
public Builder withEntries(List<RaftEntry> entries) {
request.entries = Assert.notNull(entries, "entries");
return this;
}
Expand Down

0 comments on commit 6fd7943

Please sign in to comment.