Skip to content

Commit

Permalink
Refactor the CompactionController into an abstract version to allow f…
Browse files Browse the repository at this point in the history
…or custom implementations not tied to SSTableReader or reference counting
  • Loading branch information
jberragan committed Oct 1, 2019
1 parent d60e798 commit 5b96b33
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 37 deletions.
59 changes: 59 additions & 0 deletions src/java/org/apache/cassandra/db/AbstractCompactionController.java
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.db;

import java.util.function.LongPredicate;

import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.schema.CompactionParams;

public abstract class AbstractCompactionController implements AutoCloseable
{

public final ColumnFamilyStore cfs;
public final int gcBefore;
public final CompactionParams.TombstoneOption tombstoneOption;

public AbstractCompactionController(final ColumnFamilyStore cfs, final int gcBefore, CompactionParams.TombstoneOption tombstoneOption)
{
assert cfs != null;
this.cfs = cfs;
this.gcBefore = gcBefore;
this.tombstoneOption = tombstoneOption;
}

public abstract boolean compactingRepaired();

public String getKeyspace()
{
return cfs.keyspace.getName();
}

public String getColumnFamily()
{
return cfs.name;
}

public Iterable<UnfilteredRowIterator> shadowSources(DecoratedKey key, boolean tombstoneOnly)
{
return null;
}

public abstract LongPredicate getPurgeEvaluator(DecoratedKey key);
}
Expand Up @@ -19,26 +19,21 @@

import java.util.*;
import java.util.function.LongPredicate;
import java.util.function.Predicate;

import org.apache.cassandra.config.Config;
import org.apache.cassandra.db.Memtable;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;

import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.Config;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.*;
import org.apache.cassandra.utils.AlwaysPresentFilter;
import org.apache.cassandra.utils.OverlapIterator;
import org.apache.cassandra.utils.concurrent.Refs;
Expand All @@ -48,13 +43,12 @@
/**
* Manage compaction options.
*/
public class CompactionController implements AutoCloseable
public class CompactionController extends AbstractCompactionController
{
private static final Logger logger = LoggerFactory.getLogger(CompactionController.class);
private static final String NEVER_PURGE_TOMBSTONES_PROPERTY = Config.PROPERTY_PREFIX + "never_purge_tombstones";
static final boolean NEVER_PURGE_TOMBSTONES = Boolean.getBoolean(NEVER_PURGE_TOMBSTONES_PROPERTY);

public final ColumnFamilyStore cfs;
private final boolean compactingRepaired;
// note that overlapIterator and overlappingSSTables will be null if NEVER_PURGE_TOMBSTONES is set - this is a
// good thing so that noone starts using them and thinks that if overlappingSSTables is empty, there
Expand All @@ -64,11 +58,8 @@ public class CompactionController implements AutoCloseable
private final Iterable<SSTableReader> compacting;
private final RateLimiter limiter;
private final long minTimestamp;
final TombstoneOption tombstoneOption;
final Map<SSTableReader, FileDataInput> openDataFiles = new HashMap<>();

public final int gcBefore;

protected CompactionController(ColumnFamilyStore cfs, int maxValue)
{
this(cfs, null, maxValue);
Expand All @@ -82,13 +73,10 @@ public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting

public CompactionController(ColumnFamilyStore cfs, Set<SSTableReader> compacting, int gcBefore, RateLimiter limiter, TombstoneOption tombstoneOption)
{
assert cfs != null;
this.cfs = cfs;
this.gcBefore = gcBefore;
super(cfs, gcBefore, tombstoneOption);
this.compacting = compacting;
this.limiter = limiter;
compactingRepaired = compacting != null && compacting.stream().allMatch(SSTableReader::isRepaired);
this.tombstoneOption = tombstoneOption;
this.minTimestamp = compacting != null && !compacting.isEmpty() // check needed for test
? compacting.stream().mapToLong(SSTableReader::getMinTimestamp).min().getAsLong()
: 0;
Expand Down Expand Up @@ -128,7 +116,7 @@ public void maybeRefreshOverlaps()
}
}

private void refreshOverlaps()
protected void refreshOverlaps()
{
if (NEVER_PURGE_TOMBSTONES || cfs.getNeverPurgeTombstones())
return;
Expand Down Expand Up @@ -246,23 +234,14 @@ public static Set<SSTableReader> getFullyExpiredSSTables(ColumnFamilyStore cfSto
return getFullyExpiredSSTables(cfStore, compacting, overlapping, gcBefore, false);
}

public String getKeyspace()
{
return cfs.keyspace.getName();
}

public String getColumnFamily()
{
return cfs.name;
}

/**
* @param key
* @return a predicate for whether tombstones marked for deletion at the given time for the given partition are
* purgeable; we calculate this by checking whether the deletion time is less than the min timestamp of all SSTables
* containing his partition and not participating in the compaction. This means there isn't any data in those
* sstables that might still need to be suppressed by a tombstone at this timestamp.
*/
@Override
public LongPredicate getPurgeEvaluator(DecoratedKey key)
{
if (NEVER_PURGE_TOMBSTONES || !compactingRepaired() || cfs.getNeverPurgeTombstones())
Expand Down
Expand Up @@ -57,7 +57,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100;

private final OperationType type;
private final CompactionController controller;
private final AbstractCompactionController controller;
private final List<ISSTableScanner> scanners;
private final ImmutableSet<SSTableReader> sstables;
private final int nowInSec;
Expand All @@ -77,13 +77,13 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
private final UnfilteredPartitionIterator compacted;
private final ActiveCompactionsTracker activeCompactions;

public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId)
public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, AbstractCompactionController controller, int nowInSec, UUID compactionId)
{
this(type, scanners, controller, nowInSec, compactionId, ActiveCompactionsTracker.NOOP);
}

@SuppressWarnings("resource") // We make sure to close mergedIterator in close() and CompactionIterator is itself an AutoCloseable
public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, CompactionController controller, int nowInSec, UUID compactionId, ActiveCompactionsTracker activeCompactions)
public CompactionIterator(OperationType type, List<ISSTableScanner> scanners, AbstractCompactionController controller, int nowInSec, UUID compactionId, ActiveCompactionsTracker activeCompactions)
{
this.controller = controller;
this.type = type;
Expand Down Expand Up @@ -259,14 +259,14 @@ public String toString()

private class Purger extends PurgeFunction
{
private final CompactionController controller;
private final AbstractCompactionController controller;

private DecoratedKey currentKey;
private LongPredicate purgeEvaluator;

private long compactedUnfiltered;

private Purger(CompactionController controller, int nowInSec)
private Purger(AbstractCompactionController controller, int nowInSec)
{
super(nowInSec, controller.gcBefore, controller.compactingRepaired() ? Integer.MAX_VALUE : Integer.MIN_VALUE,
controller.cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(),
Expand Down Expand Up @@ -510,10 +510,10 @@ private DeletionTime updateOpenDeletionTime(DeletionTime openDeletionTime, Unfil
*/
private static class GarbageSkipper extends Transformation<UnfilteredRowIterator>
{
final CompactionController controller;
final AbstractCompactionController controller;
final boolean cellLevelGC;

private GarbageSkipper(CompactionController controller)
private GarbageSkipper(AbstractCompactionController controller)
{
this.controller = controller;
cellLevelGC = controller.tombstoneOption == TombstoneOption.CELL;
Expand Down

0 comments on commit 5b96b33

Please sign in to comment.