diff --git a/CHANGES.txt b/CHANGES.txt index 097950afb27a..6db0e3b084a7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0.10 + * Partial compaction can resurrect deleted data (CASSANDRA-18507) * Allow internal address to change with reconnecting snitches (CASSANDRA-16718) * Fix quoting in toCqlString methods of UDTs and aggregates (CASSANDRA-17918) * NPE when deserializing malformed collections from client (CASSANDRA-18505) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index bb2094f931b3..cee2b58f75a3 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -116,7 +116,7 @@ public void maybeRefreshOverlaps() } } - private void refreshOverlaps() + void refreshOverlaps() { if (NEVER_PURGE_TOMBSTONES || cfs.getNeverPurgeTombstones()) return; diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 3b0e1729d449..90abac3fb272 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -83,13 +83,14 @@ public boolean reduceScopeForLimitedSpace(Set nonExpiredSSTables, if (partialCompactionsAcceptable() && transaction.originals().size() > 1) { // Try again w/o the largest one. - logger.warn("insufficient space to compact all requested files. {}MB required, {} for compaction {}", + SSTableReader removedSSTable = cfs.getMaxSizeFile(nonExpiredSSTables); + logger.warn("insufficient space to compact all requested files. {}MB required, {} for compaction {} - removing largest SSTable: {}", (float) expectedSize / 1024 / 1024, StringUtils.join(transaction.originals(), ", "), - transaction.opId()); + transaction.opId(), + removedSSTable); // Note that we have removed files that are still marked as compacting. // This suboptimal but ok since the caller will unmark all the sstables at the end. - SSTableReader removedSSTable = cfs.getMaxSizeFile(nonExpiredSSTables); transaction.cancel(removedSSTable); return true; } @@ -123,7 +124,12 @@ protected void runMayThrow() throws Exception final Set fullyExpiredSSTables = controller.getFullyExpiredSSTables(); // select SSTables to compact based on available disk space. - buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables); + if (!buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables)) + { + // The set of sstables has changed (one or more were excluded due to limited available disk space). + // We need to recompute the overlaps between sstables. + controller.refreshOverlaps(); + } // sanity check: all sstables must belong to the same cfs assert !Iterables.any(transaction.originals(), new Predicate() @@ -345,13 +351,15 @@ public static boolean getIsTransient(Set sstables) * Checks if we have enough disk space to execute the compaction. Drops the largest sstable out of the Task until * there's enough space (in theory) to handle the compaction. Does not take into account space that will be taken by * other compactions. + * + * @return true if there is enough disk space to execute the complete compaction, false if some sstables are excluded. */ - protected void buildCompactionCandidatesForAvailableDiskSpace(final Set fullyExpiredSSTables) + protected boolean buildCompactionCandidatesForAvailableDiskSpace(final Set fullyExpiredSSTables) { if(!cfs.isCompactionDiskSpaceCheckEnabled() && compactionType == OperationType.COMPACTION) { - logger.info("Compaction space check is disabled"); - return; // try to compact all SSTables + logger.info("Compaction space check is disabled - trying to compact all sstables"); + return true; } final Set nonExpiredSSTables = Sets.difference(transaction.originals(), fullyExpiredSSTables); @@ -395,8 +403,9 @@ protected void buildCompactionCandidatesForAvailableDiskSpace(final Set expected = Sets.newHashSetWithExpectedSize(990); + for (int i = 0; i < 1000; i++) + { + cluster.coordinator(1).execute(withKeyspace("insert into %s.tbl (id) values (?)"), ConsistencyLevel.ONE, i); + if (i >= 10) + expected.add(i); + } + cluster.get(1).flush(KEYSPACE); + for (int i = 0; i < 10; i++) + { + cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl where id = ?"), ConsistencyLevel.ONE, i); + cluster.get(1).flush(KEYSPACE); + } + assertEquals(expected, Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from %s.tbl"), ConsistencyLevel.ONE)) + .map(x -> x[0]) + .collect(Collectors.toSet())); + + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); // make sure tombstones are gc:able + + cluster.get(1).runOnInstance(() -> { + BB.enabled.set(true); + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl"); + cfs.forceMajorCompaction(); + assertEquals("We should have 2 sstables (not 1) after major compaction since we reduced the scope of the compaction", + 2, Iterables.size(cfs.getSSTables(SSTableSet.CANONICAL))); + }); + assertEquals(expected, Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from %s.tbl"), ConsistencyLevel.ONE)) + .map(x -> x[0]) + .collect(Collectors.toSet())); + } + } + + public static class BB + { + static AtomicBoolean enabled = new AtomicBoolean(); + public static void install(ClassLoader cl, Integer i) + { + new ByteBuddy().rebase(Directories.class) + .method(named("hasAvailableDiskSpace").and(takesArguments(2))) + .intercept(MethodDelegation.to(BB.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + public static boolean hasAvailableDiskSpace(long ignore1, long ignore2) + { + if (enabled.get()) + { + enabled.set(false); + return false; + } + return true; + } + } +} diff --git a/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java new file mode 100644 index 000000000000..9cb3872f8068 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + +import java.util.Iterator; + +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.RowUpdateBuilder; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.FBUtilities; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; + +public class PartialCompactionsTest extends SchemaLoader +{ + static final String KEYSPACE = PartialCompactionsTest.class.getSimpleName(); + static final String TABLE = "testtable"; + + @BeforeClass + public static void initSchema() + { + CompactionManager.instance.disableAutoCompaction(); + + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, TABLE)); + + LimitableDataDirectory.applyTo(KEYSPACE, TABLE); + } + + @Before + public void prepareCFS() + { + LimitableDataDirectory.setAvailableSpace(cfStore(), null); + } + + @After + public void truncateCF() + { + cfStore().truncateBlocking(); + LifecycleTransaction.waitForDeletions(); + } + + private static ColumnFamilyStore cfStore() + { + return Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE); + } + + @Test + public void shouldNotResurrectDataFromSSTableExcludedDueToInsufficientSpace() + { + // given + ColumnFamilyStore cfs = cfStore(); + int few = 10, many = 10 * few; + + // a large sstable as the oldest + createDataSSTable(cfs, 0, many); + // more inserts (to have more than one sstable to compact) + createDataSSTable(cfs, many, many + few); + // delete data that's in both of the prior sstables + createTombstonesSSTable(cfs, many - few / 2, many + few / 2); + + // emulate there not being enough space to compact all sstables + LimitableDataDirectory.setAvailableSpace(cfs, enoughSpaceForAllButTheLargestSSTable(cfs)); + + // when - run a compaction where all tombstones have timed out + FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, Integer.MAX_VALUE, false)); + + // then - the tombstones should not be removed + assertEquals("live sstables after compaction", 2, cfs.getLiveSSTables().size()); + assertEquals("remaining live rows after compaction", many, liveRows(cfs)); + } + + private static long enoughSpaceForAllButTheLargestSSTable(ColumnFamilyStore cfs) + { + long totalSize = 1, maxSize = 0; + for (SSTableReader ssTable : cfs.getLiveSSTables()) + { + long size = ssTable.onDiskLength(); + if (size > maxSize) maxSize = size; + totalSize += size; + } + return totalSize - maxSize; + } + + private static int liveRows(ColumnFamilyStore cfs) + { + return Util.getAll(Util.cmd(cfs, "key1").build()).stream() + .map(partition -> count(partition.rowIterator())) + .reduce(Integer::sum) + .orElse(0); + } + + private static int count(Iterator iter) + { + try (CloseableIterator unused = iter instanceof CloseableIterator ? (CloseableIterator) iter : null) + { + int count = 0; + for (; iter.hasNext(); iter.next()) + { + count++; + } + return count; + } + } + + private static void createDataSSTable(ColumnFamilyStore cfs, int firstKey, int endKey) + { + for (int i = firstKey; i < endKey; i++) + { + new RowUpdateBuilder(cfs.metadata(), 0, "key1") + .clustering(String.valueOf(i)) + .add("val", String.valueOf(i)) + .build() + .applyUnsafe(); + } + cfs.forceBlockingFlush(); + } + + private static void createTombstonesSSTable(ColumnFamilyStore cfs, int firstKey, int endKey) + { + for (int i = firstKey; i < endKey; i++) + { + RowUpdateBuilder.deleteRow(cfs.metadata(), 1, "key1", String.valueOf(i)).applyUnsafe(); + } + cfs.forceBlockingFlush(); + } + + private static class LimitableDataDirectory extends Directories.DataDirectory + { + private Long availableSpace; + + LimitableDataDirectory(Directories.DataDirectory dataDirectory) + { + super(dataDirectory.location); + } + + @Override + public long getAvailableSpace() + { + if (availableSpace != null) + return availableSpace; + return super.getAvailableSpace(); + } + + public static void setAvailableSpace(ColumnFamilyStore cfs, Long availableSpace) + { + for (Directories.DataDirectory location : cfs.getDirectories().getWriteableLocations()) + { + assertThat("ColumnFamilyStore set up with ability to emulate limited disk space", + location, instanceOf(LimitableDataDirectory.class)); + ((LimitableDataDirectory) location).availableSpace = availableSpace; + } + } + + public static void applyTo(String ks, String cf) + { + Keyspace keyspace = Keyspace.open(ks); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf); + TableMetadataRef metadata = store.metadata; + keyspace.dropCf(metadata.id); + ColumnFamilyStore cfs = ColumnFamilyStore.createColumnFamilyStore(keyspace, cf, metadata, wrapDirectoriesOf(store), false, false, true); + keyspace.initCfCustom(cfs); + } + + private static Directories wrapDirectoriesOf(ColumnFamilyStore cfs) + { + Directories.DataDirectory[] original = cfs.getDirectories().getWriteableLocations(); + Directories.DataDirectory[] wrapped = new Directories.DataDirectory[original.length]; + for (int i = 0; i < wrapped.length; i++) + { + wrapped[i] = new LimitableDataDirectory(original[i]); + } + return new Directories(cfs.metadata(), wrapped); + } + } +}