Skip to content

Commit

Permalink
Refactor log implementations to support rolling over to arbitrary ind…
Browse files Browse the repository at this point in the history
…exes for snapshot replication.
  • Loading branch information
kuujo committed Jan 22, 2015
1 parent 8bcd211 commit 5126b0a
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 64 deletions.
79 changes: 65 additions & 14 deletions core/src/main/java/net/kuujo/copycat/log/AbstractLogManager.java
Expand Up @@ -16,6 +16,7 @@
package net.kuujo.copycat.log;

import net.kuujo.copycat.internal.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
Expand All @@ -31,7 +32,7 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public abstract class AbstractLogManager extends AbstractLoggable implements LogManager {
private final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(getClass());
private final Logger LOGGER = LoggerFactory.getLogger(getClass());
private Log config;
protected final TreeMap<Long, LogSegment> segments = new TreeMap<>();
protected LogSegment currentSegment;
Expand Down Expand Up @@ -124,15 +125,43 @@ public LogSegment lastSegment() {
@Override
public synchronized void open() throws IOException {
assertIsNotOpen();

// Load existing log segments from disk.
for (LogSegment segment : loadSegments()) {
segment.open();
segments.put(segment.firstIndex(), segment);
}

// If a segment doesn't already exist, create an initial segment starting at index 1.
if (!segments.isEmpty()) {
currentSegment = segments.lastEntry().getValue();
} else {
createInitialSegment();
}

clean();
}

/**
* Cleans the log at startup.
*
* In the event that a failure occurred during compaction, it's possible that the log could contain a significant
* gap in indexes between segments. When the log is opened, check segments to ensure that first and last indexes of
* each segment agree with other segments in the log. If not, remove all segments that appear prior to any index gap.
*/
private void clean() throws IOException {
Long lastIndex = null;
Long compactIndex = null;
for (LogSegment segment : segments.values()) {
if (lastIndex == null || segment.firstIndex() > lastIndex + 1) {
compactIndex = segment.firstIndex();
}
lastIndex = segment.lastIndex();
}

if (compactIndex != null) {
compact(compactIndex);
}
}

@Override
Expand All @@ -148,11 +177,13 @@ public long size() {

@Override
public long entryCount() {
assertIsOpen();
return segments.values().stream().mapToLong(LogSegment::entryCount).sum();
}

@Override
public boolean isEmpty() {
assertIsOpen();
LogSegment firstSegment = firstSegment();
return firstSegment == null || firstSegment.size() == 0;
}
Expand Down Expand Up @@ -180,6 +211,7 @@ public Long lastIndex() {

@Override
public boolean containsIndex(long index) {
assertIsOpen();
Long firstIndex = firstIndex();
Long lastIndex = lastIndex();
return firstIndex != null && lastIndex != null && firstIndex <= index && index <= lastIndex;
Expand Down Expand Up @@ -228,26 +260,45 @@ public void removeAfter(long index) {
}

@Override
public LogSegment rollOver() throws IOException {
// If the current segment is empty then don't roll over to a new segment, just keep the existing segment.
Long lastIndex = currentSegment.lastIndex();
if (lastIndex == null)
return currentSegment;
public void rollOver(long index) throws IOException {
// If the current segment is empty then just remove it.
if (currentSegment.isEmpty()) {
segments.remove(currentSegment.firstIndex());
currentSegment.close();
currentSegment.delete();
currentSegment = null;
} else {
currentSegment.flush();
}

// Flush the segment and create a new segment.
currentSegment.flush();
long nextIndex = lastIndex + 1;
currentSegment = createSegment(++nextSegmentId, nextIndex);
LOGGER.debug("Rolling over to new segment at new index {}", nextIndex);
currentSegment = createSegment(++nextSegmentId, index);
LOGGER.debug("Rolling over to new segment at new index {}", index);

// Open the new segment.
currentSegment.open();

segments.put(nextIndex, currentSegment);
segments.put(index, currentSegment);

// Reset the segment flush time and check whether old segments need to be deleted.
lastFlush = System.currentTimeMillis();
return currentSegment;
}

@Override
public void compact(long index) throws IOException {
// Iterate through all segments in the log. If a segment's first index matches the given index or its last index
// is less than the given index then remove/close/delete the segment.
for (Iterator<Map.Entry<Long, LogSegment>> iterator = segments.entrySet().iterator(); iterator.hasNext();) {
Map.Entry<Long, LogSegment> entry = iterator.next();
LogSegment segment = entry.getValue();
if (index == segment.firstIndex() || (segment.lastIndex() != null && index > segment.lastIndex())) {
iterator.remove();
try {
segment.close();
segment.delete();
} catch (IOException e) {
}
}
}
}

@Override
Expand Down Expand Up @@ -314,7 +365,7 @@ private void checkRollOver() throws IOException {
&& System.currentTimeMillis() > currentSegment.timestamp() + config.getSegmentInterval();

if (segmentSizeExceeded || segmentExpired) {
rollOver();
rollOver(lastIndex + 1);
}
}

Expand Down
24 changes: 0 additions & 24 deletions core/src/main/java/net/kuujo/copycat/log/FileLogManager.java
Expand Up @@ -18,8 +18,6 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -48,28 +46,6 @@ protected Collection<LogSegment> loadSegments() {
try {
long id = Long.valueOf(file.getName().substring(file.getName().lastIndexOf('-')), file.getName().lastIndexOf('.')).longValue();
if (!segments.containsKey(id)) {
// First, look for an existing history file for the log. If history files exist for this segment then that
// indicates that a failure occurred during log compaction. Recover the previous log.
File historyLogFile = new File(base.getParent(), String.format("%s-%d.log.history", base.getName(), id));
File historyIndexFile = new File(base.getParent(), String.format("%s-%d.index.history", base.getName(), id));
File historyMetadataFile = new File(base.getParent(), String.format("%s-%d.metadata.history", base.getName(), id));
if (historyLogFile.exists() && historyIndexFile.exists() && historyMetadataFile.exists()) {
// Restore the log by moving historical files back to permanent log files.
File logFile = new File(base.getParent(), String.format("%s-%d.log", base.getName(), id));
File indexFile = new File(base.getParent(), String.format("%s-%d.index", base.getName(), id));
File metadataFile = new File(base.getParent(), String.format("%s-%d.metadata", base.getName(), id));

// Copy the files instead of moving them in case another failure occurs.
Files.copy(historyLogFile.toPath(), logFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
Files.copy(historyIndexFile.toPath(), indexFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
Files.copy(historyMetadataFile.toPath(), metadataFile.toPath(), StandardCopyOption.REPLACE_EXISTING);

// Once the history has been restored, delete historical files.
historyLogFile.delete();
historyIndexFile.delete();
historyMetadataFile.delete();
}

// Open the metadata file, determine the segment's first index, and create a log segment.
try (RandomAccessFile metaFile = new RandomAccessFile(file, "r")) {
long firstIndex = metaFile.readLong();
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/java/net/kuujo/copycat/log/LogManager.java
Expand Up @@ -64,8 +64,16 @@ public interface LogManager extends Loggable {
/**
* Forces the log to roll over to a new segment.
*
* @return The new log segment.
* @throws IOException If the log failed to create a new segment.
*/
LogSegment rollOver() throws IOException;
void rollOver(long index) throws IOException;

/**
* Compacts the log up to the given index.
*
* @param index The index to which to compact the log. This must be the first index of a segment in the log.
* @throws IOException If the log failed to delete a segment.
*/
void compact(long index) throws IOException;

}
Expand Up @@ -18,11 +18,11 @@
import net.kuujo.copycat.CopycatException;
import net.kuujo.copycat.ResourceContext;
import net.kuujo.copycat.internal.AbstractResource;
import net.kuujo.copycat.state.StateLog;
import net.kuujo.copycat.state.StateLogConfig;
import net.kuujo.copycat.internal.util.Assert;
import net.kuujo.copycat.internal.util.concurrent.Futures;
import net.kuujo.copycat.protocol.Consistency;
import net.kuujo.copycat.state.StateLog;
import net.kuujo.copycat.state.StateLogConfig;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Expand Up @@ -34,7 +34,6 @@
public class SnapshottableLogManager implements LogManager {
private final LogManager logManager;
private final LogManager snapshotManager;
private Long snapshotIndex;

public SnapshottableLogManager(LogManager logManager, LogManager snapshotManager) {
this.logManager = logManager;
Expand Down Expand Up @@ -71,24 +70,15 @@ public LogSegment lastSegment() {
return logManager.lastSegment();
}

@Override
public LogSegment rollOver() throws IOException {
return logManager.rollOver();
}

@Override
public void open() throws IOException {
snapshotManager.open();
logManager.open();
if (!snapshotManager.isEmpty()) {
ByteBuffer snapshot = snapshotManager.getEntry(snapshotManager.lastIndex());
snapshotIndex = snapshot.getLong();
}
}

@Override
public boolean isEmpty() {
return snapshotIndex == null && logManager.isEmpty();
return snapshotManager.isEmpty() && logManager.isEmpty();
}

@Override
Expand Down Expand Up @@ -145,9 +135,8 @@ public long appendSnapshot(long index, ByteBuffer snapshot) throws IOException {
// When appending a snapshot, force the snapshot log manager to roll over to a new segment, append the snapshot
// to the log, and then compact the log once the snapshot has been appended.
ByteBuffer entry = ByteBuffer.allocate(8 + snapshot.limit());
entry.putLong(index);
entry.put(snapshot);
snapshotManager.rollOver();
snapshotManager.rollOver(index);
snapshotManager.appendEntry(entry);
compact(snapshotManager);
compact(logManager);
Expand Down Expand Up @@ -189,21 +178,16 @@ public Long lastIndex() {
@Override
public boolean containsIndex(long index) {
Assert.state(isOpen(), "Log is not open");
if (logManager.containsIndex(index)) {
return true;
} else if (snapshotManager.isEmpty()) {
return false;
}
return snapshotIndex != null && snapshotIndex == index;
return logManager.containsIndex(index) || snapshotManager.containsIndex(index);
}

@Override
public ByteBuffer getEntry(long index) {
Assert.state(isOpen(), "Log is not open");
if (logManager.containsIndex(index)) {
return logManager.getEntry(index);
} else if (snapshotIndex != null && snapshotIndex == index) {
return snapshotManager.getEntry(snapshotIndex);
} else if (snapshotManager.containsIndex(index)) {
return snapshotManager.getEntry(index);
}
throw new IndexOutOfBoundsException("No entry at index " + index);
}
Expand All @@ -215,6 +199,17 @@ public void removeAfter(long index) {
logManager.removeAfter(index);
}

@Override
public void rollOver(long index) throws IOException {
logManager.rollOver(index);
}

@Override
public void compact(long index) throws IOException {
logManager.compact(index);
snapshotManager.compact(index);
}

@Override
public void flush() {
logManager.flush();
Expand All @@ -224,7 +219,6 @@ public void flush() {
public void close() throws IOException {
logManager.close();
snapshotManager.close();
snapshotIndex = null;
}

@Override
Expand Down

0 comments on commit 5126b0a

Please sign in to comment.