Skip to content

Commit

Permalink
Move log compaction responsibilities into log.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 13, 2015
1 parent 2391c05 commit d25f007
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 32 deletions.
8 changes: 6 additions & 2 deletions raft/src/main/java/net/kuujo/copycat/raft/StateMachine.java
Expand Up @@ -101,7 +101,11 @@ private void declareFilters(Method method) {
private Method findFilter(Class<? extends Command> type, Compaction.Type compaction) {
Map<Class<? extends Command>, Method> filters = this.filters.get(compaction);
if (filters == null) {
return allFilters.get(compaction);
Method method = allFilters.get(compaction);
if (method == null) {
throw new IllegalArgumentException("unknown command type: " + type);
}
return method;
}

Method method = filters.computeIfAbsent(type, t -> {
Expand Down Expand Up @@ -174,7 +178,7 @@ public void register(Session session) {
public boolean filter(Commit<? extends Command> commit, Compaction compaction) {
LOGGER.debug("filter {}", commit);
try {
return (boolean) findFilter(commit.type(), compaction.type()).invoke(this, commit);
return (boolean) findFilter(commit.type(), compaction.type()).invoke(this, commit, compaction);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new ApplicationException("failed to filter command", e);
}
Expand Down
77 changes: 60 additions & 17 deletions raft/src/main/java/net/kuujo/copycat/raft/log/Compactor.java
Expand Up @@ -31,44 +31,87 @@
*/
public class Compactor implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(Compactor.class);
private static final long DEFAULT_COMPACTION_INTERVAL = TimeUnit.HOURS.toMillis(1);
private static final long COMPACT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
private static final long DEFAULT_MINOR_COMPACTION_INTERVAL = TimeUnit.MINUTES.toMillis(1);
private static final long DEFAULT_MAJOR_COMPACTION_INTERVAL = TimeUnit.HOURS.toMillis(1);

private final Log log;
private final EntryFilter filter;
private final ExecutionContext context;
private long compactionInterval = DEFAULT_COMPACTION_INTERVAL;
private EntryFilter filter;
private ExecutionContext context;
private long minorCompactionInterval = DEFAULT_MINOR_COMPACTION_INTERVAL;
private long majorCompactionInterval = DEFAULT_MAJOR_COMPACTION_INTERVAL;
private long commit;
private long compact;
private Compaction compaction;
private long lastCompaction;
private CompletableFuture<Void> compactFuture;
private ScheduledFuture<?> scheduledFuture;

public Compactor(Log log, EntryFilter filter, ExecutionContext context) {
public Compactor(Log log) {
this.log = log;
}

/**
* Sets the compactor entry filter.
*
* @param filter The compactor entry filter.
* @return The log compactor.
*/
public Compactor filter(EntryFilter filter) {
if (filter == null)
filter = (entry, compaction) -> CompletableFuture.completedFuture(true);
this.filter = filter;
this.context = context;
return this;
}

/**
* Sets the interval at which major compaction is run.
*
* @param compactionInterval The interval at which major compaction is run.
*/
public void setMinorCompactionInterval(long compactionInterval) {
if (compactionInterval < 1)
throw new IllegalArgumentException("compaction interval must be positive");
this.minorCompactionInterval = compactionInterval;
}

/**
* Returns the compaction interval.
*
* @return The interval at which major compaction is run.
*/
public long getMinorCompactionInterval() {
return minorCompactionInterval;
}

/**
* Sets the interval at which major compaction is run.
*
* @param compactionInterval The interval at which major compaction is run.
* @return The log compactor.
*/
public Compactor withMinorCompactionInterval(long compactionInterval) {
setMinorCompactionInterval(compactionInterval);
return this;
}

/**
* Sets the interval at which major compaction is run.
*
* @param compactionInterval The interval at which major compaction is run.
*/
public void setCompactionInterval(long compactionInterval) {
public void setMajorCompactionInterval(long compactionInterval) {
if (compactionInterval < 1)
throw new IllegalArgumentException("compaction interval must be positive");
this.compactionInterval = compactionInterval;
this.majorCompactionInterval = compactionInterval;
}

/**
* Returns the compaction interval.
*
* @return The interval at which major compaction is run.
*/
public long getCompactionInterval() {
return compactionInterval;
public long getMajorCompactionInterval() {
return majorCompactionInterval;
}

/**
Expand All @@ -77,16 +120,16 @@ public long getCompactionInterval() {
* @param compactionInterval The interval at which major compaction is run.
* @return The log compactor.
*/
public Compactor withCompactionInterval(long compactionInterval) {
setCompactionInterval(compactionInterval);
public Compactor withMajorCompactionInterval(long compactionInterval) {
setMajorCompactionInterval(compactionInterval);
return this;
}

/**
* Opens the log compactor.
*/
public void open() {
scheduledFuture = context.scheduleAtFixedRate(this::compact, COMPACT_INTERVAL, COMPACT_INTERVAL, TimeUnit.MILLISECONDS);
public void open(ExecutionContext context) {
scheduledFuture = context.scheduleAtFixedRate(() -> compact(context), minorCompactionInterval, minorCompactionInterval, TimeUnit.MILLISECONDS);
}

/**
Expand All @@ -110,14 +153,14 @@ public void setCompactIndex(long index) {
/**
* Compacts the log.
*/
synchronized CompletableFuture<Void> compact() {
synchronized CompletableFuture<Void> compact(ExecutionContext context) {
if (compactFuture != null) {
return compactFuture;
}

compactFuture = CompletableFuture.supplyAsync(() -> {
if (compaction == null) {
if (System.currentTimeMillis() - lastCompaction > compactionInterval) {
if (System.currentTimeMillis() - lastCompaction > majorCompactionInterval) {
compaction = new MajorCompaction(compact, filter, context);
lastCompaction = System.currentTimeMillis();
} else {
Expand Down
69 changes: 68 additions & 1 deletion raft/src/main/java/net/kuujo/copycat/raft/log/Log.java
Expand Up @@ -20,6 +20,7 @@
import net.kuujo.copycat.util.ExecutionContext;

import java.io.File;
import java.util.concurrent.TimeUnit;

/**
* Raft log.
Expand All @@ -38,6 +39,7 @@ public static Builder builder() {
}

protected final SegmentManager segments;
protected Compactor compactor;
protected final TypedEntryPool entryPool = new TypedEntryPool();
private boolean open;

Expand All @@ -52,9 +54,19 @@ protected Log(SegmentManager segments) {
*/
public void open(ExecutionContext context) {
segments.open(context);
compactor.open(context);
open = true;
}

/**
* Returns the log compactor.
*
* @return The log compactor.
*/
public Compactor compactor() {
return compactor;
}

/**
* Returns the log segment manager.
*
Expand Down Expand Up @@ -311,6 +323,7 @@ public void flush() {
@Override
public void close() {
segments.close();
compactor.close();
open = false;
}

Expand Down Expand Up @@ -419,13 +432,67 @@ public Builder withMaxEntriesPerSegment(int maxEntriesPerSegment) {
return this;
}

/**
* Sets the minor compaction interval.
*
* @param compactionInterval The minor compaction interval in milliseconds.
* @return The log builder.
* @throws java.lang.IllegalArgumentException If the compaction interval is not positive
*/
public Builder withMinorCompactionInterval(long compactionInterval) {
config.setMinorCompactionInterval(compactionInterval);
return this;
}

/**
* Sets the minor compaction interval.
*
* @param compactionInterval The minor compaction interval.
* @param unit The interval time unit.
* @return The log builder.
* @throws java.lang.IllegalArgumentException If the compaction interval is not positive
*/
public Builder withMinorCompactionInterval(long compactionInterval, TimeUnit unit) {
config.setMinorCompactionInterval(compactionInterval, unit);
return this;
}

/**
* Sets the major compaction interval.
*
* @param compactionInterval The major compaction interval in milliseconds.
* @return The log builder.
* @throws java.lang.IllegalArgumentException If the compaction interval is not positive
*/
public Builder withMajorCompactionInterval(long compactionInterval) {
config.setMajorCompactionInterval(compactionInterval);
return this;
}

/**
* Sets the major compaction interval.
*
* @param compactionInterval The major compaction interval.
* @param unit The interval time unit.
* @return The log builder.
* @throws java.lang.IllegalArgumentException If the compaction interval is not positive
*/
public Builder withMajorCompactionInterval(long compactionInterval, TimeUnit unit) {
config.setMajorCompactionInterval(compactionInterval, unit);
return this;
}

/**
* Builds the log.
*
* @return A new buffered log.
*/
public Log build() {
return new Log(new SegmentManager(config));
Log log = new Log(new SegmentManager(config));
log.compactor = new Compactor(log)
.withMinorCompactionInterval(config.getMinorCompactionInterval())
.withMajorCompactionInterval(config.getMajorCompactionInterval());
return log;
}
}

Expand Down
33 changes: 33 additions & 0 deletions raft/src/main/java/net/kuujo/copycat/raft/log/LogConfig.java
Expand Up @@ -16,6 +16,7 @@
package net.kuujo.copycat.raft.log;

import java.io.File;
import java.util.concurrent.TimeUnit;

/**
* Copycat storage configuration.
Expand All @@ -30,12 +31,16 @@ public class LogConfig {
private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 8;
private static final int DEFAULT_MAX_SEGMENT_SIZE = 1024 * 1024 * 32;
private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = (int) (Math.pow(2, 31) - 1) / 8 - 16;
private static final long DEFAULT_MINOR_COMPACTION_INTERVAL = TimeUnit.MINUTES.toMillis(1);
private static final long DEFAULT_MAJOR_COMPACTION_INTERVAL = TimeUnit.HOURS.toMillis(1);

private File directory = new File(DEFAULT_DIRECTORY);
private StorageLevel level = StorageLevel.DISK;
private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
private long minorCompactionInterval = DEFAULT_MINOR_COMPACTION_INTERVAL;
private long majorCompactionInterval = DEFAULT_MAJOR_COMPACTION_INTERVAL;

public LogConfig() {
}
Expand Down Expand Up @@ -269,4 +274,32 @@ public LogConfig withMaxEntriesPerSegment(int maxEntriesPerSegment) {
return this;
}

public void setMinorCompactionInterval(long compactionInterval) {
if (compactionInterval <= 0)
throw new IllegalArgumentException("compaction interval must be positive");
this.minorCompactionInterval = compactionInterval;
}

public void setMinorCompactionInterval(long compactionInterval, TimeUnit unit) {
setMinorCompactionInterval(unit.toMillis(compactionInterval));
}

public long getMinorCompactionInterval() {
return minorCompactionInterval;
}

public void setMajorCompactionInterval(long compactionInterval) {
if (compactionInterval <= 0)
throw new IllegalArgumentException("compaction interval must be positive");
this.majorCompactionInterval = compactionInterval;
}

public void setMajorCompactionInterval(long compactionInterval, TimeUnit unit) {
setMajorCompactionInterval(unit.toMillis(compactionInterval));
}

public long getMajorCompactionInterval() {
return majorCompactionInterval;
}

}
Expand Up @@ -66,7 +66,7 @@ CompletableFuture<Void> run(SegmentManager segments) {
private List<Segment> getActiveSegments(SegmentManager manager) {
List<Segment> segments = new ArrayList<>();
for (Segment segment : manager.segments()) {
if (!segment.isEmpty() && segment.lastIndex() <= index()) {
if (segment.isFull() && segment.lastIndex() <= index()) {
segments.add(segment);
}
}
Expand Down
Expand Up @@ -86,7 +86,7 @@ private SortedMap<Long, List<Segment>> createLevels(SegmentManager segments) {
// the nature of this compaction strategy, segments of the same level should always be next to one another.
TreeMap<Long, List<Segment>> levels = new TreeMap<>();
for (Segment segment : segments.segments()) {
if (segment.lastIndex() <= index()) {
if (segment.isFull() && segment.lastIndex() <= index()) {
List<Segment> level = levels.get(segment.descriptor().version());
if (level == null) {
level = new ArrayList<>();
Expand Down
2 changes: 1 addition & 1 deletion raft/src/main/java/net/kuujo/copycat/raft/log/Segment.java
Expand Up @@ -103,7 +103,7 @@ public boolean isEmpty() {
* @return Indicates whether the segment is full.
*/
public boolean isFull() {
return size() >= descriptor.maxSegmentSize() || offsetIndex.size() >= descriptor.maxEntries() || length() == Integer.MAX_VALUE;
return size() >= descriptor.maxSegmentSize() || offsetIndex.lastOffset() >= descriptor.maxEntries() - 1|| length() == Integer.MAX_VALUE;
}

/**
Expand Down
13 changes: 12 additions & 1 deletion raft/src/main/java/net/kuujo/copycat/raft/state/RaftState.java
Expand Up @@ -24,6 +24,8 @@
import net.kuujo.copycat.raft.log.entry.*;
import net.kuujo.copycat.util.ExecutionContext;
import net.kuujo.copycat.util.concurrent.Futures;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.CompletableFuture;
Expand All @@ -34,6 +36,7 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
class RaftState {
private static final Logger LOGGER = LoggerFactory.getLogger(RaftState.class);
private final StateMachine stateMachine;
private final ManagedCluster cluster;
private final ClusterState members;
Expand Down Expand Up @@ -161,8 +164,16 @@ public CompletableFuture<Boolean> filter(CommandEntry entry, Compaction compacti
session = new RaftSession(entry.getSession(), entry.getTimestamp());
session.expire();
}

Commit<? extends Command> commit = new Commit<>(entry.getIndex(), session, entry.getTimestamp(), entry.getCommand());
return CompletableFuture.supplyAsync(() -> stateMachine.filter(commit, compaction), context);
return CompletableFuture.supplyAsync(() -> {
try {
return stateMachine.filter(commit, compaction);
} catch (Exception e) {
LOGGER.warn("Failed to filter command {} at index {}: {}", entry.getCommand(), entry.getIndex(), e);
return true;
}
}, context);
}

/**
Expand Down

0 comments on commit d25f007

Please sign in to comment.