Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -1042,8 +1042,15 @@ private boolean canRemoveRow(Row row, Columns requestedColumns, long sstableTime
// We can remove a row if it has data that is more recent that the next sstable to consider for the data that the query
// cares about. And the data we care about is 1) the row timestamp (since every query cares if the row exists or not)
// and 2) the requested columns.
if (row.primaryKeyLivenessInfo().isEmpty() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
return false;
// Note that COMPACT STORAGE tables will never have primary key liveness information, and if it is missing, we
// proceed to evaluate cell-level timestamps.
if (metadata().isCQLTable())
{
if (row.primaryKeyLivenessInfo().isEmpty() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
{
return false;
}
}

for (ColumnDefinition column : requestedColumns)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,25 @@
package org.apache.cassandra.distributed.upgrade;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.junit.Test;

import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.distributed.api.IUpgradeableInstance;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.impl.TracingUtil;
import org.apache.cassandra.distributed.shared.Versions;
import static org.apache.cassandra.distributed.shared.AssertUtils.*;
import org.apache.cassandra.utils.UUIDGen;

import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
import static org.junit.Assert.assertEquals;

public class CompactStorage2to3UpgradeTest extends UpgradeTestBase
{
Expand Down Expand Up @@ -314,6 +322,59 @@ public void testDropCompactWithClusteringAndValueColumnWithDeletesAndWrites() th
}).run();
}

@Test
public void testSSTableTimestampSkipping() throws Throwable
{
new TestCase()
.nodes(1)
.upgrade(Versions.Major.v22, Versions.Major.v30)
.setup(cluster -> {
cluster.schemaChange("CREATE KEYSPACE ks WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
// Remove compact storage, and both pre and post-upgrade reads will only hit one SSTable.
cluster.schemaChange("CREATE TABLE ks.tbl (pk int, ck int, v text, PRIMARY KEY (pk, ck)) WITH COMPACT STORAGE");

cluster.get(1).executeInternal("INSERT INTO ks.tbl (pk, ck, v) VALUES (1, 1, '1') USING TIMESTAMP 1000000");
cluster.get(1).executeInternal("INSERT INTO ks.tbl (pk, ck, v) VALUES (1, 50, '2') USING TIMESTAMP 1000001");
cluster.get(1).executeInternal("INSERT INTO ks.tbl (pk, ck, v) VALUES (1, 100, '3') USING TIMESTAMP 1000002");
cluster.get(1).flush("ks");

cluster.get(1).executeInternal("INSERT INTO ks.tbl (pk, ck, v) VALUES (1, 2, '4') USING TIMESTAMP 2000000");
cluster.get(1).executeInternal("INSERT INTO ks.tbl (pk, ck, v) VALUES (1, 51, '5') USING TIMESTAMP 2000001");
cluster.get(1).executeInternal("INSERT INTO ks.tbl (pk, ck, v) VALUES (1, 101, '6') USING TIMESTAMP 2000002");
cluster.get(1).flush("ks");

Object[][] expected = { row(1, 51, "5") };
assertSSTablesRead(cluster, "SELECT * FROM ks.tbl WHERE pk = 1 AND ck = 51", expected, 1L);
})
.runAfterNodeUpgrade(((cluster, node) -> {
IUpgradeableInstance instance = cluster.get(1);
instance.nodetool("upgradesstables", "ks", "tbl");

Object[][] expected = { row(1, 51, "5") };
assertSSTablesRead(cluster, "SELECT * FROM ks.tbl WHERE pk = 1 AND ck = 51", expected, 1L);
})).run();
}

private void assertSSTablesRead(UpgradeableCluster cluster, String query, Object[][] expected, long ssTablesRead) throws Exception
{
String originalTraceTimeout = TracingUtil.setWaitForTracingEventTimeoutSecs("1");

try
{
UUID sessionId = UUIDGen.getTimeUUID();
Object[][] rows = cluster.coordinator(1)
.asyncExecuteWithTracing(sessionId, query, ConsistencyLevel.ONE).get();
assertRows(rows, expected);

List<TracingUtil.TraceEntry> traces = TracingUtil.getTrace(cluster, sessionId, ConsistencyLevel.ONE);
long sstablesRead = traces.stream().filter(traceEntry -> traceEntry.activity.contains("Merging data from sstable")).count();
assertEquals(ssTablesRead, sstablesRead);
}
finally
{
TracingUtil.setWaitForTracingEventTimeoutSecs(originalTraceTimeout);
}
}

private void runQueries(ICoordinator coordinator, ResultsRecorder helper, String[] queries)
{
Expand Down