From 0fe615f8114b6487487ccf53c27104408bcd7479 Mon Sep 17 00:00:00 2001 From: Matt Byrd Date: Thu, 10 Aug 2023 16:25:52 +0100 Subject: [PATCH 1/3] rdar://113409699 (4.0.3: use compression ratio to scale the amount of remaining disk usage) (#3563) Cherry-picked from ee200a8a10aead3d5cbb7058d85237a4e6878b19 use compression ratio to scale the amount of remaining disk usage (#3525) use compression ratio to scale the amount of remaining disk usage expected from completing a compaction rdar://112550104 (CompactionInfo::estimatedRemainingWriteBytes is uncompressed, causing compaction to be rejected due to size when it has space to complete) * fix SSTableTasksTableTest and CompactionStatsTest Co-authored-by: Jon Meredith --- .../cassandra/cache/AutoSavingCache.java | 1 + .../db/compaction/ActiveCompactions.java | 4 +- .../db/compaction/CompactionInfo.java | 32 +++++++----- .../db/compaction/CompactionIterator.java | 8 ++- .../db/compaction/CompactionTask.java | 2 +- .../cassandra/db/view/ViewBuilderTask.java | 4 +- .../internal/CollatedViewIndexBuilder.java | 4 +- .../sai/StorageAttachedIndexBuilder.java | 1 + .../index/sasi/SASIIndexBuilder.java | 1 + .../sstable/format/SortedTableScrubber.java | 1 + .../sstable/format/SortedTableVerifier.java | 1 + .../IndexSummaryRedistribution.java | 2 +- .../cassandra/streaming/StreamSession.java | 2 +- .../test/CompactionDiskSpaceTest.java | 4 +- .../test/SecondaryIndexCompactionTest.java | 4 +- .../test/StreamsDiskSpaceTest.java | 4 +- .../db/compaction/CompactionInfoTest.java | 4 +- .../db/compaction/CompactionsCQLTest.java | 50 ++++++++++++++++--- .../db/repair/PendingAntiCompactionTest.java | 6 +-- .../db/virtual/SSTableTasksTableTest.java | 3 +- .../indexsummary/IndexSummaryManagerTest.java | 2 +- .../tools/nodetool/CompactionStatsTest.java | 16 +++--- 22 files changed, 109 insertions(+), 47 deletions(-) diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 3826eb412e02..ecd0a8d01a50 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -327,6 +327,7 @@ else if (cacheType == CacheService.CacheType.COUNTER_CACHE) type, 0, keysEstimate, + keysEstimate, Unit.KEYS, nextTimeUUID(), getCacheDataPath(CURRENT_VERSION).toPath().toString()); diff --git a/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java b/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java index 4e238ad95d46..dba1959285f3 100644 --- a/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java +++ b/src/java/org/apache/cassandra/db/compaction/ActiveCompactions.java @@ -55,7 +55,7 @@ public void finishCompaction(CompactionInfo.Holder ci) /** * Get the estimated number of bytes remaining to write per sstable directory */ - public Map estimatedRemainingWriteBytes() + public Map estimatedRemainingWriteToDiskBytes() { synchronized (compactions) { @@ -66,7 +66,7 @@ public Map estimatedRemainingWriteBytes() List directories = compactionInfo.getTargetDirectories(); if (directories == null || directories.isEmpty()) continue; - long remainingWriteBytesPerDataDir = compactionInfo.estimatedRemainingWriteBytes() / directories.size(); + long remainingWriteBytesPerDataDir = compactionInfo.estimatedRemainingWriteToDiskBytes() / directories.size(); for (File directory : directories) writeBytesPerSSTableDir.merge(directory, remainingWriteBytesPerDataDir, Long::sum); } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java index 0bfc925a7d0d..e87c4e0e109b 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java @@ -52,16 +52,18 @@ public final class CompactionInfo private final OperationType tasktype; private final long completed; private final long total; + private final long totalCompressed; private final Unit unit; private final TimeUUID compactionId; private final ImmutableSet sstables; private final String targetDirectory; - public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId, Collection sstables, String targetDirectory) + public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId, Collection sstables, String targetDirectory) { this.tasktype = tasktype; this.completed = completed; this.total = total; + this.totalCompressed = totalCompressed; this.metadata = metadata; this.unit = unit; this.compactionId = compactionId; @@ -69,38 +71,38 @@ public CompactionInfo(TableMetadata metadata, OperationType tasktype, long compl this.targetDirectory = targetDirectory; } - public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, TimeUUID compactionId, Collection sstables, String targetDirectory) + public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, TimeUUID compactionId, Collection sstables, String targetDirectory) { - this(metadata, tasktype, completed, total, Unit.BYTES, compactionId, sstables, targetDirectory); + this(metadata, tasktype, completed, total, totalCompressed, Unit.BYTES, compactionId, sstables, targetDirectory); } - public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, TimeUUID compactionId, Collection sstables) + public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, TimeUUID compactionId, Collection sstables) { - this(metadata, tasktype, completed, total, Unit.BYTES, compactionId, sstables, null); + this(metadata, tasktype, completed, total, totalCompressed, Unit.BYTES, compactionId, sstables, null); } /** * Special compaction info where we always need to cancel the compaction - for example ViewBuilderTask where we don't know * the sstables at construction */ - public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId) + public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId) { - return withoutSSTables(metadata, tasktype, completed, total, unit, compactionId, null); + return withoutSSTables(metadata, tasktype, completed, total, totalCompressed, unit, compactionId, null); } /** * Special compaction info where we always need to cancel the compaction - for example AutoSavingCache where we don't know * the sstables at construction */ - public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId, String targetDirectory) + public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId, String targetDirectory) { - return new CompactionInfo(metadata, tasktype, completed, total, unit, compactionId, ImmutableSet.of(), targetDirectory); + return new CompactionInfo(metadata, tasktype, completed, total, totalCompressed, unit, compactionId, ImmutableSet.of(), targetDirectory); } /** @return A copy of this CompactionInfo with updated progress. */ public CompactionInfo forProgress(long complete, long total) { - return new CompactionInfo(metadata, tasktype, complete, total, unit, compactionId, sstables, targetDirectory); + return new CompactionInfo(metadata, tasktype, complete, total, totalCompressed, unit, compactionId, sstables, targetDirectory); } public Optional getKeyspace() @@ -183,12 +185,16 @@ public String targetDirectory() /** * Note that this estimate is based on the amount of data we have left to read - it assumes input * size == output size for a compaction, which is not really true, but should most often provide a worst case - * remaining write size. + * remaining write size. We also scale by the effective compression ratio since total/completed are for the uncompressed size. */ - public long estimatedRemainingWriteBytes() + public long estimatedRemainingWriteToDiskBytes() { if (unit == Unit.BYTES && tasktype.writesData) - return getTotal() - getCompleted(); + { + final long total = getTotal(); + double compressionRatio = total == 0 ? 1 : ((double) totalCompressed / (double)total); + return (long)(compressionRatio * (total - getCompleted())); + } return 0; } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 00e3dee5af2a..95e4955c3954 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -98,6 +98,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte private final long nowInSec; private final TimeUUID compactionId; private final long totalBytes; + private final long totalCompressedBytes; private long bytesRead; private long totalSourceCQLRows; @@ -135,9 +136,13 @@ public CompactionIterator(OperationType type, this.bytesRead = 0; long bytes = 0; - for (ISSTableScanner scanner : scanners) + long compressedBytes = 0; + for (ISSTableScanner scanner : scanners) { bytes += scanner.getLengthInBytes(); + compressedBytes += scanner.getCompressedLengthInBytes(); + } this.totalBytes = bytes; + this.totalCompressedBytes = compressedBytes; this.mergeCounters = new long[scanners.size()]; // note that we leak `this` from the constructor when calling beginCompaction below, this means we have to get the sstables before // calling that to avoid a NPE. @@ -170,6 +175,7 @@ public CompactionInfo getCompactionInfo() type, bytesRead, totalBytes, + totalCompressedBytes, compactionId, sstables, targetDirectory); diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 9566ef843047..f57e9342d44f 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -406,7 +406,7 @@ protected boolean buildCompactionCandidatesForAvailableDiskSpace(final Set expectedWriteSize = CompactionManager.instance.active.estimatedRemainingWriteBytes(); + Map expectedWriteSize = CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes(); // todo: abort streams if they block compactions if (cfs.getDirectories().hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSize, expectedWriteSize)) diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java index 9a72c1e270a2..8443413726b3 100644 --- a/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java +++ b/src/java/org/apache/cassandra/db/view/ViewBuilderTask.java @@ -204,13 +204,13 @@ public CompactionInfo getCompactionInfo() if (range.left.getPartitioner().splitter().isPresent()) { long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000); - return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId); + return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, 1000, Unit.RANGES, compactionId); } // When there is no splitter, estimate based on number of total keys but // take the max with keysBuilt + 1 to avoid having more completed than total long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range)); - return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId); + return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, keysTotal, Unit.KEYS, compactionId); } @Override diff --git a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java index 07bdc420ca07..92c48cc7ca40 100644 --- a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java @@ -61,8 +61,10 @@ public CompactionInfo getCompactionInfo() OperationType.INDEX_BUILD, iter.getBytesRead(), iter.getTotalBytes(), + iter.getTotalBytes(), compactionId, - sstables); + sstables + ); } public void build() diff --git a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java index 55f6381859cd..fe856059f81a 100644 --- a/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sai/StorageAttachedIndexBuilder.java @@ -249,6 +249,7 @@ public CompactionInfo getCompactionInfo() OperationType.INDEX_BUILD, bytesProcessed, totalSizeInBytes, + totalSizeInBytes, compactionId, sstables.keySet()); } diff --git a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java index 555bce1b9add..4e71538e41f7 100644 --- a/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java +++ b/src/java/org/apache/cassandra/index/sasi/SASIIndexBuilder.java @@ -134,6 +134,7 @@ public CompactionInfo getCompactionInfo() OperationType.INDEX_BUILD, bytesProcessed, totalBytesToProcess, + totalBytesToProcess, compactionId, sstables.keySet(), targetDirectory); diff --git a/src/java/org/apache/cassandra/io/sstable/format/SortedTableScrubber.java b/src/java/org/apache/cassandra/io/sstable/format/SortedTableScrubber.java index e8fbea22d279..a83527509fcc 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableScrubber.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableScrubber.java @@ -382,6 +382,7 @@ public CompactionInfo getCompactionInfo() OperationType.SCRUB, dataFile.getFilePointer(), dataFile.length(), + sstable.onDiskLength(), scrubCompactionId, ImmutableSet.of(sstable), File.getPath(sstable.getFilename()).getParent().toString()); diff --git a/src/java/org/apache/cassandra/io/sstable/format/SortedTableVerifier.java b/src/java/org/apache/cassandra/io/sstable/format/SortedTableVerifier.java index f68e1c968455..a45dc585a905 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SortedTableVerifier.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SortedTableVerifier.java @@ -493,6 +493,7 @@ public CompactionInfo getCompactionInfo() OperationType.VERIFY, dataFile.getFilePointer(), dataFile.length(), + sstable.onDiskLength(), verificationCompactionId, ImmutableSet.of(sstable)); } diff --git a/src/java/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryRedistribution.java b/src/java/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryRedistribution.java index d27969719ab0..5e2d0584e71c 100644 --- a/src/java/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryRedistribution.java +++ b/src/java/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryRedistribution.java @@ -357,7 +357,7 @@ static > Pair, List perTableIdIncomingBytes, for (FileStore fs : allWriteableFileStores) newStreamBytesToWritePerFileStore.merge(fs, totalBytesInPerFileStore, Long::sum); } - Map totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteBytes(), + Map totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes(), fileStoreMapper); long totalStreamRemaining = StreamManager.instance.getTotalRemainingOngoingBytes(); long totalBytesStreamRemainingPerFileStore = totalStreamRemaining / Math.max(1, allFileStores.size()); diff --git a/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java b/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java index 099f87dd40bd..89e43ecb264f 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/CompactionDiskSpaceTest.java @@ -113,7 +113,7 @@ public static class BB public static void install(ClassLoader cl, Integer node) { new ByteBuddy().rebase(ActiveCompactions.class) - .method(named("estimatedRemainingWriteBytes")) + .method(named("estimatedRemainingWriteToDiskBytes")) .intercept(MethodDelegation.to(BB.class)) .make() .load(cl, ClassLoadingStrategy.Default.INJECTION); @@ -125,7 +125,7 @@ public static void install(ClassLoader cl, Integer node) .load(cl, ClassLoadingStrategy.Default.INJECTION); } - public static Map estimatedRemainingWriteBytes() + public static Map estimatedRemainingWriteToDiskBytes() { if (sstableDir != null) return ImmutableMap.of(sstableDir, estimatedRemaining.get()); diff --git a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java index 9d168145c55b..eea28b01b1d1 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java @@ -60,7 +60,7 @@ public void test2iCompaction() throws IOException // emulate ongoing index compaction: CompactionInfo.Holder h = new MockHolder(i.getIndexCfs().metadata(), idxSSTables); CompactionManager.instance.active.beginCompaction(h); - CompactionManager.instance.active.estimatedRemainingWriteBytes(); + CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes(); CompactionManager.instance.active.finishCompaction(h); }); } @@ -79,7 +79,7 @@ public MockHolder(TableMetadata metadata, Set sstables) @Override public CompactionInfo getCompactionInfo() { - return new CompactionInfo(metadata, OperationType.COMPACTION, 0, 1000, nextTimeUUID(), sstables); + return new CompactionInfo(metadata, OperationType.COMPACTION, 0, 1000, 0, nextTimeUUID(), sstables); } @Override diff --git a/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java b/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java index 5d72660fe188..0e84ca612c20 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/StreamsDiskSpaceTest.java @@ -72,7 +72,7 @@ public void testAbortStreamsWhenOngoingCompactionsLeaveInsufficientSpace() throw .withConfig(config -> config.set("hinted_handoff_enabled", false) .with(GOSSIP) .with(NETWORK)) - .withInstanceInitializer((cl, id) -> BB.doInstall(cl, id, ActiveCompactions.class, "estimatedRemainingWriteBytes")) + .withInstanceInitializer((cl, id) -> BB.doInstall(cl, id, ActiveCompactions.class, "estimatedRemainingWriteToDiskBytes")) .start())) { cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}"); @@ -138,7 +138,7 @@ public static long getTotalRemainingOngoingBytes() return ongoing.get(); } - public static Map estimatedRemainingWriteBytes() + public static Map estimatedRemainingWriteToDiskBytes() { Map ret = new HashMap<>(); if (datadir != null) diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionInfoTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionInfoTest.java index 753a18505dec..d995941ef5a6 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionInfoTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionInfoTest.java @@ -39,7 +39,7 @@ public void testCompactionInfoToStringContainsTaskId() { ColumnFamilyStore cfs = MockSchema.newCFS(); TimeUUID expectedTaskId = nextTimeUUID(); - CompactionInfo compactionInfo = new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, 0, 1000, expectedTaskId, new ArrayList<>()); + CompactionInfo compactionInfo = new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, 0, 1000, 1000, expectedTaskId, new ArrayList<>()); Assertions.assertThat(compactionInfo.toString()) .contains(expectedTaskId.toString()); } @@ -50,7 +50,7 @@ public void testCompactionInfoToStringFormat() UUID tableId = UUID.randomUUID(); TimeUUID taskId = nextTimeUUID(); ColumnFamilyStore cfs = MockSchema.newCFS(builder -> builder.id(TableId.fromUUID(tableId))); - CompactionInfo compactionInfo = new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, 0, 1000, taskId, new ArrayList<>()); + CompactionInfo compactionInfo = new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, 0, 1000, 300, taskId, new ArrayList<>()); Assertions.assertThat(compactionInfo.toString()) .isEqualTo("Compaction(%s, 0 / 1000 bytes)@%s(mockks, mockcf1)", taskId, tableId); } diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java index 364ab56792e0..9ae5a32d9f55 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -55,6 +56,7 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.LegacySSTableTest; +import org.apache.cassandra.io.sstable.format.ForwardingSSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.util.File; @@ -83,7 +85,7 @@ public class CompactionsCQLTest extends CQLTester public void before() throws IOException { strategy = DatabaseDescriptor.getCorruptedTombstoneStrategy(); - + CommitLog.instance.resetUnsafe(true); } @@ -872,7 +874,9 @@ public void testNoDiskspace() throws Throwable execute("insert into %s (id, i) values (?,?)", i, i); getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); } - CompactionInfo.Holder holder = holder(OperationType.COMPACTION); + // When we have an existing an compaction with sstables of total size more than double the available space, + // we should not be able to then run a major compaction + CompactionInfo.Holder holder = holder(OperationType.COMPACTION, 2); CompactionManager.instance.active.beginCompaction(holder); try { @@ -888,7 +892,7 @@ public void testNoDiskspace() throws Throwable CompactionManager.instance.active.finishCompaction(holder); } // don't block compactions if there is a huge validation - holder = holder(OperationType.VALIDATION); + holder = holder(OperationType.VALIDATION, 2); CompactionManager.instance.active.beginCompaction(holder); try { @@ -898,9 +902,36 @@ public void testNoDiskspace() throws Throwable { CompactionManager.instance.active.finishCompaction(holder); } + + // Should be able to run when the sstables in question are 90% of the total available space + holder = holder(OperationType.COMPACTION, 0.9); + CompactionManager.instance.active.beginCompaction(holder); + try + { + getCurrentColumnFamilyStore().forceMajorCompaction(); + } + finally + { + CompactionManager.instance.active.finishCompaction(holder); + } + } + + private static final class OnDiskLengthOverrideSSTableReader extends ForwardingSSTableReader + { + private final long onDiskLengthOverride; + public OnDiskLengthOverrideSSTableReader(SSTableReader delegate, long onDiskLengthOverride) { + super(delegate); + this.onDiskLengthOverride = onDiskLengthOverride; + } + + @Override + public long onDiskLength() + { + return onDiskLengthOverride; + } } - private CompactionInfo.Holder holder(OperationType opType) + private CompactionInfo.Holder holder(OperationType opType, double availableSpaceMultiplier) { CompactionInfo.Holder holder = new CompactionInfo.Holder() { @@ -910,12 +941,19 @@ public CompactionInfo getCompactionInfo() for (File f : getCurrentColumnFamilyStore().getDirectories().getCFDirectories()) availableSpace += PathUtils.tryGetSpace(f.toPath(), FileStore::getUsableSpace); + Set liveSSTables = getCurrentColumnFamilyStore().getLiveSSTables(); + long totalDiskUsage = (long)(availableSpace * availableSpaceMultiplier); + long sstableSize = totalDiskUsage / liveSSTables.size(); + final Set overridenSStables = liveSSTables.stream().map(s -> new OnDiskLengthOverrideSSTableReader(s, sstableSize)).collect(Collectors.toSet()); + // Arbitrary compression ratio of 3.4 + long totalUncompressedSize = (long) ((double) totalDiskUsage * 3.4); return new CompactionInfo(getCurrentColumnFamilyStore().metadata(), opType, +0, - +availableSpace * 2, + totalUncompressedSize, + totalDiskUsage, nextTimeUUID(), - getCurrentColumnFamilyStore().getLiveSSTables()); + overridenSStables); } public boolean isGlobal() diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java index 4960bfd18888..b837b0ea2aea 100644 --- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java @@ -609,7 +609,7 @@ private void tryPredicate(ColumnFamilyStore cfs, List compacting, { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 1000, nextTimeUUID(), compacting); + return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 1000, 1000, nextTimeUUID(), compacting); } public boolean isGlobal() @@ -650,7 +650,7 @@ public void testRetries() throws InterruptedException, ExecutionException { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, nextTimeUUID(), cfs.getLiveSSTables()); + return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, 0, nextTimeUUID(), cfs.getLiveSSTables()); } public boolean isGlobal() @@ -703,7 +703,7 @@ public void testRetriesTimeout() throws InterruptedException, ExecutionException { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, nextTimeUUID(), cfs.getLiveSSTables()); + return new CompactionInfo(cfs.metadata(), OperationType.ANTICOMPACTION, 0, 0, 0, nextTimeUUID(), cfs.getLiveSSTables()); } public boolean isGlobal() diff --git a/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java b/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java index 6e8a13612b35..ec9bf0bb870d 100644 --- a/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/SSTableTasksTableTest.java @@ -69,6 +69,7 @@ public void testSelectAll() throws Throwable long bytesCompacted = 123; long bytesTotal = 123456; + long totalCompressedBytes = 112233; TimeUUID compactionId = nextTimeUUID(); List sstables = IntStream.range(0, 10) .mapToObj(i -> MockSchema.sstable(i, i * 10L, i * 10L + 9, cfs)) @@ -80,7 +81,7 @@ public void testSelectAll() throws Throwable { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, compactionId, sstables, directory); + return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables, directory); } public boolean isGlobal() diff --git a/test/unit/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryManagerTest.java index 40d04a48401b..aa3eb75811d4 100644 --- a/test/unit/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/indexsummary/IndexSummaryManagerTest.java @@ -654,7 +654,7 @@ public void testCancelIndexHelper(Consumer cancelFunction) th { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.UNKNOWN, 0, 0, nextTimeUUID(), compacting); + return new CompactionInfo(cfs.metadata(), OperationType.UNKNOWN, 0, 0, 0, nextTimeUUID(), compacting); } public boolean isGlobal() diff --git a/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java b/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java index 8758c3f8fd86..b3d14a3444c3 100644 --- a/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java +++ b/test/unit/org/apache/cassandra/tools/nodetool/CompactionStatsTest.java @@ -104,6 +104,7 @@ public void testCompactionStats() long bytesCompacted = 123; long bytesTotal = 123456; + long totalCompressedBytes = 112233; TimeUUID compactionId = nextTimeUUID(); List sstables = IntStream.range(0, 10) .mapToObj(i -> MockSchema.sstable(i, i * 10L, i * 10L + 9, cfs)) @@ -112,7 +113,7 @@ public void testCompactionStats() { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, compactionId, sstables); + return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables); } public boolean isGlobal() @@ -153,6 +154,7 @@ public void testCompactionStatsVtable() long bytesCompacted = 123; long bytesTotal = 123456; + long totalCompressedBytes = 112233; TimeUUID compactionId = nextTimeUUID(); List sstables = IntStream.range(0, 10) .mapToObj(i -> MockSchema.sstable(i, i * 10L, i * 10L + 9, cfs)) @@ -162,7 +164,7 @@ public void testCompactionStatsVtable() { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, compactionId, sstables, targetDirectory); + return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables, targetDirectory); } public boolean isGlobal() @@ -175,7 +177,7 @@ public boolean isGlobal() { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.CLEANUP, bytesCompacted, bytesTotal, compactionId, sstables); + return new CompactionInfo(cfs.metadata(), OperationType.CLEANUP, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables); } public boolean isGlobal() @@ -212,6 +214,7 @@ public void testCompactionStatsHumanReadable() long bytesCompacted = 123; long bytesTotal = 123456; + long totalCompressedBytes = 112233; TimeUUID compactionId = nextTimeUUID(); List sstables = IntStream.range(0, 10) .mapToObj(i -> MockSchema.sstable(i, i * 10L, i * 10L + 9, cfs)) @@ -220,7 +223,7 @@ public void testCompactionStatsHumanReadable() { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, compactionId, sstables); + return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables); } public boolean isGlobal() @@ -249,6 +252,7 @@ public void testCompactionStatsVtableHumanReadable() long bytesCompacted = 123; long bytesTotal = 123456; + long totalCompressedBytes = 112233; TimeUUID compactionId = nextTimeUUID(); List sstables = IntStream.range(0, 10) .mapToObj(i -> MockSchema.sstable(i, i * 10L, i * 10L + 9, cfs)) @@ -258,7 +262,7 @@ public void testCompactionStatsVtableHumanReadable() { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, compactionId, sstables, targetDirectory); + return new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables, targetDirectory); } public boolean isGlobal() @@ -271,7 +275,7 @@ public boolean isGlobal() { public CompactionInfo getCompactionInfo() { - return new CompactionInfo(cfs.metadata(), OperationType.CLEANUP, bytesCompacted, bytesTotal, compactionId, sstables); + return new CompactionInfo(cfs.metadata(), OperationType.CLEANUP, bytesCompacted, bytesTotal, totalCompressedBytes, compactionId, sstables); } public boolean isGlobal() From cad1b5f4106880f90efa13e99165b6e447e94fef Mon Sep 17 00:00:00 2001 From: Matt Byrd Date: Mon, 13 Apr 2026 19:36:02 +0100 Subject: [PATCH 2/3] address review comments, set compressed size to be non zero, vaguely realistic, remove seemingly superfluous testing machinery OnDiskLengthOverrideSSTableReader --- .../test/SecondaryIndexCompactionTest.java | 4 ++-- .../db/compaction/CompactionsCQLTest.java | 21 +------------------ 2 files changed, 3 insertions(+), 22 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java index eea28b01b1d1..35d47bc4bd72 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SecondaryIndexCompactionTest.java @@ -79,7 +79,7 @@ public MockHolder(TableMetadata metadata, Set sstables) @Override public CompactionInfo getCompactionInfo() { - return new CompactionInfo(metadata, OperationType.COMPACTION, 0, 1000, 0, nextTimeUUID(), sstables); + return new CompactionInfo(metadata, OperationType.COMPACTION, 0, 1000, 300, nextTimeUUID(), sstables); } @Override @@ -88,4 +88,4 @@ public boolean isGlobal() return false; } } -} \ No newline at end of file +} diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java index 9ae5a32d9f55..76e519b1d19e 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -56,7 +55,6 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.io.sstable.ISSTableScanner; import org.apache.cassandra.io.sstable.LegacySSTableTest; -import org.apache.cassandra.io.sstable.format.ForwardingSSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.io.util.File; @@ -916,21 +914,6 @@ public void testNoDiskspace() throws Throwable } } - private static final class OnDiskLengthOverrideSSTableReader extends ForwardingSSTableReader - { - private final long onDiskLengthOverride; - public OnDiskLengthOverrideSSTableReader(SSTableReader delegate, long onDiskLengthOverride) { - super(delegate); - this.onDiskLengthOverride = onDiskLengthOverride; - } - - @Override - public long onDiskLength() - { - return onDiskLengthOverride; - } - } - private CompactionInfo.Holder holder(OperationType opType, double availableSpaceMultiplier) { CompactionInfo.Holder holder = new CompactionInfo.Holder() @@ -943,8 +926,6 @@ public CompactionInfo getCompactionInfo() Set liveSSTables = getCurrentColumnFamilyStore().getLiveSSTables(); long totalDiskUsage = (long)(availableSpace * availableSpaceMultiplier); - long sstableSize = totalDiskUsage / liveSSTables.size(); - final Set overridenSStables = liveSSTables.stream().map(s -> new OnDiskLengthOverrideSSTableReader(s, sstableSize)).collect(Collectors.toSet()); // Arbitrary compression ratio of 3.4 long totalUncompressedSize = (long) ((double) totalDiskUsage * 3.4); return new CompactionInfo(getCurrentColumnFamilyStore().metadata(), @@ -953,7 +934,7 @@ public CompactionInfo getCompactionInfo() totalUncompressedSize, totalDiskUsage, nextTimeUUID(), - overridenSStables); + liveSSTables); } public boolean isGlobal() From c3709034eed73eaa66b8fcb085f6404e5f1b12e8 Mon Sep 17 00:00:00 2001 From: Matt Byrd Date: Tue, 14 Apr 2026 17:59:16 +0100 Subject: [PATCH 3/3] fix brackets, make totalCompressed explicit parameter --- src/java/org/apache/cassandra/cache/AutoSavingCache.java | 3 ++- .../org/apache/cassandra/db/compaction/CompactionInfo.java | 2 +- .../org/apache/cassandra/db/compaction/CompactionIterator.java | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index ecd0a8d01a50..f1df894d4543 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -342,7 +342,8 @@ public CompactionInfo getCompactionInfo() { // keyset can change in size, thus total can too // TODO need to check for this one... was: info.forProgress(keysWritten, Math.max(keysWritten, keys.size())); - return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate)); + long totalKeys = Math.max(keysWritten, keysEstimate); + return info.forProgress(keysWritten, totalKeys, totalKeys); } public void saveCache() diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java index e87c4e0e109b..e00b062a035c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java @@ -100,7 +100,7 @@ public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationTy } /** @return A copy of this CompactionInfo with updated progress. */ - public CompactionInfo forProgress(long complete, long total) + public CompactionInfo forProgress(long complete, long total, long totalCompressed) { return new CompactionInfo(metadata, tasktype, complete, total, totalCompressed, unit, compactionId, sstables, targetDirectory); } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 95e4955c3954..e550977d304f 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -137,7 +137,8 @@ public CompactionIterator(OperationType type, long bytes = 0; long compressedBytes = 0; - for (ISSTableScanner scanner : scanners) { + for (ISSTableScanner scanner : scanners) + { bytes += scanner.getLengthInBytes(); compressedBytes += scanner.getCompressedLengthInBytes(); }