Skip to content

Commit

Permalink
Add cardinality estimation for predicting disk consumption of active …
Browse files Browse the repository at this point in the history
…log entries for efficient log compaction.
  • Loading branch information
kuujo committed May 1, 2015
1 parent 4799c5f commit f8b9882
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 31 deletions.
9 changes: 9 additions & 0 deletions raft/pom.xml
Expand Up @@ -26,6 +26,10 @@
<artifactId>copycat-raft</artifactId> <artifactId>copycat-raft</artifactId>
<name>Copycat Raft</name> <name>Copycat Raft</name>


<properties>
<stream.version>2.8.0</stream.version>
</properties>

<dependencies> <dependencies>
<dependency> <dependency>
<groupId>net.kuujo.copycat</groupId> <groupId>net.kuujo.copycat</groupId>
Expand All @@ -38,6 +42,11 @@
<version>${project.version}</version> <version>${project.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>${stream.version}</version>
</dependency>
</dependencies> </dependencies>


<build> <build>
Expand Down
Expand Up @@ -434,16 +434,19 @@ public void recycle(long index) {
} }


/** /**
* Compacts the log. * Compacts the log in a background thread.
* <p>
* This method will force compaction to take place <em>in the current thread</em>. Therefore, it may block the calling
* thread for some period of time while compaction takes place.
*
*/ */
public void compact() { public void compact() {
segments.compact(); segments.compact();
} }


/**
* Compacts the log in the current thread.
*/
void compactNow() {
segments.compactNow();
}

@Override @Override
public void flush() { public void flush() {
segments.currentSegment().flush(); segments.currentSegment().flush();
Expand Down
105 changes: 105 additions & 0 deletions raft/src/main/java/net/kuujo/copycat/protocol/raft/storage/KeySet.java
@@ -0,0 +1,105 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.protocol.raft.storage;

import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
import net.kuujo.copycat.io.Buffer;
import net.kuujo.copycat.io.util.HashFunction;
import net.kuujo.copycat.io.util.Murmur3HashFunction;

import java.io.IOException;

/**
* Segment key set.
* <p>
* The key set tracks the total number of unique keys within a given segment using a memory efficient estimator algorithm.
* Specifically, this class uses AddThis's HyperLogLog++ implementation.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class KeySet {
private final HashFunction hash = new Murmur3HashFunction();
private final ICardinality cardinality;

KeySet() {
cardinality = new HyperLogLogPlus(10, 14);
}

public KeySet(byte[] bytes) {
try {
cardinality = HyperLogLogPlus.Builder.build(bytes);
} catch (IOException e) {
throw new StorageException("failed to deserialize cardinality estimator");
}
}

private KeySet(ICardinality cardinality) {
this.cardinality = cardinality;
}

/**
* Returns the cardinality of the set.
*
* @return The cardinality of the set.
*/
public long cardinality() {
return cardinality.cardinality();
}

/**
* Adds a key to the set.
*
* @param key The key to add.
*/
public void add(Buffer key) {
cardinality.offerHashed(hash.hash64(key));
}

/**
* Merges two key sets together.
*
* @param keySet The key set to merge in to this key set.
* @return The merged key set.
*/
public KeySet merge(KeySet keySet) {
try {
return new KeySet(cardinality.merge(keySet.cardinality));
} catch (CardinalityMergeException e) {
throw new StorageException("failed to merge cardinality estimators", e);
}
}

/**
* Serializes the key set.
*
* @return The serialized key set.
*/
public byte[] getBytes() {
try {
return cardinality.getBytes();
} catch (IOException e) {
throw new StorageException("failed to serialize cardinality estimator", e);
}
}

@Override
public String toString() {
return String.format("KeySet[cardinality=%d]", cardinality.cardinality());
}

}
Expand Up @@ -60,6 +60,7 @@ public static Segment open(Buffer buffer, SegmentDescriptor descriptor, OffsetIn
private final Buffer writeBuffer; private final Buffer writeBuffer;
private final Buffer readBuffer; private final Buffer readBuffer;
private final OffsetIndex offsetIndex; private final OffsetIndex offsetIndex;
private KeySet keys;
private final RaftEntryPool entryPool = new CommittingRaftEntryPool(); private final RaftEntryPool entryPool = new CommittingRaftEntryPool();
private long commitIndex = 0; private long commitIndex = 0;
private long recycleIndex = 0; private long recycleIndex = 0;
Expand All @@ -83,7 +84,15 @@ public static Segment open(Buffer buffer, SegmentDescriptor descriptor, OffsetIn
this.offsetIndex = offsetIndex; this.offsetIndex = offsetIndex;
if (offsetIndex.size() > 0) { if (offsetIndex.size() > 0) {
firstLastOffset = offsetIndex.lastOffset(); firstLastOffset = offsetIndex.lastOffset();
writeBuffer.position(offsetIndex.position(firstLastOffset)); writeBuffer.position(offsetIndex.position(firstLastOffset) + offsetIndex.length(firstLastOffset));
}

// If the descriptor is locked then that indicates that this segment is immutable. Set the commit index to the last
// index in the segment.
if (descriptor.locked()) {
commitIndex = lastIndex();
} else {
keys = new KeySet();
} }
} }


Expand All @@ -96,6 +105,27 @@ public SegmentDescriptor descriptor() {
return descriptor; return descriptor;
} }


/**
* Returns the cardinality of the keys in the segment.
*
* @return The cardinality of the keys in the segment.
*/
public KeySet keys() {
if (keys != null) {
return keys;
} else if (!descriptor.locked()) {
throw new IllegalStateException("unlocked segment contains no key set");
}

int lastOffset = offsetIndex.lastOffset();
long lastPosition = offsetIndex.position(lastOffset);
int lastLength = offsetIndex.length(lastOffset);
long keysPosition = lastPosition + lastLength;
byte[] bytes = new byte[readBuffer.readInt(keysPosition)];
readBuffer.read(keysPosition + 4, bytes, 0, bytes.length);
return new KeySet(bytes);
}

/** /**
* Returns a boolean value indicating whether the segment is open. * Returns a boolean value indicating whether the segment is open.
* *
Expand Down Expand Up @@ -454,11 +484,31 @@ public Segment truncate(long index) {
public Segment commit(long index) { public Segment commit(long index) {
checkRange(index); checkRange(index);


if (index > commitIndex) { if (!descriptor.locked() && index > commitIndex) {
writeBuffer.flush(); writeBuffer.flush();
offsetIndex.flush(); offsetIndex.flush();
commitIndex = index;
for (long i = Math.max(commitIndex + 1, descriptor.index()); i <= index; i++) {
int offset = offset(i);
long position = offsetIndex.position(offset);
if (position != -1) {
int keySize = readBuffer.readShort(keyLengthPosition(position));
try (Buffer key = readBuffer.slice(keyPosition(position), keySize)) {
keys.add(key);
}
}
commitIndex = index;
}

if (offset(commitIndex) == descriptor.range() - 1) { if (offset(commitIndex) == descriptor.range() - 1) {
try {
byte[] bytes = keys.getBytes();
writeBuffer.writeInt(bytes.length).write(bytes);
} finally {
writeBuffer.flush();
keys = null;
}

descriptor.update(System.currentTimeMillis()); descriptor.update(System.currentTimeMillis());
descriptor.lock(); descriptor.lock();
} }
Expand Down
Expand Up @@ -227,7 +227,7 @@ public Segment createSegment(long segmentId, long segmentIndex, long segmentVers
int indexBytes = OffsetIndex.bytes(descriptor.entries()); int indexBytes = OffsetIndex.bytes(descriptor.entries());
OffsetIndex index = new OffsetIndex(((FileBuffer) buffer.skip(SegmentDescriptor.BYTES)).map(indexBytes), descriptor.entries()); OffsetIndex index = new OffsetIndex(((FileBuffer) buffer.skip(SegmentDescriptor.BYTES)).map(indexBytes), descriptor.entries());
Segment segment = Segment.open(buffer.position(SegmentDescriptor.BYTES + indexBytes).slice(), descriptor, index); Segment segment = Segment.open(buffer.position(SegmentDescriptor.BYTES + indexBytes).slice(), descriptor, index);
LOGGER.debug("Created persistent segment: {} ({})", descriptor.id(), file.getName()); LOGGER.debug("Created segment: {} ({})", descriptor.id(), file.getName());
return segment; return segment;
} }
} }
Expand Down Expand Up @@ -415,6 +415,13 @@ public void compact() {
compactor.execute(); compactor.execute();
} }


/**
* Compacts the segments in the foreground thread.
*/
void compactNow() {
compactor.run();
}

@Override @Override
public void close() { public void close() {
segments.values().forEach(s -> { segments.values().forEach(s -> {
Expand Down
Expand Up @@ -20,6 +20,7 @@
import net.kuujo.copycat.protocol.raft.storage.RaftEntry; import net.kuujo.copycat.protocol.raft.storage.RaftEntry;
import net.kuujo.copycat.protocol.raft.storage.Segment; import net.kuujo.copycat.protocol.raft.storage.Segment;
import net.kuujo.copycat.protocol.raft.storage.SegmentManager; import net.kuujo.copycat.protocol.raft.storage.SegmentManager;
import net.kuujo.copycat.protocol.raft.storage.StorageConfig;
import org.slf4j.Logger; import org.slf4j.Logger;


import java.util.ArrayList; import java.util.ArrayList;
Expand All @@ -33,6 +34,7 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public abstract class AbstractCompactionStrategy implements CompactionStrategy { public abstract class AbstractCompactionStrategy implements CompactionStrategy {
protected StorageConfig config;


/** /**
* Returns the compaction strategy logger. * Returns the compaction strategy logger.
Expand All @@ -54,6 +56,8 @@ private List<Segment> sortSegments(List<Segment> segments) {


@Override @Override
public void compact(SegmentManager manager) { public void compact(SegmentManager manager) {
config = manager.config();

// Select a list of segments to compact. // Select a list of segments to compact.
List<List<Segment>> allSegments = selectSegments(manager.segments().stream().filter(Segment::isLocked).collect(Collectors.toList())); List<List<Segment>> allSegments = selectSegments(manager.segments().stream().filter(Segment::isLocked).collect(Collectors.toList()));


Expand Down Expand Up @@ -82,8 +86,9 @@ private void compactSegments(List<Segment> segments, SegmentManager manager) {
for (Segment segment : segments) { for (Segment segment : segments) {


// Because segments are not thread safe, we need to create a temporary segment with a new file descriptor in // Because segments are not thread safe, we need to create a temporary segment with a new file descriptor in
// order to read entries from the segment. There's no risk of a race condition here since we're only compacting // order to read entries from the segment. There's no risk of a race condition here as long as the compaction
// segments in which all entries have been committed. // process remains single-threaded since we're only compacting immutable segments in which all entries have been
// committed.
Segment temp = manager.loadSegment(segment.descriptor().id(), segment.descriptor().version()); Segment temp = manager.loadSegment(segment.descriptor().id(), segment.descriptor().version());


KeyTable keyTable = new KeyTable(temp.descriptor().entries()); KeyTable keyTable = new KeyTable(temp.descriptor().entries());
Expand Down
@@ -0,0 +1,72 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.protocol.raft.storage.compact;

import net.kuujo.copycat.protocol.raft.storage.KeySet;
import net.kuujo.copycat.protocol.raft.storage.Segment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* Cardinality based compaction strategy.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class KeyCountingCompactionStrategy extends LeveledCompactionStrategy {
private static final Logger LOGGER = LoggerFactory.getLogger(KeyCountingCompactionStrategy.class);

@Override
protected Logger logger() {
return LOGGER;
}

@Override
protected List<List<Segment>> selectSegments(List<Segment> segments) {
return super.selectSegments(segments).stream()
.map(l -> calculateCompactSegments(l, config.getEntriesPerSegment()))
.collect(Collectors.toList());
}

/**
* Calculates the set of segments to compact for the given level segments.
*/
private List<Segment> calculateCompactSegments(List<Segment> segments, int entriesPerSegment) {
// Determine the set of segments that can be compacted from the level by estimating the cardinality of the oldest
// set of segments up to the maximum number of entries allowed in a segment.
List<Segment> compactSegments = new ArrayList<>();
KeySet keys = null;
for (Segment segment : segments) {
if (keys == null) {
keys = segment.keys();
compactSegments.add(segment);
} else {
keys = keys.merge(segment.keys());

if (keys.cardinality() > entriesPerSegment) {
return compactSegments;
} else {
compactSegments.add(segment);
}
}
}
return compactSegments;
}

}
Expand Up @@ -19,7 +19,7 @@
import net.kuujo.copycat.io.Bytes; import net.kuujo.copycat.io.Bytes;
import net.kuujo.copycat.io.NativeBytes; import net.kuujo.copycat.io.NativeBytes;
import net.kuujo.copycat.io.util.HashFunction; import net.kuujo.copycat.io.util.HashFunction;
import net.kuujo.copycat.io.util.HashFunctions; import net.kuujo.copycat.io.util.Murmur3HashFunction;


/** /**
* Log key lookup table. * Log key lookup table.
Expand All @@ -45,7 +45,7 @@ public class KeyTable implements AutoCloseable {
private int size; private int size;


public KeyTable(int entries) { public KeyTable(int entries) {
this(entries, HashFunctions.CITYHASH); this(entries, new Murmur3HashFunction());
} }


public KeyTable(int entries, HashFunction hashFunction) { public KeyTable(int entries, HashFunction hashFunction) {
Expand Down

0 comments on commit f8b9882

Please sign in to comment.