Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
6.0-alpha2
* Differentiate between legitimate cases where the first entry is the same as the last entry and empty bounds in SSTableCursorWriter#addIndexBlock() (CASSANDRA-21255)
* Synchronously publish changes to local gossip state following metadata updates (CASSANDRA-21239)
* Change default for cassandra.set_sep_thread_name to false to reduce CPU usage (CASSANDRA-21089)
* Avoid permission checks for masked columns when the table doesn't have any (CASSANDRA-21299)
Expand Down
21 changes: 14 additions & 7 deletions src/java/org/apache/cassandra/io/sstable/SSTableCursorWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class SSTableCursorWriter implements AutoCloseable
private final DataOutputBuffer rowIndexEntries = new DataOutputBuffer();
private final IntArrayList rowIndexEntriesOffsets = new IntArrayList();
private final ClusteringDescriptor rowIndexEntryLastClustering;
private boolean hasDistinctLastClustering = false;
private int indexBlockStartOffset;
private int rowIndexEntryOffset;
private final int indexBlockThreshold;
Expand Down Expand Up @@ -161,6 +162,7 @@ public int writePartitionStart(byte[] partitionKey, int partitionKeyLength, Dele
rowIndexEntriesOffsets.clear();
rowIndexEntryOffset = 0;
openMarker.resetLive();
hasDistinctLastClustering = false;

partitionStart = dataWriter.position();
writePartitionHeader(partitionKey, partitionKeyLength, partitionDeletionTime);
Expand Down Expand Up @@ -554,21 +556,23 @@ public void writeRangeTombstone(UnfilteredDescriptor rangeTombstone, boolean upd
updateMetadataAndIndexBlock(rangeTombstone, unfilteredStartPosition, unfilteredEndPosition, updateClusteringMetadata);
}

private void updateMetadataAndIndexBlock(
UnfilteredDescriptor unfilteredDescriptor,
long unfilteredStartPosition,
long unfilteredEndPosition,
boolean updateClusteringMetadata) throws IOException
private void updateMetadataAndIndexBlock(UnfilteredDescriptor unfilteredDescriptor,
long unfilteredStartPosition,
long unfilteredEndPosition,
boolean updateClusteringMetadata) throws IOException
{
if (updateClusteringMetadata) updateClusteringMetadata(unfilteredDescriptor);
// write the first clustering into rowIndexEntries buffer (we will need it unless we never write the first entry)
if (unfilteredStartPosition == indexBlockStartOffset || (rowIndexEntryOffset == rowIndexEntries.position())) {
if (currentOffsetInPartition(unfilteredStartPosition) == indexBlockStartOffset || (rowIndexEntryOffset == rowIndexEntries.position()))
{
writeClusteringToRowIndexEntries(unfilteredDescriptor);
}
else
{
rowIndexEntryLastClustering.copy(unfilteredDescriptor);
hasDistinctLastClustering = true;
}

/** {@link BigFormatPartitionWriter#addUnfiltered(Unfiltered)} */
// if we hit the index block size that we have to index after, go ahead and index it.
long indexBlockSize = currentOffsetInPartition(unfilteredEndPosition) - indexBlockStartOffset;
Expand Down Expand Up @@ -597,7 +601,8 @@ private void addIndexBlock(long endOfRowPosition, long indexBlockSize) throws IO
rowIndexEntriesOffsets.addInt(rowIndexEntryOffset);

// first clustering is already in, write last entry
if (rowIndexEntryLastClustering.clusteringLength() == 0) {
if (!hasDistinctLastClustering)
{
// first entry is the last entry, copy it
byte[] entriesData = rowIndexEntries.getData();
long endOfFirstEntry = rowIndexEntries.position();
Expand All @@ -608,6 +613,8 @@ private void addIndexBlock(long endOfRowPosition, long indexBlockSize) throws IO
writeClusteringToRowIndexEntries(rowIndexEntryLastClustering);
rowIndexEntryLastClustering.resetClustering();
}
hasDistinctLastClustering = false;

rowIndexEntries.writeUnsignedVInt((long)indexBlockStartOffset);
rowIndexEntries.writeVInt(indexBlockSize - IndexInfo.Serializer.WIDTH_BASE);

Expand Down
132 changes: 132 additions & 0 deletions test/unit/org/apache/cassandra/cql3/RangeTombstoneBoundaryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.cql3;

import java.util.Arrays;
import java.util.Collection;

import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;

import static org.junit.Assert.assertEquals;

@RunWith(Parameterized.class)
public class RangeTombstoneBoundaryTest extends CQLTester
{
@BeforeClass
public static void setup()
{
DatabaseDescriptor.setColumnIndexSizeInKiB(1);
}

@Parameterized.Parameter
public boolean usCursorCompaction;

@Parameterized.Parameters(name="cursorCompaction={0}")
public static Collection<Object[]> parameterizations()
{
return Arrays.asList(new Object[][] { { false }, { true }});
}

@Test
public void testOpenRangeTombstoneInLastBlock()
{
DatabaseDescriptor.setCursorCompactionEnabled(usCursorCompaction);

createTable("CREATE TABLE %s (pk text, ck text, v1 text, PRIMARY KEY (pk, ck))" +
" WITH compression = {'enabled': 'false'} AND compaction = {'class': 'LeveledCompactionStrategy', 'enabled': false}");

// 30 rows with ~100-byte values → ~3 index blocks at 1 KiB each
String pad = "x".repeat(80);
for (int i = 0; i < 30; i++)
execute("INSERT INTO %s (pk, ck, v1) VALUES (?, ?, ?) USING TIMESTAMP 1", "p", String.format("r%03d", i), pad + i);

getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);

// Open-ended delete: removes rows >= 27 (last block)
execute("DELETE FROM %s USING TIMESTAMP 2 WHERE pk = ? AND ck >= ?", "p", "r027");

getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
getCurrentColumnFamilyStore().forceMajorCompaction();

UntypedResultSet range = execute("SELECT * FROM %s WHERE pk = ? AND ck >= ?", "p", "r023");
assertEquals("range select [r023, r029] should return 4 rows (23, 24, 25, 26)", 4, range.size());

range = execute("SELECT * FROM %s WHERE pk = ? AND ck >= ?", "p", "r027");
assertEquals(0, range.size());
}

@Test
public void testOpenRangeTombstoneInMiddleBlock()
{
DatabaseDescriptor.setCursorCompactionEnabled(usCursorCompaction);

createTable("CREATE TABLE %s (pk text, ck text, v1 text, PRIMARY KEY (pk, ck))" +
" WITH compression = {'enabled': 'false'} AND compaction = {'class': 'LeveledCompactionStrategy', 'enabled': false}");

// 30 rows with ~100-byte values → ~3 index blocks at 1 KiB each
String pad = "x".repeat(80);
for (int i = 0; i < 30; i++)
execute("INSERT INTO %s (pk, ck, v1) VALUES (?, ?, ?) USING TIMESTAMP 1", "p", String.format("r%03d", i), pad + i);

getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);

// Open-ended delete: removes rows >= 15 (middle block)
execute("DELETE FROM %s USING TIMESTAMP 2 WHERE pk = ? AND ck >= ?", "p", "r015");

getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
getCurrentColumnFamilyStore().forceMajorCompaction();

UntypedResultSet range = execute("SELECT * FROM %s WHERE pk = ? AND ck >= ?", "p", "r012");
assertEquals(3, range.size());

range = execute("SELECT * FROM %s WHERE pk = ? AND ck >= ?", "p", "r023");
assertEquals(0, range.size());
}

@Test
public void testMidBlockRangeTombstoneInLastBlock()
{
DatabaseDescriptor.setCursorCompactionEnabled(usCursorCompaction);

createTable("CREATE TABLE %s (pk text, ck text, v1 text, PRIMARY KEY (pk, ck))" +
" WITH compression = {'enabled': 'false'} AND compaction = {'class': 'LeveledCompactionStrategy', 'enabled': false}");

// 30 rows with ~100-byte values → ~3 index blocks at 1 KiB each
String pad = "x".repeat(80);
for (int i = 0; i < 30; i++)
execute("INSERT INTO %s (pk, ck, v1) VALUES (?, ?, ?) USING TIMESTAMP 1", "p", String.format("r%03d", i), pad + i);

getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);

// Mid-block delete: removes rows 24-25 (last block)
execute("DELETE FROM %s USING TIMESTAMP 2 WHERE pk = ? AND ck >= ? AND ck <= ?", "p", "r024", "r025");

getCurrentColumnFamilyStore().forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
getCurrentColumnFamilyStore().forceMajorCompaction();

UntypedResultSet range = execute("SELECT * FROM %s WHERE pk = ? AND ck >= ?", "p", "r023");
assertEquals("range select [r023, r029] should return 5 rows (23, 26, 27, 28, 29)", 5, range.size());
}
}