Skip to content

Commit

Permalink
Clean up log cleaner implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Aug 4, 2015
1 parent 430eba2 commit 89cd8fb
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 74 deletions.
Expand Up @@ -86,7 +86,12 @@ CompletableFuture<Void> clean(List<Segment> segments) {

cleanFuture = new CompletableFuture<>();
context.execute(() -> {
cleanSegments(segments);
try {
cleanSegments(segments);
cleanFuture.complete(null);
} catch (Exception e) {
cleanFuture.completeExceptionally(e);
}
});
return cleanFuture.whenComplete((result, error) -> cleanFuture = null);
}
Expand Down
6 changes: 4 additions & 2 deletions storage/src/main/java/net/kuujo/copycat/io/storage/Log.java
Expand Up @@ -226,7 +226,7 @@ public <T extends Entry> T getEntry(long index) {
if (segment == null)
throw new IndexOutOfBoundsException("invalid index: " + index);
T entry = segment.getEntry(index);
return !entry.isTombstone() ? entry : null;
return entry != null && !entry.isTombstone() ? entry : null;
}

/**
Expand All @@ -240,7 +240,9 @@ public <T extends Entry> T getEntry(long index) {
* @throws IllegalStateException If the log is not open.
*/
public boolean containsIndex(long index) {
return !isEmpty() && firstIndex() <= index && index <= lastIndex();
long firstIndex = firstIndex();
long lastIndex = lastIndex();
return !isEmpty() && firstIndex <= index && index <= lastIndex;
}

/**
Expand Down
34 changes: 22 additions & 12 deletions storage/src/main/java/net/kuujo/copycat/io/storage/LogCleaner.java
Expand Up @@ -89,18 +89,23 @@ public CompletableFuture<Void> clean() {
private void cleanSegments(Context context) {
AtomicInteger counter = new AtomicInteger();
List<List<Segment>> cleanSegments = getCleanSegments();
for (List<Segment> segments : cleanSegments) {
EntryCleaner cleaner = new EntryCleaner(manager, new ThreadPoolContext(executor, manager.serializer()));
executor.execute(() -> {
cleaner.clean(segments);
if (counter.incrementAndGet() == cleanSegments.size()) {
if (context != null) {
context.execute(() -> cleanFuture.complete(null));
} else {
cleanFuture.complete(null);
}
}
});
if (!cleanSegments.isEmpty()) {
for (List<Segment> segments : cleanSegments) {
EntryCleaner cleaner = new EntryCleaner(manager, new ThreadPoolContext(executor, manager.serializer()));
executor.execute(() -> {
cleaner.clean(segments).whenComplete((result, error) -> {
if (counter.incrementAndGet() == cleanSegments.size()) {
if (context != null) {
context.execute(() -> cleanFuture.complete(null));
} else {
cleanFuture.complete(null);
}
}
});
});
}
} else {
cleanFuture.complete(null);
}
}

Expand Down Expand Up @@ -138,6 +143,11 @@ else if (segments.stream().mapToLong(Segment::count).sum() + segment.count() < s
}
previousSegment = segment;
}

// Ensure all cleanable segments have been added to the clean segments list.
if (segments != null) {
clean.add(segments);
}
return clean;
}

Expand Down
35 changes: 20 additions & 15 deletions storage/src/main/java/net/kuujo/copycat/io/storage/OffsetIndex.java
Expand Up @@ -67,8 +67,8 @@ public static long size(int maxEntries) {
private final Buffer buffer;
private final BitArray bits;
private final BitArray deletes;
private int offset;
private int size;
private int firstOffset = -1;
private int lastOffset = -1;
private int currentOffset = -1;
private long currentPosition = -1;
Expand Down Expand Up @@ -97,10 +97,6 @@ private void init() {

int offset = buffer.readInt();
while (offset != END) {
if (firstOffset == -1) {
firstOffset = offset;
}

lastOffset = offset;
bits.set(offset % bits.size());
size++;
Expand All @@ -113,17 +109,21 @@ private void init() {
}

/**
* Returns the first offset in the index.
* Returns the index offset.
*
* @return The index offset.
*/
public int firstOffset() {
return firstOffset;
public int offset() {
return offset;
}

/**
* Resets the first offset in the index.
* Resets the index offset.
*
* @param offset The index offset.
*/
public void resetOffset(int firstOffset) {
this.firstOffset = firstOffset;
public void resetOffset(int offset) {
this.offset = offset;
}

/**
Expand Down Expand Up @@ -162,10 +162,6 @@ public void index(int offset, long position, int length) {

bits.set(offset % bits.size());

if (firstOffset == -1) {
firstOffset = offset;
}

size++;
lastOffset = offset;

Expand All @@ -174,6 +170,15 @@ public void index(int offset, long position, int length) {
}
}

/**
* Returns a boolean value indicating whether the index is empty.
*
* @return Indicates whether the index is empty.
*/
public boolean isEmpty() {
return size == 0;
}

/**
* Returns the number of entries active in the index.
*
Expand Down
19 changes: 14 additions & 5 deletions storage/src/main/java/net/kuujo/copycat/io/storage/Segment.java
Expand Up @@ -92,7 +92,7 @@ public boolean isOpen() {
* @return Indicates whether the segment is empty.
*/
public boolean isEmpty() {
return offsetIndex.size() == 0;
return offsetIndex.size() > 0 ? offsetIndex.lastOffset() - offsetIndex.offset() + 1 + skip == 0 : skip == 0;
}

/**
Expand All @@ -101,7 +101,7 @@ public boolean isEmpty() {
* @return Indicates whether the segment is full.
*/
public boolean isFull() {
return size() >= descriptor.maxSegmentSize() || offsetIndex.lastOffset() >= descriptor.maxEntries() - 1|| length() == Integer.MAX_VALUE;
return size() >= descriptor.maxSegmentSize() || offsetIndex.lastOffset() >= descriptor.maxEntries() - 1 || offsetIndex.lastOffset() + skip + 1 == Integer.MAX_VALUE;
}

/**
Expand All @@ -119,7 +119,7 @@ public long size() {
* @return The current range of the segment.
*/
public int length() {
return offsetIndex.lastOffset() + skip + 1;
return !isEmpty() ? offsetIndex.lastOffset() - offsetIndex.offset() + 1 + skip : 0;
}

/**
Expand All @@ -131,6 +131,15 @@ public int count() {
return offsetIndex.lastOffset() + 1 - offsetIndex.deletes();
}

/**
* Returns the index of the segment.
*
* @return The index of the segment.
*/
long index() {
return descriptor.index() + offsetIndex.offset();
}

/**
* Returns the index of the first entry in the segment.
*
Expand All @@ -139,7 +148,7 @@ public int count() {
public long firstIndex() {
if (!isOpen())
throw new IllegalStateException("segment not open");
return !isEmpty() ? descriptor.index() + offsetIndex.firstOffset() : 0;
return !isEmpty() ? descriptor.index() + offsetIndex.offset() : 0;
}

/**
Expand All @@ -150,7 +159,7 @@ public long firstIndex() {
public long lastIndex() {
if (!isOpen())
throw new IllegalStateException("segment not open");
return !isEmpty() ? offsetIndex.lastOffset() + descriptor.index() + skip : 0;
return !isEmpty() ? offsetIndex.lastOffset() + descriptor.index() + skip : descriptor.index() - 1;
}

/**
Expand Down
Expand Up @@ -212,10 +212,10 @@ public Segment segment(long index) {
* @param segment The segment to insert.
*/
public synchronized void insertSegment(Segment segment) {
Segment oldSegment = segments.put(segment.firstIndex(), segment);
Segment oldSegment = segments.put(segment.index(), segment);
if (oldSegment == null)
throw new IllegalStateException("unknown segment at index: " + segment.firstIndex());
segments.put(oldSegment.firstIndex(), oldSegment);
throw new IllegalStateException("unknown segment at index: " + segment.index());
segments.put(oldSegment.index(), oldSegment);
}

/**
Expand All @@ -224,7 +224,7 @@ public synchronized void insertSegment(Segment segment) {
* @param segment The segment to remove.
*/
public synchronized void removeSegment(Segment segment) {
segments.remove(segment.firstIndex());
segments.remove(segment.index());
resetCurrentSegment();
}

Expand All @@ -236,7 +236,8 @@ public synchronized void removeSegment(Segment segment) {
*/
synchronized void moveSegment(long index, Segment segment) {
segments.remove(index);
segments.put(segment.firstIndex(), segment);
if (!segment.isEmpty())
segments.put(segment.index(), segment);
}

/**
Expand Down
Expand Up @@ -18,58 +18,50 @@
import net.jodah.concurrentunit.ConcurrentTestCase;
import net.kuujo.copycat.io.serializer.Serializer;
import net.kuujo.copycat.io.serializer.ServiceLoaderResolver;
import net.kuujo.copycat.util.concurrent.Context;
import net.kuujo.copycat.util.concurrent.SingleThreadContext;
import org.testng.annotations.Test;

import java.util.ArrayList;
import static org.testng.Assert.*;

/**
* Minor compaction test.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
@Test
public class EntryCleanerTest extends ConcurrentTestCase {
public class LogCleanerTest extends ConcurrentTestCase {

/**
* Tests compacting the log.
*/
public void testCompact() throws Throwable {
Log log = Log.builder()
.withStorageLevel(StorageLevel.MEMORY)
.withMaxEntriesPerSegment(128)
.withMaxEntriesPerSegment(10)
.withSerializer(new Serializer(new ServiceLoaderResolver()))
.build();

Context context = new SingleThreadContext("test", new Serializer(new ServiceLoaderResolver()));

log.open();

writeEntries(log, 550);

final long index;
try (TestEntry entry = log.createEntry(TestEntry.class)) {
entry.setTerm(1);
entry.setRemove(true);
index = log.appendEntry(entry);
}

writeEntries(log, 550);
writeEntries(log, 30);

threadAssertEquals(log.length(), 1101L);
assertEquals(log.length(), 30L);

EntryCleaner cleaner = new EntryCleaner(log.segments(), context);
for (long index = 21; index < 28; index++) {
log.cleanEntry(index);
}

expectResume();
cleaner.clean(new ArrayList<>(log.segments().segments())).thenRun(this::resume);
log.cleaner().clean().thenRun(this::resume);
await();

threadAssertEquals(log.length(), 1101L);
threadAssertTrue(log.containsIndex(index));
threadAssertFalse(log.containsEntry(index));
assertEquals(log.length(), 30L);

try (TestEntry entry = log.getEntry(index)) {
threadAssertNull(entry);
for (long index = 21; index < 28; index++) {
assertTrue(log.containsIndex(index));
assertFalse(log.containsEntry(index));
try (TestEntry entry = log.getEntry(index)) {
assertNull(entry);
}
}
}

Expand Down
10 changes: 1 addition & 9 deletions storage/src/test/java/net/kuujo/copycat/io/storage/LogTest.java
Expand Up @@ -17,8 +17,6 @@

import net.kuujo.copycat.io.serializer.Serializer;
import net.kuujo.copycat.io.serializer.ServiceLoaderResolver;
import net.kuujo.copycat.util.concurrent.Context;
import net.kuujo.copycat.util.concurrent.SingleThreadContext;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -215,13 +213,6 @@ public void testSkipOnRollOver() {
}
}

/**
* Creates a test execution context.
*/
private Context createContext() {
return new SingleThreadContext("test", new Serializer(new ServiceLoaderResolver()));
}

/**
* Creates a new in-memory log.
*/
Expand All @@ -231,6 +222,7 @@ private Log createLog() {
.withMaxEntrySize(1024)
.withMaxSegmentSize(1024 * 1024)
.withMaxEntriesPerSegment(1024)
.withSerializer(new Serializer(new ServiceLoaderResolver()))
.build();
log.open();
Assert.assertTrue(log.isOpen());
Expand Down

0 comments on commit 89cd8fb

Please sign in to comment.