Skip to content

Commit

Permalink
Fix delayed SSTable release with unsafe_aggressive_sstable_expiration
Browse files Browse the repository at this point in the history
patch by Ethan Brown; reviewed by Branimir Lambov and Mick Semb Wever for CASSANDRA-18756
  • Loading branch information
ethan-brown2022 authored and blambov committed Sep 21, 2023
1 parent edf22ed commit 87c2af8
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
3.11.17
* Fix delayed SSTable release with unsafe_aggressive_sstable_expiration (CASSANDRA-18756)
* Revert CASSANDRA-18543 (CASSANDRA-18854)
* Fix NPE when using udfContext in UDF after a restart of a node (CASSANDRA-18739)
Merged from 3.0:
Expand Down
Expand Up @@ -81,6 +81,8 @@ public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting

public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting, int gcBefore, RateLimiter limiter, TombstoneOption tombstoneOption)
{
//When making changes to the method, be aware that some of the state of the controller may still be uninitialized
//(e.g. TWCS sets up the value of ignoreOverlaps() after this completes)
assert cfs != null;
this.cfs = cfs;
this.gcBefore = gcBefore;
Expand All @@ -105,12 +107,6 @@ public void maybeRefreshOverlaps()
return;
}

if (ignoreOverlaps())
{
logger.debug("not refreshing overlaps - running with ignoreOverlaps activated");
return;
}

for (SSTableReader reader : overlappingSSTables)
{
if (reader.isMarkedCompacted())
Expand All @@ -129,7 +125,7 @@ private void refreshOverlaps()
if (this.overlappingSSTables != null)
close();

if (compacting == null || ignoreOverlaps())
if (compacting == null)
overlappingSSTables = Refs.tryRef(Collections.<SSTableReader>emptyList());
else
overlappingSSTables = cfs.getAndReferenceOverlappingLiveSSTables(compacting);
Expand Down Expand Up @@ -358,6 +354,8 @@ private UnfilteredRowIterator getShadowIterator(SSTableReader reader, DecoratedK
* This strategy can retain for a long time a lot of sstables on disk (see CASSANDRA-13418) so this option
* control whether or not this check should be ignored.
*
* Do NOT call this method in the CompactionController constructor
*
* @return false by default
*/
protected boolean ignoreOverlaps()
Expand Down
Expand Up @@ -19,12 +19,19 @@
package org.apache.cassandra.db.compaction;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
Expand All @@ -41,17 +48,27 @@
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;

@RunWith(BMUnitRunner.class)
public class CompactionControllerTest extends SchemaLoader
{
private static final String KEYSPACE = "CompactionControllerTest";
private static final String CF1 = "Standard1";
private static final String CF2 = "Standard2";
private static final int TTL_SECONDS = 10;
private static CountDownLatch compaction2FinishLatch = new CountDownLatch(1);
private static CountDownLatch createCompactionControllerLatch = new CountDownLatch(1);
private static CountDownLatch compaction1RefreshLatch = new CountDownLatch(1);
private static CountDownLatch refreshCheckLatch = new CountDownLatch(1);
private static int overlapRefreshCounter = 0;

@BeforeClass
public static void defineSchema() throws ConfigurationException
Expand Down Expand Up @@ -184,6 +201,124 @@ public void testGetFullyExpiredSSTables()
assertEquals(1, expired.size());
}

@Test
@BMRules(rules = {
@BMRule(name = "Pause compaction",
targetClass = "CompactionTask",
targetMethod = "runMayThrow",
targetLocation = "INVOKE getCompactionAwareWriter",
condition = "Thread.currentThread().getName().equals(\"compaction1\")",
action = "org.apache.cassandra.db.compaction.CompactionControllerTest.createCompactionControllerLatch.countDown();" +
"com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" +
"(org.apache.cassandra.db.compaction.CompactionControllerTest.compaction2FinishLatch);"),
@BMRule(name = "Check overlaps",
targetClass = "CompactionTask",
targetMethod = "runMayThrow",
targetLocation = "INVOKE finish",
condition = "Thread.currentThread().getName().equals(\"compaction1\")",
action = "org.apache.cassandra.db.compaction.CompactionControllerTest.compaction1RefreshLatch.countDown();" +
"com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" +
"(org.apache.cassandra.db.compaction.CompactionControllerTest.refreshCheckLatch);"),
@BMRule(name = "Increment overlap refresh counter",
targetClass = "ColumnFamilyStore",
targetMethod = "getAndReferenceOverlappingLiveSSTables",
condition = "Thread.currentThread().getName().equals(\"compaction1\")",
action = "org.apache.cassandra.db.compaction.CompactionControllerTest.incrementOverlapRefreshCounter();")
})
public void testIgnoreOverlaps() throws Exception
{
testOverlapIterator(true);
overlapRefreshCounter = 0;
compaction2FinishLatch = new CountDownLatch(1);
createCompactionControllerLatch = new CountDownLatch(1);
compaction1RefreshLatch = new CountDownLatch(1);
refreshCheckLatch = new CountDownLatch(1);
testOverlapIterator(false);
}

public void testOverlapIterator(boolean ignoreOverlaps) throws Exception
{

Keyspace keyspace = Keyspace.open(KEYSPACE);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1);
cfs.truncateBlocking();
cfs.disableAutoCompaction();

//create 2 overlapping sstables
DecoratedKey key = Util.dk("k1");
long timestamp1 = FBUtilities.timestampMicros();
long timestamp2 = timestamp1 - 5;
applyMutation(cfs.metadata, key, timestamp1);
cfs.forceBlockingFlush();
assertEquals(cfs.getLiveSSTables().size(), 1);
Set<SSTableReader> sstables = cfs.getLiveSSTables();

applyMutation(cfs.metadata, key, timestamp2);
cfs.forceBlockingFlush();
assertEquals(cfs.getLiveSSTables().size(), 2);
String sstable2 = cfs.getLiveSSTables().iterator().next().getFilename();

System.setProperty(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY, "true");
Map<String, String> options = new HashMap<>();
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30");
options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "SECONDS");
options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS");
options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0");
options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, Boolean.toString(ignoreOverlaps));
TimeWindowCompactionStrategy twcs = new TimeWindowCompactionStrategy(cfs, options);
for (SSTableReader sstable : cfs.getLiveSSTables())
twcs.addSSTable(sstable);

twcs.startup();

CompactionTask task = (CompactionTask)twcs.getUserDefinedTask(sstables, 0);

assertNotNull(task);
assertEquals(1, Iterables.size(task.transaction.originals()));

//start a compaction for the first sstable (compaction1)
//the overlap iterator should contain sstable2
//this compaction will be paused by the BMRule
Thread t = new Thread(() -> {
task.execute(null);
});

//start a compaction for the second sstable (compaction2)
//the overlap iterator should contain sstable1
//this compaction should complete as normal
Thread t2 = new Thread(() -> {
Uninterruptibles.awaitUninterruptibly(createCompactionControllerLatch);
assertEquals(1, overlapRefreshCounter);
CompactionManager.instance.forceUserDefinedCompaction(sstable2);

//after compaction2 is finished, wait 1 minute and then resume compaction1 (this gives enough time for the overlapIterator to be refreshed)
//after resuming, the overlap iterator for compaction1 should be updated to include the new sstable created by compaction2,
//and it should not contain sstable2
try
{
TimeUnit.MINUTES.sleep(1);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
compaction2FinishLatch.countDown();
});

t.setName("compaction1");
t.start();
t2.start();

compaction1RefreshLatch.await();
//at this point, the overlap iterator for compaction1 should be refreshed

//verify that the overlap iterator for compaction1 is refreshed twice, (once during the constructor, and again after compaction2 finishes)
assertEquals(2, overlapRefreshCounter);

refreshCheckLatch.countDown();
t.join();
}

private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp)
{
ByteBuffer val = ByteBufferUtil.bytes(1L);
Expand All @@ -206,4 +341,9 @@ private void assertPurgeBoundary(Predicate<Long> evaluator, long boundary)
assertFalse(evaluator.test(boundary));
assertTrue(evaluator.test(boundary - 1));
}

public static void incrementOverlapRefreshCounter()
{
overlapRefreshCounter++;
}
}

0 comments on commit 87c2af8

Please sign in to comment.