Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Leveled compaction performs size-tiered compactions in L0

patch by tjake and jbellis for CASSANDRA-5371
  • Loading branch information...
commit 75d5639014a4680f2a734b4ec9b452ed0cfff058 1 parent 52443c8
@jbellis jbellis authored
View
1  CHANGES.txt
@@ -1,4 +1,5 @@
2.0
+ * Leveled compaction performs size-tiered compactions in L0 (CASSANDRA-5371)
* Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
* Log when a node is down longer than the hint window (CASSANDRA-4554)
* Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917)
View
21 src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2079,27 +2079,6 @@ public ByteBuffer maybeIntern(ByteBuffer name)
return intern(name);
}
- public SSTableWriter createCompactionWriter(OperationType operationType, long estimatedRows, File location, Collection<SSTableReader> sstables)
- {
- ReplayPosition rp = ReplayPosition.getReplayPosition(sstables);
- SSTableMetadata.Collector sstableMetadataCollector = SSTableMetadata.createCollector().replayPosition(rp);
- sstableMetadataCollector.sstableLevel(compactionStrategy.getNextLevel(sstables, operationType));
-
- // Get the max timestamp of the precompacted sstables
- // and adds generation of live ancestors
- for (SSTableReader sstable : sstables)
- {
- sstableMetadataCollector.addAncestor(sstable.descriptor.generation);
- for (Integer i : sstable.getAncestors())
- {
- if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
- sstableMetadataCollector.addAncestor(i);
- }
- }
-
- return new SSTableWriter(getTempSSTablePath(location), estimatedRows, metadata, partitioner, sstableMetadataCollector);
- }
-
public Iterable<ColumnFamilyStore> concatWithIndexes()
{
return Iterables.concat(indexManager.getIndexesBackedByCfs(), Collections.singleton(this));
View
13 src/java/org/apache/cassandra/db/DataTracker.java
@@ -22,6 +22,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.base.Predicate;
import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +74,18 @@ public Memtable getMemtable()
return view.get().nonCompactingSStables();
}
+ public Iterable<SSTableReader> getUncompactingSSTables(Iterable<SSTableReader> candidates)
+ {
+ final View v = view.get();
+ return Iterables.filter(candidates, new Predicate<SSTableReader>()
+ {
+ public boolean apply(SSTableReader sstable)
+ {
+ return !v.compacting.contains(sstable);
+ }
+ });
+ }
+
public View getView()
{
return view.get();
View
23 src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,6 +19,8 @@
import java.util.*;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -161,17 +163,15 @@ public void shutdown()
* @param originalCandidates The collection to check for blacklisted SSTables
* @return list of the SSTables with blacklisted ones filtered out
*/
- public static List<SSTableReader> filterSuspectSSTables(Collection<SSTableReader> originalCandidates)
+ public static Iterable<SSTableReader> filterSuspectSSTables(Iterable<SSTableReader> originalCandidates)
{
- List<SSTableReader> filteredCandidates = new ArrayList<SSTableReader>();
-
- for (SSTableReader candidate : originalCandidates)
+ return Iterables.filter(originalCandidates, new Predicate<SSTableReader>()
{
- if (!candidate.isMarkedSuspect())
- filteredCandidates.add(candidate);
- }
-
- return filteredCandidates;
+ public boolean apply(SSTableReader sstable)
+ {
+ return !sstable.isMarkedSuspect();
+ }
+ });
}
/**
@@ -287,9 +287,4 @@ else if (CompactionController.getFullyExpiredSSTables(cfs, Collections.singleton
uncheckedOptions.remove(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
return uncheckedOptions;
}
-
- public int getNextLevel(Collection<SSTableReader> sstables, OperationType operationType)
- {
- return 0;
- }
}
View
26 src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -26,12 +26,7 @@
import javax.management.ObjectName;
import com.google.common.base.Throwables;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ConcurrentHashMultiset;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multiset;
-import com.google.common.primitives.Longs;
+import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +47,6 @@
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.CounterId;
@@ -246,13 +240,7 @@ public void perform(ColumnFamilyStore store, Collection<SSTableReader> sstables)
// Sort the column families in order of SSTable size, so cleanup of smaller CFs
// can free up space for larger ones
List<SSTableReader> sortedSSTables = new ArrayList<SSTableReader>(sstables);
- Collections.sort(sortedSSTables, new Comparator<SSTableReader>()
- {
- public int compare(SSTableReader o1, SSTableReader o2)
- {
- return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
- }
- });
+ Collections.sort(sortedSSTables, new SSTableReader.SizeComparator());
doCleanupCompaction(store, sortedSSTables, renewer);
}
@@ -495,7 +483,7 @@ private void doCleanupCompaction(ColumnFamilyStore cfs, Collection<SSTableReader
AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
if (compactedRow.isEmpty())
continue;
- writer = maybeCreateWriter(cfs, OperationType.CLEANUP, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable));
+ writer = maybeCreateWriter(cfs, OperationType.CLEANUP, compactionFileLocation, expectedBloomFilterSize, writer, sstable);
writer.append(compactedRow);
totalkeysWritten++;
}
@@ -581,12 +569,16 @@ public static SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs,
File compactionFileLocation,
int expectedBloomFilterSize,
SSTableWriter writer,
- Collection<SSTableReader> sstables)
+ SSTableReader sstable)
{
if (writer == null)
{
FileUtils.createDirectory(compactionFileLocation);
- writer = cfs.createCompactionWriter(compactionType, expectedBloomFilterSize, compactionFileLocation, sstables);
+ writer = new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
+ expectedBloomFilterSize,
+ cfs.metadata,
+ cfs.partitioner,
+ SSTableMetadata.createCollector(Collections.singleton(sstable), sstable.getSSTableLevel()));
}
return writer;
}
View
18 src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -148,7 +148,7 @@ protected void runWith(File sstableDirectory) throws Exception
return;
}
- SSTableWriter writer = cfs.createCompactionWriter(compactionType, keysPerSSTable, sstableDirectory, toCompact);
+ SSTableWriter writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
writers.add(writer);
while (iter.hasNext())
{
@@ -186,7 +186,7 @@ protected void runWith(File sstableDirectory) throws Exception
{
// tmp = false because later we want to query it with descriptor from SSTableReader
cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys);
- writer = cfs.createCompactionWriter(compactionType, keysPerSSTable, sstableDirectory, toCompact);
+ writer = createCompactionWriter(sstableDirectory, keysPerSSTable);
writers.add(writer);
cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>();
}
@@ -277,6 +277,20 @@ protected void runWith(File sstableDirectory) throws Exception
}
}
+ private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable)
+ {
+ return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory),
+ keysPerSSTable,
+ cfs.metadata,
+ cfs.partitioner,
+ SSTableMetadata.createCollector(toCompact, getLevel()));
+ }
+
+ protected int getLevel()
+ {
+ return 0;
+ }
+
protected boolean partialCompactionsAcceptable()
{
return !isUserDefined;
View
23 src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -38,6 +38,7 @@
import org.apache.cassandra.notifications.INotificationConsumer;
import org.apache.cassandra.notifications.SSTableAddedNotification;
import org.apache.cassandra.notifications.SSTableListChangedNotification;
+import org.apache.cassandra.utils.Pair;
public class LeveledCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer
{
@@ -98,9 +99,11 @@ public AbstractCompactionTask getMaximalTask(int gcBefore)
{
while (true)
{
- Collection<SSTableReader> sstables = manifest.getCompactionCandidates();
- OperationType op = OperationType.COMPACTION;
- if (sstables.isEmpty())
+ Pair<? extends Collection<SSTableReader>, Integer> pair = manifest.getCompactionCandidates();
+ Collection<SSTableReader> sstables;
+ OperationType op;
+ int level;
+ if (pair == null)
{
// if there is no sstable to compact in standard way, try compacting based on droppable tombstone ratio
SSTableReader sstable = findDroppableSSTable(gcBefore);
@@ -111,11 +114,18 @@ public AbstractCompactionTask getMaximalTask(int gcBefore)
}
sstables = Collections.singleton(sstable);
op = OperationType.TOMBSTONE_COMPACTION;
+ level = sstable.getSSTableLevel();
+ }
+ else
+ {
+ op = OperationType.COMPACTION;
+ sstables = pair.left;
+ level = pair.right;
}
if (cfs.getDataTracker().markCompacting(sstables))
{
- LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, maxSSTableSizeInMB);
+ LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, level, gcBefore, maxSSTableSizeInMB);
newTask.setCompactionType(op);
return newTask;
}
@@ -339,9 +349,4 @@ else if (!compacting.contains(sstable) && !sstable.isMarkedSuspect() && worthDro
return uncheckedOptions;
}
-
- public int getNextLevel(Collection<SSTableReader> sstables, OperationType operationType)
- {
- return manifest.getNextLevel(sstables, operationType);
- }
}
View
9 src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -25,11 +25,13 @@
public class LeveledCompactionTask extends CompactionTask
{
+ private final int level;
private final int sstableSizeInMB;
- public LeveledCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, final int gcBefore, int sstableSizeInMB)
+ public LeveledCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int level, final int gcBefore, int sstableSizeInMB)
{
super(cfs, sstables, gcBefore);
+ this.level = level;
this.sstableSizeInMB = sstableSizeInMB;
}
@@ -44,4 +46,9 @@ protected boolean partialCompactionsAcceptable()
{
return false;
}
+
+ protected int getLevel()
+ {
+ return level;
+ }
}
View
64 src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -38,6 +38,7 @@
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Pair;
public class LeveledManifest
{
@@ -219,10 +220,10 @@ long maxBytesForLevel(int level)
}
/**
- * @return highest-priority sstables to compact
- * If no compactions are necessary, will return an empty list. Never returns null.
+ * @return highest-priority sstables to compact, and level to compact them to
+ * If no compactions are necessary, will return null
*/
- public synchronized Collection<SSTableReader> getCompactionCandidates()
+ public synchronized Pair<? extends Collection<SSTableReader>, Integer> getCompactionCandidates()
{
// LevelDB gives each level a score of how much data it contains vs its ideal amount, and
// compacts the level with the highest score. But this falls apart spectacularly once you
@@ -242,10 +243,16 @@ long maxBytesForLevel(int level)
// LevelDB's way around this is to simply block writes if L0 compaction falls behind.
// We don't have that luxury.
//
- // So instead, we force compacting higher levels first. This may not minimize the number
- // of reads done as quickly in the short term, but it minimizes the i/o needed to compact
- // optimially which gives us a long term win.
- for (int i = generations.length - 1; i >= 0; i--)
+ // So instead, we
+ // 1) force compacting higher levels first, which minimizes the i/o needed to compact
+ // optimially which gives us a long term win, and
+ // 2) if L0 falls behind, we will size-tiered compact it to reduce read overhead until
+ // we can catch up on the higher levels.
+ //
+ // This isn't a magic wand -- if you are consistently writing too fast for LCS to keep
+ // up, you're still screwed. But if instead you have intermittent bursts of activity,
+ // it can help a lot.
+ for (int i = generations.length - 1; i > 0; i--)
{
List<SSTableReader> sstables = generations[i];
if (sstables.isEmpty())
@@ -256,19 +263,38 @@ long maxBytesForLevel(int level)
double score = (double)SSTableReader.getTotalBytes(remaining) / (double)maxBytesForLevel(i);
logger.debug("Compaction score for level {} is {}", i, score);
- // L0 gets a special case that if we don't have anything more important to do,
- // we'll go ahead and compact if we have more than one sstable
- if (score > 1.001 || (i == 0 && sstables.size() > 1))
+ if (score > 1.001)
{
+ // before proceeding with a higher level, let's see if L0 is far enough behind to warrant STCS
+ if (generations[0].size() > MAX_COMPACTING_L0)
+ {
+ Iterable<SSTableReader> candidates = cfs.getDataTracker().getUncompactingSSTables(generations[0]);
+ List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(candidates));
+ List<List<SSTableReader>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs,
+ SizeTieredCompactionStrategy.DEFAULT_BUCKET_HIGH,
+ SizeTieredCompactionStrategy.DEFAULT_BUCKET_LOW,
+ SizeTieredCompactionStrategy.DEFAULT_MIN_SSTABLE_SIZE);
+ List<SSTableReader> mostInteresting = SizeTieredCompactionStrategy.mostInterestingBucket(buckets, 4, 32);
+ if (!mostInteresting.isEmpty())
+ return Pair.create(mostInteresting, 0);
+ }
+
+ // L0 is fine, proceed with this level
Collection<SSTableReader> candidates = getCandidatesFor(i);
if (logger.isDebugEnabled())
logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
if (!candidates.isEmpty())
- return candidates;
+ return Pair.create(candidates, getNextLevel(candidates));
}
}
- return Collections.emptyList();
+ // Higher levels are happy, time for a standard, non-STCS L0 compaction
+ if (generations[0].isEmpty())
+ return null;
+ Collection<SSTableReader> candidates = getCandidatesFor(0);
+ if (candidates.isEmpty())
+ return null;
+ return Pair.create(candidates, getNextLevel(candidates));
}
public synchronized int getLevelSize(int i)
@@ -295,7 +321,7 @@ private void logDistribution()
if (!generations[i].isEmpty())
{
logger.debug("L{} contains {} SSTables ({} bytes) in {}",
- new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this});
+ i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this);
}
}
}
@@ -511,11 +537,11 @@ public synchronized int getEstimatedTasks()
}
logger.debug("Estimating {} compactions to do for {}.{}",
- new Object[] {Arrays.toString(estimated), cfs.table.getName(), cfs.name });
+ Arrays.toString(estimated), cfs.table.getName(), cfs.name);
return Ints.checkedCast(tasks);
}
- public int getNextLevel(Collection<SSTableReader> sstables, OperationType operationType)
+ public int getNextLevel(Collection<SSTableReader> sstables)
{
int maximumLevel = Integer.MIN_VALUE;
int minimumLevel = Integer.MAX_VALUE;
@@ -524,14 +550,6 @@ public int getNextLevel(Collection<SSTableReader> sstables, OperationType operat
maximumLevel = Math.max(sstable.getSSTableLevel(), maximumLevel);
minimumLevel = Math.min(sstable.getSSTableLevel(), minimumLevel);
}
- switch(operationType)
- {
- case SCRUB:
- case TOMBSTONE_COMPACTION:
- case CLEANUP:
- case UPGRADE_SSTABLES:
- return minimumLevel;
- }
int newLevel;
if (minimumLevel == 0 && minimumLevel == maximumLevel && SSTable.getTotalBytes(sstables) < maxSSTableSizeInBytes)
View
4 src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -111,7 +111,7 @@ public void scrub() throws IOException
}
// TODO errors when creating the writer may leave empty temp files.
- writer = CompactionManager.maybeCreateWriter(cfs, OperationType.SCRUB, destination, expectedBloomFilterSize, null, Collections.singletonList(sstable));
+ writer = CompactionManager.maybeCreateWriter(cfs, OperationType.SCRUB, destination, expectedBloomFilterSize, null, sstable);
AbstractCompactedRow prevRow = null;
@@ -268,7 +268,7 @@ public void scrub() throws IOException
if (!outOfOrderRows.isEmpty())
{
- SSTableWriter inOrderWriter = CompactionManager.maybeCreateWriter(cfs, OperationType.SCRUB, destination, expectedBloomFilterSize, null, Collections.singletonList(sstable));
+ SSTableWriter inOrderWriter = CompactionManager.maybeCreateWriter(cfs, OperationType.SCRUB, destination, expectedBloomFilterSize, null, sstable);
for (AbstractCompactedRow row : outOfOrderRows)
inOrderWriter.append(row);
newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
View
48 src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -20,6 +20,7 @@
import java.util.*;
import java.util.Map.Entry;
+import com.google.common.collect.Iterables;
import com.google.common.primitives.Longs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,10 +71,30 @@ public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> o
}
Set<SSTableReader> candidates = cfs.getUncompactingSSTables();
- List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(filterSuspectSSTables(candidates)));
+ List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(filterSuspectSSTables(candidates)), bucketHigh, bucketLow, minSSTableSize);
logger.debug("Compaction buckets are {}", buckets);
updateEstimatedCompactionsByTasks(buckets);
+ List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
+ if (!mostInteresting.isEmpty())
+ return mostInteresting;
+ // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
+ // ratio is greater than threshold.
+ List<SSTableReader> sstablesWithTombstones = new ArrayList<SSTableReader>();
+ for (SSTableReader sstable : candidates)
+ {
+ if (worthDroppingTombstones(sstable, gcBefore))
+ sstablesWithTombstones.add(sstable);
+ }
+ if (sstablesWithTombstones.isEmpty())
+ return Collections.emptyList();
+
+ Collections.sort(sstablesWithTombstones, new SSTableReader.SizeComparator());
+ return Collections.singletonList(sstablesWithTombstones.get(0));
+ }
+
+ public static List<SSTableReader> mostInterestingBucket(List<List<SSTableReader>> buckets, int minThreshold, int maxThreshold)
+ {
// skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold entries
List<List<SSTableReader>> prunedBuckets = new ArrayList<List<SSTableReader>>();
for (List<SSTableReader> bucket : buckets)
@@ -91,23 +112,8 @@ public int compare(SSTableReader o1, SSTableReader o2)
List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
prunedBuckets.add(prunedBucket);
}
-
- // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
- // ratio is greater than threshold.
if (prunedBuckets.isEmpty())
- {
- for (List<SSTableReader> bucket : buckets)
- {
- for (SSTableReader table : bucket)
- {
- if (worthDroppingTombstones(table, gcBefore))
- prunedBuckets.add(Collections.singletonList(table));
- }
- }
-
- if (prunedBuckets.isEmpty())
- return Collections.emptyList();
- }
+ return Collections.emptyList();
// prefer compacting buckets with smallest average size; that will yield the fastest improvement for read performance
return Collections.min(prunedBuckets, new Comparator<List<SSTableReader>>()
@@ -171,10 +177,10 @@ public int getEstimatedRemainingTasks()
return estimatedRemainingTasks;
}
- private static List<Pair<SSTableReader, Long>> createSSTableAndLengthPairs(Collection<SSTableReader> collection)
+ public static List<Pair<SSTableReader, Long>> createSSTableAndLengthPairs(Iterable<SSTableReader> sstables)
{
- List<Pair<SSTableReader, Long>> tableLengthPairs = new ArrayList<Pair<SSTableReader, Long>>(collection.size());
- for(SSTableReader table: collection)
+ List<Pair<SSTableReader, Long>> tableLengthPairs = new ArrayList<Pair<SSTableReader, Long>>(Iterables.size(sstables));
+ for(SSTableReader table: sstables)
tableLengthPairs.add(Pair.create(table, table.onDiskLength()));
return tableLengthPairs;
}
@@ -182,7 +188,7 @@ public int getEstimatedRemainingTasks()
/*
* Group files of similar size into buckets.
*/
- <T> List<List<T>> getBuckets(Collection<Pair<T, Long>> files)
+ public static <T> List<List<T>> getBuckets(Collection<Pair<T, Long>> files, double bucketHigh, double bucketLow, long minSSTableSize)
{
// Sort the list in order to get deterministic results during the grouping below
List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(files);
View
21 src/java/org/apache/cassandra/io/sstable/SSTableMetadata.java
@@ -112,6 +112,27 @@ public static Collector createCollector()
return new Collector();
}
+ public static Collector createCollector(Collection<SSTableReader> sstables, int level)
+ {
+ Collector collector = new Collector();
+
+ collector.replayPosition(ReplayPosition.getReplayPosition(sstables));
+ collector.sstableLevel(level);
+ // Get the max timestamp of the precompacted sstables
+ // and adds generation of live ancestors
+ for (SSTableReader sstable : sstables)
+ {
+ collector.addAncestor(sstable.descriptor.generation);
+ for (Integer i : sstable.getAncestors())
+ {
+ if (new File(sstable.descriptor.withGeneration(i).filenameFor(Component.DATA)).exists())
+ collector.addAncestor(i);
+ }
+ }
+
+ return collector;
+ }
+
/**
* Used when updating sstablemetadata files with an sstable level
* @param metadata
View
28 src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -20,10 +20,13 @@
import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.primitives.Longs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,17 +37,16 @@
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.DataTracker;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.ICompactionScanner;
-import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.LocalPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.db.index.SecondaryIndex;
+import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.util.*;
@@ -1328,4 +1330,12 @@ public void close() throws IOException { }
public void remove() { }
}
+
+ public static class SizeComparator implements Comparator<SSTableReader>
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
+ }
+ }
}
View
23 test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
@@ -17,17 +17,16 @@
*/
package org.apache.cassandra.db.compaction;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
import org.junit.Test;
-import static org.junit.Assert.*;
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Table;
import org.apache.cassandra.utils.Pair;
-public class SizeTieredCompactionStrategyTest extends SchemaLoader
+import static org.junit.Assert.assertEquals;
+
+public class SizeTieredCompactionStrategyTest
{
@Test
public void testGetBuckets()
@@ -40,11 +39,7 @@ public void testGetBuckets()
pairs.add(pair);
}
- ColumnFamilyStore cfs = Table.open("Keyspace1").getColumnFamilyStore("Standard1");
- Map<String, String> opts = new HashMap<String, String>();
- opts.put(SizeTieredCompactionStrategy.MIN_SSTABLE_SIZE_KEY, "2");
- SizeTieredCompactionStrategy strategy = new SizeTieredCompactionStrategy(cfs, opts);
- List<List<String>> buckets = strategy.getBuckets(pairs);
+ List<List<String>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 2);
assertEquals(3, buckets.size());
for (List<String> bucket : buckets)
@@ -64,7 +59,7 @@ public void testGetBuckets()
pairs.add(pair);
}
- buckets = strategy.getBuckets(pairs);
+ buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 2);
assertEquals(2, buckets.size());
for (List<String> bucket : buckets)
@@ -85,9 +80,7 @@ public void testGetBuckets()
pairs.add(pair);
}
- opts.put(SizeTieredCompactionStrategy.MIN_SSTABLE_SIZE_KEY, "10");
- strategy = new SizeTieredCompactionStrategy(cfs, opts);
- buckets = strategy.getBuckets(pairs); // notice the min is 10
+ buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 10);
assertEquals(1, buckets.size());
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.