Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2850,7 +2850,7 @@ public void run()
logger.trace("truncate complete");
}

public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
public <V> V runWithCompactionsDisabled(Callable<V> callable, List<OperationType> interruptibles)
{
// synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly,
// and so we only run one major compaction at a time
Expand All @@ -2863,8 +2863,9 @@ public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptV
cfs.getCompactionStrategy().pause();
try
{

// interrupt in-progress compactions
CompactionManager.instance.interruptCompactionForCFs(selfWithIndexes, interruptValidation);
CompactionManager.instance.interruptCompactionFor(metadataWithIndexes(), interruptibles);
CompactionManager.instance.waitForCessation(selfWithIndexes);

// doublecheck that we finished, instead of timing out
Expand Down Expand Up @@ -2896,6 +2897,15 @@ public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptV
}
}

public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation)
{
ArrayList<OperationType> interruptible = new ArrayList<OperationType>();
if (interruptValidation)
interruptible.add(OperationType.VALIDATION);

return runWithCompactionsDisabled(callable, interruptible);
}

public LifecycleTransaction markAllCompacting(final OperationType operationType)
{
Callable<LifecycleTransaction> callable = new Callable<LifecycleTransaction>()
Expand All @@ -2912,7 +2922,7 @@ public LifecycleTransaction call() throws Exception
}
};

return runWithCompactionsDisabled(callable, false);
return runWithCompactionsDisabled(callable, operationType.getInterruptibles());
}


Expand Down Expand Up @@ -3047,6 +3057,18 @@ public Iterable<ColumnFamilyStore> concatWithIndexes()
return Iterables.concat(Collections.singleton(this), indexManager.getIndexesBackedByCfs());
}

/**
* @return List of CFMetadatas for this CFS and all its indexes
*/
public Iterable<CFMetaData> metadataWithIndexes()
{
ArrayList<CFMetaData> metadatas = new ArrayList<>();
metadatas.add(metadata);
for (ColumnFamilyStore cfs : indexManager.getIndexesBackedByCfs())
metadatas.add(cfs.metadata);
return metadatas;
}

public List<String> getBuiltIndexes()
{
return indexManager.getBuiltIndexes();
Expand Down
26 changes: 19 additions & 7 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1722,25 +1722,37 @@ public void setMaximumValidatorThreads(int number)
* isCompacting if you want that behavior.
*
* @param columnFamilies The ColumnFamilies to try to stop compaction upon.
* @param interruptValidation true if validation operations for repair should also be interrupted
* @param interruptibles {@link org.apache.cassandra.db.compaction.OperationType}'s that should be interrupted
*
*/
public void interruptCompactionFor(Iterable<CFMetaData> columnFamilies, boolean interruptValidation)
public void interruptCompactionFor(Iterable<CFMetaData> columnFamilies, List<OperationType> interruptibles)
{
assert columnFamilies != null;

// interrupt in-progress compactions
for (Holder compactionHolder : CompactionMetrics.getCompactions())
{
CompactionInfo info = compactionHolder.getCompactionInfo();
if ((info.getTaskType() == OperationType.VALIDATION) && !interruptValidation)
continue;

if (Iterables.contains(columnFamilies, info.getCFMetaData()))
compactionHolder.stop(); // signal compaction to stop
if ((interruptibles.contains(info.getTaskType())))
{
if (Iterables.contains(columnFamilies, info.getCFMetaData()))
compactionHolder.stop(); // signal compaction to stop
}
}
}

/**
* See {@link org.apache.cassandra.db.compaction.CompactionManager#interruptCompactionFor(Iterable, List)}
* @param interruptValidation true if validation operations for repair should also be interrupted
*/
public void interruptCompactionFor(Iterable<CFMetaData> columnFamilies, boolean interruptValidation)
{
if (interruptValidation)
interruptCompactionFor(columnFamilies, new ArrayList<OperationType>().add(OperationType.VALIDATION));
else
interruptCompactionFor(columnFamilies, new ArrayList<OperationType>());
}

public void interruptCompactionForCFs(Iterable<ColumnFamilyStore> cfss, boolean interruptValidation)
{
List<CFMetaData> metadata = new ArrayList<>();
Expand Down
19 changes: 19 additions & 0 deletions src/java/org/apache/cassandra/db/compaction/OperationType.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.cassandra.db.compaction;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

public enum OperationType
{
COMPACTION("Compaction"),
Expand All @@ -36,12 +40,27 @@ public enum OperationType
INDEX_SUMMARY("Index summary redistribution");

private final String type;
// Interruptible operations by other operations
private static final HashMap<OperationType, List<OperationType>> interruptibles = new HashMap<>();

static {
interruptibles.put(OperationType.UPGRADE_SSTABLES, Arrays.asList(UNKNOWN, KEY_CACHE_SAVE, ROW_CACHE_SAVE,
COUNTER_CACHE_SAVE, VERIFY, INDEX_SUMMARY));
}

OperationType(String type)
{
this.type = type;
}


/**
* @return Operations that can be interrupted by this.
*/
public List<OperationType> getInterruptibles(){
return interruptibles.get(this);
}

public String toString()
{
return type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.CancellationException;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
Expand All @@ -42,6 +44,10 @@
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.repair.RepairJobDesc;
import org.apache.cassandra.repair.Validator;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
Expand Down Expand Up @@ -383,6 +389,80 @@ public void testUserDefinedCompaction() throws Exception
assertEquals( prevGeneration + 1, sstables.iterator().next().descriptor.generation);
}

@Test(expected = CompactionInterruptedException.class)
public void testInterruptCompactionFor() throws Throwable
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
final String cfname = CF_STANDARD2;
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);

// disable compaction while flushing
cfs.disableAutoCompaction();

populate(KEYSPACE1, CF_STANDARD2, 0, 9, 3); //ttl=3s
cfs.forceBlockingFlush();
populate(KEYSPACE1, CF_STANDARD2, 0, 9, 3); //ttl=3s
cfs.forceBlockingFlush();
populate(KEYSPACE1, CF_STANDARD2, 0, 9, 3); //ttl=3s
cfs.forceBlockingFlush();
populate(KEYSPACE1, CF_STANDARD2, 0, 9, 3); //ttl=3s
cfs.forceBlockingFlush();

assertEquals(4, cfs.getSSTables().size());

// wait enough to force single compaction
TimeUnit.SECONDS.sleep(5);

// kick off a validation we can interrupt
Range<Token> range = new Range<>(Util.token(""), Util.token(""));
int gcBefore = keyspace.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
UUID parentRepSession = UUID.randomUUID();
ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range), false, true);
RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, cfname, range);
Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);

List<OperationType> interrupt = new ArrayList<>();
interrupt.add(OperationType.VALIDATION);

// submit a normal compaction to ensure it doesn't get interrupted
cfs.enableAutoCompaction();
List<Future<?>> compactions = CompactionManager.instance.submitBackground(cfs);
assertTrue(compactions.size() == 1);
Future<?> compaction = compactions.get(0);
while (!compaction.isDone() && !(CompactionManager.instance.getActiveCompactions() > 0))
{
Thread.sleep(10);
}
CompactionManager.instance.interruptCompactionFor(new ArrayList<CFMetaData>(Collections.singletonList(cfs.metadata)), interrupt);

try
{
// should not throw exception as we didn't interrupt.
compaction.get();
}
catch (Exception ex)
{
fail("Compaction was interrupted when it should not have been");
}

Future<?> validation = CompactionManager.instance.submitValidation(cfs, validator);

while (!validation.isDone() && !(CompactionManager.instance.getActiveCompactions() > 0))
{
Thread.sleep(10);
}

CompactionManager.instance.interruptCompactionFor(new ArrayList<CFMetaData>(Collections.singletonList(cfs.metadata)), interrupt);

try
{
validation.get();
} catch (ExecutionException e)
{
throw (CompactionInterruptedException) e.getCause();
}
}

@Test
public void testRangeTombstones()
{
Expand Down