Permalink
Browse files

Add an official way to disable autocompaction.

patch by marcuse, reviewed by pcmanus for CASSANDRA-5074
  • Loading branch information...
1 parent efbdee2 commit ebefb77c6e8a5046a8c1b1bb0edd258aaf0ad8b7 Marcus Eriksson committed Apr 9, 2013
View
@@ -30,6 +30,7 @@
(CASSANDRA-4937)
* Improve repair by deciding on a gcBefore before sending
out TreeRequests (CASSANDRA-4932)
+ * Add an official way to disable compactions (CASSANDRA-5074)
1.2.4
* Ensure that PerRowSecondaryIndex updates see the most recent values
View
@@ -30,6 +30,9 @@ Operations
----------
- Major compactions, cleanup, scrub, and upgradesstables will interrupt
any in-progress compactions (but not repair validations) when invoked.
+ - Disabling autocompactions by setting min/max compaction threshold to 0
+ has been deprecated, instead, use the nodetool commands 'disableautocompaction'
+ and 'enableautocompaction' or set the compaction strategy option enabled = false
1.2.4
@@ -1311,9 +1311,15 @@ private CfDef updateCfDefAttributes(Tree statement, CfDef cfDefToUpdate)
cfDef.setDefault_validation_class(CliUtils.unescapeSQLString(mValue));
break;
case MIN_COMPACTION_THRESHOLD:
- cfDef.setMin_compaction_threshold(Integer.parseInt(mValue));
+ int threshold = Integer.parseInt(mValue);
+ if (threshold <= 0)
+ throw new RuntimeException("Disabling compaction by setting min/max compaction thresholds to 0 has been deprecated, set compaction_strategy_options={'enabled':false} instead");
+ cfDef.setMin_compaction_threshold(threshold);
break;
case MAX_COMPACTION_THRESHOLD:
+ threshold = Integer.parseInt(mValue);
+ if (threshold <= 0)
+ throw new RuntimeException("Disabling compaction by setting min/max compaction thresholds to 0 has been deprecated, set compaction_strategy_options={'enabled':false} instead");
cfDef.setMax_compaction_threshold(Integer.parseInt(mValue));
break;
case REPLICATE_ON_WRITE:
@@ -168,7 +168,7 @@
+ "mutation blob,"
+ "PRIMARY KEY (target_id, hint_id, message_version)"
+ ") WITH COMPACT STORAGE "
- + "AND COMPACTION={'class' : 'SizeTieredCompactionStrategy', 'min_threshold' : 0, 'max_threshold' : 0} "
+ + "AND COMPACTION={'class' : 'SizeTieredCompactionStrategy', 'enabled' : false} "
+ "AND COMMENT='hints awaiting delivery'"
+ "AND gc_grace_seconds=0");
@@ -1335,7 +1335,11 @@ private static void validateAlias(ColumnDefinition alias, String msg) throws Con
private void validateCompactionThresholds() throws ConfigurationException
{
if (maxCompactionThreshold == 0)
+ {
+ logger.warn("Disabling compaction by setting max or min compaction has been deprecated, " +
+ "set the compaction strategy option 'enabled' to 'false' instead");
return;
+ }
if (minCompactionThreshold <= 1)
throw new ConfigurationException(String.format("Min compaction threshold cannot be less than 2 (got %d).", minCompactionThreshold));
@@ -174,8 +174,12 @@ public static void applyPropertiesToCFMetadata(CFMetaData cfm, CFPropDefs cfProp
cfm.dcLocalReadRepairChance(cfProps.getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepair()));
cfm.gcGraceSeconds(cfProps.getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, cfm.getGcGraceSeconds()));
cfm.replicateOnWrite(cfProps.getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, cfm.getReplicateOnWrite()));
- cfm.minCompactionThreshold(cfProps.getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfm.getMinCompactionThreshold()));
- cfm.maxCompactionThreshold(cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold()));
+ int minCompactionThreshold = cfProps.getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, cfm.getMinCompactionThreshold());
+ int maxCompactionThreshold = cfProps.getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, cfm.getMaxCompactionThreshold());
+ if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0)
+ throw new ConfigurationException("Disabling compaction by setting compaction thresholds to 0 has been deprecated, set the compaction option 'enabled' to false instead.");
+ cfm.minCompactionThreshold(minCompactionThreshold);
+ cfm.maxCompactionThreshold(maxCompactionThreshold);
cfm.caching(CFMetaData.Caching.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString())));
cfm.defaultTimeToLive(cfProps.getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(cfProps.getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
@@ -161,7 +161,7 @@ public String getName()
* @return a CFMetaData instance corresponding to the values parsed from this statement
* @throws InvalidRequestException on failure to validate parsed parameters
*/
- public CFMetaData getCFMetaData(String keyspace, List<ByteBuffer> variables) throws InvalidRequestException
+ public CFMetaData getCFMetaData(String keyspace, List<ByteBuffer> variables) throws InvalidRequestException, ConfigurationException
{
validate(variables);
@@ -178,15 +178,19 @@ public CFMetaData getCFMetaData(String keyspace, List<ByteBuffer> variables) thr
if (CFMetaData.DEFAULT_COMPRESSOR != null && cfProps.compressionParameters.isEmpty())
cfProps.compressionParameters.put(CompressionParameters.SSTABLE_COMPRESSION, CFMetaData.DEFAULT_COMPRESSOR);
+ int maxCompactionThreshold = getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD);
+ int minCompactionThreshold = getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD);
+ if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0)
+ throw new ConfigurationException("Disabling compaction by setting compaction thresholds to 0 has been deprecated, set the compaction option 'enabled' to false instead.");
newCFMD.comment(cfProps.getProperty(CFPropDefs.KW_COMMENT))
.readRepairChance(getPropertyDouble(CFPropDefs.KW_READREPAIRCHANCE, CFMetaData.DEFAULT_READ_REPAIR_CHANCE))
.dcLocalReadRepairChance(getPropertyDouble(CFPropDefs.KW_DCLOCALREADREPAIRCHANCE, CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE))
.replicateOnWrite(getPropertyBoolean(CFPropDefs.KW_REPLICATEONWRITE, CFMetaData.DEFAULT_REPLICATE_ON_WRITE))
.gcGraceSeconds(getPropertyInt(CFPropDefs.KW_GCGRACESECONDS, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
.defaultValidator(cfProps.getValidator())
- .minCompactionThreshold(getPropertyInt(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD, CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD))
- .maxCompactionThreshold(getPropertyInt(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD, CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD))
+ .minCompactionThreshold(minCompactionThreshold)
+ .maxCompactionThreshold(maxCompactionThreshold)
.columnMetadata(getColumns(comparator))
.keyValidator(TypeParser.parse(CFPropDefs.comparators.get(getKeyType())))
.compactionStrategyClass(cfProps.compactionStrategyClass)
@@ -139,8 +139,12 @@ public void applyToCFMetadata(CFMetaData cfm) throws ConfigurationException, Syn
cfm.dcLocalReadRepairChance(getDouble(KW_DCLOCALREADREPAIRCHANCE, cfm.getDcLocalReadRepair()));
cfm.gcGraceSeconds(getInt(KW_GCGRACESECONDS, cfm.getGcGraceSeconds()));
cfm.replicateOnWrite(getBoolean(KW_REPLICATEONWRITE, cfm.getReplicateOnWrite()));
- cfm.minCompactionThreshold(toInt(KW_MINCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MINCOMPACTIONTHRESHOLD), cfm.getMinCompactionThreshold()));
- cfm.maxCompactionThreshold(toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold()));
+ int minCompactionThreshold = toInt(KW_MINCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MINCOMPACTIONTHRESHOLD), cfm.getMinCompactionThreshold());
+ int maxCompactionThreshold = toInt(KW_MAXCOMPACTIONTHRESHOLD, getCompactionOptions().get(KW_MAXCOMPACTIONTHRESHOLD), cfm.getMaxCompactionThreshold());
+ if (minCompactionThreshold <= 0 || maxCompactionThreshold <= 0)
+ throw new ConfigurationException("Disabling compaction by setting compaction thresholds to 0 has been deprecated, set the compaction option 'enabled' to false instead.");
+ cfm.minCompactionThreshold(minCompactionThreshold);
+ cfm.maxCompactionThreshold(maxCompactionThreshold);
cfm.caching(CFMetaData.Caching.fromString(getString(KW_CACHING, cfm.getCaching().toString())));
cfm.defaultTimeToLive(getInt(KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
@@ -182,7 +182,7 @@ public DecoratedKey getKey()
// "hoist up" the requested data into a more recent sstable
if (sstablesIterated > cfs.getMinimumCompactionThreshold()
- && !cfs.isCompactionDisabled()
+ && !cfs.isAutoCompactionDisabled()
&& cfs.getCompactionStrategy() instanceof SizeTieredCompactionStrategy)
{
Tracing.trace("Defragmenting requested data");
@@ -28,6 +28,7 @@
import java.util.regex.Pattern;
import javax.management.*;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableSet;
@@ -268,6 +269,12 @@ private ColumnFamilyStore(Table table,
// compaction strategy should be created after the CFS has been prepared
this.compactionStrategy = metadata.createCompactionStrategyInstance(this);
+ if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0)
+ {
+ logger.warn("Disabling compaction strategy by setting compaction thresholds to 0 is deprecated, set the compaction option 'enabled' to 'false' instead.");
+ this.compactionStrategy.disable();
+ }
+
// create the private ColumnFamilyStores for the secondary column indexes
for (ColumnDefinition info : metadata.allColumns())
{
@@ -1951,14 +1958,30 @@ public void disableAutoCompaction()
{
// we don't use CompactionStrategy.pause since we don't want users flipping that on and off
// during runWithCompactionsDisabled
- minCompactionThreshold.set(0);
- maxCompactionThreshold.set(0);
+ this.compactionStrategy.disable();
}
public void enableAutoCompaction()
{
- minCompactionThreshold.reset();
- maxCompactionThreshold.reset();
+ enableAutoCompaction(false);
+ }
+
+ /**
+ * used for tests - to be able to check things after a minor compaction
+ * @param waitForFutures if we should block until autocompaction is done
+ */
+ @VisibleForTesting
+ public void enableAutoCompaction(boolean waitForFutures)
+ {
+ this.compactionStrategy.enable();
+ List<Future<?>> futures = CompactionManager.instance.submitBackground(this);
+ if (waitForFutures)
+ FBUtilities.waitOnFutures(futures);
+ }
+
+ public boolean isAutoCompactionDisabled()
+ {
+ return !this.compactionStrategy.isEnabled();
}
/*
@@ -2012,14 +2035,13 @@ public void setMaximumCompactionThreshold(int maxCompactionThreshold)
private void validateCompactionThresholds(int minThreshold, int maxThreshold)
{
- if (minThreshold > maxThreshold && maxThreshold != 0)
+ if (minThreshold > maxThreshold)
throw new RuntimeException(String.format("The min_compaction_threshold cannot be larger than the max_compaction_threshold. " +
"Min is '%d', Max is '%d'.", minThreshold, maxThreshold));
- }
- public boolean isCompactionDisabled()
- {
- return getMinimumCompactionThreshold() <= 0 || getMaximumCompactionThreshold() <= 0;
+ if (maxThreshold == 0 || minThreshold == 0)
+ throw new RuntimeException("Disabling compaction by setting min_compaction_threshold or max_compaction_threshold to 0 " +
+ "is deprecated, set the compaction strategy option 'enabled' to 'false' instead or use the nodetool command 'disableautocompaction'.");
}
// End JMX get/set.
@@ -281,10 +281,7 @@
*/
public void setCrcCheckChance(double crcCheckChance);
- /**
- * Disable automatic compaction.
- */
- public void disableAutoCompaction();
+ public boolean isAutoCompactionDisabled();
public long estimateKeys();
@@ -48,6 +48,7 @@
protected static final long DEFAULT_TOMBSTONE_COMPACTION_INTERVAL = 86400;
protected static final String TOMBSTONE_THRESHOLD_OPTION = "tombstone_threshold";
protected static final String TOMBSTONE_COMPACTION_INTERVAL_OPTION = "tombstone_compaction_interval";
+ protected static final String COMPACTION_ENABLED = "enabled";
public final Map<String, String> options;
@@ -67,6 +68,8 @@
*/
protected boolean isActive = true;
+ protected volatile boolean enabled = true;
+
protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
assert cfs != null;
@@ -82,6 +85,13 @@ protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String>
tombstoneThreshold = optionValue == null ? DEFAULT_TOMBSTONE_THRESHOLD : Float.parseFloat(optionValue);
optionValue = options.get(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
tombstoneCompactionInterval = optionValue == null ? DEFAULT_TOMBSTONE_COMPACTION_INTERVAL : Long.parseLong(optionValue);
+ optionValue = options.get(COMPACTION_ENABLED);
+
+ if (optionValue != null)
+ {
+ if (optionValue.equalsIgnoreCase("false"))
+ this.enabled = false;
+ }
}
catch (ConfigurationException e)
{
@@ -157,6 +167,21 @@ public void shutdown()
*/
public abstract long getMaxSSTableSize();
+ public boolean isEnabled()
+ {
+ return this.enabled && this.isActive;
+ }
+
+ public void enable()
+ {
+ this.enabled = true;
+ }
+
+ public void disable()
+ {
+ this.enabled = false;
+ }
+
/**
* Filters SSTables that are to be blacklisted from the given collection
*
@@ -282,9 +307,18 @@ else if (CompactionController.getFullyExpiredSSTables(cfs, Collections.singleton
}
}
+ String compactionEnabled = options.get(COMPACTION_ENABLED);
+ if (compactionEnabled != null)
+ {
+ if (!compactionEnabled.equalsIgnoreCase("true") && !compactionEnabled.equalsIgnoreCase("false"))
+ {
+ throw new ConfigurationException(String.format("enabled should either be 'true' or 'false', not %s", compactionEnabled));
+ }
+ }
Map<String, String> uncheckedOptions = new HashMap<String, String>(options);
uncheckedOptions.remove(TOMBSTONE_THRESHOLD_OPTION);
uncheckedOptions.remove(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
+ uncheckedOptions.remove(COMPACTION_ENABLED);
return uncheckedOptions;
}
}
@@ -106,6 +106,12 @@ protected Boolean initialValue()
*/
public List<Future<?>> submitBackground(final ColumnFamilyStore cfs)
{
+ if (cfs.isAutoCompactionDisabled())
+ {
+ logger.debug("Autocompaction is disabled");
+ return Collections.emptyList();
+ }
+
int count = compactingCF.count(cfs);
if (count > 0 && executor.getActiveCount() >= executor.getMaximumPoolSize())
{
@@ -89,7 +89,7 @@ public int getLevelSize(int i)
*/
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
- if (!isActive || cfs.isCompactionDisabled())
+ if (!isEnabled())
return null;
return getMaximalTask(gcBefore);
@@ -56,19 +56,16 @@ public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> o
bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
optionValue = options.get(BUCKET_HIGH_KEY);
bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
- cfs.setCompactionThresholds(cfs.metadata.getMinCompactionThreshold(), cfs.metadata.getMaxCompactionThreshold());
}
private List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
+ if (!isEnabled())
+ return Collections.emptyList();
+
// make local copies so they can't be changed out from under us mid-method
int minThreshold = cfs.getMinimumCompactionThreshold();
int maxThreshold = cfs.getMaximumCompactionThreshold();
- if (minThreshold == 0 || maxThreshold == 0)
- {
- logger.debug("Compaction is currently disabled.");
- return Collections.emptyList();
- }
Set<SSTableReader> candidates = cfs.getUncompactingSSTables();
List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(filterSuspectSSTables(candidates)), bucketHigh, bucketLow, minSSTableSize);
@@ -135,7 +132,7 @@ private long avgSize(List<SSTableReader> sstables)
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
- if (!isActive)
+ if (!isEnabled())
return null;
while (true)
@@ -3842,4 +3842,22 @@ public void disableScheduledRangeXfers()
{
rangeXferExecutor.tearDown();
}
+
+ @Override
+ public void disableAutoCompaction(String ks, String... columnFamilies) throws IOException
+ {
+ for (ColumnFamilyStore cfs : getValidColumnFamilies(true, true, ks, columnFamilies))
+ {
+ cfs.disableAutoCompaction();
+ }
+ }
+
+ @Override
+ public void enableAutoCompaction(String ks, String... columnFamilies) throws IOException
+ {
+ for (ColumnFamilyStore cfs : getValidColumnFamilies(true, true, ks, columnFamilies))
+ {
+ cfs.enableAutoCompaction();
+ }
+ }
}
@@ -470,4 +470,8 @@
public void enableScheduledRangeXfers();
/** Disable processing of queued range transfers. */
public void disableScheduledRangeXfers();
+
+ void disableAutoCompaction(String ks, String ... columnFamilies) throws IOException;
+ void enableAutoCompaction(String ks, String ... columnFamilies) throws IOException;
+
}
Oops, something went wrong.

0 comments on commit ebefb77

Please sign in to comment.