diff --git a/CHANGES.txt b/CHANGES.txt index 2f442e1639a2..c19f86787228 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 6.0-alpha2 + * Differentiate between legitimate cases where the first entry is the same as the last entry and empty bounds in SSTableCursorWriter#addIndexBlock() (CASSANDRA-21255) * Synchronously publish changes to local gossip state following metadata updates (CASSANDRA-21239) * Change default for cassandra.set_sep_thread_name to false to reduce CPU usage (CASSANDRA-21089) * Avoid permission checks for masked columns when the table doesn't have any (CASSANDRA-21299) diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableCursorWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableCursorWriter.java index 63f0e034059f..102e109ab3ae 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableCursorWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableCursorWriter.java @@ -99,6 +99,7 @@ public class SSTableCursorWriter implements AutoCloseable private final DataOutputBuffer rowIndexEntries = new DataOutputBuffer(); private final IntArrayList rowIndexEntriesOffsets = new IntArrayList(); private final ClusteringDescriptor rowIndexEntryLastClustering; + private boolean hasDistinctLastClustering = false; private int indexBlockStartOffset; private int rowIndexEntryOffset; private final int indexBlockThreshold; @@ -161,6 +162,7 @@ public int writePartitionStart(byte[] partitionKey, int partitionKeyLength, Dele rowIndexEntriesOffsets.clear(); rowIndexEntryOffset = 0; openMarker.resetLive(); + hasDistinctLastClustering = false; partitionStart = dataWriter.position(); writePartitionHeader(partitionKey, partitionKeyLength, partitionDeletionTime); @@ -554,21 +556,23 @@ public void writeRangeTombstone(UnfilteredDescriptor rangeTombstone, boolean upd updateMetadataAndIndexBlock(rangeTombstone, unfilteredStartPosition, unfilteredEndPosition, updateClusteringMetadata); } - private void updateMetadataAndIndexBlock( - UnfilteredDescriptor unfilteredDescriptor, - long unfilteredStartPosition, - long unfilteredEndPosition, - boolean updateClusteringMetadata) throws IOException + private void updateMetadataAndIndexBlock(UnfilteredDescriptor unfilteredDescriptor, + long unfilteredStartPosition, + long unfilteredEndPosition, + boolean updateClusteringMetadata) throws IOException { if (updateClusteringMetadata) updateClusteringMetadata(unfilteredDescriptor); // write the first clustering into rowIndexEntries buffer (we will need it unless we never write the first entry) - if (unfilteredStartPosition == indexBlockStartOffset || (rowIndexEntryOffset == rowIndexEntries.position())) { + if (currentOffsetInPartition(unfilteredStartPosition) == indexBlockStartOffset || (rowIndexEntryOffset == rowIndexEntries.position())) + { writeClusteringToRowIndexEntries(unfilteredDescriptor); } else { rowIndexEntryLastClustering.copy(unfilteredDescriptor); + hasDistinctLastClustering = true; } + /** {@link BigFormatPartitionWriter#addUnfiltered(Unfiltered)} */ // if we hit the index block size that we have to index after, go ahead and index it. long indexBlockSize = currentOffsetInPartition(unfilteredEndPosition) - indexBlockStartOffset; @@ -597,7 +601,8 @@ private void addIndexBlock(long endOfRowPosition, long indexBlockSize) throws IO rowIndexEntriesOffsets.addInt(rowIndexEntryOffset); // first clustering is already in, write last entry - if (rowIndexEntryLastClustering.clusteringLength() == 0) { + if (!hasDistinctLastClustering) + { // first entry is the last entry, copy it byte[] entriesData = rowIndexEntries.getData(); long endOfFirstEntry = rowIndexEntries.position(); @@ -608,6 +613,8 @@ private void addIndexBlock(long endOfRowPosition, long indexBlockSize) throws IO writeClusteringToRowIndexEntries(rowIndexEntryLastClustering); rowIndexEntryLastClustering.resetClustering(); } + hasDistinctLastClustering = false; + rowIndexEntries.writeUnsignedVInt((long)indexBlockStartOffset); rowIndexEntries.writeVInt(indexBlockSize - IndexInfo.Serializer.WIDTH_BASE); diff --git a/test/unit/org/apache/cassandra/cql3/RangeTombstoneBoundaryTest.java b/test/unit/org/apache/cassandra/cql3/RangeTombstoneBoundaryTest.java new file mode 100644 index 000000000000..cc2f1837fa30 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/RangeTombstoneBoundaryTest.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import java.util.Arrays; +import java.util.Collection; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class RangeTombstoneBoundaryTest extends CQLTester +{ + @BeforeClass + public static void setup() + { + DatabaseDescriptor.setColumnIndexSizeInKiB(1); + } + + @Parameterized.Parameter + public boolean usCursorCompaction; + + @Parameterized.Parameters(name="cursorCompaction={0}") + public static Collection parameterizations() + { + return Arrays.asList(new Object[][] { { false }, { true }}); + } + + @Test + public void testOpenRangeTombstoneInLastBlock() + { + DatabaseDescriptor.setCursorCompactionEnabled(usCursorCompaction); + + createTable("CREATE TABLE %s (pk text, ck text, v1 text, PRIMARY KEY (pk, ck))" + + " WITH compression = {'enabled': 'false'} AND compaction = {'class': 'LeveledCompactionStrategy', 'enabled': false}"); + + // 30 rows with ~100-byte values → ~3 index blocks at 1 KiB each + String pad = "x".repeat(80); + for (int i = 0; i < 30; i++) + execute("INSERT INTO %s (pk, ck, v1) VALUES (?, ?, ?) USING TIMESTAMP 1", "p", String.format("r%03d", i), pad + i); + + getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + + // Open-ended delete: removes rows >= 27 (last block) + execute("DELETE FROM %s USING TIMESTAMP 2 WHERE pk = ? AND ck >= ?", "p", "r027"); + + getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + getCurrentColumnFamilyStore().forceMajorCompaction(); + + UntypedResultSet range = execute("SELECT * FROM %s WHERE pk = ? AND ck >= ?", "p", "r023"); + assertEquals("range select [r023, r029] should return 4 rows (23, 24, 25, 26)", 4, range.size()); + + range = execute("SELECT * FROM %s WHERE pk = ? AND ck >= ?", "p", "r027"); + assertEquals(0, range.size()); + } + + @Test + public void testOpenRangeTombstoneInMiddleBlock() + { + DatabaseDescriptor.setCursorCompactionEnabled(usCursorCompaction); + + createTable("CREATE TABLE %s (pk text, ck text, v1 text, PRIMARY KEY (pk, ck))" + + " WITH compression = {'enabled': 'false'} AND compaction = {'class': 'LeveledCompactionStrategy', 'enabled': false}"); + + // 30 rows with ~100-byte values → ~3 index blocks at 1 KiB each + String pad = "x".repeat(80); + for (int i = 0; i < 30; i++) + execute("INSERT INTO %s (pk, ck, v1) VALUES (?, ?, ?) USING TIMESTAMP 1", "p", String.format("r%03d", i), pad + i); + + getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + + // Open-ended delete: removes rows >= 15 (middle block) + execute("DELETE FROM %s USING TIMESTAMP 2 WHERE pk = ? AND ck >= ?", "p", "r015"); + + getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + getCurrentColumnFamilyStore().forceMajorCompaction(); + + UntypedResultSet range = execute("SELECT * FROM %s WHERE pk = ? AND ck >= ?", "p", "r012"); + assertEquals(3, range.size()); + + range = execute("SELECT * FROM %s WHERE pk = ? AND ck >= ?", "p", "r023"); + assertEquals(0, range.size()); + } + + @Test + public void testMidBlockRangeTombstoneInLastBlock() + { + DatabaseDescriptor.setCursorCompactionEnabled(usCursorCompaction); + + createTable("CREATE TABLE %s (pk text, ck text, v1 text, PRIMARY KEY (pk, ck))" + + " WITH compression = {'enabled': 'false'} AND compaction = {'class': 'LeveledCompactionStrategy', 'enabled': false}"); + + // 30 rows with ~100-byte values → ~3 index blocks at 1 KiB each + String pad = "x".repeat(80); + for (int i = 0; i < 30; i++) + execute("INSERT INTO %s (pk, ck, v1) VALUES (?, ?, ?) USING TIMESTAMP 1", "p", String.format("r%03d", i), pad + i); + + getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + + // Mid-block delete: removes rows 24-25 (last block) + execute("DELETE FROM %s USING TIMESTAMP 2 WHERE pk = ? AND ck >= ? AND ck <= ?", "p", "r024", "r025"); + + getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS); + getCurrentColumnFamilyStore().forceMajorCompaction(); + + UntypedResultSet range = execute("SELECT * FROM %s WHERE pk = ? AND ck >= ?", "p", "r023"); + assertEquals("range select [r023, r029] should return 5 rows (23, 26, 27, 28, 29)", 5, range.size()); + } +}