Skip to content

Commit

Permalink
Merge branch 'cassandra-5.0' into trunk
Browse files Browse the repository at this point in the history
* cassandra-5.0:
  Optionally fail writes when SAI refuses to index a term value exceeding a configured maximum size
  • Loading branch information
maedhroz committed Apr 17, 2024
2 parents f345370 + 9bfaee9 commit c33c8eb
Show file tree
Hide file tree
Showing 38 changed files with 1,270 additions and 380 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
* Clean up obsolete functions and simplify cql_version handling in cqlsh (CASSANDRA-18787)
Merged from 5.0:
* Optionally fail writes when SAI refuses to index a term value exceeding configured term max size (CASSANDRA-19493)
* Vector search can restrict on clustering keys when filtering isn't required (CASSANDRA-19544)
* Fix FBUtilities' parsing of gcp cos_containerd kernel versions (CASSANDRA-18594)
* Clean up KeyRangeIterator classes (CASSANDRA-19428)
Expand Down
1 change: 1 addition & 0 deletions NEWS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ New features
- Vector dimensions
- Whether it is possible to execute secondary index queries without restricting on partition key
- Warning and failure thresholds for maximum referenced SAI indexes on a replica when executing a SELECT query
- Warning and failure thresholds for the size of terms written to an SAI index
- It is possible to list ephemeral snapshots by nodetool listsnaphots command when flag "-e" is specified.
- Added a new flag to `nodetool profileload` and JMX endpoint to set up recurring profile load generation on specified
intervals (see CASSANDRA-17821)
Expand Down
12 changes: 12 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2171,6 +2171,18 @@ drop_compact_storage_enabled: false
# before emitting a failure (defaults to -1 to disable)
#sai_sstable_indexes_per_query_fail_threshold: -1

# Guardrail specifying warn/fail thresholds for the size of string terms written to an SAI index
# sai_string_term_size_warn_threshold: 1KiB
# sai_string_term_size_fail_threshold: 8KiB

# Guardrail specifying warn/fail thresholds for the size of frozen terms written to an SAI index
# sai_frozen_term_size_warn_threshold: 1KiB
# sai_frozen_term_size_fail_threshold: 8KiB

# Guardrail specifying warn/fail thresholds for the size of vector terms written to an SAI index
# sai_vector_term_size_warn_threshold: 16KiB
# sai_vector_term_size_fail_threshold: 32KiB

# The default secondary index implementation when CREATE INDEX does not specify one via USING.
# ex. "legacy_local_table" - (default) legacy secondary index, implemented as a hidden table
# ex. "sai" - "storage-attched" index, implemented via optimized SSTable/Memtable-attached indexes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,6 @@ public enum CassandraRelevantProperties
SAI_INTERSECTION_CLAUSE_LIMIT("cassandra.sai.intersection_clause_limit", "2"),
/** Latest version to be used for SAI index writing */
SAI_LATEST_VERSION("cassandra.sai.latest_version", "aa"),
SAI_MAX_FROZEN_TERM_SIZE("cassandra.sai.max_frozen_term_size", "5KiB"),
SAI_MAX_STRING_TERM_SIZE("cassandra.sai.max_string_term_size", "1KiB"),
SAI_MAX_VECTOR_TERM_SIZE("cassandra.sai.max_vector_term_size", "32KiB"),

/** Minimum number of reachable leaves for a given node to be eligible for an auxiliary posting list */
SAI_MINIMUM_POSTINGS_LEAVES("cassandra.sai.minimum_postings_leaves", "64"),
Expand Down
11 changes: 9 additions & 2 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -910,11 +910,18 @@ public static void setClientMode(boolean clientMode)
public volatile boolean zero_ttl_on_twcs_warned = true;
public volatile boolean zero_ttl_on_twcs_enabled = true;
public volatile boolean non_partition_restricted_index_query_enabled = true;
public volatile int sai_sstable_indexes_per_query_warn_threshold = 32;
public volatile int sai_sstable_indexes_per_query_fail_threshold = -1;
public volatile boolean intersect_filtering_query_warned = true;
public volatile boolean intersect_filtering_query_enabled = true;

public volatile int sai_sstable_indexes_per_query_warn_threshold = 32;
public volatile int sai_sstable_indexes_per_query_fail_threshold = -1;
public volatile DataStorageSpec.LongBytesBound sai_string_term_size_warn_threshold = new DataStorageSpec.LongBytesBound("1KiB");
public volatile DataStorageSpec.LongBytesBound sai_string_term_size_fail_threshold = new DataStorageSpec.LongBytesBound("8KiB");
public volatile DataStorageSpec.LongBytesBound sai_frozen_term_size_warn_threshold = new DataStorageSpec.LongBytesBound("1KiB");
public volatile DataStorageSpec.LongBytesBound sai_frozen_term_size_fail_threshold = new DataStorageSpec.LongBytesBound("8KiB");
public volatile DataStorageSpec.LongBytesBound sai_vector_term_size_warn_threshold = new DataStorageSpec.LongBytesBound("16KiB");
public volatile DataStorageSpec.LongBytesBound sai_vector_term_size_fail_threshold = new DataStorageSpec.LongBytesBound("32KiB");

public volatile DurationSpec.LongNanosecondsBound streaming_state_expires = new DurationSpec.LongNanosecondsBound("3d");
public volatile DataStorageSpec.LongBytesBound streaming_state_size = new DataStorageSpec.LongBytesBound("40MiB");

Expand Down
84 changes: 84 additions & 0 deletions src/java/org/apache/cassandra/config/GuardrailsOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,90 @@ public void setSaiSSTableIndexesPerQueryThreshold(int warn, int fail)
x -> config.sai_sstable_indexes_per_query_fail_threshold = x);
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getSaiStringTermSizeWarnThreshold()
{
return config.sai_string_term_size_warn_threshold;
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getSaiStringTermSizeFailThreshold()
{
return config.sai_string_term_size_fail_threshold;
}

@Override
public void setSaiStringTermSizeThreshold(@Nullable DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound fail)
{
validateSizeThreshold(warn, fail, false, "sai_string_term_size");
updatePropertyWithLogging("sai_string_term_size_warn_threshold",
warn,
() -> config.sai_string_term_size_warn_threshold,
x -> config.sai_string_term_size_warn_threshold = x);
updatePropertyWithLogging("sai_string_term_size_fail_threshold",
fail,
() -> config.sai_string_term_size_fail_threshold,
x -> config.sai_string_term_size_fail_threshold = x);
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getSaiFrozenTermSizeWarnThreshold()
{
return config.sai_frozen_term_size_warn_threshold;
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getSaiFrozenTermSizeFailThreshold()
{
return config.sai_frozen_term_size_fail_threshold;
}

@Override
public void setSaiFrozenTermSizeThreshold(@Nullable DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound fail)
{
validateSizeThreshold(warn, fail, false, "sai_frozen_term_size");
updatePropertyWithLogging("sai_frozen_term_size_warn_threshold",
warn,
() -> config.sai_frozen_term_size_warn_threshold,
x -> config.sai_frozen_term_size_warn_threshold = x);
updatePropertyWithLogging("sai_frozen_term_size_fail_threshold",
fail,
() -> config.sai_frozen_term_size_fail_threshold,
x -> config.sai_frozen_term_size_fail_threshold = x);
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getSaiVectorTermSizeWarnThreshold()
{
return config.sai_vector_term_size_warn_threshold;
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getSaiVectorTermSizeFailThreshold()
{
return config.sai_vector_term_size_fail_threshold;
}

@Override
public void setSaiVectorTermSizeThreshold(@Nullable DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound fail)
{
validateSizeThreshold(warn, fail, false, "sai_vector_term_size");
updatePropertyWithLogging("sai_vector_term_size_warn_threshold",
warn,
() -> config.sai_vector_term_size_warn_threshold,
x -> config.sai_vector_term_size_warn_threshold = x);
updatePropertyWithLogging("sai_vector_term_size_fail_threshold",
fail,
() -> config.sai_vector_term_size_fail_threshold,
x -> config.sai_vector_term_size_fail_threshold = x);
}

@Override
public boolean getNonPartitionRestrictedQueryEnabled()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public List<? extends IMutation> getMutations(ClientState state,
ClientWarn.instance.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs })
.getMessage());
}
return collector.toMutations();
return collector.toMutations(state);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,28 @@
package org.apache.cassandra.cql3.statements;

import java.nio.ByteBuffer;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;

import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.virtual.VirtualMutation;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.service.ClientState;

import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;

Expand Down Expand Up @@ -122,17 +131,21 @@ private IMutationBuilder makeMutationBuilder(TableMetadata metadata, DecoratedKe

/**
* Returns a collection containing all the mutations.
*
* @param state state related to the client connection
*
* @return a collection containing all the mutations.
*/
public List<IMutation> toMutations()
@Override
public List<IMutation> toMutations(ClientState state)
{
List<IMutation> ms = new ArrayList<>();
for (Map<ByteBuffer, IMutationBuilder> ksMap : mutationBuilders.values())
{
for (IMutationBuilder builder : ksMap.values())
{
IMutation mutation = builder.build();
mutation.validateIndexedColumns();
mutation.validateIndexedColumns(state);
mutation.validateSize(MessagingService.current_version, CommitLogSegment.ENTRY_OVERHEAD_SIZE);
ms.add(mutation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public PartitionUpdate makeUpdates(FilteredPartition current, ClientState client
upd.applyUpdates(current, updateBuilder, clientState);

PartitionUpdate partitionUpdate = updateBuilder.build();
IndexRegistry.obtain(metadata).validate(partitionUpdate);
IndexRegistry.obtain(metadata).validate(partitionUpdate, clientState);

return partitionUpdate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ private List<? extends IMutation> getMutations(ClientState state,
HashMultiset<ByteBuffer> perPartitionKeyCounts = HashMultiset.create(keys);
SingleTableUpdatesCollector collector = new SingleTableUpdatesCollector(metadata, updatedColumns, perPartitionKeyCounts);
addUpdates(collector, keys, state, options, local, timestamp, nowInSeconds, queryStartNanoTime);
return collector.toMutations();
return collector.toMutations(state);
}

final void addUpdates(UpdatesCollector collector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.cassandra.db.virtual.VirtualMutation;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientState;

/**
* Utility class to collect updates.
Expand Down Expand Up @@ -92,7 +93,8 @@ public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata,
* Returns a collection containing all the mutations.
* @return a collection containing all the mutations.
*/
public List<IMutation> toMutations()
@Override
public List<IMutation> toMutations(ClientState state)
{
List<IMutation> ms = new ArrayList<>(puBuilders.size());
for (PartitionUpdate.Builder builder : puBuilders.values())
Expand All @@ -106,7 +108,7 @@ else if (metadata.isCounter())
else
mutation = new Mutation(builder.build());

mutation.validateIndexedColumns();
mutation.validateIndexedColumns(state);
mutation.validateSize(MessagingService.current_version, CommitLogSegment.ENTRY_OVERHEAD_SIZE);
ms.add(mutation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientState;

public interface UpdatesCollector
{
PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency);
List<IMutation> toMutations();
List<IMutation> toMutations(ClientState state);
}
27 changes: 14 additions & 13 deletions src/java/org/apache/cassandra/db/IMutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,25 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ClientState;

public interface IMutation
{
public long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();

public void apply();
public String getKeyspaceName();
public Collection<TableId> getTableIds();
public DecoratedKey key();
public long getTimeout(TimeUnit unit);
public String toString(boolean shallow);
public Collection<PartitionUpdate> getPartitionUpdates();
public Supplier<Mutation> hintOnFailure();
void apply();
String getKeyspaceName();
Collection<TableId> getTableIds();
DecoratedKey key();
long getTimeout(TimeUnit unit);
String toString(boolean shallow);
Collection<PartitionUpdate> getPartitionUpdates();
Supplier<Mutation> hintOnFailure();

public default void validateIndexedColumns()
default void validateIndexedColumns(ClientState state)
{
for (PartitionUpdate pu : getPartitionUpdates())
pu.validateIndexedColumns();
pu.validateIndexedColumns(state);
}

/**
Expand All @@ -52,14 +53,14 @@ public default void validateIndexedColumns()
* @param overhead overhadd to add for mutation size to validate. Pass zero if not required but not a negative value.
* @throws MutationExceededMaxSizeException if {@link DatabaseDescriptor#getMaxMutationSize()} is exceeded
*/
public void validateSize(int version, int overhead);
void validateSize(int version, int overhead);

/**
* Computes the total data size of the specified mutations.
* @param mutations the mutations
* @return the total data size of the specified mutations
*/
public static long dataSize(Collection<? extends IMutation> mutations)
static long dataSize(Collection<? extends IMutation> mutations)
{
long size = 0;
for (IMutation mutation : mutations)
Expand Down

0 comments on commit c33c8eb

Please sign in to comment.