diff --git a/CHANGES.txt b/CHANGES.txt index 74055ab158d7..3fabee8b0f79 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.2 + * Add nodetool forcecompact to remove tombstoned or ttl'd data ignoring GC grace for given table and partition keys (CASSANDRA-17711) * Offer IF (NOT) EXISTS in cqlsh completion for CREATE TYPE, DROP TYPE, CREATE ROLE and DROP ROLE (CASSANDRA-16640) * Nodetool bootstrap resume will now return an error if the operation fails (CASSANDRA-16491) * Disable resumable bootstrap by default (CASSANDRA-17679) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index ebe4aeba8a5f..9fc0f775e1a6 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -38,6 +38,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -320,6 +321,9 @@ public enum FlushReason private volatile boolean compactionSpaceCheck = true; + // Tombtone partitions that ignore the gc_grace_seconds during compaction + private final Set partitionKeySetIgnoreGcGrace = ConcurrentHashMap.newKeySet(); + @VisibleForTesting final DiskBoundaryManager diskBoundaryManager = new DiskBoundaryManager(); private volatile ShardBoundaries cachedShardBoundaries = null; @@ -2416,6 +2420,31 @@ public void forceCompactionForKey(DecoratedKey key) CompactionManager.instance.forceCompactionForKey(this, key); } + public void forceCompactionKeysIgnoringGcGrace(String... partitionKeysIgnoreGcGrace) + { + List decoratedKeys = new ArrayList<>(); + try + { + partitionKeySetIgnoreGcGrace.clear(); + + for (String key : partitionKeysIgnoreGcGrace) { + DecoratedKey dk = decorateKey(metadata().partitionKeyType.fromString(key)); + partitionKeySetIgnoreGcGrace.add(dk); + decoratedKeys.add(dk); + } + + CompactionManager.instance.forceCompactionForKeys(this, decoratedKeys); + } finally + { + partitionKeySetIgnoreGcGrace.clear(); + } + } + + public boolean shouldIgnoreGcGraceForKey(DecoratedKey dk) + { + return partitionKeySetIgnoreGcGrace.contains(dk); + } + public static Iterable all() { List> stores = new ArrayList<>(Schema.instance.getKeyspaces().size()); diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 2f79f92b84e8..bceb2b6a97bf 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -349,6 +349,18 @@ protected void updateProgress() updateBytesRead(); } + /* + * Called at the beginning of each new partition + * Return true if the current partitionKey ignores the gc_grace_seconds during compaction. + * Note that this method should be called after the onNewPartition because it depends on the currentKey + * which is set in the onNewPartition + */ + @Override + protected boolean shouldIgnoreGcGrace() + { + return controller.cfs.shouldIgnoreGcGraceForKey(currentKey); + } + /* * Evaluates whether a tombstone with the given deletion timestamp can be purged. This is the minimum * timestamp for any sstable containing `currentKey` outside of the set of sstables involved in this compaction. diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index dc22b6712ab0..2e1d94e80723 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1047,6 +1047,23 @@ public void forceCompactionForKey(ColumnFamilyStore cfStore, DecoratedKey key) forceCompaction(cfStore, () -> sstablesWithKey(cfStore, key), sstable -> sstable.maybePresent(key)); } + public void forceCompactionForKeys(ColumnFamilyStore cfStore, Collection keys) + { + com.google.common.base.Predicate predicate = sstable -> { + for (DecoratedKey key : keys) + { + if(sstable.maybePresent(key)) + { + return true; + } + } + + return false; + }; + + forceCompaction(cfStore, () -> sstablesWithKeys(cfStore, keys), predicate); + } + private static Collection sstablesWithKey(ColumnFamilyStore cfs, DecoratedKey key) { final Set sstables = new HashSet<>(); @@ -1060,6 +1077,18 @@ private static Collection sstablesWithKey(ColumnFamilyStore cfs, return sstables.isEmpty() ? Collections.emptyList() : sstables; } + private static Collection sstablesWithKeys(ColumnFamilyStore cfs, Collection decoratedKeys) + { + final Set sstables = new HashSet<>(); + + for (DecoratedKey decoratedKey : decoratedKeys) + { + sstables.addAll(sstablesWithKey(cfs, decoratedKey)); + } + + return sstables; + } + public void forceUserDefinedCompaction(String dataFiles) { String[] filenames = dataFiles.split(","); diff --git a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java index 09f3ae3bbf18..5d97fd36b1ee 100644 --- a/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java +++ b/src/java/org/apache/cassandra/db/partitions/PurgeFunction.java @@ -31,13 +31,15 @@ public abstract class PurgeFunction extends Transformation !(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone) - && localDeletionTime < gcBefore + && (localDeletionTime < gcBefore || ignoreGcGraceSeconds) && getPurgeEvaluator().test(timestamp); this.enforceStrictLiveness = enforceStrictLiveness; } @@ -59,6 +61,13 @@ protected void updateProgress() { } + // Called at the beginning of each new partition + // Return true if the current partitionKey ignores the gc_grace_seconds during compaction. + protected boolean shouldIgnoreGcGrace() + { + return false; + } + protected void setReverseOrder(boolean isReverseOrder) { this.isReverseOrder = isReverseOrder; @@ -70,6 +79,8 @@ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition { onNewPartition(partition.partitionKey()); + ignoreGcGraceSeconds = shouldIgnoreGcGrace(); + setReverseOrder(partition.isReverseOrder()); UnfilteredRowIterator purged = Transformation.apply(partition, this); if (purged.isEmpty()) diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d6162331045e..da130c60a08b 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -4072,6 +4072,24 @@ public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String p } } + /*** + * Forces compaction for a list of partition keys in a table + * The method will ignore the gc_grace_seconds for the partitionKeysIgnoreGcGrace during the comapction, + * in order to purge the tombstones and free up space quicker. + * @param keyspaceName keyspace name + * @param tableName table name + * @param partitionKeysIgnoreGcGrace partition keys ignoring the gc_grace_seconds + * @throws IOException on any I/O operation error + * @throws ExecutionException when attempting to retrieve the result of a task that aborted by throwing an exception + * @throws InterruptedException when a thread is waiting, sleeping, or otherwise occupied, and the thread is interrupted, either before or during the activity + */ + public void forceCompactionKeysIgnoringGcGrace(String keyspaceName, + String tableName, String... partitionKeysIgnoreGcGrace) throws IOException, ExecutionException, InterruptedException + { + ColumnFamilyStore cfStore = getValidKeyspace(keyspaceName).getColumnFamilyStore(tableName); + cfStore.forceCompactionKeysIgnoringGcGrace(partitionKeysIgnoreGcGrace); + } + /** * Takes the snapshot for the given keyspaces. A snapshot name must be specified. * diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 101c9a3ebf7c..c92ea72bd6f7 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -364,6 +364,13 @@ public interface StorageServiceMBean extends NotificationEmitter */ public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String partitionKey, String... tableNames) throws IOException, ExecutionException, InterruptedException; + /** + * Forces compaction for a list of partition keys on a table. + * The method will ignore the gc_grace_seconds for the partitionKeysIgnoreGcGrace during the comapction, + * in order to purge the tombstones and free up space quicker. + */ + public void forceCompactionKeysIgnoringGcGrace(String keyspaceName, String tableName, String... partitionKeysIgnoreGcGrace) throws IOException, ExecutionException, InterruptedException; + /** * Trigger a cleanup of keys on a single keyspace */ diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 7abede2ca8b0..e296d39723e3 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -460,6 +460,11 @@ public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String p ssProxy.forceKeyspaceCompactionForPartitionKey(keyspaceName, partitionKey, tableNames); } + public void forceCompactionKeysIgnoringGcGrace(String keyspaceName, String tableName, String... partitionKeysIgnoreGcGrace) throws IOException, ExecutionException, InterruptedException + { + ssProxy.forceCompactionKeysIgnoringGcGrace(keyspaceName, tableName, partitionKeysIgnoreGcGrace); + } + public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException { ssProxy.forceKeyspaceFlush(keyspaceName, tableNames); diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 8d87c88906b1..5dca8eda7350 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -225,7 +225,8 @@ public int execute(String... args) UpgradeSSTable.class, Verify.class, Version.class, - ViewBuildStatus.class + ViewBuildStatus.class, + ForceCompact.class ); Cli.CliBuilder builder = Cli.builder("nodetool"); @@ -484,6 +485,11 @@ protected String[] parseOptionalTables(List cmdArgs) { return cmdArgs.size() <= 1 ? EMPTY_STRING_ARRAY : toArray(cmdArgs.subList(1, cmdArgs.size()), String.class); } + + protected String[] parsePartitionKeys(List cmdArgs) + { + return cmdArgs.size() <= 2 ? EMPTY_STRING_ARRAY : toArray(cmdArgs.subList(2, cmdArgs.size()), String.class); + } } public static SortedMap getOwnershipByDcWithPort(NodeProbe probe, boolean resolveIp, diff --git a/src/java/org/apache/cassandra/tools/nodetool/ForceCompact.java b/src/java/org/apache/cassandra/tools/nodetool/ForceCompact.java new file mode 100644 index 000000000000..99265e7bf0fa --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/ForceCompact.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool.NodeToolCmd; + +import static com.google.common.base.Preconditions.checkArgument; + +@Command(name = "forcecompact", description = "Force a (major) compaction on a table") +public class ForceCompact extends NodeToolCmd +{ + @Arguments(usage = "[ ]", description = "The keyspace, table, and a list of partition keys ignoring the gc_grace_seconds") + private List args = new ArrayList<>(); + + @Override + public void execute(NodeProbe probe) + { + // Check if the input has valid size + checkArgument(args.size() >= 3, "forcecompact requires keyspace, table and keys args"); + + // We rely on lower-level APIs to check and throw exceptions if the input keyspace or table name are invalid + String keyspaceName = args.get(0); + String tableName = args.get(1); + String[] partitionKeysIgnoreGcGrace = parsePartitionKeys(args); + + try + { + probe.forceCompactionKeysIgnoringGcGrace(keyspaceName, tableName, partitionKeysIgnoreGcGrace); + } + catch (Exception e) + { + throw new RuntimeException("Error occurred during compaction keys", e); + } + } +} \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/tools/nodetool/ForceCompactionTest.java b/test/unit/org/apache/cassandra/tools/nodetool/ForceCompactionTest.java new file mode 100644 index 000000000000..04d369ec6dfd --- /dev/null +++ b/test/unit/org/apache/cassandra/tools/nodetool/ForceCompactionTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableReader; + +public class ForceCompactionTest extends CQLTester +{ + private final static int NUM_PARTITIONS = 10; + private final static int NUM_ROWS = 100; + + @Before + public void setup() throws Throwable + { + createTable("CREATE TABLE %s (key text, c1 text, c2 text, c3 text, PRIMARY KEY (key, c1))"); + + for (int partitionCount = 0; partitionCount < NUM_PARTITIONS; partitionCount++) + { + for (int rowCount = 0; rowCount < NUM_ROWS; rowCount++) + { + execute("INSERT INTO %s (key, c1, c2, c3) VALUES (?, ?, ?, ?)", + "k" + partitionCount, "c1_" + rowCount, "c2_" + rowCount, "c3_" + rowCount); + } + } + + // Disable auto compaction + // NOTE: We can only disable the auto compaction once the table is created because the setting is on + // the table level. And we don't need to re-enable it back because the table will be dropped after the test. + disableCompaction(); + } + + @Test + public void forceCompactPartitionTombstoneTest() throws Throwable + { + String keyToPurge = "k0"; + + testHelper("DELETE FROM %s WHERE key = ?", keyToPurge); + } + + @Test + public void forceCompactMultiplePartitionsTombstoneTest() throws Throwable + { + List keysToPurge = new ArrayList<>(); + Random rand = new Random(); + + int numPartitionsToPurge = 1 + rand.nextInt(NUM_PARTITIONS); + for (int count = 0; count < numPartitionsToPurge; count++) + { + String key = "k" + rand.nextInt(NUM_PARTITIONS); + + execute("DELETE FROM %s WHERE key = ?", key); + keysToPurge.add(key); + } + + flush(); + + String[] keys = new String[keysToPurge.size()]; + keys = keysToPurge.toArray(keys); + forceCompact(keys); + + verifyNotContainsTombstones(); + } + + @Test + public void forceCompactRowTombstoneTest() throws Throwable + { + String keyToPurge = "k0"; + + testHelper("DELETE FROM %s WHERE key = ? AND c1 = 'c1_0'", keyToPurge); + } + + @Test + public void forceCompactMultipleRowsTombstoneTest() throws Throwable + { + List keysToPurge = new ArrayList<>(); + + Random randPartition = new Random(); + Random randRow = new Random(); + + for (int count = 0; count < 10; count++) + { + String partitionKey = "k" + randPartition.nextInt(NUM_PARTITIONS); + String clusteringKey = "c1_" + randRow.nextInt(NUM_ROWS); + + execute("DELETE FROM %s WHERE key = ? AND c1 = ?", partitionKey, clusteringKey); + keysToPurge.add(partitionKey); + } + + flush(); + + String[] keys = new String[keysToPurge.size()]; + keys = keysToPurge.toArray(keys); + forceCompact(keys); + + verifyNotContainsTombstones(); + } + + @Test + public void forceCompactCellTombstoneTest() throws Throwable + { + String keyToPurge = "k0"; + testHelper("DELETE c2 FROM %s WHERE key = ? AND c1 = 'c1_0'", keyToPurge); + } + + @Test + public void forceCompactMultipleCellsTombstoneTest() throws Throwable + { + List keysToPurge = new ArrayList<>(); + + Random randPartition = new Random(); + Random randRow = new Random(); + + for (int count = 0; count < 10; count++) + { + String partitionKey = "k" + randPartition.nextInt(NUM_PARTITIONS); + String clusteringKey = "c1_" + randRow.nextInt(NUM_ROWS); + + execute("DELETE c2, c3 FROM %s WHERE key = ? AND c1 = ?", partitionKey, clusteringKey); + keysToPurge.add(partitionKey); + } + + flush(); + + String[] keys = new String[keysToPurge.size()]; + keys = keysToPurge.toArray(keys); + forceCompact(keys); + + verifyNotContainsTombstones(); + } + + @Test + public void forceCompactUpdateCellTombstoneTest() throws Throwable + { + String keyToPurge = "k0"; + testHelper("UPDATE %s SET c2 = null WHERE key = ? AND c1 = 'c1_0'", keyToPurge); + } + + @Test + public void forceCompactTTLExpiryTest() throws Throwable + { + int ttlSec = 2; + String keyToPurge = "k0"; + + execute("UPDATE %s USING TTL ? SET c2 = 'bbb' WHERE key = ? AND c1 = 'c1_0'", ttlSec, keyToPurge); + + flush(); + + // Wait until the TTL has been expired + // NOTE: we double the wait time of the ttl to be on the safer side and avoid the flakiness of the test + Thread.sleep(ttlSec * 1000 * 2); + + String[] keysToPurge = new String[]{keyToPurge}; + forceCompact(keysToPurge); + + verifyNotContainsTombstones(); + } + + @Test + public void forceCompactCompositePartitionKeysTest() throws Throwable + { + createTable("CREATE TABLE %s (key1 text, key2 text, c1 text, c2 text, PRIMARY KEY ((key1, key2), c1))"); + + for (int partitionCount = 0; partitionCount < NUM_PARTITIONS; partitionCount++) + { + for (int rowCount = 0; rowCount < NUM_ROWS; rowCount++) + { + execute("INSERT INTO %s (key1, key2, c1, c2) VALUES (?, ?, ?, ?)", + "k1_" + partitionCount, "k2_" + partitionCount, "c1_" + rowCount, "c2_" + rowCount); + } + } + + // Disable auto compaction + // NOTE: We can only disable the auto compaction once the table is created because the setting is on + // the table level. And we don't need to re-enable it back because the table will be dropped after the test. + disableCompaction(); + + String keyToPurge = "k1_0:k2_0"; + + execute("DELETE FROM %s WHERE key1 = 'k1_0' and key2 = 'k2_0'"); + + flush(); + + String[] keysToPurge = new String[]{keyToPurge}; + forceCompact(keysToPurge); + + verifyNotContainsTombstones(); + } + + private void testHelper(String cqlStatement, String keyToPurge) throws Throwable + { + execute(cqlStatement, keyToPurge); + + flush(); + + String[] keysToPurge = new String[]{keyToPurge}; + forceCompact(keysToPurge); + + verifyNotContainsTombstones(); + } + + private void forceCompact(String[] partitionKeysIgnoreGcGrace) + { + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + if (cfs != null) + { + cfs.forceCompactionKeysIgnoringGcGrace(partitionKeysIgnoreGcGrace); + } + } + + private void verifyNotContainsTombstones() + { + // Get sstables + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()); + Collection sstables = cfs.getLiveSSTables(); + + // always run a major compaction before calling this + assertTrue(sstables.size() == 1); + + SSTableReader sstable = sstables.iterator().next(); + int actualPurgedTombstoneCount = 0; + try (ISSTableScanner scanner = sstable.getScanner()) + { + while (scanner.hasNext()) + { + try (UnfilteredRowIterator iter = scanner.next()) + { + // Partition should be all alive + assertTrue(iter.partitionLevelDeletion().isLive()); + + while (iter.hasNext()) + { + Unfiltered atom = iter.next(); + if (atom.isRow()) + { + Row r = (Row)atom; + + // Row should be all alive + assertTrue(r.deletion().isLive()); + + // Cell should be alive as well + for (Cell c : r.cells()) + { + assertFalse(c.isTombstone()); + } + } + } + } + } + } + } +} \ No newline at end of file