Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/cache/AutoSavingCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ else if (cacheType == CacheService.CacheType.COUNTER_CACHE)
type,
0,
keysEstimate,
keysEstimate,
Unit.KEYS,
nextTimeUUID(),
getCacheDataPath(CURRENT_VERSION).toPath().toString());
Expand All @@ -341,7 +342,8 @@ public CompactionInfo getCompactionInfo()
{
// keyset can change in size, thus total can too
// TODO need to check for this one... was: info.forProgress(keysWritten, Math.max(keysWritten, keys.size()));
return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate));
long totalKeys = Math.max(keysWritten, keysEstimate);
return info.forProgress(keysWritten, totalKeys, totalKeys);
}

public void saveCache()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void finishCompaction(CompactionInfo.Holder ci)
/**
* Get the estimated number of bytes remaining to write per sstable directory
*/
public Map<File, Long> estimatedRemainingWriteBytes()
public Map<File, Long> estimatedRemainingWriteToDiskBytes()
{
synchronized (compactions)
{
Expand All @@ -66,7 +66,7 @@ public Map<File, Long> estimatedRemainingWriteBytes()
List<File> directories = compactionInfo.getTargetDirectories();
if (directories == null || directories.isEmpty())
continue;
long remainingWriteBytesPerDataDir = compactionInfo.estimatedRemainingWriteBytes() / directories.size();
long remainingWriteBytesPerDataDir = compactionInfo.estimatedRemainingWriteToDiskBytes() / directories.size();
for (File directory : directories)
writeBytesPerSSTableDir.merge(directory, remainingWriteBytesPerDataDir, Long::sum);
}
Expand Down
34 changes: 20 additions & 14 deletions src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,55 +52,57 @@ public final class CompactionInfo
private final OperationType tasktype;
private final long completed;
private final long total;
private final long totalCompressed;
private final Unit unit;
private final TimeUUID compactionId;
private final ImmutableSet<SSTableReader> sstables;
private final String targetDirectory;

public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId, Collection<? extends SSTableReader> sstables, String targetDirectory)
public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId, Collection<? extends SSTableReader> sstables, String targetDirectory)
{
this.tasktype = tasktype;
this.completed = completed;
this.total = total;
this.totalCompressed = totalCompressed;
this.metadata = metadata;
this.unit = unit;
this.compactionId = compactionId;
this.sstables = ImmutableSet.copyOf(sstables);
this.targetDirectory = targetDirectory;
}

public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, TimeUUID compactionId, Collection<SSTableReader> sstables, String targetDirectory)
public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, TimeUUID compactionId, Collection<SSTableReader> sstables, String targetDirectory)
{
this(metadata, tasktype, completed, total, Unit.BYTES, compactionId, sstables, targetDirectory);
this(metadata, tasktype, completed, total, totalCompressed, Unit.BYTES, compactionId, sstables, targetDirectory);
}

public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, TimeUUID compactionId, Collection<? extends SSTableReader> sstables)
public CompactionInfo(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, TimeUUID compactionId, Collection<? extends SSTableReader> sstables)
{
this(metadata, tasktype, completed, total, Unit.BYTES, compactionId, sstables, null);
this(metadata, tasktype, completed, total, totalCompressed, Unit.BYTES, compactionId, sstables, null);
}

/**
* Special compaction info where we always need to cancel the compaction - for example ViewBuilderTask where we don't know
* the sstables at construction
*/
public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId)
public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId)
{
return withoutSSTables(metadata, tasktype, completed, total, unit, compactionId, null);
return withoutSSTables(metadata, tasktype, completed, total, totalCompressed, unit, compactionId, null);
}

/**
* Special compaction info where we always need to cancel the compaction - for example AutoSavingCache where we don't know
* the sstables at construction
*/
public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, Unit unit, TimeUUID compactionId, String targetDirectory)
public static CompactionInfo withoutSSTables(TableMetadata metadata, OperationType tasktype, long completed, long total, long totalCompressed, Unit unit, TimeUUID compactionId, String targetDirectory)
{
return new CompactionInfo(metadata, tasktype, completed, total, unit, compactionId, ImmutableSet.of(), targetDirectory);
return new CompactionInfo(metadata, tasktype, completed, total, totalCompressed, unit, compactionId, ImmutableSet.of(), targetDirectory);
}

/** @return A copy of this CompactionInfo with updated progress. */
public CompactionInfo forProgress(long complete, long total)
public CompactionInfo forProgress(long complete, long total, long totalCompressed)
{
return new CompactionInfo(metadata, tasktype, complete, total, unit, compactionId, sstables, targetDirectory);
return new CompactionInfo(metadata, tasktype, complete, total, totalCompressed, unit, compactionId, sstables, targetDirectory);
}

public Optional<String> getKeyspace()
Expand Down Expand Up @@ -183,12 +185,16 @@ public String targetDirectory()
/**
* Note that this estimate is based on the amount of data we have left to read - it assumes input
* size == output size for a compaction, which is not really true, but should most often provide a worst case
* remaining write size.
* remaining write size. We also scale by the effective compression ratio since total/completed are for the uncompressed size.
*/
public long estimatedRemainingWriteBytes()
public long estimatedRemainingWriteToDiskBytes()
{
if (unit == Unit.BYTES && tasktype.writesData)
return getTotal() - getCompleted();
{
final long total = getTotal();
double compressionRatio = total == 0 ? 1 : ((double) totalCompressed / (double)total);
return (long)(compressionRatio * (total - getCompleted()));
}
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
private final long nowInSec;
private final TimeUUID compactionId;
private final long totalBytes;
private final long totalCompressedBytes;
private long bytesRead;
private long totalSourceCQLRows;

Expand Down Expand Up @@ -135,9 +136,14 @@ public CompactionIterator(OperationType type,
this.bytesRead = 0;

long bytes = 0;
long compressedBytes = 0;
for (ISSTableScanner scanner : scanners)
{
bytes += scanner.getLengthInBytes();
compressedBytes += scanner.getCompressedLengthInBytes();
}
this.totalBytes = bytes;
this.totalCompressedBytes = compressedBytes;
this.mergeCounters = new long[scanners.size()];
// note that we leak `this` from the constructor when calling beginCompaction below, this means we have to get the sstables before
// calling that to avoid a NPE.
Expand Down Expand Up @@ -170,6 +176,7 @@ public CompactionInfo getCompactionInfo()
type,
bytesRead,
totalBytes,
totalCompressedBytes,
compactionId,
sstables,
targetDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ protected boolean buildCompactionCandidatesForAvailableDiskSpace(final Set<SSTab
for (File directory : newCompactionDatadirs)
expectedNewWriteSize.put(directory, writeSizePerOutputDatadir);

Map<File, Long> expectedWriteSize = CompactionManager.instance.active.estimatedRemainingWriteBytes();
Map<File, Long> expectedWriteSize = CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes();

// todo: abort streams if they block compactions
if (cfs.getDirectories().hasDiskSpaceForCompactionsAndStreams(expectedNewWriteSize, expectedWriteSize))
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/view/ViewBuilderTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,13 @@ public CompactionInfo getCompactionInfo()
if (range.left.getPartitioner().splitter().isPresent())
{
long progress = prevToken == null ? 0 : Math.round(prevToken.getPartitioner().splitter().get().positionInRange(prevToken, range) * 1000);
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, Unit.RANGES, compactionId);
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, progress, 1000, 1000, Unit.RANGES, compactionId);
}

// When there is no splitter, estimate based on number of total keys but
// take the max with keysBuilt + 1 to avoid having more completed than total
long keysTotal = Math.max(keysBuilt + 1, baseCfs.estimatedKeysForRange(range));
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, Unit.KEYS, compactionId);
return CompactionInfo.withoutSSTables(baseCfs.metadata(), OperationType.VIEW_BUILD, keysBuilt, keysTotal, keysTotal, Unit.KEYS, compactionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ public CompactionInfo getCompactionInfo()
OperationType.INDEX_BUILD,
iter.getBytesRead(),
iter.getTotalBytes(),
iter.getTotalBytes(),
compactionId,
sstables);
sstables
);
}

public void build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ public CompactionInfo getCompactionInfo()
OperationType.INDEX_BUILD,
bytesProcessed,
totalSizeInBytes,
totalSizeInBytes,
compactionId,
sstables.keySet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public CompactionInfo getCompactionInfo()
OperationType.INDEX_BUILD,
bytesProcessed,
totalBytesToProcess,
totalBytesToProcess,
compactionId,
sstables.keySet(),
targetDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ public CompactionInfo getCompactionInfo()
OperationType.SCRUB,
dataFile.getFilePointer(),
dataFile.length(),
sstable.onDiskLength(),
scrubCompactionId,
ImmutableSet.of(sstable),
File.getPath(sstable.getFilename()).getParent().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ public CompactionInfo getCompactionInfo()
OperationType.VERIFY,
dataFile.getFilePointer(),
dataFile.length(),
sstable.onDiskLength(),
verificationCompactionId,
ImmutableSet.of(sstable));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ static <T extends SSTableReader & IndexSummarySupport<T>> Pair<List<T>, List<Res

public CompactionInfo getCompactionInfo()
{
return CompactionInfo.withoutSSTables(null, OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, Unit.BYTES, compactionId);
return CompactionInfo.withoutSSTables(null, OperationType.INDEX_SUMMARY, (memoryPoolBytes - remainingSpace), memoryPoolBytes, memoryPoolBytes, Unit.BYTES, compactionId);
}

public boolean isGlobal()
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/streaming/StreamSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ static boolean checkDiskSpace(Map<TableId, Long> perTableIdIncomingBytes,
for (FileStore fs : allWriteableFileStores)
newStreamBytesToWritePerFileStore.merge(fs, totalBytesInPerFileStore, Long::sum);
}
Map<FileStore, Long> totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteBytes(),
Map<FileStore, Long> totalCompactionWriteRemaining = Directories.perFileStore(CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes(),
fileStoreMapper);
long totalStreamRemaining = StreamManager.instance.getTotalRemainingOngoingBytes();
long totalBytesStreamRemainingPerFileStore = totalStreamRemaining / Math.max(1, allFileStores.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public static class BB
public static void install(ClassLoader cl, Integer node)
{
new ByteBuddy().rebase(ActiveCompactions.class)
.method(named("estimatedRemainingWriteBytes"))
.method(named("estimatedRemainingWriteToDiskBytes"))
.intercept(MethodDelegation.to(BB.class))
.make()
.load(cl, ClassLoadingStrategy.Default.INJECTION);
Expand All @@ -125,7 +125,7 @@ public static void install(ClassLoader cl, Integer node)
.load(cl, ClassLoadingStrategy.Default.INJECTION);
}

public static Map<File, Long> estimatedRemainingWriteBytes()
public static Map<File, Long> estimatedRemainingWriteToDiskBytes()
{
if (sstableDir != null)
return ImmutableMap.of(sstableDir, estimatedRemaining.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void test2iCompaction() throws IOException
// emulate ongoing index compaction:
CompactionInfo.Holder h = new MockHolder(i.getIndexCfs().metadata(), idxSSTables);
CompactionManager.instance.active.beginCompaction(h);
CompactionManager.instance.active.estimatedRemainingWriteBytes();
CompactionManager.instance.active.estimatedRemainingWriteToDiskBytes();
CompactionManager.instance.active.finishCompaction(h);
});
}
Expand All @@ -79,7 +79,7 @@ public MockHolder(TableMetadata metadata, Set<SSTableReader> sstables)
@Override
public CompactionInfo getCompactionInfo()
{
return new CompactionInfo(metadata, OperationType.COMPACTION, 0, 1000, nextTimeUUID(), sstables);
return new CompactionInfo(metadata, OperationType.COMPACTION, 0, 1000, 300, nextTimeUUID(), sstables);
}

@Override
Expand All @@ -88,4 +88,4 @@ public boolean isGlobal()
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void testAbortStreamsWhenOngoingCompactionsLeaveInsufficientSpace() throw
.withConfig(config -> config.set("hinted_handoff_enabled", false)
.with(GOSSIP)
.with(NETWORK))
.withInstanceInitializer((cl, id) -> BB.doInstall(cl, id, ActiveCompactions.class, "estimatedRemainingWriteBytes"))
.withInstanceInitializer((cl, id) -> BB.doInstall(cl, id, ActiveCompactions.class, "estimatedRemainingWriteToDiskBytes"))
.start()))
{
cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int) with compaction={'class': 'SizeTieredCompactionStrategy'}");
Expand Down Expand Up @@ -138,7 +138,7 @@ public static long getTotalRemainingOngoingBytes()
return ongoing.get();
}

public static Map<File, Long> estimatedRemainingWriteBytes()
public static Map<File, Long> estimatedRemainingWriteToDiskBytes()
{
Map<File, Long> ret = new HashMap<>();
if (datadir != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testCompactionInfoToStringContainsTaskId()
{
ColumnFamilyStore cfs = MockSchema.newCFS();
TimeUUID expectedTaskId = nextTimeUUID();
CompactionInfo compactionInfo = new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, 0, 1000, expectedTaskId, new ArrayList<>());
CompactionInfo compactionInfo = new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, 0, 1000, 1000, expectedTaskId, new ArrayList<>());
Assertions.assertThat(compactionInfo.toString())
.contains(expectedTaskId.toString());
}
Expand All @@ -50,7 +50,7 @@ public void testCompactionInfoToStringFormat()
UUID tableId = UUID.randomUUID();
TimeUUID taskId = nextTimeUUID();
ColumnFamilyStore cfs = MockSchema.newCFS(builder -> builder.id(TableId.fromUUID(tableId)));
CompactionInfo compactionInfo = new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, 0, 1000, taskId, new ArrayList<>());
CompactionInfo compactionInfo = new CompactionInfo(cfs.metadata(), OperationType.COMPACTION, 0, 1000, 300, taskId, new ArrayList<>());
Assertions.assertThat(compactionInfo.toString())
.isEqualTo("Compaction(%s, 0 / 1000 bytes)@%s(mockks, mockcf1)", taskId, tableId);
}
Expand Down
Loading