Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
* Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
* Clean up obsolete functions and simplify cql_version handling in cqlsh (CASSANDRA-18787)
Merged from 5.0:
* Avoid fetching entire partitions on unresolved static rows in RFP when no static column predicates exist (CASSANDRA-20243)
* Avoid indexing empty values for non-literals and types that do not allow them (CASSANDRA-20313)
* Fix incorrect results of min / max in-built functions on clustering columns in descending order (CASSANDRA-20295)
* Avoid possible consistency violations for SAI intersection queries over repaired index matches and multiple non-indexed column matches (CASSANDRA-20189)
Expand Down
11 changes: 10 additions & 1 deletion src/java/org/apache/cassandra/db/filter/RowFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ public RowFilter without(ColumnMetadata column, Operator op, ByteBuffer value)
return withNewExpressions(newExpressions);
}

public boolean hasNonKeyExpressions()
public boolean hasNonKeyExpression()
{
for (Expression e : expressions)
if (!e.column().isPrimaryKeyColumn())
Expand All @@ -402,6 +402,15 @@ public boolean hasNonKeyExpressions()
return false;
}

public boolean hasStaticExpression()
{
for (Expression e : expressions)
if (e.column().isStatic())
return true;

return false;
}

public RowFilter withoutExpressions()
{
return withNewExpressions(Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,11 @@ public KeyRangeIterator.Builder getIndexQueryResults(Collection<Expression> expr

private void maybeTriggerGuardrails(QueryViewBuilder.QueryView queryView)
{
int referencedIndexes = queryView.referencedIndexes.size();
int referencedIndexes = 0;

// We want to make sure that no individual column expression touches too many SSTable-attached indexes:
for (Pair<Expression, Collection<SSTableIndex>> expressionSSTables : queryView.view)
referencedIndexes = Math.max(referencedIndexes, expressionSSTables.right.size());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guardrail was designed to make sure individual column predicates don't involve index queries that hit too many SSTables rather than worrying about the total number of SSTables across all column predicates. The Harry tests can have 10+ column predicates, and that makes it much more likely we'll hit this and give in-JVM tests trouble.


if (Guardrails.saiSSTableIndexesPerQuery.failsOn(referencedIndexes, null))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private PartitionIterator resolveWithReplicaFilteringProtection(E replicas, Repa
private UnaryOperator<PartitionIterator> preCountFilterForReplicaFilteringProtection()
{
// Key columns are immutable and should never need to participate in replica filtering
if (!command.rowFilter().hasNonKeyExpressions())
if (!command.rowFilter().hasNonKeyExpression())
return results -> results;

return results -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,12 +427,12 @@ private void addToFetch(Row row)
if (toFetch == null)
toFetch = BTreeSet.builder(command.metadata().comparator);

// Note that for static, we shouldn't add the clustering to the clustering set (the
// ClusteringIndexNamesFilter we'll build from this later does not expect it), but the fact
// we created a builder in the first place will act as a marker that the static row must be
// fetched, even if no other rows are added for this partition.
if (row.isStatic())
unresolvedStatic = true;
// If there is an expression on a static column, the static row must be marked unresolved and the
// partition fetched, as completing the static row could produce matches across the entire partition.
// The static row itself will still be retrieved and completed if there is any unresolved non-static
// row, however, ensuring the latest static values are returned from the query.
unresolvedStatic = command.rowFilter().hasStaticExpression();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the crux of the patch. I might expand the comment to make it clear that we still retrieve the static row when any non-static row needs to be completed by RFP, and this is exactly what happens in the new tests in this patch.

else
toFetch.add(row.clustering());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.exceptions.OverloadedException;
Expand Down Expand Up @@ -71,6 +72,24 @@ public static void teardown()
cluster.close();
}

@Test
public void testMissingStaticRowWithNonStaticExpression()
{
cluster.schemaChange(withKeyspace("CREATE TABLE %s.single_predicate (pk0 int, ck0 int, ck1 int, s0 int static, s1 int static, v0 int, PRIMARY KEY (pk0, ck0, ck1)) " +
"WITH CLUSTERING ORDER BY (ck0 ASC, ck1 DESC) AND read_repair = 'NONE'"));

cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.single_predicate (pk0, ck0, ck1, s0, s1, v0) " +
"VALUES (0, 1, 2, 3, 4, 5) USING TIMESTAMP 1"));
cluster.get(2).executeInternal(withKeyspace("UPDATE %s.single_predicate USING TIMESTAMP 2 SET s0 = 6, s1 = 7, v0 = 8 " +
"WHERE pk0 = 0 AND ck0 = 9 AND ck1 = 10"));

// Node 2 will not produce a match for the static row. Make sure that replica filtering protection does not
// fetch the entire partition, which could let non-matching rows slip through combined with the fact that we
// don't post-filter at the coordinator with no regular column predicates in the query.
String select = withKeyspace("SELECT pk0, ck0, ck1, s0, s1 FROM %s.single_predicate WHERE ck1 = 2 ALLOW FILTERING");
assertRows(cluster.coordinator(1).execute(select, ConsistencyLevel.ALL), row(0, 1, 2, 6, 7));
}

@Test
public void testMissedUpdatesBelowCachingWarnThreshold()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,8 @@ protected List<CreateIndexDDL.Indexer> supportedIndexers()
builder.value(pk, key.bufferAt(pks.indexOf(pk)));


Symbol symbol;
List<Symbol> searchableColumns = state.nonPartitionColumns;
if (state.hasMultiNodeAllowFilteringWithLocalWritesIssue())
{
if (state.nonPkIndexedColumns.isEmpty())
throw new AssertionError("Ignoring AF_MULTI_NODE_AND_NODE_LOCAL_WRITES is defined, but no non-partition columns are indexed");
symbol = rs.pick(state.nonPkIndexedColumns);
}
else
{
symbol = rs.pick(searchableColumns);
}
Symbol symbol = rs.pick(searchableColumns);

TreeMap<ByteBuffer, List<BytesPartitionState.PrimaryKey>> universe = state.model.index(ref, symbol);
// we need to index 'null' so LT works, but we can not directly query it... so filter out when selecting values
Expand Down Expand Up @@ -248,15 +238,7 @@ protected List<CreateIndexDDL.Indexer> supportedIndexers()

public Property.Command<State, Void, ?> nonPartitionQuery(RandomSource rs, State state)
{
Symbol symbol;
if (state.hasMultiNodeAllowFilteringWithLocalWritesIssue())
{
symbol = rs.pickUnorderedSet(state.indexes.keySet());
}
else
{
symbol = rs.pick(state.searchableColumns);
}
Symbol symbol = rs.pick(state.searchableColumns);
TreeMap<ByteBuffer, List<BytesPartitionState.PrimaryKey>> universe = state.model.index(symbol);
// we need to index 'null' so LT works, but we can not directly query it... so filter out when selecting values
NavigableSet<ByteBuffer> allowed = Sets.filter(universe.navigableKeySet(), b -> !ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(b));
Expand Down Expand Up @@ -410,6 +392,7 @@ public class State extends CommonState
private final Gen<Mutation> mutationGen;
private final List<Symbol> nonPartitionColumns;
private final List<Symbol> searchableColumns;
private final List<Symbol> nonPkIndexedColumns;

public State(RandomSource rs, Cluster cluster)
{
Expand Down Expand Up @@ -457,6 +440,9 @@ public State(RandomSource rs, Cluster cluster)
.addAll(model.factory.staticColumns)
.addAll(model.factory.regularColumns)
.build();
nonPkIndexedColumns = nonPartitionColumns.stream()
.filter(indexes::containsKey)
.collect(Collectors.toList());

searchableColumns = metadata.partitionKeyColumns().size() > 1 ? model.factory.selectionOrder : nonPartitionColumns;
}
Expand Down Expand Up @@ -521,12 +507,7 @@ public boolean supportTokens()

public boolean allowNonPartitionQuery()
{
boolean result = !model.isEmpty() && !searchableColumns.isEmpty();
if (hasMultiNodeAllowFilteringWithLocalWritesIssue())
{
return hasNonPkIndexedColumns() && result;
}
return result;
return !model.isEmpty() && !searchableColumns.isEmpty();
}

public boolean allowNonPartitionMultiColumnQuery()
Expand All @@ -537,31 +518,19 @@ public boolean allowNonPartitionMultiColumnQuery()
private List<Symbol> multiColumnQueryColumns()
{
List<Symbol> allowedColumns = searchableColumns;
if (hasMultiNodeAllowFilteringWithLocalWritesIssue())
if (hasMultiNodeMultiColumnAllowFilteringWithLocalWritesIssue())
allowedColumns = nonPkIndexedColumns;
return allowedColumns;
}

private boolean hasMultiNodeAllowFilteringWithLocalWritesIssue()
private boolean hasMultiNodeMultiColumnAllowFilteringWithLocalWritesIssue()
{
return isMultiNode() && IGNORED_ISSUES.contains(KnownIssue.AF_MULTI_NODE_AND_NODE_LOCAL_WRITES);
return isMultiNode() && IGNORED_ISSUES.contains(KnownIssue.AF_MULTI_NODE_MULTI_COLUMN_AND_NODE_LOCAL_WRITES);
}

public List<Symbol> nonPkIndexedColumns;
public boolean allowPartitionQuery()
{
if (model.isEmpty() || nonPartitionColumns.isEmpty()) return false;
if (hasMultiNodeAllowFilteringWithLocalWritesIssue())
return hasNonPkIndexedColumns();
return true;
}

private boolean hasNonPkIndexedColumns()
{
nonPkIndexedColumns = nonPartitionColumns.stream()
.filter(indexes::containsKey)
.collect(Collectors.toList());
return !nonPkIndexedColumns.isEmpty();
return !(model.isEmpty() || nonPartitionColumns.isEmpty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.shared.AssertUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;

import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
Expand All @@ -51,6 +50,26 @@ public static void setUpCluster() throws IOException
CLUSTER = init(Cluster.build(2).withConfig(config -> config.set("hinted_handoff_enabled", false).with(GOSSIP).with(NETWORK)).start());
}

@Test
public void testMissingStaticRowWithNonStaticExpression()
{
CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.single_predicate (pk0 int, ck0 int, ck1 int, s0 int static, s1 int static, v0 int, PRIMARY KEY (pk0, ck0, ck1)) " +
"WITH CLUSTERING ORDER BY (ck0 ASC, ck1 DESC) AND read_repair = 'NONE'"));
CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON %s.single_predicate(ck1) USING 'sai'"));
SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);

CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s.single_predicate (pk0, ck0, ck1, s0, s1, v0) " +
"VALUES (0, 1, 2, 3, 4, 5) USING TIMESTAMP 1"));
CLUSTER.get(2).executeInternal(withKeyspace("UPDATE %s.single_predicate USING TIMESTAMP 2 SET s0 = 6, s1 = 7, v0 = 8 " +
"WHERE pk0 = 0 AND ck0 = 9 AND ck1 = 10"));

// Node 2 will not produce a match for the static row. Make sure that replica filtering protection does not
// fetch the entire partition, which could let non-matching rows slip through combined with the fact that we
// don't post-filter at the coordinator with no regular column predicates in the query.
String select = withKeyspace("SELECT pk0, ck0, ck1, s0, s1 FROM %s.single_predicate WHERE ck1 = 2 ALLOW FILTERING");
assertRows(CLUSTER.coordinator(1).execute(select, ConsistencyLevel.ALL), row(0, 1, 2, 6, 7));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to note here is that we still get the updated static column values, because node 2 doesn't produce a match for ck1 = 2, and that means we have an unresolved row, and any unresolved row implicitly reads the static row during its RFP completion. (See https://github.com/apache/cassandra/pull/3890/files#r1951922090(

}

@Test
public void shouldDegradeToUnionOnSingleStatic()
{
Expand Down Expand Up @@ -251,7 +270,7 @@ public void testTimestampCollision()

String select = withKeyspace("SELECT * FROM %s.timestamp_collision WHERE a = 2 AND b = 2");
Object[][] initialRows = CLUSTER.coordinator(1).execute(select, ConsistencyLevel.ALL);
assertRows(initialRows, AssertUtils.row(0, 2, 2));
assertRows(initialRows, row(0, 2, 2));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -62,11 +63,11 @@ public abstract class SingleNodeSAITestBase extends TestBaseImpl
private static final int VALIDATION_SKIP = 739;
private static final int QUERIES_PER_VALIDATION = 8;

private static final int FLUSH_SKIP = 1499;
private static final int COMPACTION_SKIP = 1503;
private static final int DEFAULT_REPAIR_SKIP = 6101;
private static final int FLUSH_SKIP = 2217;
private static final int COMPACTION_SKIP = 4435;
private static final int DEFAULT_REPAIR_SKIP = 8869;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just reverting to skips closer to the original skips before I changed this in CASSANDRA-20189. We don't need to repair 5 times in 30,000 ops.


private static final int OPERATIONS_PER_RUN = DEFAULT_REPAIR_SKIP * 5;
private static final int OPERATIONS_PER_RUN = 30_000;

private static final int NUM_PARTITIONS = 64;
private static final int NUM_VISITED_PARTITIONS = 16;
Expand Down Expand Up @@ -162,27 +163,30 @@ private void saiTest(EntropySource rng, SchemaSpec schema, Supplier<Boolean> cre
Generator<Integer> globalPkGen = Generators.int32(0, Math.min(NUM_PARTITIONS, schema.valueGenerators.pkPopulation()));
Generator<Integer> ckGen = Generators.int32(0, schema.valueGenerators.ckPopulation());

CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(100);
beforeEach();
cluster.forEach(i -> i.nodetool("disableautocompaction"));

cluster.schemaChange(schema.compile());
cluster.schemaChange(schema.compile().replace(schema.keyspace + '.' + schema.table, schema.keyspace + ".debug_table"));

AtomicInteger indexCount = new AtomicInteger();

Streams.concat(schema.clusteringKeys.stream(), schema.regularColumns.stream(), schema.staticColumns.stream())
.forEach(column -> {
if (createIndex.get())
{
logger.info("Adding index to column {}...", column.name);
cluster.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) USING 'sai' ",
column.name, schema.keyspace, schema.table, column.name));
column.name, schema.keyspace, schema.table, column.name));
indexCount.incrementAndGet();
}
else
{
logger.info("Leaving column {} unindexed...", column.name);
}
});

CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(indexCount.get());
waitForIndexesQueryable(schema);

HistoryBuilder history = new ReplayingHistoryBuilder(schema.valueGenerators,
Expand Down
6 changes: 3 additions & 3 deletions test/unit/org/apache/cassandra/cql3/KnownIssue.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public enum KnownIssue
"SAI converts ipv4 to ipv6 to simplify the index, this causes issues with range search as it starts to mix the values, which isn't always desirable or intuative"),
CUSTOM_INDEX_MAX_COLUMN_48("https://issues.apache.org/jira/browse/CASSANDRA-19897",
"Columns can be up to 50 chars, but CREATE CUSTOM INDEX only allows up to 48"),
AF_MULTI_NODE_AND_NODE_LOCAL_WRITES("https://issues.apache.org/jira/browse/CASSANDRA-20243",
"When writes are done at NODE_LOCAL and the select is ALL, AF should be able to return the correct data but it doesn't"),
SHORT_AND_VARINT_GET_INT_FUNCTIONS("https://issues.apache.org/jira/browse/CASSANDRA-19874",
"Function inference maybe unable to infer the correct function or chooses one for a smaller type"),
SAI_EMPTY_TYPE("ML: Meaningless emptiness and filtering",
"Some types allow empty bytes, but define them as meaningless. AF can be used to query them using <, <=, and =; but SAI can not")
"Some types allow empty bytes, but define them as meaningless. AF can be used to query them using <, <=, and =; but SAI can not"),
AF_MULTI_NODE_MULTI_COLUMN_AND_NODE_LOCAL_WRITES("https://issues.apache.org/jira/browse/CASSANDRA-19007",
"When doing multi node/multi column queries, AF can miss data when the nodes are not in-sync"),
;

KnownIssue(String url, String description)
Expand Down