Skip to content

Commit

Permalink
Fix LeveledCompactionStrategy compacts last level throw an ArrayIndex…
Browse files Browse the repository at this point in the history
…OutOfBoundsException

patch by Alexey Zotov; reviewed by Marcus Eriksson, Yifan Cai for CASSANDRA-15669
  • Loading branch information
azotcsit authored and yifan-c committed May 31, 2021
1 parent 85358be commit 803a23b
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
3.11.11
* Fix LeveledCompactionStrategy compacts last level throw an ArrayIndexOutOfBoundsException (CASSANDRA-15669)
* Maps $CASSANDRA_LOG_DIR to cassandra.logdir java property when executing nodetool (CASSANDRA-16199)
* Nodetool garbagecollect should retain SSTableLevel for LCS (CASSANDRA-16634)
* Ignore stale acks received in the shadow round (CASSANDRA-16588)
Expand Down
3 changes: 2 additions & 1 deletion doc/source/operating/compaction.rst
Expand Up @@ -335,7 +335,8 @@ cover the full range. We also can't compact all L0 sstables with all L1 sstables
use too much memory.

When deciding which level to compact LCS checks the higher levels first (with LCS, a "higher" level is one with a higher
number, L0 being the lowest one) and if the level is behind a compaction will be started in that level.
number: L0 is the lowest one, L8 is the highest one) and if the level is behind a compaction will be started
in that level.

Major compaction
~~~~~~~~~~~~~~~~
Expand Down
Expand Up @@ -51,10 +51,8 @@ class LeveledGenerations
{
private static final Logger logger = LoggerFactory.getLogger(LeveledGenerations.class);
private final boolean strictLCSChecksTest = Boolean.getBoolean(Config.PROPERTY_PREFIX + "test.strict_lcs_checks");
// allocate enough generations for a PB of data, with a 1-MB sstable size. (Note that if maxSSTableSize is
// updated, we will still have sstables of the older, potentially smaller size. So don't make this
// dependent on maxSSTableSize.)
static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 1000);
// It includes L0, i.e. we support [L0 - L8] levels
static final int MAX_LEVEL_COUNT = 9;

/**
* This map is used to track the original NORMAL instances of sstables
Expand Down
13 changes: 12 additions & 1 deletion src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
Expand Up @@ -239,11 +239,22 @@ public synchronized CompactionCandidate getCompactionCandidates()
// we want to calculate score excluding compacting ones
Set<SSTableReader> sstablesInLevel = Sets.newHashSet(sstables);
Set<SSTableReader> remaining = Sets.difference(sstablesInLevel, cfs.getTracker().getCompacting());
double score = (double) SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i, maxSSTableSizeInBytes);
long remainingBytesForLevel = SSTableReader.getTotalBytes(remaining);
long maxBytesForLevel = maxBytesForLevel(i, maxSSTableSizeInBytes);
double score = (double) remainingBytesForLevel / (double) maxBytesForLevel;
logger.trace("Compaction score for level {} is {}", i, score);

if (score > 1.001)
{
// the highest level should not ever exceed its maximum size under normal curcumstaces,
// but if it happens we warn about it
if (i == generations.levelCount() - 1)
{
logger.warn("L" + i + " (maximum supported level) has " + remainingBytesForLevel + " bytes while "
+ "its maximum size is supposed to be " + maxBytesForLevel + " bytes");
continue;
}

// before proceeding with a higher level, let's see if L0 is far enough behind to warrant STCS
CompactionCandidate l0Compaction = getSTCSInL0CompactionCandidate();
if (l0Compaction != null)
Expand Down
58 changes: 35 additions & 23 deletions test/unit/org/apache/cassandra/MockSchema.java
Expand Up @@ -39,6 +39,7 @@
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.Memory;
import org.apache.cassandra.io.util.FileHandle;
Expand All @@ -62,7 +63,7 @@ public class MockSchema
public static final Keyspace ks = Keyspace.mockKS(KeyspaceMetadata.create("mockks", KeyspaceParams.simpleTransient(1)));

public static final IndexSummary indexSummary;
private static final FileHandle RANDOM_ACCESS_READER_FACTORY = new FileHandle.Builder(temp("mocksegmentedfile").getAbsolutePath()).complete();
private static final File tempFile = temp("mocksegmentedfile");

public static Memtable memtable(ColumnFamilyStore cfs)
{
Expand Down Expand Up @@ -94,6 +95,11 @@ public static SSTableReader sstableWithLevel(int generation, long firstToken, lo
return sstable(generation, 0, false, firstToken, lastToken, level, cfs);
}

public static SSTableReader sstableWithLevel(int generation, int size, int level, ColumnFamilyStore cfs)
{
return sstable(generation, size, false, generation, generation, level, cfs);
}

public static SSTableReader sstable(int generation, int size, boolean keepRef, long firstToken, long lastToken, ColumnFamilyStore cfs)
{
return sstable(generation, size, keepRef, firstToken, lastToken, 0, cfs);
Expand All @@ -117,34 +123,40 @@ public static SSTableReader sstable(int generation, int size, boolean keepRef, l
{
}
}
if (size > 0)
// .complete() with size to make sstable.onDiskLength work
try (FileHandle.Builder builder = new FileHandle.Builder(new ChannelProxy(tempFile)).bufferSize(size);
FileHandle fileHandle = builder.complete(size))
{
try
if (size > 0)
{
File file = new File(descriptor.filenameFor(Component.DATA));
try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
try
{
raf.setLength(size);
File file = new File(descriptor.filenameFor(Component.DATA));
try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
{
raf.setLength(size);
}
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
catch (IOException e)
{
throw new RuntimeException(e);
}
SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList());
StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
.sstableLevel(level)
.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, UNREPAIRED_SSTABLE, header)
.get(MetadataType.STATS);
SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata,
fileHandle.sharedCopy(), fileHandle.sharedCopy(), indexSummary.sharedCopy(),
new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header);
reader.first = readerBounds(firstToken);
reader.last = readerBounds(lastToken);
if (!keepRef)
reader.selfRef().release();
return reader;
}
SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.emptyList());
StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
.sstableLevel(level)
.finalizeMetadata(cfs.metadata.partitioner.getClass().getCanonicalName(), 0.01f, UNREPAIRED_SSTABLE, header)
.get(MetadataType.STATS);
SSTableReader reader = SSTableReader.internalOpen(descriptor, components, cfs.metadata,
RANDOM_ACCESS_READER_FACTORY.sharedCopy(), RANDOM_ACCESS_READER_FACTORY.sharedCopy(), indexSummary.sharedCopy(),
new AlwaysPresentFilter(), 1L, metadata, SSTableReader.OpenReason.NORMAL, header);
reader.first = readerBounds(firstToken);
reader.last = readerBounds(lastToken);
if (!keepRef)
reader.selfRef().release();
return reader;

}

public static ColumnFamilyStore newCFS()
Expand Down
Expand Up @@ -64,6 +64,7 @@
import org.apache.cassandra.utils.FBUtilities;

import static java.util.Collections.singleton;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -569,7 +570,7 @@ public void singleTokenSSTableTest()
public void randomMultiLevelAddTest()
{
int iterations = 100;
int levelCount = 8;
int levelCount = 9;

ColumnFamilyStore cfs = MockSchema.newCFS();
LeveledManifest lm = new LeveledManifest(cfs, 10, 10, new SizeTieredCompactionStrategyOptions());
Expand Down Expand Up @@ -694,4 +695,32 @@ private static void assertLevelsEqual(Collection<SSTableReader> l1, Collection<S
assertEquals(l1.size(), l2.size());
assertEquals(new HashSet<>(l1), new HashSet<>(l2));
}

@Test
public void testHighestLevelHasMoreDataThanSupported()
{
ColumnFamilyStore cfs = MockSchema.newCFS();
int fanoutSize = 2; // to generate less sstables
LeveledManifest lm = new LeveledManifest(cfs, 1, fanoutSize, new SizeTieredCompactionStrategyOptions());

// generate data for L7 to trigger compaction
int l7 = 7;
int maxBytesForL7 = (int) (Math.pow(fanoutSize, l7) * 1024 * 1024);
int sstablesSizeForL7 = (int) (maxBytesForL7 * 1.001) + 1;
List<SSTableReader> sstablesOnL7 = Collections.singletonList(MockSchema.sstableWithLevel( 1, sstablesSizeForL7, l7, cfs));
lm.addSSTables(sstablesOnL7);

// generate data for L8 to trigger compaction
int l8 = 8;
int maxBytesForL8 = (int) (Math.pow(fanoutSize, l8) * 1024 * 1024);
int sstablesSizeForL8 = (int) (maxBytesForL8 * 1.001) + 1;
List<SSTableReader> sstablesOnL8 = Collections.singletonList(MockSchema.sstableWithLevel( 2, sstablesSizeForL8, l8, cfs));
lm.addSSTables(sstablesOnL8);

// compaction for L8 sstables is not supposed to be run because there is no upper level to promote sstables
// that's why we expect compaction candidates for L7 only
Collection<SSTableReader> compactionCandidates = lm.getCompactionCandidates().sstables;
assertThat(compactionCandidates).containsAll(sstablesOnL7);
assertThat(compactionCandidates).doesNotContainAnyElementsOf(sstablesOnL8);
}
}

0 comments on commit 803a23b

Please sign in to comment.