diff --git a/CHANGES.txt b/CHANGES.txt index 2d9e2059e192..74755be6e795 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.11.17 + * Fix delayed SSTable release with unsafe_aggressive_sstable_expiration (CASSANDRA-18756) * Revert CASSANDRA-18543 (CASSANDRA-18854) * Fix NPE when using udfContext in UDF after a restart of a node (CASSANDRA-18739) Merged from 3.0: diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index 19318ff1a901..06272a10754c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -81,6 +81,8 @@ public CompactionController(ColumnFamilyStore cfs, Set compacting public CompactionController(ColumnFamilyStore cfs, Set compacting, int gcBefore, RateLimiter limiter, TombstoneOption tombstoneOption) { + //When making changes to the method, be aware that some of the state of the controller may still be uninitialized + //(e.g. TWCS sets up the value of ignoreOverlaps() after this completes) assert cfs != null; this.cfs = cfs; this.gcBefore = gcBefore; @@ -105,12 +107,6 @@ public void maybeRefreshOverlaps() return; } - if (ignoreOverlaps()) - { - logger.debug("not refreshing overlaps - running with ignoreOverlaps activated"); - return; - } - for (SSTableReader reader : overlappingSSTables) { if (reader.isMarkedCompacted()) @@ -129,7 +125,7 @@ private void refreshOverlaps() if (this.overlappingSSTables != null) close(); - if (compacting == null || ignoreOverlaps()) + if (compacting == null) overlappingSSTables = Refs.tryRef(Collections.emptyList()); else overlappingSSTables = cfs.getAndReferenceOverlappingLiveSSTables(compacting); @@ -358,6 +354,8 @@ private UnfilteredRowIterator getShadowIterator(SSTableReader reader, DecoratedK * This strategy can retain for a long time a lot of sstables on disk (see CASSANDRA-13418) so this option * control whether or not this check should be ignored. * + * Do NOT call this method in the CompactionController constructor + * * @return false by default */ protected boolean ignoreOverlaps() diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java index 052206e685fb..aa95ba56fb18 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java @@ -19,12 +19,19 @@ package org.apache.cassandra.db.compaction; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Uninterruptibles; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; @@ -41,17 +48,27 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNotNull; +@RunWith(BMUnitRunner.class) public class CompactionControllerTest extends SchemaLoader { private static final String KEYSPACE = "CompactionControllerTest"; private static final String CF1 = "Standard1"; private static final String CF2 = "Standard2"; + private static final int TTL_SECONDS = 10; + private static CountDownLatch compaction2FinishLatch = new CountDownLatch(1); + private static CountDownLatch createCompactionControllerLatch = new CountDownLatch(1); + private static CountDownLatch compaction1RefreshLatch = new CountDownLatch(1); + private static CountDownLatch refreshCheckLatch = new CountDownLatch(1); + private static int overlapRefreshCounter = 0; @BeforeClass public static void defineSchema() throws ConfigurationException @@ -184,6 +201,124 @@ public void testGetFullyExpiredSSTables() assertEquals(1, expired.size()); } + @Test + @BMRules(rules = { + @BMRule(name = "Pause compaction", + targetClass = "CompactionTask", + targetMethod = "runMayThrow", + targetLocation = "INVOKE getCompactionAwareWriter", + condition = "Thread.currentThread().getName().equals(\"compaction1\")", + action = "org.apache.cassandra.db.compaction.CompactionControllerTest.createCompactionControllerLatch.countDown();" + + "com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" + + "(org.apache.cassandra.db.compaction.CompactionControllerTest.compaction2FinishLatch);"), + @BMRule(name = "Check overlaps", + targetClass = "CompactionTask", + targetMethod = "runMayThrow", + targetLocation = "INVOKE finish", + condition = "Thread.currentThread().getName().equals(\"compaction1\")", + action = "org.apache.cassandra.db.compaction.CompactionControllerTest.compaction1RefreshLatch.countDown();" + + "com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly" + + "(org.apache.cassandra.db.compaction.CompactionControllerTest.refreshCheckLatch);"), + @BMRule(name = "Increment overlap refresh counter", + targetClass = "ColumnFamilyStore", + targetMethod = "getAndReferenceOverlappingLiveSSTables", + condition = "Thread.currentThread().getName().equals(\"compaction1\")", + action = "org.apache.cassandra.db.compaction.CompactionControllerTest.incrementOverlapRefreshCounter();") + }) + public void testIgnoreOverlaps() throws Exception + { + testOverlapIterator(true); + overlapRefreshCounter = 0; + compaction2FinishLatch = new CountDownLatch(1); + createCompactionControllerLatch = new CountDownLatch(1); + compaction1RefreshLatch = new CountDownLatch(1); + refreshCheckLatch = new CountDownLatch(1); + testOverlapIterator(false); + } + + public void testOverlapIterator(boolean ignoreOverlaps) throws Exception + { + + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + + //create 2 overlapping sstables + DecoratedKey key = Util.dk("k1"); + long timestamp1 = FBUtilities.timestampMicros(); + long timestamp2 = timestamp1 - 5; + applyMutation(cfs.metadata, key, timestamp1); + cfs.forceBlockingFlush(); + assertEquals(cfs.getLiveSSTables().size(), 1); + Set sstables = cfs.getLiveSSTables(); + + applyMutation(cfs.metadata, key, timestamp2); + cfs.forceBlockingFlush(); + assertEquals(cfs.getLiveSSTables().size(), 2); + String sstable2 = cfs.getLiveSSTables().iterator().next().getFilename(); + + System.setProperty(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_PROPERTY, "true"); + Map options = new HashMap<>(); + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_SIZE_KEY, "30"); + options.put(TimeWindowCompactionStrategyOptions.COMPACTION_WINDOW_UNIT_KEY, "SECONDS"); + options.put(TimeWindowCompactionStrategyOptions.TIMESTAMP_RESOLUTION_KEY, "MILLISECONDS"); + options.put(TimeWindowCompactionStrategyOptions.EXPIRED_SSTABLE_CHECK_FREQUENCY_SECONDS_KEY, "0"); + options.put(TimeWindowCompactionStrategyOptions.UNSAFE_AGGRESSIVE_SSTABLE_EXPIRATION_KEY, Boolean.toString(ignoreOverlaps)); + TimeWindowCompactionStrategy twcs = new TimeWindowCompactionStrategy(cfs, options); + for (SSTableReader sstable : cfs.getLiveSSTables()) + twcs.addSSTable(sstable); + + twcs.startup(); + + CompactionTask task = (CompactionTask)twcs.getUserDefinedTask(sstables, 0); + + assertNotNull(task); + assertEquals(1, Iterables.size(task.transaction.originals())); + + //start a compaction for the first sstable (compaction1) + //the overlap iterator should contain sstable2 + //this compaction will be paused by the BMRule + Thread t = new Thread(() -> { + task.execute(null); + }); + + //start a compaction for the second sstable (compaction2) + //the overlap iterator should contain sstable1 + //this compaction should complete as normal + Thread t2 = new Thread(() -> { + Uninterruptibles.awaitUninterruptibly(createCompactionControllerLatch); + assertEquals(1, overlapRefreshCounter); + CompactionManager.instance.forceUserDefinedCompaction(sstable2); + + //after compaction2 is finished, wait 1 minute and then resume compaction1 (this gives enough time for the overlapIterator to be refreshed) + //after resuming, the overlap iterator for compaction1 should be updated to include the new sstable created by compaction2, + //and it should not contain sstable2 + try + { + TimeUnit.MINUTES.sleep(1); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + compaction2FinishLatch.countDown(); + }); + + t.setName("compaction1"); + t.start(); + t2.start(); + + compaction1RefreshLatch.await(); + //at this point, the overlap iterator for compaction1 should be refreshed + + //verify that the overlap iterator for compaction1 is refreshed twice, (once during the constructor, and again after compaction2 finishes) + assertEquals(2, overlapRefreshCounter); + + refreshCheckLatch.countDown(); + t.join(); + } + private void applyMutation(CFMetaData cfm, DecoratedKey key, long timestamp) { ByteBuffer val = ByteBufferUtil.bytes(1L); @@ -206,4 +341,9 @@ private void assertPurgeBoundary(Predicate evaluator, long boundary) assertFalse(evaluator.test(boundary)); assertTrue(evaluator.test(boundary - 1)); } + + public static void incrementOverlapRefreshCounter() + { + overlapRefreshCounter++; + } }