From 5126b0ab28aee7ae66369675efffcc5da0202b58 Mon Sep 17 00:00:00 2001 From: Jordan Halterman Date: Wed, 21 Jan 2015 18:19:41 -0800 Subject: [PATCH] Refactor log implementations to support rolling over to arbitrary indexes for snapshot replication. --- .../kuujo/copycat/log/AbstractLogManager.java | 79 +++++++++++++++---- .../net/kuujo/copycat/log/FileLogManager.java | 24 ------ .../net/kuujo/copycat/log/LogManager.java | 12 ++- .../state/internal/DefaultStateLog.java | 4 +- .../internal/SnapshottableLogManager.java | 38 ++++----- 5 files changed, 93 insertions(+), 64 deletions(-) diff --git a/core/src/main/java/net/kuujo/copycat/log/AbstractLogManager.java b/core/src/main/java/net/kuujo/copycat/log/AbstractLogManager.java index 57c679cbca..f83c1ca528 100644 --- a/core/src/main/java/net/kuujo/copycat/log/AbstractLogManager.java +++ b/core/src/main/java/net/kuujo/copycat/log/AbstractLogManager.java @@ -16,6 +16,7 @@ package net.kuujo.copycat.log; import net.kuujo.copycat.internal.util.Assert; +import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; @@ -31,7 +32,7 @@ * @author Jordan Halterman */ public abstract class AbstractLogManager extends AbstractLoggable implements LogManager { - private final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(getClass()); + private final Logger LOGGER = LoggerFactory.getLogger(getClass()); private Log config; protected final TreeMap segments = new TreeMap<>(); protected LogSegment currentSegment; @@ -124,15 +125,43 @@ public LogSegment lastSegment() { @Override public synchronized void open() throws IOException { assertIsNotOpen(); + + // Load existing log segments from disk. for (LogSegment segment : loadSegments()) { segment.open(); segments.put(segment.firstIndex(), segment); } + + // If a segment doesn't already exist, create an initial segment starting at index 1. if (!segments.isEmpty()) { currentSegment = segments.lastEntry().getValue(); } else { createInitialSegment(); } + + clean(); + } + + /** + * Cleans the log at startup. + * + * In the event that a failure occurred during compaction, it's possible that the log could contain a significant + * gap in indexes between segments. When the log is opened, check segments to ensure that first and last indexes of + * each segment agree with other segments in the log. If not, remove all segments that appear prior to any index gap. + */ + private void clean() throws IOException { + Long lastIndex = null; + Long compactIndex = null; + for (LogSegment segment : segments.values()) { + if (lastIndex == null || segment.firstIndex() > lastIndex + 1) { + compactIndex = segment.firstIndex(); + } + lastIndex = segment.lastIndex(); + } + + if (compactIndex != null) { + compact(compactIndex); + } } @Override @@ -148,11 +177,13 @@ public long size() { @Override public long entryCount() { + assertIsOpen(); return segments.values().stream().mapToLong(LogSegment::entryCount).sum(); } @Override public boolean isEmpty() { + assertIsOpen(); LogSegment firstSegment = firstSegment(); return firstSegment == null || firstSegment.size() == 0; } @@ -180,6 +211,7 @@ public Long lastIndex() { @Override public boolean containsIndex(long index) { + assertIsOpen(); Long firstIndex = firstIndex(); Long lastIndex = lastIndex(); return firstIndex != null && lastIndex != null && firstIndex <= index && index <= lastIndex; @@ -228,26 +260,45 @@ public void removeAfter(long index) { } @Override - public LogSegment rollOver() throws IOException { - // If the current segment is empty then don't roll over to a new segment, just keep the existing segment. - Long lastIndex = currentSegment.lastIndex(); - if (lastIndex == null) - return currentSegment; + public void rollOver(long index) throws IOException { + // If the current segment is empty then just remove it. + if (currentSegment.isEmpty()) { + segments.remove(currentSegment.firstIndex()); + currentSegment.close(); + currentSegment.delete(); + currentSegment = null; + } else { + currentSegment.flush(); + } - // Flush the segment and create a new segment. - currentSegment.flush(); - long nextIndex = lastIndex + 1; - currentSegment = createSegment(++nextSegmentId, nextIndex); - LOGGER.debug("Rolling over to new segment at new index {}", nextIndex); + currentSegment = createSegment(++nextSegmentId, index); + LOGGER.debug("Rolling over to new segment at new index {}", index); // Open the new segment. currentSegment.open(); - segments.put(nextIndex, currentSegment); + segments.put(index, currentSegment); // Reset the segment flush time and check whether old segments need to be deleted. lastFlush = System.currentTimeMillis(); - return currentSegment; + } + + @Override + public void compact(long index) throws IOException { + // Iterate through all segments in the log. If a segment's first index matches the given index or its last index + // is less than the given index then remove/close/delete the segment. + for (Iterator> iterator = segments.entrySet().iterator(); iterator.hasNext();) { + Map.Entry entry = iterator.next(); + LogSegment segment = entry.getValue(); + if (index == segment.firstIndex() || (segment.lastIndex() != null && index > segment.lastIndex())) { + iterator.remove(); + try { + segment.close(); + segment.delete(); + } catch (IOException e) { + } + } + } } @Override @@ -314,7 +365,7 @@ private void checkRollOver() throws IOException { && System.currentTimeMillis() > currentSegment.timestamp() + config.getSegmentInterval(); if (segmentSizeExceeded || segmentExpired) { - rollOver(); + rollOver(lastIndex + 1); } } diff --git a/core/src/main/java/net/kuujo/copycat/log/FileLogManager.java b/core/src/main/java/net/kuujo/copycat/log/FileLogManager.java index 6c9c5219db..db3fd7ceb8 100644 --- a/core/src/main/java/net/kuujo/copycat/log/FileLogManager.java +++ b/core/src/main/java/net/kuujo/copycat/log/FileLogManager.java @@ -18,8 +18,6 @@ import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.nio.file.Files; -import java.nio.file.StandardCopyOption; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -48,28 +46,6 @@ protected Collection loadSegments() { try { long id = Long.valueOf(file.getName().substring(file.getName().lastIndexOf('-')), file.getName().lastIndexOf('.')).longValue(); if (!segments.containsKey(id)) { - // First, look for an existing history file for the log. If history files exist for this segment then that - // indicates that a failure occurred during log compaction. Recover the previous log. - File historyLogFile = new File(base.getParent(), String.format("%s-%d.log.history", base.getName(), id)); - File historyIndexFile = new File(base.getParent(), String.format("%s-%d.index.history", base.getName(), id)); - File historyMetadataFile = new File(base.getParent(), String.format("%s-%d.metadata.history", base.getName(), id)); - if (historyLogFile.exists() && historyIndexFile.exists() && historyMetadataFile.exists()) { - // Restore the log by moving historical files back to permanent log files. - File logFile = new File(base.getParent(), String.format("%s-%d.log", base.getName(), id)); - File indexFile = new File(base.getParent(), String.format("%s-%d.index", base.getName(), id)); - File metadataFile = new File(base.getParent(), String.format("%s-%d.metadata", base.getName(), id)); - - // Copy the files instead of moving them in case another failure occurs. - Files.copy(historyLogFile.toPath(), logFile.toPath(), StandardCopyOption.REPLACE_EXISTING); - Files.copy(historyIndexFile.toPath(), indexFile.toPath(), StandardCopyOption.REPLACE_EXISTING); - Files.copy(historyMetadataFile.toPath(), metadataFile.toPath(), StandardCopyOption.REPLACE_EXISTING); - - // Once the history has been restored, delete historical files. - historyLogFile.delete(); - historyIndexFile.delete(); - historyMetadataFile.delete(); - } - // Open the metadata file, determine the segment's first index, and create a log segment. try (RandomAccessFile metaFile = new RandomAccessFile(file, "r")) { long firstIndex = metaFile.readLong(); diff --git a/core/src/main/java/net/kuujo/copycat/log/LogManager.java b/core/src/main/java/net/kuujo/copycat/log/LogManager.java index ea859ee631..4c34060038 100644 --- a/core/src/main/java/net/kuujo/copycat/log/LogManager.java +++ b/core/src/main/java/net/kuujo/copycat/log/LogManager.java @@ -64,8 +64,16 @@ public interface LogManager extends Loggable { /** * Forces the log to roll over to a new segment. * - * @return The new log segment. + * @throws IOException If the log failed to create a new segment. */ - LogSegment rollOver() throws IOException; + void rollOver(long index) throws IOException; + + /** + * Compacts the log up to the given index. + * + * @param index The index to which to compact the log. This must be the first index of a segment in the log. + * @throws IOException If the log failed to delete a segment. + */ + void compact(long index) throws IOException; } diff --git a/state-log/src/main/java/net/kuujo/copycat/state/internal/DefaultStateLog.java b/state-log/src/main/java/net/kuujo/copycat/state/internal/DefaultStateLog.java index 6a6731eaed..3c55b8a5b1 100644 --- a/state-log/src/main/java/net/kuujo/copycat/state/internal/DefaultStateLog.java +++ b/state-log/src/main/java/net/kuujo/copycat/state/internal/DefaultStateLog.java @@ -18,11 +18,11 @@ import net.kuujo.copycat.CopycatException; import net.kuujo.copycat.ResourceContext; import net.kuujo.copycat.internal.AbstractResource; -import net.kuujo.copycat.state.StateLog; -import net.kuujo.copycat.state.StateLogConfig; import net.kuujo.copycat.internal.util.Assert; import net.kuujo.copycat.internal.util.concurrent.Futures; import net.kuujo.copycat.protocol.Consistency; +import net.kuujo.copycat.state.StateLog; +import net.kuujo.copycat.state.StateLogConfig; import java.io.IOException; import java.nio.ByteBuffer; diff --git a/state-log/src/main/java/net/kuujo/copycat/state/internal/SnapshottableLogManager.java b/state-log/src/main/java/net/kuujo/copycat/state/internal/SnapshottableLogManager.java index 7fa2407ec5..f907369570 100644 --- a/state-log/src/main/java/net/kuujo/copycat/state/internal/SnapshottableLogManager.java +++ b/state-log/src/main/java/net/kuujo/copycat/state/internal/SnapshottableLogManager.java @@ -34,7 +34,6 @@ public class SnapshottableLogManager implements LogManager { private final LogManager logManager; private final LogManager snapshotManager; - private Long snapshotIndex; public SnapshottableLogManager(LogManager logManager, LogManager snapshotManager) { this.logManager = logManager; @@ -71,24 +70,15 @@ public LogSegment lastSegment() { return logManager.lastSegment(); } - @Override - public LogSegment rollOver() throws IOException { - return logManager.rollOver(); - } - @Override public void open() throws IOException { snapshotManager.open(); logManager.open(); - if (!snapshotManager.isEmpty()) { - ByteBuffer snapshot = snapshotManager.getEntry(snapshotManager.lastIndex()); - snapshotIndex = snapshot.getLong(); - } } @Override public boolean isEmpty() { - return snapshotIndex == null && logManager.isEmpty(); + return snapshotManager.isEmpty() && logManager.isEmpty(); } @Override @@ -145,9 +135,8 @@ public long appendSnapshot(long index, ByteBuffer snapshot) throws IOException { // When appending a snapshot, force the snapshot log manager to roll over to a new segment, append the snapshot // to the log, and then compact the log once the snapshot has been appended. ByteBuffer entry = ByteBuffer.allocate(8 + snapshot.limit()); - entry.putLong(index); entry.put(snapshot); - snapshotManager.rollOver(); + snapshotManager.rollOver(index); snapshotManager.appendEntry(entry); compact(snapshotManager); compact(logManager); @@ -189,12 +178,7 @@ public Long lastIndex() { @Override public boolean containsIndex(long index) { Assert.state(isOpen(), "Log is not open"); - if (logManager.containsIndex(index)) { - return true; - } else if (snapshotManager.isEmpty()) { - return false; - } - return snapshotIndex != null && snapshotIndex == index; + return logManager.containsIndex(index) || snapshotManager.containsIndex(index); } @Override @@ -202,8 +186,8 @@ public ByteBuffer getEntry(long index) { Assert.state(isOpen(), "Log is not open"); if (logManager.containsIndex(index)) { return logManager.getEntry(index); - } else if (snapshotIndex != null && snapshotIndex == index) { - return snapshotManager.getEntry(snapshotIndex); + } else if (snapshotManager.containsIndex(index)) { + return snapshotManager.getEntry(index); } throw new IndexOutOfBoundsException("No entry at index " + index); } @@ -215,6 +199,17 @@ public void removeAfter(long index) { logManager.removeAfter(index); } + @Override + public void rollOver(long index) throws IOException { + logManager.rollOver(index); + } + + @Override + public void compact(long index) throws IOException { + logManager.compact(index); + snapshotManager.compact(index); + } + @Override public void flush() { logManager.flush(); @@ -224,7 +219,6 @@ public void flush() { public void close() throws IOException { logManager.close(); snapshotManager.close(); - snapshotIndex = null; } @Override