Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5319: Optimize virtual node cache flush strategy #5568

Merged
merged 14 commits into from May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -55,6 +55,11 @@ public int getSerializedSize() {
return VARIABLE_DATA_SIZE;
}

@Override
public int getTypicalSerializedSize() {
return 1024; // guesstimation
}

@Override
public int serialize(final SmartContractByteCodeMapValue data, final ByteBuffer buffer) throws IOException {
final int size = data.getSize();
Expand Down
Expand Up @@ -31,6 +31,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down Expand Up @@ -201,8 +203,17 @@ public void transferPrefetch() throws Exception {
final long[] map = new long[verify ? maxKey : 0];
VirtualMap<BenchmarkKey, BenchmarkValue> virtualMap = createMap(map);

final ExecutorService prefetchPool =
Executors.newCachedThreadPool(new ThreadConfiguration(getStaticThreadManager())
// Use a custom queue and executor for warmups. It may happen that some warmup jobs
// aren't complete by the end of the round, so they will start piling up. To fix it,
// clear the queue in the end of each round
final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
jsync-swirlds marked this conversation as resolved.
Show resolved Hide resolved
final ExecutorService prefetchPool = new ThreadPoolExecutor(
numThreads,
numThreads,
1,
TimeUnit.SECONDS,
queue,
new ThreadConfiguration(getStaticThreadManager())
.setComponent("benchmark")
.setThreadName("prefetch")
.setExceptionHandler((t, ex) -> logger.error("Uncaught exception during prefetching", ex))
Expand All @@ -219,12 +230,14 @@ public void transferPrefetch() throws Exception {

// Warm keys in parallel asynchronously
final VirtualMap<BenchmarkKey, BenchmarkValue> currentMap = virtualMap;
for (int thread = 0; thread < numThreads; ++thread) {
final int idx = thread;
for (int j = 0; j < keys.length; j += KEYS_PER_RECORD) {
artemananiev marked this conversation as resolved.
Show resolved Hide resolved
final int key = j;
prefetchPool.execute(() -> {
for (int j = idx * KEYS_PER_RECORD; j < keys.length; j += numThreads * KEYS_PER_RECORD) {
currentMap.warm(new BenchmarkKey(keys[j]));
currentMap.warm(new BenchmarkKey(keys[j + 1]));
try {
currentMap.warm(new BenchmarkKey(keys[key]));
currentMap.warm(new BenchmarkKey(keys[key + 1]));
} catch (final Exception e) {
logger.error("Warmup exception", e);
artemananiev marked this conversation as resolved.
Show resolved Hide resolved
}
});
}
Expand Down Expand Up @@ -267,6 +280,8 @@ public void transferPrefetch() throws Exception {
}
}

queue.clear();

virtualMap = copyMap(virtualMap);

// Report TPS
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.swirlds.common.io.streams.SerializableDataOutputStream;
import com.swirlds.common.threading.framework.config.ThreadConfiguration;
import com.swirlds.virtualmap.VirtualMap;
import com.swirlds.virtualmap.internal.merkle.VirtualRootNode;
import com.swirlds.virtualmap.internal.pipeline.VirtualRoot;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -124,7 +125,7 @@ protected VirtualMap<BenchmarkKey, BenchmarkValue> createMap(final long[] map) {
return virtualMap;
}

private void enableSnapshots() {
protected void enableSnapshots() {
snapshotTime.set(System.currentTimeMillis() + SNAPSHOT_DELAY);
doSnapshots = true;
}
Expand Down Expand Up @@ -171,20 +172,16 @@ protected VirtualMap<BenchmarkKey, BenchmarkValue> flushMap(
final VirtualMap<BenchmarkKey, BenchmarkValue> virtualMap) {
final long start = System.currentTimeMillis();
VirtualMap<BenchmarkKey, BenchmarkValue> curMap = virtualMap;
for (; ; ) {
final VirtualMap<BenchmarkKey, BenchmarkValue> oldCopy = curMap;
curMap = curMap.copy();
oldCopy.release();
final VirtualRoot root = oldCopy.getRight();
if (root.shouldBeFlushed()) {
try {
root.waitUntilFlushed();
} catch (InterruptedException ex) {
logger.warn("Interrupted", ex);
Thread.currentThread().interrupt();
}
break;
}
final VirtualMap<BenchmarkKey, BenchmarkValue> oldCopy = curMap;
curMap = curMap.copy();
oldCopy.release();
final VirtualRootNode<BenchmarkKey, BenchmarkValue> root = oldCopy.getRight();
root.enableFlush();
try {
root.waitUntilFlushed();
} catch (InterruptedException ex) {
logger.warn("Interrupted", ex);
Thread.currentThread().interrupt();
}
logger.info("Flushed map in {} ms", System.currentTimeMillis() - start);

Expand Down
Expand Up @@ -23,6 +23,7 @@
import static com.swirlds.logging.LogMarker.EXCEPTION;
import static com.swirlds.logging.LogMarker.JASPER_DB;

import com.swirlds.common.crypto.DigestType;
import com.swirlds.common.crypto.Hash;
import com.swirlds.common.metrics.FunctionGauge;
import com.swirlds.common.metrics.Metrics;
Expand Down Expand Up @@ -606,6 +607,12 @@ public long getLastLeafPath() {
return this.validLeafPathRange.getMaxValidKey();
}

@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves) {
return dirtyInternals * (Long.BYTES + DigestType.SHA_384.digestLength())
+ dirtyLeaves * pathToHashKeyValue.getSerializer().getTypicalSerializedSize();
}

/**
* Save a batch of data to data store.
* <p>
Expand Down
Expand Up @@ -224,6 +224,10 @@ protected DataFileCollection(
}
}

DataItemSerializer<D> getDataItemSerializer() {
return dataItemSerializer;
}

/**
* Get the valid range of keys for data items currently stored by this data file collection. Any data items with
* keys below this can be deleted during a merge.
Expand Down
Expand Up @@ -133,6 +133,10 @@ public MemoryIndexDiskKeyValueStore(
storeDir, storeName, legacyStoreName, dataItemSerializer, combinedLoadedDataCallback);
}

public DataItemSerializer<D> getSerializer() {
return fileCollection.getDataItemSerializer();
}

/**
* Merge all files that match the given filter
*
Expand Down
Expand Up @@ -26,6 +26,7 @@
import static com.swirlds.merkledb.MerkleDb.MERKLEDB_COMPONENT;
import static org.apache.commons.lang3.builder.ToStringStyle.SHORT_PREFIX_STYLE;

import com.swirlds.common.crypto.DigestType;
import com.swirlds.common.crypto.Hash;
import com.swirlds.common.metrics.FunctionGauge;
import com.swirlds.common.metrics.Metrics;
Expand Down Expand Up @@ -950,6 +951,21 @@ public void snapshot(final Path snapshotDirectory) throws IOException, IllegalSt
}
}

@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves) {
// Deleted leaves count is ignored, as deleted leaves aren't flushed to data source
final long estimatedInternalsSize = dirtyInternals
* (Long.BYTES // path
+ DigestType.SHA_384.digestLength()); // hash
final long estimatedLeavesSize = dirtyLeaves
* (Long.BYTES // path
+ DigestType.SHA_384.digestLength() // hash
+ tableConfig.getKeySerializer().getTypicalSerializedSize() // key
+ tableConfig.getValueSerializer().getTypicalSerializedSize()); // value
final long estimatedTotalSize = estimatedInternalsSize + estimatedLeavesSize;
return estimatedTotalSize;
}

/** toString for debugging */
@Override
public String toString() {
Expand Down Expand Up @@ -1140,7 +1156,7 @@ private void shutdownThreadsAndWait(final ExecutorService... executors) throws I
} catch (final InterruptedException e) {
logger.warn(EXCEPTION.getMarker(), "[{}] Interrupted while waiting on executors to shutdown", tableName, e);
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting for merge to finish.", e);
throw new IOException("Interrupted while waiting for shutdown to finish.", e);
}
}

Expand Down Expand Up @@ -1342,18 +1358,18 @@ boolean doMerge() {
/* Filter nothing during a full merge */
filesToMergeFilter = dataFileReaders -> dataFileReaders;
isLargeMerge = true;
logger.info(MERKLE_DB.getMarker(), "[{}] Starting Large Merge", tableName);
logger.debug(MERKLE_DB.getMarker(), "[{}] Starting Large Merge", tableName);
} else if (isTimeForMediumMerge(now)) {
lastMediumMerge = now;
filesToMergeFilter = DataFileCommon.newestFilesSmallerThan(
settings.getMediumMergeCutoffMb(), settings.getMaxNumberOfFilesInMerge());
isMediumMerge = true;
logger.info(MERKLE_DB.getMarker(), "[{}] Starting Medium Merge", tableName);
logger.debug(MERKLE_DB.getMarker(), "[{}] Starting Medium Merge", tableName);
} else {
filesToMergeFilter = DataFileCommon.newestFilesSmallerThan(
settings.getSmallMergeCutoffMb(), settings.getMaxNumberOfFilesInMerge());
isSmallMerge = true;
logger.info(MERKLE_DB.getMarker(), "[{}] Starting Small Merge", tableName);
logger.debug(MERKLE_DB.getMarker(), "[{}] Starting Small Merge", tableName);
}

// we need to merge disk files for internal hashes if they exist and pathToHashKeyValue
Expand Down Expand Up @@ -1437,7 +1453,7 @@ boolean doMerge() {
updateFileStats();
// update off-heap usage statistic
updateOffHeapStats();
logger.info(MERKLE_DB.getMarker(), "[{}] Finished Small Merge", tableName);
logger.debug(MERKLE_DB.getMarker(), "[{}] Finished Merge", tableName);
return true;
} catch (final InterruptedException | ClosedByInterruptException e) {
logger.info(MERKLE_DB.getMarker(), "Interrupted while merging, this is allowed.");
Expand Down
Expand Up @@ -202,7 +202,7 @@ public HalfDiskHashMap(
} else {
logger.error(
EXCEPTION.getMarker(),
"Loading existing set of data files but now metadata file was found in [{}]",
"Loading existing set of data files but no metadata file was found in [{}]",
storeDir.toAbsolutePath());
throw new IOException("Can not load an existing HalfDiskHashMap from ["
+ storeDir.toAbsolutePath()
Expand Down
Expand Up @@ -43,6 +43,18 @@ default boolean isVariableSize() {
*/
int getSerializedSize();

/**
* For variable sized data items get the typical number of bytes an item takes when serialized.
*
* @return Either for fixed size same as getSerializedSize() or an estimated typical size
*/
default int getTypicalSerializedSize() {
if (isVariableSize()) {
throw new IllegalStateException("Variable sized implementations have to override this method");
}
return getSerializedSize();
}

/**
* Serialize a data item including header to the byte buffer returning the size of the data
* written. Serialization format must be identical to {@link #deserialize(ByteBuffer, long)}.
Expand Down
Expand Up @@ -54,19 +54,6 @@ default KeyIndexType getIndexType() {
return getSerializedSize() == Long.BYTES ? KeyIndexType.SEQUENTIAL_INCREMENTING_LONGS : KeyIndexType.GENERIC;
}

/**
* For variable sized keys get the typical number of bytes a key takes when serialized.
*
* @return Either for fixed size same as getSerializedSize() or an estimated typical size for
* keys
*/
default int getTypicalSerializedSize() {
if (isVariableSize()) {
throw new IllegalStateException("Variable sized implementations have to override this method");
}
return getSerializedSize();
}

/**
* Deserialize key size from the given byte buffer
*
Expand Down
Expand Up @@ -29,11 +29,14 @@
import com.swirlds.virtualmap.datasource.VirtualInternalRecord;
import com.swirlds.virtualmap.datasource.VirtualKeySet;
import com.swirlds.virtualmap.datasource.VirtualLeafRecord;
import com.swirlds.virtualmap.internal.pipeline.VirtualRoot;
import com.swirlds.virtualmap.internal.merkle.VirtualRootNode;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -63,6 +66,7 @@ public static void setup() throws IOException {
@Tags({@Tag(TestTypeTags.HAMMER)})
public void closeFlushTest() throws Exception {
final int count = 100000;
final ExecutorService exec = Executors.newSingleThreadExecutor();
final AtomicReference<Exception> exception = new AtomicReference<>();
for (int j = 0; j < 100; j++) {
final Path storeDir = tmpFileDir.resolve("closeFlushTest-" + j);
Expand All @@ -79,35 +83,33 @@ public void closeFlushTest() throws Exception {
map.put(key, value);
}
VirtualMap<VirtualLongKey, ExampleByteArrayVirtualValue> copy;
VirtualRoot root;
final CountDownLatch shutdownLatch = new CountDownLatch(1);
while (true) {
for (int i = 0; i < 100; i++) {
copy = map.copy();
root = map.getRight();
if (root.shouldBeFlushed()) {
final VirtualMap<VirtualLongKey, ExampleByteArrayVirtualValue> finalCopy = copy;
new Thread(() -> {
try {
Thread.sleep(new Random().nextInt(500));
finalCopy.release();
} catch (final Exception z) {
throw new RuntimeException(z);
} finally {
shutdownLatch.countDown();
}
})
.start();
break;
}
map.release();
map = copy;
}
map.release();
copy = map.copy();
final VirtualRootNode<VirtualLongKey, ExampleByteArrayVirtualValue> root = map.getRight();
root.enableFlush();
final VirtualMap<VirtualLongKey, ExampleByteArrayVirtualValue> lastMap = map;
final Future<?> job = exec.submit(() -> {
try {
Thread.sleep(new Random().nextInt(500));
lastMap.release();
} catch (final Exception z) {
throw new RuntimeException(z);
} finally {
shutdownLatch.countDown();
}
});
copy.release();
shutdownLatch.await();
if (exception.get() != null) {
exception.get().printStackTrace();
break;
}
job.get();
}
Assertions.assertNull(exception.get(), "No exceptions expected, but caught " + exception.get());
}
Expand Down Expand Up @@ -204,6 +206,11 @@ public void registerMetrics(final Metrics metrics) {
public VirtualKeySet<K> buildKeySet() {
return delegate.buildKeySet();
}

@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves) {
return delegate.estimatedSize(dirtyInternals, dirtyLeaves);
}
};
}
}
Expand Down
Expand Up @@ -279,14 +279,6 @@ public boolean equals(ByteBuffer buffer, int dataVersion, ExampleLongKeyVariable
return value == keyToCompare.getKeyAsLong();
}

/** {@inheritDoc} */
@Override
public void serialize(final SerializableDataOutputStream out) throws IOException {}

/** {@inheritDoc} */
@Override
public void deserialize(final SerializableDataInputStream in, final int version) throws IOException {}

/** {@inheritDoc} */
@Override
public int hashCode() {
Expand Down