Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-19493 Optionally fail writes when SAI refuses to index a term value exceeding a configured maximum size #3256

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
* Clean up obsolete functions and simplify cql_version handling in cqlsh (CASSANDRA-18787)
Merged from 5.0:
* Optionally fail writes when SAI refuses to index a term value exceeding configured term max size (CASSANDRA-19493)
* Vector search can restrict on clustering keys when filtering isn't required (CASSANDRA-19544)
* Fix FBUtilities' parsing of gcp cos_containerd kernel versions (CASSANDRA-18594)
* Clean up KeyRangeIterator classes (CASSANDRA-19428)
Expand Down
12 changes: 12 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2171,6 +2171,18 @@ drop_compact_storage_enabled: false
# before emitting a failure (defaults to -1 to disable)
#sai_sstable_indexes_per_query_fail_threshold: -1

# Guardrail specifying warn/fail thresholds for the size of string terms written to an SAI index
# sai_string_term_size_warn_threshold: 1KiB
# sai_string_term_size_fail_threshold: 8KiB

# Guardrail specifying warn/fail thresholds for the size of frozen terms written to an SAI index
# sai_frozen_term_size_warn_threshold: 1KiB
# sai_frozen_term_size_fail_threshold: 8KiB

# Guardrail specifying warn/fail thresholds for the size of vector terms written to an SAI index
# sai_vector_term_size_warn_threshold: 16KiB
# sai_vector_term_size_fail_threshold: 32KiB

# The default secondary index implementation when CREATE INDEX does not specify one via USING.
# ex. "legacy_local_table" - (default) legacy secondary index, implemented as a hidden table
# ex. "sai" - "storage-attched" index, implemented via optimized SSTable/Memtable-attached indexes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,6 @@ public enum CassandraRelevantProperties
SAI_INTERSECTION_CLAUSE_LIMIT("cassandra.sai.intersection_clause_limit", "2"),
/** Latest version to be used for SAI index writing */
SAI_LATEST_VERSION("cassandra.sai.latest_version", "aa"),
SAI_MAX_FROZEN_TERM_SIZE("cassandra.sai.max_frozen_term_size", "5KiB"),
SAI_MAX_STRING_TERM_SIZE("cassandra.sai.max_string_term_size", "1KiB"),
SAI_MAX_VECTOR_TERM_SIZE("cassandra.sai.max_vector_term_size", "32KiB"),

/** Minimum number of reachable leaves for a given node to be eligible for an auxiliary posting list */
SAI_MINIMUM_POSTINGS_LEAVES("cassandra.sai.minimum_postings_leaves", "64"),
Expand Down
11 changes: 9 additions & 2 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -910,11 +910,18 @@ public static void setClientMode(boolean clientMode)
public volatile boolean zero_ttl_on_twcs_warned = true;
public volatile boolean zero_ttl_on_twcs_enabled = true;
public volatile boolean non_partition_restricted_index_query_enabled = true;
public volatile int sai_sstable_indexes_per_query_warn_threshold = 32;
public volatile int sai_sstable_indexes_per_query_fail_threshold = -1;
public volatile boolean intersect_filtering_query_warned = true;
public volatile boolean intersect_filtering_query_enabled = true;

public volatile int sai_sstable_indexes_per_query_warn_threshold = 32;
public volatile int sai_sstable_indexes_per_query_fail_threshold = -1;
public volatile DataStorageSpec.LongBytesBound sai_string_term_size_warn_threshold = new DataStorageSpec.LongBytesBound("1KiB");
public volatile DataStorageSpec.LongBytesBound sai_string_term_size_fail_threshold = new DataStorageSpec.LongBytesBound("8KiB");
public volatile DataStorageSpec.LongBytesBound sai_frozen_term_size_warn_threshold = new DataStorageSpec.LongBytesBound("1KiB");
public volatile DataStorageSpec.LongBytesBound sai_frozen_term_size_fail_threshold = new DataStorageSpec.LongBytesBound("8KiB");
public volatile DataStorageSpec.LongBytesBound sai_vector_term_size_warn_threshold = new DataStorageSpec.LongBytesBound("16KiB");
public volatile DataStorageSpec.LongBytesBound sai_vector_term_size_fail_threshold = new DataStorageSpec.LongBytesBound("32KiB");

public volatile DurationSpec.LongNanosecondsBound streaming_state_expires = new DurationSpec.LongNanosecondsBound("3d");
public volatile DataStorageSpec.LongBytesBound streaming_state_size = new DataStorageSpec.LongBytesBound("40MiB");

Expand Down
84 changes: 84 additions & 0 deletions src/java/org/apache/cassandra/config/GuardrailsOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,90 @@ public void setSaiSSTableIndexesPerQueryThreshold(int warn, int fail)
x -> config.sai_sstable_indexes_per_query_fail_threshold = x);
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getSaiStringTermSizeWarnThreshold()
{
return config.sai_string_term_size_warn_threshold;
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getSaiStringTermSizeFailThreshold()
{
return config.sai_string_term_size_fail_threshold;
}

@Override
public void setSaiStringTermSizeThreshold(@Nullable DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound fail)
{
validateSizeThreshold(warn, fail, false, "sai_string_term_size");
updatePropertyWithLogging("sai_string_term_size_warn_threshold",
warn,
() -> config.sai_string_term_size_warn_threshold,
x -> config.sai_string_term_size_warn_threshold = x);
updatePropertyWithLogging("sai_string_term_size_fail_threshold",
fail,
() -> config.sai_string_term_size_fail_threshold,
x -> config.sai_string_term_size_fail_threshold = x);
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getSaiFrozenTermSizeWarnThreshold()
{
return config.sai_frozen_term_size_warn_threshold;
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getSaiFrozenTermSizeFailThreshold()
{
return config.sai_frozen_term_size_fail_threshold;
}

@Override
public void setSaiFrozenTermSizeThreshold(@Nullable DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound fail)
{
validateSizeThreshold(warn, fail, false, "sai_frozen_term_size");
updatePropertyWithLogging("sai_frozen_term_size_warn_threshold",
warn,
() -> config.sai_frozen_term_size_warn_threshold,
x -> config.sai_frozen_term_size_warn_threshold = x);
updatePropertyWithLogging("sai_frozen_term_size_fail_threshold",
fail,
() -> config.sai_frozen_term_size_fail_threshold,
x -> config.sai_frozen_term_size_fail_threshold = x);
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getSaiVectorTermSizeWarnThreshold()
{
return config.sai_vector_term_size_warn_threshold;
}

@Override
@Nullable
public DataStorageSpec.LongBytesBound getSaiVectorTermSizeFailThreshold()
{
return config.sai_vector_term_size_fail_threshold;
}

@Override
public void setSaiVectorTermSizeThreshold(@Nullable DataStorageSpec.LongBytesBound warn, @Nullable DataStorageSpec.LongBytesBound fail)
{
validateSizeThreshold(warn, fail, false, "sai_vector_term_size");
updatePropertyWithLogging("sai_vector_term_size_warn_threshold",
warn,
() -> config.sai_vector_term_size_warn_threshold,
x -> config.sai_vector_term_size_warn_threshold = x);
updatePropertyWithLogging("sai_vector_term_size_fail_threshold",
fail,
() -> config.sai_vector_term_size_fail_threshold,
x -> config.sai_vector_term_size_fail_threshold = x);
}

@Override
public boolean getNonPartitionRestrictedQueryEnabled()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public List<? extends IMutation> getMutations(ClientState state,
ClientWarn.instance.warn(MessageFormatter.arrayFormat(LOGGED_BATCH_LOW_GCGS_WARNING, new Object[] { suffix, tablesWithZeroGcGs })
.getMessage());
}
return collector.toMutations();
return collector.toMutations(state);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,28 @@
package org.apache.cassandra.cql3.statements;

import java.nio.ByteBuffer;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;

import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.CounterMutation;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.virtual.VirtualMutation;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.service.ClientState;

import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;

Expand Down Expand Up @@ -122,17 +131,21 @@ private IMutationBuilder makeMutationBuilder(TableMetadata metadata, DecoratedKe

/**
* Returns a collection containing all the mutations.
*
* @param state state related to the client connection
*
* @return a collection containing all the mutations.
*/
public List<IMutation> toMutations()
@Override
public List<IMutation> toMutations(ClientState state)
{
List<IMutation> ms = new ArrayList<>();
for (Map<ByteBuffer, IMutationBuilder> ksMap : mutationBuilders.values())
{
for (IMutationBuilder builder : ksMap.values())
{
IMutation mutation = builder.build();
mutation.validateIndexedColumns();
mutation.validateIndexedColumns(state);
mutation.validateSize(MessagingService.current_version, CommitLogSegment.ENTRY_OVERHEAD_SIZE);
ms.add(mutation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public PartitionUpdate makeUpdates(FilteredPartition current, ClientState client
upd.applyUpdates(current, updateBuilder, clientState);

PartitionUpdate partitionUpdate = updateBuilder.build();
IndexRegistry.obtain(metadata).validate(partitionUpdate);
IndexRegistry.obtain(metadata).validate(partitionUpdate, clientState);

return partitionUpdate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ private List<? extends IMutation> getMutations(ClientState state,
HashMultiset<ByteBuffer> perPartitionKeyCounts = HashMultiset.create(keys);
SingleTableUpdatesCollector collector = new SingleTableUpdatesCollector(metadata, updatedColumns, perPartitionKeyCounts);
addUpdates(collector, keys, state, options, local, timestamp, nowInSeconds, queryStartNanoTime);
return collector.toMutations();
return collector.toMutations(state);
}

final void addUpdates(UpdatesCollector collector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.cassandra.db.virtual.VirtualMutation;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientState;

/**
* Utility class to collect updates.
Expand Down Expand Up @@ -92,7 +93,7 @@ public PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata,
* Returns a collection containing all the mutations.
* @return a collection containing all the mutations.
*/
public List<IMutation> toMutations()
public List<IMutation> toMutations(ClientState state)
{
List<IMutation> ms = new ArrayList<>(puBuilders.size());
for (PartitionUpdate.Builder builder : puBuilders.values())
Expand All @@ -106,7 +107,7 @@ else if (metadata.isCounter())
else
mutation = new Mutation(builder.build());

mutation.validateIndexedColumns();
mutation.validateIndexedColumns(state);
mutation.validateSize(MessagingService.current_version, CommitLogSegment.ENTRY_OVERHEAD_SIZE);
ms.add(mutation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ClientState;

public interface UpdatesCollector
{
PartitionUpdate.Builder getPartitionUpdateBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency);
List<IMutation> toMutations();
List<IMutation> toMutations(ClientState state);
}
27 changes: 14 additions & 13 deletions src/java/org/apache/cassandra/db/IMutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,25 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.service.ClientState;

public interface IMutation
{
public long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();
long MAX_MUTATION_SIZE = DatabaseDescriptor.getMaxMutationSize();

public void apply();
public String getKeyspaceName();
public Collection<TableId> getTableIds();
public DecoratedKey key();
public long getTimeout(TimeUnit unit);
public String toString(boolean shallow);
public Collection<PartitionUpdate> getPartitionUpdates();
public Supplier<Mutation> hintOnFailure();
void apply();
String getKeyspaceName();
Collection<TableId> getTableIds();
DecoratedKey key();
long getTimeout(TimeUnit unit);
String toString(boolean shallow);
Collection<PartitionUpdate> getPartitionUpdates();
Supplier<Mutation> hintOnFailure();

public default void validateIndexedColumns()
default void validateIndexedColumns(ClientState state)
{
for (PartitionUpdate pu : getPartitionUpdates())
pu.validateIndexedColumns();
pu.validateIndexedColumns(state);
}

/**
Expand All @@ -52,14 +53,14 @@ public default void validateIndexedColumns()
* @param overhead overhadd to add for mutation size to validate. Pass zero if not required but not a negative value.
* @throws MutationExceededMaxSizeException if {@link DatabaseDescriptor#getMaxMutationSize()} is exceeded
*/
public void validateSize(int version, int overhead);
void validateSize(int version, int overhead);

/**
* Computes the total data size of the specified mutations.
* @param mutations the mutations
* @return the total data size of the specified mutations
*/
public static long dataSize(Collection<? extends IMutation> mutations)
static long dataSize(Collection<? extends IMutation> mutations)
{
long size = 0;
for (IMutation mutation : mutations)
Expand Down