From 2b90ac1a1671b4071d9aa6f18e852021bc66702d Mon Sep 17 00:00:00 2001 From: David Capwell Date: Thu, 21 Apr 2022 14:37:59 -0700 Subject: [PATCH] nodetool compact should support using a key string to find the range to avoid operators having to manually do this patch by David Capwell; reviewed by Marcus Eriksson for CASSANDRA-17537 --- CHANGES.txt | 1 + .../cassandra/db/ColumnFamilyStore.java | 5 + .../db/compaction/CompactionController.java | 6 +- .../db/compaction/CompactionManager.java | 30 ++++- .../cassandra/dht/Murmur3Partitioner.java | 5 + .../io/sstable/format/SSTableReader.java | 7 ++ .../cassandra/service/StorageService.java | 49 +++++++- .../service/StorageServiceMBean.java | 7 ++ .../org/apache/cassandra/tools/NodeProbe.java | 5 + .../cassandra/tools/nodetool/Compact.java | 13 ++- .../apache/cassandra/tools/ToolRunner.java | 53 +++++++++ .../cassandra/tools/nodetool/CompactTest.java | 107 ++++++++++++++++++ 12 files changed, 274 insertions(+), 14 deletions(-) create mode 100644 test/unit/org/apache/cassandra/tools/nodetool/CompactTest.java diff --git a/CHANGES.txt b/CHANGES.txt index 9e9e1ee2f1b4..972f76044230 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1 + * nodetool compact should support using a key string to find the range to avoid operators having to manually do this (CASSANDRA-17537) * Add guardrail for data disk usage (CASSANDRA-17150) * Tool to list data paths of existing tables (CASSANDRA-17568) * Migrate track_warnings to more standard naming conventions and use latest configuration types rather than long (CASSANDRA-17560) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 35ca94214d2e..47dd66d7ae17 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2365,6 +2365,11 @@ static Set> toTokenRanges(IPartitioner partitioner, String... strin return tokenRanges; } + public void forceCompactionForKey(DecoratedKey key) + { + CompactionManager.instance.forceCompactionForKey(this, key); + } + public static Iterable all() { List> stores = new ArrayList<>(Schema.instance.getKeyspaces().size()); diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index e1b0f3258359..814292f207f0 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -34,7 +34,6 @@ import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.schema.CompactionParams.TombstoneOption; -import org.apache.cassandra.utils.AlwaysPresentFilter; import org.apache.cassandra.utils.OverlapIterator; import org.apache.cassandra.utils.concurrent.Refs; @@ -255,10 +254,7 @@ public LongPredicate getPurgeEvaluator(DecoratedKey key) for (SSTableReader sstable: filteredSSTables) { - // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing), - // we check index file instead. - if (sstable.getBloomFilter() instanceof AlwaysPresentFilter && sstable.getPosition(key, SSTableReader.Operator.EQ, false) != null - || sstable.getBloomFilter().isPresent(key)) + if (sstable.maybePresent(key)) { minTimestampSeen = Math.min(minTimestampSeen, sstable.getMinTimestamp()); hasTimestamp = true; diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 47ed3d5e11ea..165e1e02f34f 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BooleanSupplier; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; @@ -928,10 +929,10 @@ protected void runMayThrow() return futures; } - public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, Collection> ranges) + public void forceCompaction(ColumnFamilyStore cfStore, Supplier> sstablesFn, com.google.common.base.Predicate sstablesPredicate) { Callable taskCreator = () -> { - Collection sstables = sstablesInBounds(cfStore, ranges); + Collection sstables = sstablesFn.get(); if (sstables == null || sstables.isEmpty()) { logger.debug("No sstables found for the provided token range"); @@ -941,7 +942,7 @@ public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, Collection new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges), + sstablesPredicate, false, false, false)) @@ -963,6 +964,11 @@ protected void runMayThrow() } } + public void forceCompactionForTokenRange(ColumnFamilyStore cfStore, Collection> ranges) + { + forceCompaction(cfStore, () -> sstablesInBounds(cfStore, ranges), (sstable) -> new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges)); + } + private static Collection sstablesInBounds(ColumnFamilyStore cfs, Collection> tokenRangeCollection) { final Set sstables = new HashSet<>(); @@ -990,6 +996,24 @@ private static Collection sstablesInBounds(ColumnFamilyStore cfs, return sstables; } + public void forceCompactionForKey(ColumnFamilyStore cfStore, DecoratedKey key) + { + forceCompaction(cfStore, () -> sstablesWithKey(cfStore, key), sstable -> sstable.maybePresent(key)); + } + + private static Collection sstablesWithKey(ColumnFamilyStore cfs, DecoratedKey key) + { + final Set sstables = new HashSet<>(); + Iterable liveTables = cfs.getTracker().getView().liveSSTablesInBounds(key.getToken().minKeyBound(), + key.getToken().maxKeyBound()); + for (SSTableReader sstable : liveTables) + { + if (sstable.maybePresent(key)) + sstables.add(sstable); + } + return sstables.isEmpty() ? Collections.emptyList() : sstables; + } + public void forceUserDefinedCompaction(String dataFiles) { String[] filenames = dataFiles.split(","); diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java index b0f8bf90e361..e2daac412cca 100644 --- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java +++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java @@ -214,6 +214,11 @@ public LongToken decreaseSlightly() return new LongToken(token - 1); } + public static ByteBuffer keyForToken(long token) + { + return keyForToken(new LongToken(token)); + } + /** * Reverses murmur3 to find a possible 16 byte key that generates a given token */ diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 4da90fc95d59..e6214891f1c4 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -2003,6 +2003,13 @@ public void addTo(Ref.IdentityCollection identities) } + public boolean maybePresent(DecoratedKey key) + { + // if we don't have bloom filter(bf_fp_chance=1.0 or filter file is missing), + // we check index file instead. + return bf instanceof AlwaysPresentFilter && getPosition(key, Operator.EQ, false) != null || bf.isPresent(key); + } + /** * One instance per SSTableReader we create. * diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 6562635dcabd..c24096ad12c1 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3914,6 +3914,7 @@ public void takeTableSnapshot(String keyspaceName, String tableName, String tag) takeMultipleTableSnapshot(tag, false, null, keyspaceName + "." + tableName); } + @Override public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String startToken, String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException { Collection> tokenRanges = createRepairRangeFrom(startToken, endToken); @@ -3924,6 +3925,30 @@ public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String sta } } + @Override + public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String partitionKey, String... tableNames) throws IOException, ExecutionException, InterruptedException + { + // validate that the key parses before attempting compaction + for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, tableNames)) + { + try + { + getKeyFromPartition(keyspaceName, cfStore.name, partitionKey); + } + catch (Exception e) + { + // JMX can not handle exceptions defined outside of java.* and javax.*, so safer to rewrite the exception + IllegalArgumentException exception = new IllegalArgumentException(String.format("Unable to parse partition key '%s' for table %s; %s", partitionKey, cfStore.metadata, e.getMessage())); + exception.setStackTrace(e.getStackTrace()); + throw exception; + } + } + for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, keyspaceName, tableNames)) + { + cfStore.forceCompactionForKey(getKeyFromPartition(keyspaceName, cfStore.name, partitionKey)); + } + } + /** * Takes the snapshot for the given keyspaces. A snapshot name must be specified. * @@ -4572,6 +4597,22 @@ public List getNaturalEndpointsWithPort(String keyspaceName, ByteBuffer } public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, String cf, String key) + { + return getNaturalReplicasForToken(keyspaceName, partitionKeyToBytes(keyspaceName, cf, key)); + } + + public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, ByteBuffer key) + { + Token token = tokenMetadata.partitioner.getToken(key); + return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(token); + } + + public DecoratedKey getKeyFromPartition(String keyspaceName, String table, String partitionKey) + { + return tokenMetadata.partitioner.decorateKey(partitionKeyToBytes(keyspaceName, table, partitionKey)); + } + + private static ByteBuffer partitionKeyToBytes(String keyspaceName, String cf, String key) { KeyspaceMetadata ksMetaData = Schema.instance.getKeyspaceMetadata(keyspaceName); if (ksMetaData == null) @@ -4581,13 +4622,13 @@ public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, String if (metadata == null) throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'"); - return getNaturalReplicasForToken(keyspaceName, metadata.partitionKeyType.fromString(key)); + return metadata.partitionKeyType.fromString(key); } - public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, ByteBuffer key) + @Override + public String getToken(String keyspaceName, String table, String key) { - Token token = tokenMetadata.partitioner.getToken(key); - return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(token); + return tokenMetadata.partitioner.getToken(partitionKeyToBytes(keyspaceName, table, key)).toString(); } public void setLoggingLevel(String classQualifier, String rawLevel) throws Exception diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index a3267189e12e..802295924be3 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -330,6 +330,11 @@ public interface StorageServiceMBean extends NotificationEmitter */ public void forceKeyspaceCompactionForTokenRange(String keyspaceName, String startToken, String endToken, String... tableNames) throws IOException, ExecutionException, InterruptedException; + /** + * Forces major compactions for the range represented by the partition key + */ + public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String partitionKey, String... tableNames) throws IOException, ExecutionException, InterruptedException; + /** * Trigger a cleanup of keys on a single keyspace */ @@ -984,4 +989,6 @@ public void enableAuditLog(String loggerName, String includedKeyspaces, String e public void clearPaxosRepairs(); public void setSkipPaxosRepairCompatibilityCheck(boolean v); public boolean getSkipPaxosRepairCompatibilityCheck(); + + String getToken(String keyspaceName, String table, String partitionKey); } diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index f66566df48a7..c9c81a874af0 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -441,6 +441,11 @@ public void forceKeyspaceCompactionForTokenRange(String keyspaceName, final Stri ssProxy.forceKeyspaceCompactionForTokenRange(keyspaceName, startToken, endToken, tableNames); } + public void forceKeyspaceCompactionForPartitionKey(String keyspaceName, String partitionKey, String... tableNames) throws InterruptedException, ExecutionException, IOException + { + ssProxy.forceKeyspaceCompactionForPartitionKey(keyspaceName, partitionKey, tableNames); + } + public void forceKeyspaceFlush(String keyspaceName, String... tableNames) throws IOException, ExecutionException, InterruptedException { ssProxy.forceKeyspaceFlush(keyspaceName, tableNames); diff --git a/src/java/org/apache/cassandra/tools/nodetool/Compact.java b/src/java/org/apache/cassandra/tools/nodetool/Compact.java index ca560cd3bba1..23a8fb9d8929 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Compact.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Compact.java @@ -47,11 +47,16 @@ public class Compact extends NodeToolCmd @Option(title = "end_token", name = {"-et", "--end-token"}, description = "Use -et to specify a token at which compaction range ends") private String endToken = EMPTY; + @Option(title = "partition_key", name = {"--partition"}, description = "String representation of the partition key") + private String partitionKey = EMPTY; + @Override public void execute(NodeProbe probe) { - final boolean tokenProvided = !(startToken.isEmpty() && endToken.isEmpty()); + final boolean startEndTokenProvided = !(startToken.isEmpty() && endToken.isEmpty()); + final boolean partitionKeyProvided = !partitionKey.isEmpty(); + final boolean tokenProvided = startEndTokenProvided || partitionKeyProvided; if (splitOutput && (userDefined || tokenProvided)) { throw new RuntimeException("Invalid option combination: Can not use split-output here"); @@ -80,10 +85,14 @@ public void execute(NodeProbe probe) { try { - if (tokenProvided) + if (startEndTokenProvided) { probe.forceKeyspaceCompactionForTokenRange(keyspace, startToken, endToken, tableNames); } + else if (partitionKeyProvided) + { + probe.forceKeyspaceCompactionForPartitionKey(keyspace, partitionKey, tableNames); + } else { probe.forceKeyspaceCompaction(splitOutput, keyspace, tableNames); diff --git a/test/unit/org/apache/cassandra/tools/ToolRunner.java b/test/unit/org/apache/cassandra/tools/ToolRunner.java index f650f349ef92..6f5516099de9 100644 --- a/test/unit/org/apache/cassandra/tools/ToolRunner.java +++ b/test/unit/org/apache/cassandra/tools/ToolRunner.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -45,8 +46,11 @@ import org.apache.cassandra.distributed.api.IInstance; import org.apache.cassandra.distributed.api.NodeToolResult; import org.apache.cassandra.utils.Pair; +import org.assertj.core.api.Assertions; import org.assertj.core.util.Lists; +import org.assertj.core.util.Strings; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -455,6 +459,55 @@ public void assertOnCleanExit() assertOnExitCode(); assertCleanStdErr(); } + + public AssertHelp asserts() + { + return new AssertHelp(); + } + + public final class AssertHelp + { + public AssertHelp success() + { + if (exitCode != 0) + fail("was not successful"); + return this; + } + + public AssertHelp failure() + { + if (exitCode == 0) + fail("was not successful"); + return this; + } + + public AssertHelp errorContains(String messages) + { + return errorContainsAny(messages); + } + + public AssertHelp errorContainsAny(String... messages) + { + assertThat(messages).hasSizeGreaterThan(0); + assertThat(stderr).isNotNull(); + if (!Stream.of(messages).anyMatch(stderr::contains)) + fail("stderr does not contain " + Arrays.toString(messages)); + return this; + } + + private void fail(String msg) + { + StringBuilder sb = new StringBuilder(); + sb.append("nodetool command ").append(String.join(" ", allArgs)).append(": ").append(msg).append('\n'); + if (stdout != null) + sb.append("stdout:\n").append(stdout).append('\n'); + if (stderr != null) + sb.append("stderr:\n").append(stderr).append('\n'); + if (e != null) + sb.append("Exception:\n").append(Throwables.getStackTraceAsString(e)).append('\n'); + throw new AssertionError(sb.toString()); + } + } } public interface ObservableTool extends AutoCloseable diff --git a/test/unit/org/apache/cassandra/tools/nodetool/CompactTest.java b/test/unit/org/apache/cassandra/tools/nodetool/CompactTest.java new file mode 100644 index 000000000000..f8fe6078d418 --- /dev/null +++ b/test/unit/org/apache/cassandra/tools/nodetool/CompactTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import java.util.Arrays; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.tools.ToolRunner; +import org.assertj.core.api.Assertions; + +import static org.apache.cassandra.tools.ToolRunner.invokeNodetool; + +public class CompactTest extends CQLTester +{ + @BeforeClass + public static void setup() throws Throwable + { + startJMXServer(); + } + + @Test + public void keyPresent() throws Throwable + { + long token = 42; + long key = Murmur3Partitioner.LongToken.keyForToken(token).getLong(); + createTable("CREATE TABLE %s (id bigint, value text, PRIMARY KEY ((id)))"); + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()); + cfs.disableAutoCompaction(); + // write SSTables for the specific key + for (int i = 0; i < 10; i++) + { + execute("INSERT INTO %s (id, value) VALUES (?, ?)", key, "This is just some text... part " + i); + flush(keyspace()); + } + Assertions.assertThat(cfs.getTracker().getView().liveSSTables()).hasSize(10); + invokeNodetool("compact", "--partition", Long.toString(key), keyspace(), currentTable()).assertOnCleanExit(); + + // only 1 SSTable should exist + Assertions.assertThat(cfs.getTracker().getView().liveSSTables()).hasSize(1); + } + + @Test + public void keyNotPresent() throws Throwable + { + long token = 42; + long key = Murmur3Partitioner.LongToken.keyForToken(token).getLong(); + createTable("CREATE TABLE %s (id bigint, value text, PRIMARY KEY ((id)))"); + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()); + cfs.disableAutoCompaction(); + // write SSTables for the specific key + for (int i = 0; i < 10; i++) + { + execute("INSERT INTO %s (id, value) VALUES (?, ?)", key, "This is just some text... part " + i); + flush(keyspace()); + } + Assertions.assertThat(cfs.getTracker().getView().liveSSTables()).hasSize(10); + + for (long keyNotFound : Arrays.asList(key - 1, key + 1)) + { + invokeNodetool("compact", "--partition", Long.toString(keyNotFound), keyspace(), currentTable()).assertOnCleanExit(); + + // only 1 SSTable should exist + Assertions.assertThat(cfs.getTracker().getView().liveSSTables()).hasSize(10); + } + } + + @Test + public void tableNotFound() + { + invokeNodetool("compact", "--partition", Long.toString(42), keyspace(), "doesnotexist") + .asserts() + .failure() + .errorContains(String.format("java.lang.IllegalArgumentException: Unknown keyspace/cf pair (%s.doesnotexist)", keyspace())); + } + + @Test + public void keyWrongType() + { + createTable("CREATE TABLE %s (id bigint, value text, PRIMARY KEY ((id)))"); + + invokeNodetool("compact", "--partition", "this_will_not_work", keyspace(), currentTable()) + .asserts() + .failure() + .errorContains(String.format("Unable to parse partition key 'this_will_not_work' for table %s.%s; Unable to make long from 'this_will_not_work'", keyspace(), currentTable())); + } +}