Skip to content
This repository has been archived by the owner on Dec 19, 2017. It is now read-only.

Commit

Permalink
Merge pull request #61 from atomix/last-commit-non-null
Browse files Browse the repository at this point in the history
Ensure one committed entry is always accessible in the log
  • Loading branch information
kuujo committed Nov 21, 2015
2 parents 66b264e + acb1f9b commit 95e305c
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1006,16 +1006,7 @@ private void commitEntries() {
// important to ensure that tombstones are applied to their state machines.
// If the members list is empty, use the local server's last log index as the global index.
long globalMatchIndex = context.getCluster().getMembers().stream().mapToLong(MemberState::getMatchIndex).min().orElse(context.getLog().lastIndex());

// The globalIndex should always be one less than the highest index that is replicated to all members.
// This is to ensure prevEntry checks can always be performed for AppendRequests.
// Consider the following scenario:
// - Entry at index i is a NoOpEntry and it is fully replicated to all members. (NoOpEntries can be garbage collected
// as soon as they are replicated to all cluster members.)
// - A leadership change occurs and the new leader sends an AppendRequest with logIndex == i.
// Any follower should be able to validate that they have this entry in their log with a term that matches
// what the new leader is advertising.
context.setGlobalIndex(Math.max(globalMatchIndex - 1, 0));
context.setGlobalIndex(globalMatchIndex);

// Sort the list of replicas, order by the last index that was replicated to the replica. This will allow
// us to determine the median index for all known replicated entries across all cluster members.
Expand Down
23 changes: 20 additions & 3 deletions server/src/main/java/io/atomix/copycat/server/storage/Log.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,28 @@ public <T extends Entry> T get(long index) {
if (entry != null) {
// If the entry has not been cleaned by the state machine, return it. Note that the call to isClean()
// on the segment will be done in O(1) time since the search was already done in the get() call.
// We also return entries where the index is greater than the server's globalIndex (represented by
// the compactor's majorIndex) in order to ensure commands which trigger events are replicated.
if (!segment.isClean(index) || index > compactor.majorIndex()) {
if (!segment.isClean(index)) {
return entry;
}
// If the entry index is equal to or greater than the commitIndex, return it. This ensures that the last committed entry
// is always exposed so that consistency checks can be done.
else if (index >= segments.commitIndex()) {
return entry;
}
// For tombstone entries, if the entry index is greater than the majorIndex, return the entry. This ensures that entries
// are replicated until persisted on all available servers.
else if (entry.isTombstone()) {
if (index > compactor.majorIndex()) {
return entry;
}
}
// If the entry is not a tombstone, return the entry if the index is greater than minorIndex. If the entry is less than minorIndex,
// that indicates that it has been committed and events have been received for the entry, so we can safely stop replicating it.
else {
if (index > compactor.minorIndex()) {
return entry;
}
}
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

/**
Expand Down Expand Up @@ -50,19 +51,19 @@ public final class MajorCompactionManager implements CompactionManager {

@Override
public List<CompactionTask> buildTasks(Storage storage, SegmentManager segments) {
List<List<Segment>> groups = getCleanableGroups(storage, segments);
List<List<Segment>> groups = getCompactableGroups(storage, segments);
return !groups.isEmpty() ? Collections.singletonList(new MajorCompactionTask(segments, groups, compactor.majorIndex())) : Collections.emptyList();
}

/**
* Returns a list of segments lists to clean, where segments are grouped according to how they will be merged during
* cleaning.
* Returns a list of segments lists to compact, where segments are grouped according to how they will be merged during
* compaction.
*/
public List<List<Segment>> getCleanableGroups(Storage storage, SegmentManager manager) {
public List<List<Segment>> getCompactableGroups(Storage storage, SegmentManager manager) {
List<List<Segment>> clean = new ArrayList<>();
List<Segment> segments = null;
Segment previousSegment = null;
for (Segment segment : getCleanableSegments(manager)) {
for (Segment segment : getCompactableSegments(manager)) {
// If this is the first segment in a segments list, add the segment.
if (segments == null) {
segments = new ArrayList<>();
Expand Down Expand Up @@ -96,20 +97,28 @@ else if (segments.stream().mapToLong(Segment::size).sum() + segment.size() < sto
}

/**
* Returns a list of cleanable log segments.
* Returns a list of compactable log segments.
*
* @param manager The segment manager.
* @return A list of cleanable log segments.
* @return A list of compactable log segments.
*/
private List<Segment> getCleanableSegments(SegmentManager manager) {
private List<Segment> getCompactableSegments(SegmentManager manager) {
List<Segment> segments = new ArrayList<>(manager.segments().size());
Segment lastSegment = manager.lastSegment();
for (Segment segment : manager.segments()) {
if ((segment.isFull() || segment.isCompacted()) && segment.lastIndex() < compactor.minorIndex() && lastSegment.firstIndex() <= compactor.minorIndex() && !lastSegment.isEmpty()) {
Iterator<Segment> iterator = manager.segments().iterator();
Segment segment = iterator.next();
while (iterator.hasNext()) {
Segment nextSegment = iterator.next();

// Segments that have already been compacted are eligible for compaction. For uncompacted segments, the segment must be full, consist
// of entries less than the minorIndex, and a later segment with at least one committed entry must exist in the log. This ensures that
// a non-empty entry always remains at the end of the log.
if (segment.isCompacted() || (segment.isFull() && segment.lastIndex() < compactor.minorIndex() && nextSegment.firstIndex() <= manager.commitIndex() && !nextSegment.isEmpty())) {
segments.add(segment);
} else {
break;
}

segment = nextSegment;
}
return segments;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.atomix.copycat.server.storage.entry.Entry;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
Expand Down Expand Up @@ -65,7 +66,7 @@ public final class MinorCompactionManager implements CompactionManager {
@Override
public List<CompactionTask> buildTasks(Storage storage, SegmentManager segments) {
List<CompactionTask> tasks = new ArrayList<>(segments.segments().size());
for (Segment segment : getCleanableSegments(storage, segments)) {
for (Segment segment : getCompactableSegments(storage, segments)) {
tasks.add(new MinorCompactionTask(segments, segment));
}
return tasks;
Expand All @@ -76,11 +77,17 @@ public List<CompactionTask> buildTasks(Storage storage, SegmentManager segments)
*
* @return A list of compactable segments.
*/
private Iterable<Segment> getCleanableSegments(Storage storage, SegmentManager manager) {
List<Segment> segments = new ArrayList<>();
for (Segment segment : manager.segments()) {
// Only allow compaction of segments that are full.
if (segment.isCompacted() || (segment.isFull() && segment.lastIndex() < compactor.minorIndex() && manager.currentSegment().firstIndex() <= manager.commitIndex() && !manager.currentSegment().isEmpty())) {
private Iterable<Segment> getCompactableSegments(Storage storage, SegmentManager manager) {
List<Segment> segments = new ArrayList<>(manager.segments().size());
Iterator<Segment> iterator = manager.segments().iterator();
Segment segment = iterator.next();
while (iterator.hasNext()) {
Segment nextSegment = iterator.next();

// Segments that have already been compacted are eligible for compaction. For uncompacted segments, the segment must be full, consist
// of entries less than the minorIndex, and a later segment with at least one committed entry must exist in the log. This ensures that
// a non-empty entry always remains at the end of the log.
if (segment.isCompacted() || (segment.isFull() && segment.lastIndex() < compactor.minorIndex() && nextSegment.firstIndex() <= manager.commitIndex() && !nextSegment.isEmpty())) {
// Calculate the percentage of entries that have been marked for cleaning in the segment.
double cleanPercentage = segment.cleanCount() / (double) segment.count();

Expand All @@ -90,6 +97,8 @@ private Iterable<Segment> getCleanableSegments(Storage storage, SegmentManager m
segments.add(segment);
}
}

segment = nextSegment;
}
return segments;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ void setLog() throws Exception {
@AfterMethod
protected void deleteLog() {
try {
log.close();
if (log.isOpen())
log.close();
} catch (Exception ignore) {
} finally {
assertFalse(log.isOpen());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,11 @@ public void testCleanGet() {
for (int i = entriesPerSegment; i <= entriesPerSegment * 2 + 1; i++) {
assertFalse(log.segments.segment(i).isClean(i));
log.clean(i);
assertTrue(log.segments.segment(i).isClean(i));
assertNotNull(log.get(i));
}
log.compactor().majorIndex(entriesPerSegment * 2);
for (int i = entriesPerSegment; i <= entriesPerSegment * 2; i++) {
log.commit(entriesPerSegment * 2).compactor().minorIndex(entriesPerSegment * 2);
for (int i = entriesPerSegment; i < entriesPerSegment * 2; i++) {
assertNull(log.get(i));
}
}
Expand All @@ -131,10 +132,11 @@ public void testCleanGetTombstones() {
for (int i = entriesPerSegment; i <= entriesPerSegment * 2 + 1; i++) {
assertFalse(log.segments.segment(i).isClean(i));
log.clean(i);
assertTrue(log.segments.segment(i).isClean(i));
assertNotNull(log.get(i));
}
log.compactor().majorIndex(entriesPerSegment * 2);
for (int i = entriesPerSegment; i <= entriesPerSegment * 2; i++) {
log.commit(entriesPerSegment * 2).compactor().majorIndex(entriesPerSegment * 2);
for (int i = entriesPerSegment; i < entriesPerSegment * 2; i++) {
assertNull(log.get(i));
}
}
Expand Down

0 comments on commit 95e305c

Please sign in to comment.