Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5319: Optimize virtual node cache flush strategy #5568

Merged
merged 14 commits into from May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -55,6 +55,11 @@ public int getSerializedSize() {
return VARIABLE_DATA_SIZE;
}

@Override
public int getTypicalSerializedSize() {
return 1024; // guesstimation
}

@Override
public int serialize(final SmartContractByteCodeMapValue data, final ByteBuffer buffer) throws IOException {
final int size = data.getSize();
Expand Down
Expand Up @@ -31,6 +31,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -201,8 +203,12 @@ public void transferPrefetch() throws Exception {
final long[] map = new long[verify ? maxKey : 0];
VirtualMap<BenchmarkKey, BenchmarkValue> virtualMap = createMap(map);

final ExecutorService prefetchPool =
Executors.newCachedThreadPool(new ThreadConfiguration(getStaticThreadManager())
// Use a custom queue and executor for warmups. It may happen that some warmup jobs
// aren't complete by the end of the round, so they will start piling up. To fix it,
// clear the queue in the end of each round
final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
jsync-swirlds marked this conversation as resolved.
Show resolved Hide resolved
final ExecutorService prefetchPool = new ThreadPoolExecutor(numThreads, numThreads, 1, TimeUnit.SECONDS, queue,
new ThreadConfiguration(getStaticThreadManager())
.setComponent("benchmark")
.setThreadName("prefetch")
.setExceptionHandler((t, ex) -> logger.error("Uncaught exception during prefetching", ex))
Expand All @@ -212,19 +218,25 @@ public void transferPrefetch() throws Exception {

initializeFixedAccounts(virtualMap);

final List<VirtualMap<BenchmarkKey, BenchmarkValue>> toRelease = new ArrayList<>();

long prevTime = System.currentTimeMillis();
for (int i = 1; i <= numFiles; ++i) {
toRelease.add(virtualMap);

// Generate a new set of unique random keys
final Integer[] keys = generateKeySet();

// Warm keys in parallel asynchronously
final VirtualMap<BenchmarkKey, BenchmarkValue> currentMap = virtualMap;
for (int thread = 0; thread < numThreads; ++thread) {
final int idx = thread;
for (int j = 0; j < keys.length; j += KEYS_PER_RECORD) {
artemananiev marked this conversation as resolved.
Show resolved Hide resolved
final int key = j;
prefetchPool.execute(() -> {
for (int j = idx * KEYS_PER_RECORD; j < keys.length; j += numThreads * KEYS_PER_RECORD) {
currentMap.warm(new BenchmarkKey(keys[j]));
currentMap.warm(new BenchmarkKey(keys[j + 1]));
try {
currentMap.warm(new BenchmarkKey(keys[key]));
currentMap.warm(new BenchmarkKey(keys[key + 1]));
} catch (final Exception e) {
logger.error("Warmup exception", e);
artemananiev marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
Expand Down Expand Up @@ -267,14 +279,45 @@ public void transferPrefetch() throws Exception {
}
}

virtualMap = copyMap(virtualMap);
queue.clear();

// Add some chaos. Don't release the map on copy
virtualMap = copyMap(virtualMap, false);
// Either release a random previous copy, or two copies, or none
int randomInt = Utils.randomInt(100);
final VirtualMap<BenchmarkKey, BenchmarkValue> mapToRelease1;
final VirtualMap<BenchmarkKey, BenchmarkValue> mapToRelease2;
if (randomInt < 60) {
final int indexToRelease = Utils.randomInt(toRelease.size());
mapToRelease1 = toRelease.remove(indexToRelease);
mapToRelease2 = null;
} else if (randomInt < 80) {
final int indexToRelease = Utils.randomInt(toRelease.size());
mapToRelease1 = toRelease.remove(indexToRelease);
mapToRelease2 = (toRelease.size() > 0) ? toRelease.remove(0) : null;
} else {
mapToRelease1 = null;
mapToRelease2 = null;
}
if (mapToRelease1 != null) {
mapToRelease1.release();
}
if (mapToRelease2 != null) {
mapToRelease2.release();
}

// Report TPS
final long curTime = System.currentTimeMillis();
updateTPS(i, curTime - prevTime);
prevTime = curTime;
}

logger.debug("Maps left to release: " + toRelease.size());
// Now release the remaining copies
for (VirtualMap<BenchmarkKey, BenchmarkValue> m : toRelease) {
m.release();
}

// Ensure the map is done with hashing/merging/flushing
final VirtualMap<BenchmarkKey, BenchmarkValue> finalMap = flushMap(virtualMap);

Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.swirlds.common.io.streams.SerializableDataOutputStream;
import com.swirlds.common.threading.framework.config.ThreadConfiguration;
import com.swirlds.virtualmap.VirtualMap;
import com.swirlds.virtualmap.internal.merkle.VirtualRootNode;
import com.swirlds.virtualmap.internal.pipeline.VirtualRoot;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -122,13 +123,18 @@ protected VirtualMap<BenchmarkKey, BenchmarkValue> createMap(final long[] map) {
}
}

private void enableSnapshots() {
protected void enableSnapshots() {
snapshotTime.set(System.currentTimeMillis() + SNAPSHOT_DELAY);
doSnapshots = true;
}

protected VirtualMap<BenchmarkKey, BenchmarkValue> copyMap(
final VirtualMap<BenchmarkKey, BenchmarkValue> virtualMap) {
return copyMap(virtualMap, true);
}

protected VirtualMap<BenchmarkKey, BenchmarkValue> copyMap(
final VirtualMap<BenchmarkKey, BenchmarkValue> virtualMap, final boolean release) {
final VirtualRoot root = virtualMap.getRight();
final VirtualMap<BenchmarkKey, BenchmarkValue> newCopy = virtualMap.copy();
hasher.execute(root::getHash);
Expand All @@ -146,7 +152,9 @@ protected VirtualMap<BenchmarkKey, BenchmarkValue> copyMap(
new SerializableDataOutputStream(Files.newOutputStream(savedDir.resolve(SERDE)))) {
virtualMap.serialize(out, savedDir);
}
virtualMap.release();
if (release) {
virtualMap.release();
}
Utils.deleteRecursively(savedDir);

snapshotTime.set(System.currentTimeMillis() + SNAPSHOT_DELAY);
Expand All @@ -156,7 +164,9 @@ protected VirtualMap<BenchmarkKey, BenchmarkValue> copyMap(
})
.start();
} else {
virtualMap.release();
if (release) {
virtualMap.release();
}
}

return newCopy;
Expand All @@ -169,20 +179,16 @@ protected VirtualMap<BenchmarkKey, BenchmarkValue> flushMap(
final VirtualMap<BenchmarkKey, BenchmarkValue> virtualMap) {
final long start = System.currentTimeMillis();
VirtualMap<BenchmarkKey, BenchmarkValue> curMap = virtualMap;
for (; ; ) {
final VirtualMap<BenchmarkKey, BenchmarkValue> oldCopy = curMap;
curMap = curMap.copy();
oldCopy.release();
final VirtualRoot root = oldCopy.getRight();
if (root.shouldBeFlushed()) {
try {
root.waitUntilFlushed();
} catch (InterruptedException ex) {
logger.warn("Interrupted", ex);
Thread.currentThread().interrupt();
}
break;
}
final VirtualMap<BenchmarkKey, BenchmarkValue> oldCopy = curMap;
curMap = curMap.copy();
oldCopy.release();
final VirtualRootNode<BenchmarkKey, BenchmarkValue> root = oldCopy.getRight();
root.enableFlush();
try {
root.waitUntilFlushed();
} catch (InterruptedException ex) {
logger.warn("Interrupted", ex);
Thread.currentThread().interrupt();
}
logger.info("Flushed map in {} ms", System.currentTimeMillis() - start);

Expand Down
Expand Up @@ -23,6 +23,7 @@
import static com.swirlds.logging.LogMarker.EXCEPTION;
import static com.swirlds.logging.LogMarker.JASPER_DB;

import com.swirlds.common.crypto.DigestType;
import com.swirlds.common.crypto.Hash;
import com.swirlds.common.metrics.FunctionGauge;
import com.swirlds.common.metrics.Metrics;
Expand Down Expand Up @@ -606,6 +607,12 @@ public long getLastLeafPath() {
return this.validLeafPathRange.getMaxValidKey();
}

@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves, final long deletedLeaves) {
return dirtyInternals * (Long.BYTES + DigestType.SHA_384.digestLength()) +
dirtyLeaves * pathToHashKeyValue.getSerializer().getTypicalSerializedSize();
}

/**
* Save a batch of data to data store.
* <p>
Expand Down
Expand Up @@ -224,6 +224,10 @@ protected DataFileCollection(
}
}

DataItemSerializer<D> getDataItemSerializer() {
return dataItemSerializer;
}

/**
* Get the valid range of keys for data items currently stored by this data file collection. Any data items with
* keys below this can be deleted during a merge.
Expand Down
Expand Up @@ -133,6 +133,10 @@ public MemoryIndexDiskKeyValueStore(
storeDir, storeName, legacyStoreName, dataItemSerializer, combinedLoadedDataCallback);
}

public DataItemSerializer<D> getSerializer() {
return fileCollection.getDataItemSerializer();
}

/**
* Merge all files that match the given filter
*
Expand Down
Expand Up @@ -24,6 +24,7 @@
import static com.swirlds.logging.LogMarker.MERKLE_DB;
import static com.swirlds.merkledb.KeyRange.INVALID_KEY_RANGE;

import com.swirlds.common.crypto.DigestType;
import com.swirlds.common.crypto.Hash;
import com.swirlds.common.metrics.FunctionGauge;
import com.swirlds.common.metrics.Metrics;
Expand Down Expand Up @@ -951,6 +952,19 @@ public void snapshot(final Path snapshotDirectory) throws IOException, IllegalSt
}
}

@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves, final long deletedLeaves) {
// Deleted leaves count is ignored, as deleted leaves aren't flushed to data source
final long estimatedSize =
dirtyInternals * (Long.BYTES + DigestType.SHA_384.digestLength()) + // path and hash
dirtyLeaves * (Long.BYTES + // path
DigestType.SHA_384.digestLength() + // hash
tableConfig.getKeySerializer().getTypicalSerializedSize() + // key
tableConfig.getValueSerializer().getTypicalSerializedSize()); // value
logger.debug(MERKLE_DB.getMarker(), "Estimated flush size {}", estimatedSize);
return estimatedSize;
}

/** toString for debugging */
@Override
public String toString() {
Expand Down Expand Up @@ -1154,7 +1168,7 @@ private void shutdownThreadsAndWait(final ExecutorService... executors) throws I
} catch (final InterruptedException e) {
logger.warn(EXCEPTION.getMarker(), "[{}] Interrupted while waiting on executors to shutdown", tableName, e);
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting for merge to finish.", e);
throw new IOException("Interrupted while waiting for shutdown to finish.", e);
}
}

Expand Down
Expand Up @@ -186,7 +186,7 @@ public HalfDiskHashMap(
} else {
logger.error(
EXCEPTION.getMarker(),
"Loading existing set of data files but now metadata file was found in [{}]",
"Loading existing set of data files but no metadata file was found in [{}]",
storeDir.toAbsolutePath());
throw new IOException("Can not load an existing HalfDiskHashMap from ["
+ storeDir.toAbsolutePath()
Expand Down
Expand Up @@ -43,6 +43,18 @@ default boolean isVariableSize() {
*/
int getSerializedSize();

/**
* For variable sized data items get the typical number of bytes an item takes when serialized.
*
* @return Either for fixed size same as getSerializedSize() or an estimated typical size
*/
default int getTypicalSerializedSize() {
if (isVariableSize()) {
throw new IllegalStateException("Variable sized implementations have to override this method");
}
return getSerializedSize();
}

/**
* Serialize a data item including header to the byte buffer returning the size of the data
* written. Serialization format must be identical to {@link #deserialize(ByteBuffer, long)}.
Expand Down
Expand Up @@ -54,19 +54,6 @@ default KeyIndexType getIndexType() {
return getSerializedSize() == Long.BYTES ? KeyIndexType.SEQUENTIAL_INCREMENTING_LONGS : KeyIndexType.GENERIC;
}

/**
* For variable sized keys get the typical number of bytes a key takes when serialized.
*
* @return Either for fixed size same as getSerializedSize() or an estimated typical size for
* keys
*/
default int getTypicalSerializedSize() {
if (isVariableSize()) {
throw new IllegalStateException("Variable sized implementations have to override this method");
}
return getSerializedSize();
}

/**
* Deserialize key size from the given byte buffer
*
Expand Down
Expand Up @@ -84,7 +84,7 @@ public void closeFlushTest() throws Exception {
while (true) {
copy = map.copy();
root = map.getRight();
if (root.shouldBeFlushed()) {
if (root.requestedToFlush()) {
final VirtualMap<VirtualLongKey, ExampleByteArrayVirtualValue> finalCopy = copy;
new Thread(() -> {
try {
Expand Down Expand Up @@ -204,6 +204,11 @@ public void registerMetrics(final Metrics metrics) {
public VirtualKeySet<K> buildKeySet() {
return delegate.buildKeySet();
}

@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves, final long deletedLeaves) {
return delegate.estimatedSize(dirtyInternals, dirtyLeaves, deletedLeaves);
}
};
}
}
Expand Down
Expand Up @@ -279,14 +279,6 @@ public boolean equals(ByteBuffer buffer, int dataVersion, ExampleLongKeyVariable
return value == keyToCompare.getKeyAsLong();
}

/** {@inheritDoc} */
@Override
public void serialize(final SerializableDataOutputStream out) throws IOException {}

/** {@inheritDoc} */
@Override
public void deserialize(final SerializableDataInputStream in, final int version) throws IOException {}

/** {@inheritDoc} */
@Override
public int hashCode() {
Expand Down
Expand Up @@ -280,7 +280,7 @@ public ExampleLongLongKeyVariableSize deserialize(final ByteBuffer buffer, final

/**
* Serialize a data item including header to the byte buffer returning the size of the data
* written
* written.
*
* @param data The data item to serialize
* @param buffer Byte buffer to write to
Expand All @@ -292,14 +292,6 @@ public int serialize(final ExampleLongLongKeyVariableSize data, final ByteBuffer
return 2 + computeNonZeroBytes(data.value1) + computeNonZeroBytes(data.value2);
}

/** {@inheritDoc} */
@Override
public void serialize(final SerializableDataOutputStream out) throws IOException {}

/** {@inheritDoc} */
@Override
public void deserialize(final SerializableDataInputStream in, final int version) throws IOException {}

/**
* Compare keyToCompare's data to that contained in the given ByteBuffer. The data in the
* buffer is assumed to be starting at the current buffer position and in the format written
Expand Down
Expand Up @@ -297,7 +297,7 @@ void serializeUnflushedData(final int count) throws IOException, InterruptedExce

@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3, 4, 5, 10, 100, 1000, 1023, 1024, 1025})
@DisplayName("Serialize Unflushed Data")
@DisplayName("Serialize Only Flushed Data")
void serializeOnlyFlushedData(final int count) throws InterruptedException, IOException {
final long seed = new Random().nextLong();
System.out.println("seed = " + seed);
Expand Down