Skip to content

Commit

Permalink
9483 Added metrics for total space consumption by compaction level (#…
Browse files Browse the repository at this point in the history
…10086)

Signed-off-by: Ivan Malygin <ivan@swirldslabs.com>
  • Loading branch information
imalygin committed Nov 28, 2023
1 parent d25c2a6 commit a2e0e80
Show file tree
Hide file tree
Showing 16 changed files with 201 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ BenchmarkRecord read(long dataLocation) throws IOException {
return readDataItem(dataLocation);
}
};
final var compactor = new DataFileCompactor(storeName, store, index, null, null, null);
final var compactor = new DataFileCompactor(storeName, store, index, null, null, null, null);
System.out.println();

// Write files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void merge() throws Exception {
final var store =
new HalfDiskHashMap<>(maxKey, new BenchmarkKeySerializer(), getTestDir(), storeName, null, false);
final var dataFileCompactor = new DataFileCompactor(
storeName, store.getFileCollection(), store.getBucketIndexToBucketLocation(), null, null, null);
storeName, store.getFileCollection(), store.getBucketIndexToBucketLocation(), null, null, null, null);
System.out.println();

// Write files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public void merge() throws Exception {
new BenchmarkRecordSerializer(),
(key, dataLocation, dataValue) -> {},
keyToDiskLocationIndex);
final DataFileCompactor compactor =
new DataFileCompactor(storeName, store.getFileCollection(), keyToDiskLocationIndex, null, null, null);
final DataFileCompactor compactor = new DataFileCompactor(
storeName, store.getFileCollection(), keyToDiskLocationIndex, null, null, null, null);
System.out.println();

// Write files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ public MerkleDbDataSource(
pathToDiskLocationInternalNodes,
statisticsUpdater::setHashesStoreCompactionTimeMs,
statisticsUpdater::setHashesStoreCompactionSavedSpaceMb,
statisticsUpdater::setHashesStoreFileSizeByLevelMb,
updateTotalStatsFunction);
} else {
hashStoreDisk = null;
Expand Down Expand Up @@ -359,6 +360,7 @@ public MerkleDbDataSource(
objectKeyToPath.getBucketIndexToBucketLocation(),
statisticsUpdater::setLeafKeysStoreCompactionTimeMs,
statisticsUpdater::setLeafKeysStoreCompactionSavedSpaceMb,
statisticsUpdater::setLeafKeysStoreFileSizeByLevelMb,
updateTotalStatsFunction);
objectKeyToPath.printStats();
// we do not need callback as HalfDiskHashMap loads its own data from disk
Expand All @@ -380,6 +382,7 @@ public MerkleDbDataSource(
pathToDiskLocationLeafNodes,
statisticsUpdater::setLeavesStoreCompactionTimeMs,
statisticsUpdater::setLeavesStoreCompactionSavedSpaceMb,
statisticsUpdater::setLeavesStoreFileSizeByLevelMb,
updateTotalStatsFunction);

// Leaf records cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,22 @@ public class MerkleDbStatistics {
private final List<LongAccumulator> hashesStoreCompactionTimeMsList;
/** Hashes store compactions - saved space in Mb */
private final List<DoubleAccumulator> hashesStoreCompactionSavedSpaceMbList;
/** Leaves store small compactions - time in ms */
/** Hashes store - cumulative file size by compaction level in Mb */
private final List<DoubleAccumulator> hashesStoreFileSizeByLevelMbList;
/** Leaves store compactions - time in ms */
private final List<LongAccumulator> leavesStoreCompactionTimeMsList;
/** Leaves store small compactions - saved space in Mb */
/** Leaves store compactions - saved space in Mb */
private final List<DoubleAccumulator> leavesStoreCompactionSavedSpaceMbList;
/** Leaf keys store small compactions - time in ms */

/** Leaves store - cumulative file size by compaction level in Mb */
private final List<DoubleAccumulator> leavesStoreFileSizeByLevelMbList;
/** Leaf keys store compactions - time in ms */
private final List<LongAccumulator> leafKeysStoreCompactionTimeMsList;
/** Leaf keys store small compactions - saved space in Mb */
/** Leaf keys store compactions - saved space in Mb */
private final List<DoubleAccumulator> leafKeysStoreCompactionSavedSpaceMbList;

/** Leaf keys store - cumulative file size by compaction level in Mb */
private final List<DoubleAccumulator> leafKeysStoreFileSizeByLevelMbList;
/** Off-heap usage in MB of hashes store index */
private IntegerGauge offHeapHashesIndexMb;
/** Off-heap usage in MB of leaves store index */
Expand All @@ -125,10 +133,13 @@ public MerkleDbStatistics(final String label) {
this.label = CommonUtils.throwArgNull(label, "label");
hashesStoreCompactionTimeMsList = new ArrayList<>();
hashesStoreCompactionSavedSpaceMbList = new ArrayList<>();
hashesStoreFileSizeByLevelMbList = new ArrayList<>();
leavesStoreCompactionTimeMsList = new ArrayList<>();
leavesStoreCompactionSavedSpaceMbList = new ArrayList<>();
leavesStoreFileSizeByLevelMbList = new ArrayList<>();
leafKeysStoreCompactionTimeMsList = new ArrayList<>();
leafKeysStoreCompactionSavedSpaceMbList = new ArrayList<>();
leafKeysStoreFileSizeByLevelMbList = new ArrayList<>();
}

private static IntegerGauge buildIntegerGauge(final Metrics metrics, final String name, final String description) {
Expand Down Expand Up @@ -229,8 +240,7 @@ public void registerMetrics(final Metrics metrics) {

// Compaction

for (int i = 0; i < config.maxCompactionLevel(); i++) {
int level = i + 1;
for (int level = 0; level <= config.maxCompactionLevel(); level++) {
// Hashes store
hashesStoreCompactionTimeMsList.add(buildLongAccumulator(
metrics,
Expand All @@ -240,6 +250,10 @@ public void registerMetrics(final Metrics metrics) {
metrics,
DS_PREFIX + COMPACTIONS_PREFIX + LEVEL_PREFIX + level + "_hashesSavedSpaceMb_" + label,
"Saved space during compactions of level %s, hashes store, %s, Mb".formatted(level, label)));
hashesStoreFileSizeByLevelMbList.add(buildDoubleAccumulator(
metrics,
DS_PREFIX + FILES_PREFIX + LEVEL_PREFIX + level + "_hashesFileSizeByLevelMb_" + label,
"Total space taken by files of level %s, hashes store, %s, Mb".formatted(level, label)));

// Leaves store
leavesStoreCompactionTimeMsList.add(buildLongAccumulator(
Expand All @@ -250,6 +264,10 @@ public void registerMetrics(final Metrics metrics) {
metrics,
DS_PREFIX + COMPACTIONS_PREFIX + LEVEL_PREFIX + level + "_leavesSavedSpaceMb_" + label,
"Saved space during compactions of level %s, leaves store, %s, Mb".formatted(level, label)));
leavesStoreFileSizeByLevelMbList.add(buildDoubleAccumulator(
metrics,
DS_PREFIX + FILES_PREFIX + LEVEL_PREFIX + level + "_leavesFileSizeByLevelMb_" + label,
"Total space taken by files of level %s, leaves store, %s, Mb".formatted(level, label)));

// Leaf keys store
leafKeysStoreCompactionTimeMsList.add(buildLongAccumulator(
Expand All @@ -260,6 +278,10 @@ public void registerMetrics(final Metrics metrics) {
metrics,
DS_PREFIX + COMPACTIONS_PREFIX + LEVEL_PREFIX + level + "_leafKeysSavedSpaceMb_" + label,
"Saved space during compactions of level %s, leaf keys store, %s, Mb".formatted(level, label)));
leafKeysStoreFileSizeByLevelMbList.add(buildDoubleAccumulator(
metrics,
DS_PREFIX + FILES_PREFIX + LEVEL_PREFIX + level + "_leafKeysFileSizeByLevelMb_" + label,
"Total space taken by files of level %s, leaf keys store, %s, Mb".formatted(level, label)));
}

// Off-heap usage
Expand Down Expand Up @@ -443,12 +465,12 @@ public void setFlushLeafKeysStoreFileSizeMb(final double value) {
* @param value the value to set
*/
public void setHashesStoreCompactionTimeMs(final Integer compactionLevel, final long value) {
assert compactionLevel >= 1 && compactionLevel <= config.maxCompactionLevel();
assert compactionLevel >= 0 && compactionLevel <= config.maxCompactionLevel();
if (hashesStoreCompactionTimeMsList.isEmpty()) {
// if the method called before the metrics are registered, there is nothing to do
return;
}
hashesStoreCompactionTimeMsList.get(compactionLevel - 1).update(value);
hashesStoreCompactionTimeMsList.get(compactionLevel).update(value);
}

/**
Expand All @@ -458,12 +480,27 @@ public void setHashesStoreCompactionTimeMs(final Integer compactionLevel, final
* @param value the value to set
*/
public void setHashesStoreCompactionSavedSpaceMb(final int compactionLevel, final double value) {
assert compactionLevel >= 1 && compactionLevel <= config.maxCompactionLevel();
assert compactionLevel >= 0 && compactionLevel <= config.maxCompactionLevel();
if (hashesStoreCompactionSavedSpaceMbList.isEmpty()) {
// if the method called before the metrics are registered, there is nothing to do
return;
}
hashesStoreCompactionSavedSpaceMbList.get(compactionLevel - 1).update(value);
hashesStoreCompactionSavedSpaceMbList.get(compactionLevel).update(value);
}

/**
* Set the current value for the accumulator corresponding to provided compaction level from
* {@link #hashesStoreFileSizeByLevelMbList}
*
* @param value the value to set
*/
public void setHashesStoreFileSizeByLevelMb(final int compactionLevel, final double value) {
assert compactionLevel >= 0 && compactionLevel <= config.maxCompactionLevel();
if (hashesStoreFileSizeByLevelMbList.isEmpty()) {
// if the method called before the metrics are registered, there is nothing to do
return;
}
hashesStoreFileSizeByLevelMbList.get(compactionLevel).update(value);
}

/**
Expand All @@ -473,12 +510,12 @@ public void setHashesStoreCompactionSavedSpaceMb(final int compactionLevel, fina
* @param value the value to set
*/
public void setLeavesStoreCompactionTimeMs(final int compactionLevel, final long value) {
assert compactionLevel >= 1 && compactionLevel <= config.maxCompactionLevel();
assert compactionLevel >= 0 && compactionLevel <= config.maxCompactionLevel();
if (leavesStoreCompactionTimeMsList.isEmpty()) {
// if the method called before the metrics are registered, there is nothing to do
return;
}
leavesStoreCompactionTimeMsList.get(compactionLevel - 1).update(value);
leavesStoreCompactionTimeMsList.get(compactionLevel).update(value);
}

/**
Expand All @@ -487,12 +524,26 @@ public void setLeavesStoreCompactionTimeMs(final int compactionLevel, final long
* @param value the value to set
*/
public void setLeavesStoreCompactionSavedSpaceMb(final int compactionLevel, final double value) {
assert compactionLevel >= 1 && compactionLevel <= config.maxCompactionLevel();
assert compactionLevel >= 0 && compactionLevel <= config.maxCompactionLevel();
if (leavesStoreCompactionSavedSpaceMbList.isEmpty()) {
// if the method called before the metrics are registered, there is nothing to do
return;
}
leavesStoreCompactionSavedSpaceMbList.get(compactionLevel - 1).update(value);
leavesStoreCompactionSavedSpaceMbList.get(compactionLevel).update(value);
}

/**
* Set the current value for the accumulator corresponding to provided compaction level from
* {@link #leavesStoreFileSizeByLevelMbList}
* @param value the value to set
*/
public void setLeavesStoreFileSizeByLevelMb(final int compactionLevel, final double value) {
assert compactionLevel >= 0 && compactionLevel <= config.maxCompactionLevel();
if (leavesStoreFileSizeByLevelMbList.isEmpty()) {
// if the method called before the metrics are registered, there is nothing to do
return;
}
leavesStoreFileSizeByLevelMbList.get(compactionLevel).update(value);
}

/**
Expand All @@ -502,12 +553,12 @@ public void setLeavesStoreCompactionSavedSpaceMb(final int compactionLevel, fina
* @param value the value to set
*/
public void setLeafKeysStoreCompactionTimeMs(final int compactionLevel, final long value) {
assert compactionLevel >= 1 && compactionLevel <= config.maxCompactionLevel();
assert compactionLevel >= 0 && compactionLevel <= config.maxCompactionLevel();
if (leafKeysStoreCompactionTimeMsList.isEmpty()) {
// if the method called before the metrics are registered, there is nothing to do
return;
}
leafKeysStoreCompactionTimeMsList.get(compactionLevel - 1).update(value);
leafKeysStoreCompactionTimeMsList.get(compactionLevel).update(value);
}

/**
Expand All @@ -517,12 +568,27 @@ public void setLeafKeysStoreCompactionTimeMs(final int compactionLevel, final lo
* @param value the value to set
*/
public void setLeafKeysStoreCompactionSavedSpaceMb(final int compactionLevel, final double value) {
assert compactionLevel >= 1 && compactionLevel <= config.maxCompactionLevel();
assert compactionLevel >= 0 && compactionLevel <= config.maxCompactionLevel();
if (leafKeysStoreCompactionSavedSpaceMbList.isEmpty()) {
// if the method called before the metrics are registered, there is nothing to do
return;
}
leafKeysStoreCompactionSavedSpaceMbList.get(compactionLevel - 1).update(value);
leafKeysStoreCompactionSavedSpaceMbList.get(compactionLevel).update(value);
}

/**
* Set the current value for the accumulator corresponding to provided compaction level from
* {@link #leafKeysStoreCompactionSavedSpaceMbList}
*
* @param value the value to set
*/
public void setLeafKeysStoreFileSizeByLevelMb(final int compactionLevel, final double value) {
assert compactionLevel >= 0 && compactionLevel <= config.maxCompactionLevel();
if (leafKeysStoreFileSizeByLevelMbList.isEmpty()) {
// if the method called before the metrics are registered, there is nothing to do
return;
}
leafKeysStoreFileSizeByLevelMbList.get(compactionLevel).update(value);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ void setLeafKeysStoreCompactionSavedSpaceMb(Integer compactionLevel, Double save
statistics.setLeafKeysStoreCompactionSavedSpaceMb(compactionLevel, savedSpace);
}

void setLeafKeysStoreFileSizeByLevelMb(Integer compactionLevel, Double savedSpace) {
statistics.setLeafKeysStoreFileSizeByLevelMb(compactionLevel, savedSpace);
}

void setHashesStoreCompactionTimeMs(Integer compactionLevel, Long time) {
statistics.setHashesStoreCompactionTimeMs(compactionLevel, time);
}
Expand All @@ -229,11 +233,19 @@ void setHashesStoreCompactionSavedSpaceMb(Integer compactionLevel, Double savedS
statistics.setHashesStoreCompactionSavedSpaceMb(compactionLevel, savedSpace);
}

void setHashesStoreFileSizeByLevelMb(Integer compactionLevel, Double savedSpace) {
statistics.setHashesStoreFileSizeByLevelMb(compactionLevel, savedSpace);
}

void setLeavesStoreCompactionTimeMs(Integer compactionType, Long time) {
statistics.setLeavesStoreCompactionTimeMs(compactionType, time);
}

void setLeavesStoreCompactionSavedSpaceMb(Integer compactionType, Double savedSpace) {
statistics.setLeavesStoreCompactionSavedSpaceMb(compactionType, savedSpace);
}

void setLeavesStoreFileSizeByLevelMb(Integer compactionType, Double savedSpace) {
statistics.setLeavesStoreFileSizeByLevelMb(compactionType, savedSpace);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.swirlds.merkledb.files;

import static com.swirlds.common.units.UnitConstants.MEBIBYTES_TO_BYTES;
import static com.swirlds.logging.legacy.LogMarker.EXCEPTION;
import static com.swirlds.logging.legacy.LogMarker.MERKLE_DB;
import static com.swirlds.merkledb.KeyRange.INVALID_KEY_RANGE;
Expand Down Expand Up @@ -291,32 +290,18 @@ public int getNumOfFiles() {

/**
* Get a list of all files in this collection that have been fully finished writing, are read
* only, ready to be compacted, and don't exceed the specified size in MB.
*
* @param maxSizeMb all files returned are smaller than this number of MB
* only and ready to be compacted.
*/
public List<DataFileReader<D>> getAllCompletedFiles(final int maxSizeMb) {
public List<DataFileReader<D>> getAllCompletedFiles() {
final ImmutableIndexedObjectList<DataFileReader<D>> activeIndexedFiles = dataFiles.get();
if (activeIndexedFiles == null) {
return Collections.emptyList();
}
Stream<DataFileReader<D>> filesStream = activeIndexedFiles.stream();
filesStream = filesStream.filter(DataFileReader::isFileCompleted);
if (maxSizeMb != Integer.MAX_VALUE) {
final long maxSizeBytes = maxSizeMb * (long) MEBIBYTES_TO_BYTES;
filesStream = filesStream.filter(file -> file.getSize() < maxSizeBytes);
}
return filesStream.toList();
}

/**
* Get a list of all files in this collection that have been fully finished writing, are read
* only and ready to be compacted.
*/
public List<DataFileReader<D>> getAllCompletedFiles() {
return getAllCompletedFiles(Integer.MAX_VALUE);
}

/**
* Get statistics for sizes of all files
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ public static long getSizeOfFilesByPath(final Iterable<Path> filePaths) throws I
/**
* Get total size fo a collection of files.
*
* @param filePaths collection of paths to files
* @return total number of bytes take for all the files in filePaths
* @param fileReaders collection of paths to files
* @return total number of bytes take for all the files in fileReaders
*/
public static long getSizeOfFiles(final Iterable<? extends DataFileReader<?>> filePaths) {
public static long getSizeOfFiles(final Iterable<? extends DataFileReader<?>> fileReaders) {
long totalSize = 0;
for (final DataFileReader<?> dataFileReader : filePaths) {
for (final DataFileReader<?> dataFileReader : fileReaders) {
totalSize += dataFileReader.getSize();
}
return totalSize;
Expand Down
Loading

0 comments on commit a2e0e80

Please sign in to comment.