Skip to content

Commit

Permalink
CASSANDRA-17711: Add nodetool forcecompact
Browse files Browse the repository at this point in the history
  • Loading branch information
chengw-netflix authored and jrwest committed Nov 7, 2022
1 parent 037e709 commit 873e024
Show file tree
Hide file tree
Showing 11 changed files with 463 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.2
* Add nodetool forcecompact to remove tombstoned or ttl'd data ignoring GC grace for given table and partition keys (CASSANDRA-17711)
* Offer IF (NOT) EXISTS in cqlsh completion for CREATE TYPE, DROP TYPE, CREATE ROLE and DROP ROLE (CASSANDRA-16640)
* Nodetool bootstrap resume will now return an error if the operation fails (CASSANDRA-16491)
* Disable resumable bootstrap by default (CASSANDRA-17679)
Expand Down
29 changes: 29 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Expand Up @@ -38,6 +38,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -320,6 +321,9 @@ public enum FlushReason

private volatile boolean compactionSpaceCheck = true;

// Tombtone partitions that ignore the gc_grace_seconds during compaction
private final Set<DecoratedKey> partitionKeySetIgnoreGcGrace = ConcurrentHashMap.newKeySet();

@VisibleForTesting
final DiskBoundaryManager diskBoundaryManager = new DiskBoundaryManager();
private volatile ShardBoundaries cachedShardBoundaries = null;
Expand Down Expand Up @@ -2416,6 +2420,31 @@ public void forceCompactionForKey(DecoratedKey key)
CompactionManager.instance.forceCompactionForKey(this, key);
}

public void forceCompactionKeysIgnoringGcGrace(String... partitionKeysIgnoreGcGrace)
{
List<DecoratedKey> decoratedKeys = new ArrayList<>();
try
{
partitionKeySetIgnoreGcGrace.clear();

for (String key : partitionKeysIgnoreGcGrace) {
DecoratedKey dk = decorateKey(metadata().partitionKeyType.fromString(key));
partitionKeySetIgnoreGcGrace.add(dk);
decoratedKeys.add(dk);
}

CompactionManager.instance.forceCompactionForKeys(this, decoratedKeys);
} finally
{
partitionKeySetIgnoreGcGrace.clear();
}
}

public boolean shouldIgnoreGcGraceForKey(DecoratedKey dk)
{
return partitionKeySetIgnoreGcGrace.contains(dk);
}

public static Iterable<ColumnFamilyStore> all()
{
List<Iterable<ColumnFamilyStore>> stores = new ArrayList<>(Schema.instance.getKeyspaces().size());
Expand Down
Expand Up @@ -349,6 +349,18 @@ protected void updateProgress()
updateBytesRead();
}

/*
* Called at the beginning of each new partition
* Return true if the current partitionKey ignores the gc_grace_seconds during compaction.
* Note that this method should be called after the onNewPartition because it depends on the currentKey
* which is set in the onNewPartition
*/
@Override
protected boolean shouldIgnoreGcGrace()
{
return controller.cfs.shouldIgnoreGcGraceForKey(currentKey);
}

/*
* Evaluates whether a tombstone with the given deletion timestamp can be purged. This is the minimum
* timestamp for any sstable containing `currentKey` outside of the set of sstables involved in this compaction.
Expand Down
29 changes: 29 additions & 0 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Expand Up @@ -1047,6 +1047,23 @@ public void forceCompactionForKey(ColumnFamilyStore cfStore, DecoratedKey key)
forceCompaction(cfStore, () -> sstablesWithKey(cfStore, key), sstable -> sstable.maybePresent(key));
}

public void forceCompactionForKeys(ColumnFamilyStore cfStore, Collection<DecoratedKey> keys)
{
com.google.common.base.Predicate<SSTableReader> predicate = sstable -> {
for (DecoratedKey key : keys)
{
if(sstable.maybePresent(key))
{
return true;
}
}

return false;
};

forceCompaction(cfStore, () -> sstablesWithKeys(cfStore, keys), predicate);
}

private static Collection<SSTableReader> sstablesWithKey(ColumnFamilyStore cfs, DecoratedKey key)
{
final Set<SSTableReader> sstables = new HashSet<>();
Expand All @@ -1060,6 +1077,18 @@ private static Collection<SSTableReader> sstablesWithKey(ColumnFamilyStore cfs,
return sstables.isEmpty() ? Collections.emptyList() : sstables;
}

private static Collection<SSTableReader> sstablesWithKeys(ColumnFamilyStore cfs, Collection<DecoratedKey> decoratedKeys)
{
final Set<SSTableReader> sstables = new HashSet<>();

for (DecoratedKey decoratedKey : decoratedKeys)
{
sstables.addAll(sstablesWithKey(cfs, decoratedKey));
}

return sstables;
}

public void forceUserDefinedCompaction(String dataFiles)
{
String[] filenames = dataFiles.split(",");
Expand Down
13 changes: 12 additions & 1 deletion src/java/org/apache/cassandra/db/partitions/PurgeFunction.java
Expand Up @@ -31,13 +31,15 @@ public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator
private final boolean enforceStrictLiveness;
private boolean isReverseOrder;

private boolean ignoreGcGraceSeconds;

public PurgeFunction(int nowInSec, int gcBefore, int oldestUnrepairedTombstone, boolean onlyPurgeRepairedTombstones,
boolean enforceStrictLiveness)
{
this.nowInSec = nowInSec;
this.purger = (timestamp, localDeletionTime) ->
!(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone)
&& localDeletionTime < gcBefore
&& (localDeletionTime < gcBefore || ignoreGcGraceSeconds)
&& getPurgeEvaluator().test(timestamp);
this.enforceStrictLiveness = enforceStrictLiveness;
}
Expand All @@ -59,6 +61,13 @@ protected void updateProgress()
{
}

// Called at the beginning of each new partition
// Return true if the current partitionKey ignores the gc_grace_seconds during compaction.
protected boolean shouldIgnoreGcGrace()
{
return false;
}

protected void setReverseOrder(boolean isReverseOrder)
{
this.isReverseOrder = isReverseOrder;
Expand All @@ -70,6 +79,8 @@ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition
{
onNewPartition(partition.partitionKey());

ignoreGcGraceSeconds = shouldIgnoreGcGrace();

setReverseOrder(partition.isReverseOrder());
UnfilteredRowIterator purged = Transformation.apply(partition, this);
if (purged.isEmpty())
Expand Down
18 changes: 18 additions & 0 deletions src/java/org/apache/cassandra/service/StorageService.java
Expand Up @@ -4072,6 +4072,24 @@ public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String p
}
}

/***
* Forces compaction for a list of partition keys in a table
* The method will ignore the gc_grace_seconds for the partitionKeysIgnoreGcGrace during the comapction,
* in order to purge the tombstones and free up space quicker.
* @param keyspaceName keyspace name
* @param tableName table name
* @param partitionKeysIgnoreGcGrace partition keys ignoring the gc_grace_seconds
* @throws IOException on any I/O operation error
* @throws ExecutionException when attempting to retrieve the result of a task that aborted by throwing an exception
* @throws InterruptedException when a thread is waiting, sleeping, or otherwise occupied, and the thread is interrupted, either before or during the activity
*/
public void forceCompactionKeysIgnoringGcGrace(String keyspaceName,
String tableName, String... partitionKeysIgnoreGcGrace) throws IOException, ExecutionException, InterruptedException
{
ColumnFamilyStore cfStore = getValidKeyspace(keyspaceName).getColumnFamilyStore(tableName);
cfStore.forceCompactionKeysIgnoringGcGrace(partitionKeysIgnoreGcGrace);
}

/**
* Takes the snapshot for the given keyspaces. A snapshot name must be specified.
*
Expand Down
Expand Up @@ -364,6 +364,13 @@ public interface StorageServiceMBean extends NotificationEmitter
*/
public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String partitionKey, String... tableNames) throws IOException, ExecutionException, InterruptedException;

/**
* Forces compaction for a list of partition keys on a table.
* The method will ignore the gc_grace_seconds for the partitionKeysIgnoreGcGrace during the comapction,
* in order to purge the tombstones and free up space quicker.
*/
public void forceCompactionKeysIgnoringGcGrace(String keyspaceName, String tableName, String... partitionKeysIgnoreGcGrace) throws IOException, ExecutionException, InterruptedException;

/**
* Trigger a cleanup of keys on a single keyspace
*/
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/tools/NodeProbe.java
Expand Up @@ -460,6 +460,11 @@ public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String p
ssProxy.forceKeyspaceCompactionForPartitionKey(keyspaceName, partitionKey, tableNames);
}

public void forceCompactionKeysIgnoringGcGrace(String keyspaceName, String tableName, String... partitionKeysIgnoreGcGrace) throws IOException, ExecutionException, InterruptedException
{
ssProxy.forceCompactionKeysIgnoringGcGrace(keyspaceName, tableName, partitionKeysIgnoreGcGrace);
}

public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException
{
ssProxy.forceKeyspaceFlush(keyspaceName, tableNames);
Expand Down
8 changes: 7 additions & 1 deletion src/java/org/apache/cassandra/tools/NodeTool.java
Expand Up @@ -225,7 +225,8 @@ public int execute(String... args)
UpgradeSSTable.class,
Verify.class,
Version.class,
ViewBuildStatus.class
ViewBuildStatus.class,
ForceCompact.class
);

Cli.CliBuilder<NodeToolCmdRunnable> builder = Cli.builder("nodetool");
Expand Down Expand Up @@ -484,6 +485,11 @@ protected String[] parseOptionalTables(List<String> cmdArgs)
{
return cmdArgs.size() <= 1 ? EMPTY_STRING_ARRAY : toArray(cmdArgs.subList(1, cmdArgs.size()), String.class);
}

protected String[] parsePartitionKeys(List<String> cmdArgs)
{
return cmdArgs.size() <= 2 ? EMPTY_STRING_ARRAY : toArray(cmdArgs.subList(2, cmdArgs.size()), String.class);
}
}

public static SortedMap<String, SetHostStatWithPort> getOwnershipByDcWithPort(NodeProbe probe, boolean resolveIp,
Expand Down
58 changes: 58 additions & 0 deletions src/java/org/apache/cassandra/tools/nodetool/ForceCompact.java
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.tools.nodetool;

import io.airlift.airline.Arguments;
import io.airlift.airline.Command;

import java.util.ArrayList;
import java.util.List;

import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool.NodeToolCmd;

import static com.google.common.base.Preconditions.checkArgument;

@Command(name = "forcecompact", description = "Force a (major) compaction on a table")
public class ForceCompact extends NodeToolCmd
{
@Arguments(usage = "[<keyspace> <table> <keys>]", description = "The keyspace, table, and a list of partition keys ignoring the gc_grace_seconds")
private List<String> args = new ArrayList<>();

@Override
public void execute(NodeProbe probe)
{
// Check if the input has valid size
checkArgument(args.size() >= 3, "forcecompact requires keyspace, table and keys args");

// We rely on lower-level APIs to check and throw exceptions if the input keyspace or table name are invalid
String keyspaceName = args.get(0);
String tableName = args.get(1);
String[] partitionKeysIgnoreGcGrace = parsePartitionKeys(args);

try
{
probe.forceCompactionKeysIgnoringGcGrace(keyspaceName, tableName, partitionKeysIgnoreGcGrace);
}
catch (Exception e)
{
throw new RuntimeException("Error occurred during compaction keys", e);
}
}
}

0 comments on commit 873e024

Please sign in to comment.