Skip to content

Commit

Permalink
Refactor journal descriptors to include version information.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 23, 2017
1 parent ea64f8d commit 298663c
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 130 deletions.
6 changes: 6 additions & 0 deletions storage/journal/pom.xml
Expand Up @@ -52,6 +52,12 @@
<artifactId>atomix-utils</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix-slf4j</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
Expand Up @@ -56,23 +56,24 @@
public final class JournalSegmentDescriptor implements AutoCloseable {
public static final int BYTES = 64;

// Current segment version.
private static final int VERSION = 1;

// The lengths of each field in the header.
private static final int ID_LENGTH = Bytes.LONG; // 64-bit signed integer
private static final int VERSION_LENGTH = Bytes.LONG; // 64-bit signed integer
private static final int INDEX_LENGTH = Bytes.LONG; // 64-bit signed integer
private static final int MAX_SIZE_LENGTH = Bytes.INTEGER; // 32-bit unsigned integer
private static final int VERSION_LENGTH = Bytes.INTEGER; // 32-bit signed integer
private static final int ID_LENGTH = Bytes.LONG; // 64-bit signed integer
private static final int INDEX_LENGTH = Bytes.LONG; // 64-bit signed integer
private static final int MAX_SIZE_LENGTH = Bytes.INTEGER; // 32-bit unsigned integer
private static final int MAX_ENTRIES_LENGTH = Bytes.INTEGER; // 32-bit signed integer
private static final int UPDATED_LENGTH = Bytes.LONG; // 64-bit signed integer
private static final int LOCKED_LENGTH = Bytes.BOOLEAN; // 8-bit boolean
private static final int UPDATED_LENGTH = Bytes.LONG; // 64-bit signed integer

// The positions of each field in the header.
private static final long ID_POSITION = 0; // 0
private static final long VERSION_POSITION = ID_POSITION + ID_LENGTH; // 8
private static final long INDEX_POSITION = VERSION_POSITION + VERSION_LENGTH; // 16
private static final long MAX_SIZE_POSITION = INDEX_POSITION + INDEX_LENGTH; // 24
private static final long MAX_ENTRIES_POSITION = MAX_SIZE_POSITION + MAX_SIZE_LENGTH; // 28
private static final long UPDATED_POSITION = MAX_ENTRIES_POSITION + MAX_ENTRIES_LENGTH; // 32
private static final long LOCKED_POSITION = UPDATED_POSITION + UPDATED_LENGTH; // 40
private static final long VERSION_POSITION = 0; // 0
private static final long ID_POSITION = VERSION_POSITION + VERSION_LENGTH; // 4
private static final long INDEX_POSITION = ID_POSITION + ID_LENGTH; // 12
private static final long MAX_SIZE_POSITION = INDEX_POSITION + INDEX_LENGTH; // 20
private static final long MAX_ENTRIES_POSITION = MAX_SIZE_POSITION + MAX_SIZE_LENGTH; // 24
private static final long UPDATED_POSITION = MAX_ENTRIES_POSITION + MAX_ENTRIES_LENGTH; // 28

/**
* Returns a descriptor builder.
Expand All @@ -97,8 +98,8 @@ public static Builder newBuilder(Buffer buffer) {
}

Buffer buffer;
private final int version;
private final long id;
private final long version;
private final long index;
private final long maxSegmentSize;
private final int maxEntries;
Expand All @@ -110,8 +111,8 @@ public static Builder newBuilder(Buffer buffer) {
*/
public JournalSegmentDescriptor(Buffer buffer) {
this.buffer = checkNotNull(buffer, "buffer cannot be null");
this.version = buffer.readInt();
this.id = buffer.readLong();
this.version = buffer.readLong();
this.index = buffer.readLong();
this.maxSegmentSize = buffer.readUnsignedInt();
this.maxEntries = buffer.readInt();
Expand All @@ -121,27 +122,26 @@ public JournalSegmentDescriptor(Buffer buffer) {
}

/**
* Returns the segment identifier.
* Returns the segment version.
* <p>
* The segment ID is a monotonically increasing number within each log. Segments with in-sequence identifiers should
* contain in-sequence indexes.
* Versions are monotonically increasing starting at {@code 1}.
*
* @return The segment identifier.
* @return The segment version.
*/
public long id() {
return id;
public int version() {
return version;
}

/**
* Returns the segment version.
* Returns the segment identifier.
* <p>
* Versions are monotonically increasing starting at {@code 1}. Versions will only be incremented whenever the segment
* is rewritten to another memory/disk space, e.g. after log compaction.
* The segment ID is a monotonically increasing number within each log. Segments with in-sequence identifiers should
* contain in-sequence indexes.
*
* @return The segment version.
* @return The segment identifier.
*/
public long version() {
return version;
public long id() {
return id;
}

/**
Expand Down Expand Up @@ -197,26 +197,6 @@ public void update(long timestamp) {
}
}

/**
* Returns whether the segment has been locked by commitment.
* <p>
* Segments will be locked once all entries have been committed to the segment. The lock state of each segment is used
* to determine log compaction and recovery behavior.
*
* @return Indicates whether the segment has been locked.
*/
public boolean locked() {
return locked;
}

/**
* Locks the segment.
*/
public void lock() {
buffer.writeBoolean(LOCKED_POSITION, true).flush();
locked = true;
}

/**
* Copies the segment to a new buffer.
*/
Expand Down Expand Up @@ -253,11 +233,10 @@ public void delete() {
@Override
public String toString() {
return toStringHelper(this)
.add("id", id)
.add("version", version)
.add("id", id)
.add("index", index)
.add("updated", updated)
.add("locked", locked)
.toString();
}

Expand All @@ -268,7 +247,8 @@ public static class Builder {
private final Buffer buffer;

private Builder(Buffer buffer) {
this.buffer = checkNotNull(buffer, "buffer cannot be null");
this.buffer = checkNotNull(buffer, "buffer cannot be null")
.writeLong(VERSION_POSITION, VERSION);
}

/**
Expand All @@ -278,18 +258,7 @@ private Builder(Buffer buffer) {
* @return The segment descriptor builder.
*/
public Builder withId(long id) {
buffer.writeLong(0, id);
return this;
}

/**
* Sets the segment version.
*
* @param version The segment version.
* @return The segment descriptor builder.
*/
public Builder withVersion(long version) {
buffer.writeLong(8, version);
buffer.writeLong(ID_POSITION, id);
return this;
}

Expand All @@ -300,7 +269,7 @@ public Builder withVersion(long version) {
* @return The segment descriptor builder.
*/
public Builder withIndex(long index) {
buffer.writeLong(16, index);
buffer.writeLong(INDEX_POSITION, index);
return this;
}

Expand All @@ -311,7 +280,7 @@ public Builder withIndex(long index) {
* @return The segment descriptor builder.
*/
public Builder withMaxSegmentSize(long maxSegmentSize) {
buffer.writeUnsignedInt(24, maxSegmentSize);
buffer.writeUnsignedInt(MAX_SIZE_POSITION, maxSegmentSize);
return this;
}

Expand All @@ -322,7 +291,7 @@ public Builder withMaxSegmentSize(long maxSegmentSize) {
* @return The segment descriptor builder.
*/
public Builder withMaxEntries(int maxEntries) {
buffer.writeInt(28, maxEntries);
buffer.writeInt(MAX_ENTRIES_POSITION, maxEntries);
return this;
}

Expand All @@ -332,9 +301,7 @@ public Builder withMaxEntries(int maxEntries) {
* @return The built segment descriptor.
*/
public JournalSegmentDescriptor build() {
return new JournalSegmentDescriptor(buffer.writeLong(32, 0).rewind());
return new JournalSegmentDescriptor(buffer.rewind());
}

}

}
Expand Up @@ -198,17 +198,13 @@ private void open() {
} else {
JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.newBuilder()
.withId(1)
.withVersion(1)
.withIndex(1)
.withMaxSegmentSize(maxSegmentSize)
.withMaxEntries(maxEntriesPerSegment)
.build();

descriptor.lock();

currentSegment = createSegment(descriptor);
currentSegment.descriptor().update(System.currentTimeMillis());
currentSegment.descriptor().lock();

segments.put(1L, currentSegment);
}
Expand All @@ -233,12 +229,10 @@ private synchronized void resetCurrentSegment() {
} else {
JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.newBuilder()
.withId(1)
.withVersion(1)
.withIndex(1)
.withMaxSegmentSize(maxSegmentSize)
.withMaxEntries(maxEntriesPerSegment)
.build();
descriptor.lock();

currentSegment = createSegment(descriptor);

Expand Down Expand Up @@ -290,12 +284,10 @@ synchronized JournalSegment<E> getNextSegment() {
JournalSegment lastSegment = getLastSegment();
JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.newBuilder()
.withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1)
.withVersion(1)
.withIndex(currentSegment.lastIndex() + 1)
.withMaxSegmentSize(maxSegmentSize)
.withMaxEntries(maxEntriesPerSegment)
.build();
descriptor.lock();

currentSegment = createSegment(descriptor);

Expand Down Expand Up @@ -482,68 +474,57 @@ protected Collection<JournalSegment> loadSegments() {
JournalSegmentFile segmentFile = new JournalSegmentFile(file);
JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(FileBuffer.allocate(file, JournalSegmentDescriptor.BYTES));

// Valid segments will have been locked. Segments that resulting from failures during log cleaning will be
// unlocked and should ultimately be deleted from disk.
if (descriptor.locked()) {

// Load the segment.
JournalSegment segment = loadSegment(descriptor.id(), descriptor.version());

// If a segment with an equal or lower index has already been loaded, ensure this segment is not superseded
// by the earlier segment. This can occur due to segments being combined during log compaction.
Map.Entry<Long, JournalSegment> previousEntry = segments.floorEntry(segment.index());
if (previousEntry != null) {

// If an existing descriptor exists with a lower index than this segment's first index, check to determine
// whether this segment's first index is contained in that existing index. If it is, determine which segment
// should take precedence based on segment versions.
JournalSegment previousSegment = previousEntry.getValue();

// If the two segments start at the same index, the segment with the higher version number is used.
if (previousSegment.index() == segment.index()) {
if (segment.descriptor().version() > previousSegment.descriptor().version()) {
LOGGER.debug("Replaced segment {} with newer version: {} ({})", previousSegment.descriptor().id(), segment.descriptor().version(), segmentFile.file().getName());
segments.remove(previousEntry.getKey());
previousSegment.close();
previousSegment.delete();
} else {
segment.close();
segment.delete();
continue;
}
}
// If the existing segment's entries overlap with the loaded segment's entries, the existing segment always
// supersedes the loaded segment. Log compaction processes ensure this is always the case.
else if (previousSegment.index() + previousSegment.length() > segment.index()) {
// Load the segment.
JournalSegment segment = loadSegment(descriptor.id(), descriptor.version());

// If a segment with an equal or lower index has already been loaded, ensure this segment is not superseded
// by the earlier segment. This can occur due to segments being combined during log compaction.
Map.Entry<Long, JournalSegment> previousEntry = segments.floorEntry(segment.index());
if (previousEntry != null) {

// If an existing descriptor exists with a lower index than this segment's first index, check to determine
// whether this segment's first index is contained in that existing index. If it is, determine which segment
// should take precedence based on segment versions.
JournalSegment previousSegment = previousEntry.getValue();

// If the two segments start at the same index, the segment with the higher version number is used.
if (previousSegment.index() == segment.index()) {
if (segment.descriptor().version() > previousSegment.descriptor().version()) {
LOGGER.debug("Replaced segment {} with newer version: {} ({})", previousSegment.descriptor().id(), segment.descriptor().version(), segmentFile.file().getName());
segments.remove(previousEntry.getKey());
previousSegment.close();
previousSegment.delete();
} else {
segment.close();
segment.delete();
continue;
}
}

// Add the segment to the segments list.
LOGGER.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
segments.put(segment.index(), segment);

// Ensure any segments later in the log with which this segment overlaps are removed.
Map.Entry<Long, JournalSegment> nextEntry = segments.higherEntry(segment.index());
while (nextEntry != null) {
if (nextEntry.getValue().index() < segment.index() + segment.length()) {
segments.remove(nextEntry.getKey());
nextEntry = segments.higherEntry(segment.index());
} else {
break;
}
// If the existing segment's entries overlap with the loaded segment's entries, the existing segment always
// supersedes the loaded segment. Log compaction processes ensure this is always the case.
else if (previousSegment.index() + previousSegment.length() > segment.index()) {
segment.close();
segment.delete();
continue;
}

descriptor.close();
}
// If the segment descriptor wasn't locked, close and delete the descriptor.
else {
LOGGER.debug("Deleting unlocked segment: {}-{} ({})", descriptor.id(), descriptor.version(), segmentFile.file().getName());
descriptor.close();
descriptor.delete();

// Add the segment to the segments list.
LOGGER.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
segments.put(segment.index(), segment);

// Ensure any segments later in the log with which this segment overlaps are removed.
Map.Entry<Long, JournalSegment> nextEntry = segments.higherEntry(segment.index());
while (nextEntry != null) {
if (nextEntry.getValue().index() < segment.index() + segment.length()) {
segments.remove(nextEntry.getKey());
nextEntry = segments.higherEntry(segment.index());
} else {
break;
}
}

descriptor.close();
}
}

Expand Down

0 comments on commit 298663c

Please sign in to comment.