Skip to content

Commit

Permalink
Segment Space-Efficient Index (#3533)
Browse files Browse the repository at this point in the history
This patch replaces the Segment's ConcurrentHashMap index to
a more space-efficient index. Memory overhead is reduced from
943120 to 80856 bytes, ~11x.
  • Loading branch information
Maithem committed Mar 21, 2023
1 parent 7f11d1c commit 25be252
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.corfudb.infrastructure.log;

import com.google.common.base.Preconditions;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLongArray;

/**
* A BoundedMap provides an array backed map that can map long -> long where the indices are in the range of
* (Long.MIN_VALUE, Long.MAX_VALUE - Integer.MAX_VALUE)
*/
public class BoundedMap {

private final long offset;
private final AtomicLongArray array;
public static final long NOT_SET = Long.MIN_VALUE;

public BoundedMap(long offset, int size) {
Preconditions.checkArgument(offset >= 0);
Preconditions.checkArgument(size > 0);
Preconditions.checkArgument(Math.addExact(offset, size) < Integer.MAX_VALUE);
this.offset = offset;
this.array = new AtomicLongArray(size);
for (int idx = 0; idx < array.length(); idx++) {
this.array.set(idx, NOT_SET);
}
}

private void checkRange(long num) {
if (num < offset || num >= offset + array.length()) {
throw new IllegalArgumentException(num + " not in [" + offset + ", " + (offset + array.length()) + ")");
}
}

private void checkValue(long value) {
Preconditions.checkArgument(NOT_SET != value, "invalid %s value", value);
}

private int mapIdx(long idx) {
return Math.toIntExact(idx - offset);
}

public boolean set(long idx, long value) {
checkRange(idx);
checkValue(value);
return array.compareAndSet(mapIdx(idx), NOT_SET, value);
}

public long get(long idx) {
checkRange(idx);
return array.get(mapIdx(idx));
}

public boolean contains(long idx) {
checkRange(idx);
return array.get(mapIdx(idx)) != NOT_SET;
}

public int capacity() {
return array.length();
}

/**
* Iterators are only used for initialization and therefore are not optimized. For instance, the running time
* for iterating is not a function of the number of elements set in the index, but rather the size of the index.
*/
public Iterator<Long> iterator() {
return new IndexIterator(array);
}

public Iterable<Long> iterable() {
final Iterator<Long> iter = iterator();
return () -> iter;
}

private final class IndexIterator implements Iterator<Long> {

private final AtomicLongArray array;
private int idx = 0;
private IndexIterator(AtomicLongArray array) {
this.array = array;
findNextIdx();
}

private void findNextIdx() {
while (idx < array.length() && array.get(idx) == NOT_SET) {
idx++;
}
}

@Override
public boolean hasNext() {
return idx < array.length();
}

@Override
public Long next() {
long ret = array.get(idx);
Preconditions.checkState(ret != NOT_SET);
long retVal = idx + offset;
idx++;
findNextIdx();
return retVal;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static org.corfudb.infrastructure.log.SegmentUtils.getByteBuffer;
import static org.corfudb.infrastructure.log.SegmentUtils.getLogData;
Expand Down Expand Up @@ -61,6 +60,10 @@ public class Segment {

public static final int VERSION = 2;

public static final int MAX_WRITE_SIZE = 0xfffffff;

public static final long MAX_SEGMENT_SIZE = 0x0000000fffffffffL;

final long id;

@NonNull
Expand All @@ -74,17 +77,18 @@ public class Segment {

private boolean isDirty;

private final Map<Long, AddressMetaData> knownAddresses = new ConcurrentHashMap<>();
private final Index index;

private int refCount = 0;

private final ResourceQuota logSize;

public Segment(long segmentId, Path segmentsDir, ResourceQuota logSize) {
public Segment(long segmentId, int segmentSize, Path segmentsDir, ResourceQuota logSize) {
this.id = segmentId;
this.segmentFilePath = segmentsDir + File.separator + segmentId + ".log";
this.isDirty = false;
this.logSize = logSize;
this.index = new Index(segmentId * segmentSize, segmentSize);
// Open and load a segment file, or create one if it doesn't exist.
// Once the segment address space is loaded, it should be ready to accept writes.
try {
Expand Down Expand Up @@ -146,18 +150,19 @@ private void loadSegmentIndex() throws IOException {
return;
}

knownAddresses.put(entry.getGlobalAddress(),
new AddressMetaData(metadata.getLength(),
channelOffset + METADATA_SIZE));
checkSegmentAndBufferSize(channelOffset + METADATA_SIZE, metadata.getLength());
index.put(entry.getGlobalAddress(),
channelOffset + METADATA_SIZE,
metadata.getLength());
}
}

/**
* Get a set of all the written addresses in this segment
* @return A set of sequence numbers
*/
public Set<Long> getAddresses() {
return knownAddresses.keySet();
public Iterable<Long> getAddresses() {
return index.map.iterable();
}

/**
Expand All @@ -179,7 +184,7 @@ public void flush() throws IOException {
* Check if the segment contains a particular address
*/
public boolean contains(long address) {
return knownAddresses.containsKey(address);
return index.contains(address);
}

/**
Expand All @@ -189,14 +194,14 @@ public boolean contains(long address) {
* @throws IOException
*/
public LogData read(long address) throws IOException {
AddressMetaData addressMetaData = knownAddresses.get(address);
if (addressMetaData == null) {
long value = index.getPacked(address);

if (value == BoundedMap.NOT_SET) {
return null;
}

int length = addressMetaData.length;
long offset = addressMetaData.offset;

int length = index.unpackLength(value);
long offset = index.unpackOffset(value);
try {
ByteBuffer entryBuf = ByteBuffer.allocate(length);
readChannel.read(entryBuf, offset);
Expand All @@ -211,6 +216,19 @@ public LogData read(long address) throws IOException {
}
}

/**
* Verify that the file backing this segment is less than or equal to MAX_SEGMENT_SIZE.
* These checks are required prevent corrupting the Index.
* @param offset segment file offset after writing the buffer
* @param buffSize size of the buffer to be written in bytes
*/
private void checkSegmentAndBufferSize(long offset, int buffSize) {
Preconditions.checkArgument(offset > 0);
Preconditions.checkArgument(buffSize > 0);
Preconditions.checkArgument(offset <= MAX_SEGMENT_SIZE);
Preconditions.checkArgument(buffSize <= MAX_WRITE_SIZE);
}

/**
* Write log data to this segment for a following sequence
* @param address the sequence to bind the log data to
Expand All @@ -224,13 +242,13 @@ public long write(long address, LogData logdata) throws IOException {
LogFormat.Metadata metadata = getMetadata(logEntry);

ByteBuffer buffer = getByteBuffer(metadata, logEntry);
long size = buffer.remaining();
int size = buffer.remaining();
long channelOffset;

channelOffset = writeChannel.position() + METADATA_SIZE;
checkSegmentAndBufferSize(channelOffset, size);
writeBuffer(buffer);
AddressMetaData addressMetaData = new AddressMetaData(metadata.getLength(), channelOffset);
knownAddresses.put(address, addressMetaData);
index.put(address, channelOffset, metadata.getLength());
return size;
}

Expand Down Expand Up @@ -268,12 +286,15 @@ public long write(List<LogData> entries) throws IOException {
LogFormat.Metadata metadata = metadataList.get(ind);
recordsMap.put(entries.get(ind).getGlobalAddress(),
new AddressMetaData(metadata.getLength(), channelOffset));
checkSegmentAndBufferSize(channelOffset, metadata.getLength());
}

allRecordsBuf.flip();
writeBuffer(allRecordsBuf);

knownAddresses.putAll(recordsMap);
for (Map.Entry<Long, AddressMetaData> entry : recordsMap.entrySet()) {
index.put(entry.getKey(), entry.getValue().offset, entry.getValue().length);
}

return size;
}
Expand Down Expand Up @@ -529,6 +550,55 @@ public void close() {
}
}

/**
* This index maps a sequence number to a file offset + payload length.
*
* The file offset and payload length are packed in a long (treated as unsigned) as follows:
*
* |--------64-bits--------|
* |fileOffset|payloadSize|
*
*/
@VisibleForTesting
static final class Index {
static final int highBitsNum = Long.bitCount(MAX_SEGMENT_SIZE);
static final int lowBitsNum = Integer.bitCount(MAX_WRITE_SIZE);
private final BoundedMap map;

Index(long offset, int size) {
this.map = new BoundedMap(offset, size);
Preconditions.checkArgument((highBitsNum + lowBitsNum) >> 3 == Long.BYTES);
}

long pack(long high, int low) {
return (((long) high) << lowBitsNum) | (low & MAX_WRITE_SIZE);
}

int unpackLength(long num) {
return (int) (num & MAX_WRITE_SIZE);
}

long unpackOffset(long num) {
return num >>> lowBitsNum;
}

void put(long sequenceNum, long fileOffset, int length) {
Preconditions.checkArgument(fileOffset > 0 && fileOffset <= MAX_SEGMENT_SIZE,
"invalid offset %s", fileOffset);
Preconditions.checkArgument(length > 0 && length <= MAX_WRITE_SIZE,
"invalid length %s", length);
Preconditions.checkState(map.set(sequenceNum, pack(fileOffset, length)));
}

boolean contains(long sequenceNum) {
return map.contains(sequenceNum);
}

long getPacked(long sequenceNum) {
return map.get(sequenceNum);
}
}

@VisibleForTesting
FileChannel getWriteChannel() {
return writeChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ private void trimPrefix() {
Segment getSegmentHandleForAddress(long address) {
long segmentId = address / RECORDS_PER_LOG_FILE;
Segment handle = openSegments.computeIfAbsent(segmentId,
a -> new Segment(a, logDir, logSizeQuota));
a -> new Segment(a, RECORDS_PER_LOG_FILE, logDir, logSizeQuota));
handle.retain();
return handle;
}
Expand Down

0 comments on commit 25be252

Please sign in to comment.