Skip to content

Commit

Permalink
Test Failure: org.apache.cassandra.distributed.test.UpgradeSSTablesTe…
Browse files Browse the repository at this point in the history
…st.truncateWhileUpgrading

patch by Berenguer Blasi; reviewed by Brandon Williams for CASSANDRA-19398
  • Loading branch information
bereng committed Mar 6, 2024
1 parent 194a41b commit ebe07db
Showing 1 changed file with 50 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@

public class UpgradeSSTablesTest extends TestBaseImpl
{

@Test
public void upgradeSSTablesInterruptsOngoingCompaction() throws Throwable
{
try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start()))
try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).withInstanceInitializer(CompactionLatchByteman::install).start()))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));");
cluster.get(1).acceptsOnInstance((String ks) -> {
Expand All @@ -79,10 +80,15 @@ public void upgradeSSTablesInterruptsOngoingCompaction() throws Throwable

LogAction logAction = cluster.get(1).logs();
logAction.mark();

Future<?> future = cluster.get(1).asyncAcceptsOnInstance((String ks) -> {
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore("tbl");
CompactionManager.instance.submitMaximal(cfs, FBUtilities.nowInSeconds(), false, OperationType.COMPACTION);
}).apply(KEYSPACE);

Assert.assertTrue(cluster.get(1).callOnInstance(() -> CompactionLatchByteman.starting.awaitUninterruptibly(1, TimeUnit.MINUTES)));
cluster.get(1).runOnInstance(() -> {
CompactionLatchByteman.start.decrement();});
Assert.assertEquals(0, cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl"));
future.get();
Assert.assertFalse(logAction.grep("Compaction interrupted").getResult().isEmpty());
Expand Down Expand Up @@ -136,7 +142,7 @@ public void compactionDoesNotCancelUpgradeSSTables() throws Throwable
@Test
public void cleanupDoesNotInterruptUpgradeSSTables() throws Throwable
{
try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).withInstanceInitializer(BB::install).start()))
try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).withInstanceInitializer(UpgradeSStablesLatchByteman::install).start()))
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck));");

Expand All @@ -160,20 +166,20 @@ public void cleanupDoesNotInterruptUpgradeSSTables() throws Throwable
LogAction logAction = cluster.get(1).logs();
logAction.mark();

// Start upgradingsstables - use BB to pause once inside ActiveCompactions.beginCompaction
// Start upgradingsstables - use UpgradeSStablesLatchByteman to pause once inside ActiveCompactions.beginCompaction
Thread upgradeThread = new Thread(() -> {
cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl");
});
upgradeThread.start();
Assert.assertTrue(cluster.get(1).callOnInstance(() -> BB.starting.awaitUninterruptibly(1, TimeUnit.MINUTES)));
Assert.assertTrue(cluster.get(1).callOnInstance(() -> UpgradeSStablesLatchByteman.starting.awaitUninterruptibly(1, TimeUnit.MINUTES)));

// Start a scrub and make sure that it fails, log check later to make sure it was
// because it cannot cancel the active upgrade sstables
Assert.assertNotEquals(0, cluster.get(1).nodetool("scrub", KEYSPACE, "tbl"));

// Now resume the upgrade sstables so test can shut down
cluster.get(1).runOnInstance(() -> {
BB.start.decrement();
UpgradeSStablesLatchByteman.start.decrement();
});
upgradeThread.join();

Expand All @@ -186,7 +192,7 @@ public void cleanupDoesNotInterruptUpgradeSSTables() throws Throwable
@Test
public void truncateWhileUpgrading() throws Throwable
{
try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).start()))
try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(1).withInstanceInitializer(UpgradeSStablesLatchByteman::install).start()))
{
cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck)) "));
cluster.get(1).acceptsOnInstance((String ks) -> {
Expand Down Expand Up @@ -215,6 +221,8 @@ public void truncateWhileUpgrading() throws Throwable
cluster.get(1).nodetool("upgradesstables", "-a", KEYSPACE, "tbl");
});

Assert.assertTrue(cluster.get(1).callOnInstance(() -> UpgradeSStablesLatchByteman.starting.awaitUninterruptibly(1, TimeUnit.MINUTES)));
cluster.get(1).runOnInstance(() -> {UpgradeSStablesLatchByteman.start.decrement();});
cluster.schemaChange(withKeyspace("TRUNCATE %s.tbl"));
upgrade.get();
Assert.assertFalse(logAction.grep("Compaction interrupted").getResult().isEmpty());
Expand Down Expand Up @@ -303,7 +311,7 @@ public void rewriteSSTablesTest() throws Throwable
}
}

public static class BB
public static class UpgradeSStablesLatchByteman
{
// Will be initialized in the context of the instance class loader
static CountDownLatch starting = newCountDownLatch(1);
Expand All @@ -313,7 +321,7 @@ public static void install(ClassLoader classLoader, Integer num)
{
new ByteBuddy().rebase(ActiveCompactions.class)
.method(named("beginCompaction"))
.intercept(MethodDelegation.to(BB.class))
.intercept(MethodDelegation.to(UpgradeSStablesLatchByteman.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
}
Expand All @@ -336,4 +344,38 @@ public static void beginCompaction(CompactionInfo.Holder ci, @SuperCall Callable
}
}
}

public static class CompactionLatchByteman
{
// Will be initialized in the context of the instance class loader
static CountDownLatch starting = newCountDownLatch(1);
static CountDownLatch start = newCountDownLatch(1);

public static void install(ClassLoader classLoader, Integer num)
{
new ByteBuddy().rebase(ActiveCompactions.class)
.method(named("beginCompaction"))
.intercept(MethodDelegation.to(CompactionLatchByteman.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
}

@SuppressWarnings("unused")
public static void beginCompaction(CompactionInfo.Holder ci, @SuperCall Callable<Void> zuperCall)
{
try
{
zuperCall.call();
if (ci.getCompactionInfo().getTaskType() == OperationType.COMPACTION)
{
starting.decrement();
Assert.assertTrue(start.awaitUninterruptibly(1, TimeUnit.MINUTES));
}
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
}
}

0 comments on commit ebe07db

Please sign in to comment.