Skip to content

Commit

Permalink
Reviewers comments. Spotless checks. Minor cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Ananev <artem.ananev@swirldslabs.com>
  • Loading branch information
artemananiev committed Mar 20, 2023
1 parent fbe94b5 commit 55d4d0c
Show file tree
Hide file tree
Showing 19 changed files with 83 additions and 97 deletions.
Expand Up @@ -608,9 +608,9 @@ public long getLastLeafPath() {
}

@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves, final long deletedLeaves) {
return dirtyInternals * (Long.BYTES + DigestType.SHA_384.digestLength()) +
dirtyLeaves * pathToHashKeyValue.getSerializer().getTypicalSerializedSize();
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves) {
return dirtyInternals * (Long.BYTES + DigestType.SHA_384.digestLength())
+ dirtyLeaves * pathToHashKeyValue.getSerializer().getTypicalSerializedSize();
}

/**
Expand Down
Expand Up @@ -953,16 +953,19 @@ public void snapshot(final Path snapshotDirectory) throws IOException, IllegalSt
}

@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves, final long deletedLeaves) {
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves) {
// Deleted leaves count is ignored, as deleted leaves aren't flushed to data source
final long estimatedSize =
dirtyInternals * (Long.BYTES + DigestType.SHA_384.digestLength()) + // path and hash
dirtyLeaves * (Long.BYTES + // path
DigestType.SHA_384.digestLength() + // hash
tableConfig.getKeySerializer().getTypicalSerializedSize() + // key
tableConfig.getValueSerializer().getTypicalSerializedSize()); // value
logger.debug(MERKLE_DB.getMarker(), "Estimated flush size {}", estimatedSize);
return estimatedSize;
final long estimatedInternalsSize = dirtyInternals
* (Long.BYTES // path
+ DigestType.SHA_384.digestLength()); // hash
final long estimatedLeavesSize = dirtyLeaves
* (Long.BYTES // path
+ DigestType.SHA_384.digestLength() // hash
+ tableConfig.getKeySerializer().getTypicalSerializedSize() // key
+ tableConfig.getValueSerializer().getTypicalSerializedSize()); // value
final long estimatedTotalSize = estimatedInternalsSize + estimatedLeavesSize;
logger.debug(MERKLE_DB.getMarker(), "Estimated flush size {}", estimatedTotalSize);
return estimatedTotalSize;
}

/** toString for debugging */
Expand Down
Expand Up @@ -206,8 +206,8 @@ public VirtualKeySet<K> buildKeySet() {
}

@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves, final long deletedLeaves) {
return delegate.estimatedSize(dirtyInternals, dirtyLeaves, deletedLeaves);
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves) {
return delegate.estimatedSize(dirtyInternals, dirtyLeaves);
}
};
}
Expand Down
Expand Up @@ -28,7 +28,6 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public abstract class BreakableDataSource implements VirtualDataSource<TestKey, TestValue> {
Expand All @@ -50,8 +49,7 @@ public void saveRecords(
final Stream<VirtualLeafRecord<TestKey, TestValue>> leafRecordsToDelete)
throws IOException {

final List<VirtualLeafRecord<TestKey, TestValue>> leaves =
leafRecordsToAddOrUpdate.toList();
final List<VirtualLeafRecord<TestKey, TestValue>> leaves = leafRecordsToAddOrUpdate.toList();

if (builder.numTimesBroken < builder.numTimesToBreak) {
builder.numCalls += leaves.size();
Expand Down Expand Up @@ -111,7 +109,7 @@ public void registerMetrics(final Metrics metrics) {}
public abstract VirtualKeySet<TestKey> buildKeySet();

@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves, final long deletedLeaves) {
return delegate.estimatedSize(dirtyInternals, dirtyLeaves, deletedLeaves);
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves) {
return delegate.estimatedSize(dirtyInternals, dirtyLeaves);
}
}
Expand Up @@ -17,8 +17,8 @@
package com.swirlds.platform;

import static com.swirlds.virtualmap.DefaultVirtualMapSettings.DEFAULT_COPY_FLUSH_THRESHOLD;
import static com.swirlds.virtualmap.DefaultVirtualMapSettings.DEFAULT_FAMILY_THROTTLE_THRESHOLD;
import static com.swirlds.virtualmap.DefaultVirtualMapSettings.DEFAULT_FLUSH_INTERVAL;
import static com.swirlds.virtualmap.DefaultVirtualMapSettings.DEFAULT_TOTAL_FLUSH_THRESHOLD;
import static com.swirlds.virtualmap.DefaultVirtualMapSettings.DEFAULT_FLUSH_THROTTLE_STEP_SIZE;
import static com.swirlds.virtualmap.DefaultVirtualMapSettings.DEFAULT_MAXIMUM_FLUSH_THROTTLE_PERIOD;
import static com.swirlds.virtualmap.DefaultVirtualMapSettings.DEFAULT_MAXIMUM_VIRTUAL_MAP_SIZE;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class VirtualMapSettingsImpl extends SubSetting implements VirtualMapSett
public long virtualMapWarningInterval = DEFAULT_VIRTUAL_MAP_WARNING_INTERVAL;
public long copyFlushThreshold = DEFAULT_COPY_FLUSH_THRESHOLD;
public int flushInterval = DEFAULT_FLUSH_INTERVAL;
public long totalFlushThreshold = DEFAULT_TOTAL_FLUSH_THRESHOLD;
public long familyThrottleThreshold = DEFAULT_FAMILY_THROTTLE_THRESHOLD;
public int preferredFlushQueueSize = DEFAULT_PREFERRED_FLUSH_QUEUE_SIZE;
public Duration flushThrottleStepSize = DEFAULT_FLUSH_THROTTLE_STEP_SIZE;
public Duration maximumFlushThrottlePeriod = DEFAULT_MAXIMUM_FLUSH_THROTTLE_PERIOD;
Expand Down Expand Up @@ -201,12 +201,12 @@ public void setFlushInterval(final int flushInterval) {
* {@inheritDoc}
*/
@Override
public long getTotalFlushThreshold() {
return totalFlushThreshold;
public long getFamilyThrottleThreshold() {
return familyThrottleThreshold;
}

public void setTotalFlushThreshold(final long flushThreshold) {
this.totalFlushThreshold = flushThreshold;
public void setFamilyThrottleThreshold(final long flushThreshold) {
this.familyThrottleThreshold = flushThreshold;
}

/**
Expand Down
Expand Up @@ -360,8 +360,8 @@ public VirtualKeySet<TestKey> buildKeySet() {
}

@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves, final long deletedLeaves) {
return delegate.estimatedSize(dirtyInternals, dirtyLeaves, deletedLeaves);
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves) {
return delegate.estimatedSize(dirtyInternals, dirtyLeaves);
}
}
}
Expand Up @@ -36,7 +36,7 @@ public final class DefaultVirtualMapSettings implements VirtualMapSettings {
public static final long DEFAULT_VIRTUAL_MAP_WARNING_INTERVAL = 100_000;
public static final int DEFAULT_FLUSH_INTERVAL = 20;
public static final long DEFAULT_COPY_FLUSH_THRESHOLD = 500_000_000L;
public static final long DEFAULT_TOTAL_FLUSH_THRESHOLD = 2_500_000_000L;
public static final long DEFAULT_FAMILY_THROTTLE_THRESHOLD = 2_500_000_000L;
public static final int DEFAULT_PREFERRED_FLUSH_QUEUE_SIZE = 2;
public static final Duration DEFAULT_FLUSH_THROTTLE_STEP_SIZE = Duration.ofMillis(200);
public static final Duration DEFAULT_MAXIMUM_FLUSH_THROTTLE_PERIOD = Duration.ofSeconds(5);
Expand Down Expand Up @@ -110,8 +110,8 @@ public int getFlushInterval() {
* {@inheritDoc}
*/
@Override
public long getTotalFlushThreshold() {
return DEFAULT_TOTAL_FLUSH_THRESHOLD;
public long getFamilyThrottleThreshold() {
return DEFAULT_FAMILY_THROTTLE_THRESHOLD;
}

/**
Expand Down
Expand Up @@ -16,8 +16,7 @@

package com.swirlds.virtualmap;

import com.swirlds.virtualmap.datasource.VirtualDataSource;

import com.swirlds.virtualmap.internal.merkle.VirtualRootNode;
import java.time.Duration;

/**
Expand Down Expand Up @@ -105,26 +104,34 @@ public interface VirtualMapSettings {
* The value must be positive and will typically be a fairly small number, such as 20. The first copy is not
* flushed, but every Nth copy thereafter is.
*
* This setting is ignored, if {@link #getTotalFlushThreshold()} is set to a positive value.
* This setting is ignored, if {@link #getFamilyThrottleThreshold()} is set to a positive value.
*
* @return The number of copies between flushes
*/
int getFlushInterval();

/**
* Get size threshold when virtual node cache needs to be flushed to the data source. This threshold is
* checked against estimated size provided by {@link VirtualDataSource#estimatedSize(long, long, long)}.
* Virtual root flush threshold, in bytes. When virtual root size exceeds the threshold, it is
* no longer merged with next copies, but marked to flush to disk, when it becomes the oldest
* accessible version.
*
* If the threshold is not set, set to 0, or set to a negative value, flushes are based on {@link
* #getFlushInterval()} setting.
* If a virtual root is explicitly requested to flush with {@link VirtualRootNode#enableFlush()},
* its size isn't checked against this threshold.
*
* @return
* Virtual node cache flush threshold
* @return Virtual root flush threshold
*/
long getTotalFlushThreshold();

long getCopyFlushThreshold();

/**
* Virtual root family throttle threshold, in bytes. When sum of estimated sizes of all
* virtual roots in a single family (virtual map) exceeds the threshold, virtual pipeline
* starts throttling creating new virtual root copies.
*
* @return
* Virtual root family throttle threshold
*/
long getFamilyThrottleThreshold();

/**
* The preferred maximum number of virtual maps waiting to be flushed. If more maps than this number are awaiting
* flushing then slow down fast copies of the virtual map so that flushing can catch up.
Expand Down
Expand Up @@ -61,9 +61,9 @@
* value must be positive and will typically be a fairly small number, such as 20. The first copy is not flushed,
* but every Nth copy thereafter is.
* @param copyFlushThreshold
* Copy flush threshold
* @param totalFlushThreshold
* Total flush threshold
* Virtual root copy flush threshold
* @param familyThrottleThreshold
* Virtual root family throttle threshold
* @param preferredFlushQueueSize
* The preferred maximum number of virtual maps waiting to be flushed. If more maps than this number are awaiting
* flushing then slow down fast copies of the virtual map so that flushing can catch up.
Expand All @@ -88,7 +88,7 @@ public record VirtualMapConfig(
long virtualMapWarningInterval,
@Min(1) @ConfigProperty(defaultValue = "20") int flushInterval,
@ConfigProperty(defaultValue = "500000000") long copyFlushThreshold,
@ConfigProperty(defaultValue = "2500000000") long totalFlushThreshold,
@ConfigProperty(defaultValue = "2500000000") long familyThrottleThreshold,
@ConfigProperty(defaultValue = "2") int preferredFlushQueueSize,
@ConfigProperty(defaultValue = "200ms") Duration flushThrottleStepSize,
@ConfigProperty(defaultValue = "5s") Duration maximumFlushThrottlePeriod) {
Expand Down
Expand Up @@ -225,18 +225,12 @@ default void stopBackgroundCompaction() {}
VirtualKeySet<K> buildKeySet();

/**
* Provides estimation how much space is needed to store the given number of internal / leaf nodes
* in the data source. This estimation is used to decided when to flush virtual node caches to
* data sources.
* Provides estimation how much space is needed to store the given number of internal / leaf nodes in the data
* source. This estimation is used to decide when to flush virtual node caches to data sources.
*
* @param dirtyInternals
* Number of dirty internal nodes in the node cache
* @param dirtyLeaves
* Number of dirty leaf nodes in the node cache
* @param deletedLeaves
* Number of deleted leaf nodes in the node cache
* @return
* Estimated space needed to store the given number of nodes in the data source, in bytes
* @param dirtyInternals Number of dirty internal nodes in the node cache
* @param dirtyLeaves Number of dirty leaf nodes in the node cache
* @return Estimated space needed to store the given number of nodes in the data source, in bytes
*/
long estimatedSize(long dirtyInternals, long dirtyLeaves, long deletedLeaves);
long estimatedSize(long dirtyInternals, long dirtyLeaves);
}
Expand Up @@ -374,9 +374,9 @@ public void postInit(final VirtualStateAccessor state) {
}

this.state = Objects.requireNonNull(state);
final long flushThreshold = settings.getTotalFlushThreshold();
final long flushThreshold = settings.getFamilyThrottleThreshold();
if (flushThreshold <= 0) {
// If flush threshold is not set, use flush interval
// If throttle threshold is not set, use flush interval
this.shouldBeFlushed.set(fastCopyVersion != 0 && fastCopyVersion % settings.getFlushInterval() == 0);
}
if (this.dataSourceBuilder != null && this.dataSource == null) {
Expand Down Expand Up @@ -950,9 +950,7 @@ public long estimatedSize() {
final long estimatedDirtyLeavesCount =
cache.estimatedDirtyLeavesCount(state.getFirstLeafPath(), state.getLastLeafPath());
final long estimatedInternalsCount = cache.estimatedInternalsCount(state.getFirstLeafPath());
final long estimatedFlushSize = dataSource.estimatedSize(
estimatedInternalsCount, estimatedDirtyLeavesCount, 0);
return estimatedFlushSize;
return dataSource.estimatedSize(estimatedInternalsCount, estimatedDirtyLeavesCount);
}

/*
Expand Down
Expand Up @@ -228,11 +228,11 @@ private void applyFlushBackpressure() {

/**
* Slow down the fast copy operation if total size of all (unreleased) virtual root copies
* in this pipeline exceeds {@link VirtualMapSettings#getTotalFlushThreshold()}.
* in this pipeline exceeds {@link VirtualMapSettings#getFamilyThrottleThreshold()}.
*/
private void applyTotalSizeBackpressure() {
final long totalSize = currentTotalSize();
final long sizeThreshold = VirtualMapSettingsFactory.get().getTotalFlushThreshold();
final long sizeThreshold = VirtualMapSettingsFactory.get().getFamilyThrottleThreshold();
final double ratio = (double) totalSize / sizeThreshold;
final int over100percentExcess = (int) ((ratio - 1.0) * 100);
if (over100percentExcess < 0) {
Expand Down Expand Up @@ -283,11 +283,6 @@ public void registerCopy(final VirtualRoot copy) {
unhashedCopies.add(copy);
}
mostRecentCopy.set(copy);
// synchronized (this) {
// if (alive) {
// scheduleWork();
// }
// }

applyFlushBackpressure();
applyTotalSizeBackpressure();
Expand Down Expand Up @@ -432,8 +427,8 @@ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws
* Check if this copy should be flushed.
*/
private boolean shouldBeFlushed(final VirtualRoot copy) {
return (copy.requestedToFlush() || copySizeToFlush(copy)) &&
(copy.isDestroyed() || copy.isDetached());
return (copy.requestedToFlush() || copySizeToFlush(copy)) // copy should be flushed
&& (copy.isDestroyed() || copy.isDetached()); // destroyed or detached
}

/**
Expand Down Expand Up @@ -492,14 +487,10 @@ private boolean canBeMerged(final PipelineListNode<VirtualRoot> mergeCandidate)
final PipelineListNode<VirtualRoot> mergeTarget = mergeCandidate.getNext();

return !copy.requestedToFlush() // not explicitly requested to flush
&&
!copySizeToFlush(copy) // don't let merged copies grow too much
&&
(copy.isDestroyed() || copy.isDetached()) // copy must be destroyed or detached
&&
mergeTarget != null // target must exist
&&
mergeTarget.getValue().isImmutable(); // target must be immutable
&& !copySizeToFlush(copy) // don't let merged copies grow too much
&& (copy.isDestroyed() || copy.isDetached()) // copy must be destroyed or detached
&& mergeTarget != null // target must exist
&& mergeTarget.getValue().isImmutable(); // target must be immutable
}

/**
Expand Down Expand Up @@ -534,8 +525,7 @@ private void merge(final PipelineListNode<VirtualRoot> node) {
private void hashFlushMerge() {
PipelineListNode<VirtualRoot> next = copies.getFirst();
// Iterate from the oldest copy to the newest
while ((next != null)
&& !Thread.currentThread().isInterrupted()) {
while ((next != null) && !Thread.currentThread().isInterrupted()) {
final VirtualRoot copy = next.getValue();
// The newest copy. Nothing can be done to it
if (!copy.isImmutable()) {
Expand Down
Expand Up @@ -71,8 +71,8 @@ public int getFlushInterval() {
}

@Override
public long getTotalFlushThreshold() {
return original.getTotalFlushThreshold();
public long getFamilyThrottleThreshold() {
return original.getFamilyThrottleThreshold();
}

@Override
Expand Down
Expand Up @@ -372,7 +372,7 @@ private void deleteLeafRecords(final Stream<VirtualLeafRecord<K, V>> leafRecords
}

@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves, final long deletedLeaves) {
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves) {
// It doesn't have to be very precise as this data source is used for testing purposed only
return dirtyInternals * (Long.BYTES + DigestType.SHA_384.digestLength()) + dirtyLeaves * 1024;
}
Expand Down
Expand Up @@ -1080,8 +1080,7 @@ void gettingAnInternalOnReleasedCacheThrows() {
cache0.seal();
cache0.release();

assertNull(cache0.lookupLeafByPath(ROOT_PATH, false),
"should not be able to look up value on destroyed cache");
assertNull(cache0.lookupLeafByPath(ROOT_PATH, false), "should not be able to look up value on destroyed cache");
}

@Test
Expand Down Expand Up @@ -1371,9 +1370,7 @@ void gettingALeafOnDestroyedCacheThrows() {
cache.seal();
cache.release();

assertNull(
cache.lookupLeafByKey(A_KEY, false),
"shouldn't be able to key on destroyed cache");
assertNull(cache.lookupLeafByKey(A_KEY, false), "shouldn't be able to key on destroyed cache");
}

@Test
Expand Down
Expand Up @@ -466,8 +466,8 @@ public VirtualKeySet<TestKey> buildKeySet() {
* {@inheritDoc}
*/
@Override
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves, final long deletedLeaves) {
return delegate.estimatedSize(dirtyInternals, dirtyLeaves, deletedLeaves);
public long estimatedSize(final long dirtyInternals, final long dirtyLeaves) {
return delegate.estimatedSize(dirtyInternals, dirtyLeaves);
}
}

Expand Down

0 comments on commit 55d4d0c

Please sign in to comment.