Skip to content

Commit

Permalink
Add Timestamp Bound Guardrail (bound user supplied timestamps within …
Browse files Browse the repository at this point in the history
…a certain range)

Patch by Jordan West; Reviewed by Andrés de la Peña and Brandon Williams for CASSANDRA-18352
  • Loading branch information
jrwest committed May 17, 2023
1 parent 9ba27f3 commit 2ff1ad4
Show file tree
Hide file tree
Showing 16 changed files with 529 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
5.0
* Add guardrail to bound timestamps (CASSANDRA-18352)
* Add keyspace_name column to system_views.clients (CASSANDRA-18525)
* Moved system properties and envs to CassandraRelevantProperties and CassandraRelevantEnv respectively (CASSANDRA-17797)
* Add sstablepartitions offline tool to find large partitions in sstables (CASSANDRA-8720)
Expand Down
7 changes: 7 additions & 0 deletions conf/cassandra.yaml
Expand Up @@ -1767,6 +1767,13 @@ drop_compact_storage_enabled: false
# Guardrail to allow/disallow user-provided timestamps. Defaults to true.
# user_timestamps_enabled: true
#
# Guardrail to bound user-provided timestamps within a given range. Default is infinite (denoted by null).
# Accepted values are durations of the form 12h, 24h, etc.
# maximum_timestamp_warn_threshold:
# maximum_timestamp_fail_threshold:
# minimum_timestamp_warn_threshold:
# minimum_timestamp_fail_threshold:
#
# Guardrail to allow/disallow GROUP BY functionality.
# group_by_enabled: true
#
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Expand Up @@ -904,6 +904,12 @@ public static void setClientMode(boolean clientMode)
public volatile DurationSpec.LongNanosecondsBound repair_state_expires = new DurationSpec.LongNanosecondsBound("3d");
public volatile int repair_state_size = 100_000;

/** The configuration of timestamp bounds */
public volatile DurationSpec.LongMicrosecondsBound maximum_timestamp_warn_threshold = null;
public volatile DurationSpec.LongMicrosecondsBound maximum_timestamp_fail_threshold = null;
public volatile DurationSpec.LongMicrosecondsBound minimum_timestamp_warn_threshold = null;
public volatile DurationSpec.LongMicrosecondsBound minimum_timestamp_fail_threshold = null;

/**
* The variants of paxos implementation and semantics supported by Cassandra.
*/
Expand Down
58 changes: 58 additions & 0 deletions src/java/org/apache/cassandra/config/DurationSpec.java
Expand Up @@ -272,6 +272,64 @@ public long toNanoseconds()
}
}

/**
* Represents a duration used for Cassandra configuration. The bound is [0, Long.MAX_VALUE) in microseconds.
* If the user sets a different unit - we still validate that converted to microseconds the quantity will not exceed
* that upper bound. (CASSANDRA-17571)
*/
public final static class LongMicrosecondsBound extends DurationSpec
{
/**
* Creates a {@code DurationSpec.LongMicrosecondsBound} of the specified amount.
* The bound is [0, Long.MAX_VALUE) in microseconds.
*
* @param value the duration
*/
public LongMicrosecondsBound(String value)
{
super(value, MICROSECONDS, Long.MAX_VALUE);
}

/**
* Creates a {@code DurationSpec.LongMicrosecondsBound} of the specified amount in the specified unit.
* The bound is [0, Long.MAX_VALUE) in milliseconds.
*
* @param quantity where quantity shouldn't be bigger than Long.MAX_VALUE - 1 in microseconds
* @param unit in which the provided quantity is
*/
public LongMicrosecondsBound(long quantity, TimeUnit unit)
{
super(quantity, unit, MICROSECONDS, Long.MAX_VALUE);
}

/**
* Creates a {@code DurationSpec.LongMicrosecondsBound} of the specified amount in microseconds.
* The bound is [0, Long.MAX_VALUE) in microseconds.
*
* @param microseconds where milliseconds shouldn't be bigger than Long.MAX_VALUE-1
*/
public LongMicrosecondsBound(long microseconds)
{
this(microseconds, MICROSECONDS);
}

/**
* @return this duration in number of milliseconds
*/
public long toMicroseconds()
{
return unit().toMicros(quantity());
}

/**
* @return this duration in number of seconds
*/
public long toSeconds()
{
return unit().toSeconds(quantity());
}
}

/**
* Represents a duration used for Cassandra configuration. The bound is [0, Long.MAX_VALUE) in milliseconds.
* If the user sets a different unit - we still validate that converted to milliseconds the quantity will not exceed
Expand Down
88 changes: 86 additions & 2 deletions src/java/org/apache/cassandra/config/GuardrailsOptions.java
Expand Up @@ -84,6 +84,8 @@ public GuardrailsOptions(Config config)
validateDataDiskUsageMaxDiskSize(config.data_disk_usage_max_disk_size);
validateMinRFThreshold(config.minimum_replication_factor_warn_threshold, config.minimum_replication_factor_fail_threshold);
validateMaxRFThreshold(config.maximum_replication_factor_warn_threshold, config.maximum_replication_factor_fail_threshold);
validateTimestampThreshold(config.maximum_timestamp_warn_threshold, config.maximum_timestamp_fail_threshold, "maximum_timestamp");
validateTimestampThreshold(config.minimum_timestamp_warn_threshold, config.minimum_timestamp_fail_threshold, "minimum_timestamp");
}

@Override
Expand Down Expand Up @@ -760,6 +762,64 @@ public void setZeroTTLOnTWCSEnabled(boolean value)
x -> config.zero_ttl_on_twcs_enabled = x);
}

@Override
public DurationSpec.LongMicrosecondsBound getMaximumTimestampWarnThreshold()
{
return config.maximum_timestamp_warn_threshold;
}

@Override
public DurationSpec.LongMicrosecondsBound getMaximumTimestampFailThreshold()
{
return config.maximum_timestamp_fail_threshold;
}

@Override
public void setMaximumTimestampThreshold(@Nullable DurationSpec.LongMicrosecondsBound warn,
@Nullable DurationSpec.LongMicrosecondsBound fail)
{
validateTimestampThreshold(warn, fail, "maximum_timestamp");

updatePropertyWithLogging("maximum_timestamp_warn_threshold",
warn,
() -> config.maximum_timestamp_warn_threshold,
x -> config.maximum_timestamp_warn_threshold = x);

updatePropertyWithLogging("maximum_timestamp_fail_threshold",
fail,
() -> config.maximum_timestamp_fail_threshold,
x -> config.maximum_timestamp_fail_threshold = x);
}

@Override
public DurationSpec.LongMicrosecondsBound getMinimumTimestampWarnThreshold()
{
return config.minimum_timestamp_warn_threshold;
}

@Override
public DurationSpec.LongMicrosecondsBound getMinimumTimestampFailThreshold()
{
return config.minimum_timestamp_fail_threshold;
}

@Override
public void setMinimumTimestampThreshold(@Nullable DurationSpec.LongMicrosecondsBound warn,
@Nullable DurationSpec.LongMicrosecondsBound fail)
{
validateTimestampThreshold(warn, fail, "minimum_timestamp");

updatePropertyWithLogging("minimum_timestamp_warn_threshold",
warn,
() -> config.minimum_timestamp_warn_threshold,
x -> config.minimum_timestamp_warn_threshold = x);

updatePropertyWithLogging("minimum_timestamp_fail_threshold",
fail,
() -> config.minimum_timestamp_fail_threshold,
x -> config.minimum_timestamp_fail_threshold = x);
}

private static <T> void updatePropertyWithLogging(String propertyName, T newValue, Supplier<T> getter, Consumer<T> setter)
{
T oldValue = getter.get();
Expand All @@ -771,6 +831,11 @@ private static <T> void updatePropertyWithLogging(String propertyName, T newValu
}

private static void validatePositiveNumeric(long value, long maxValue, String name)
{
validatePositiveNumeric(value, maxValue, name, false);
}

private static void validatePositiveNumeric(long value, long maxValue, String name, boolean allowZero)
{
if (value == -1)
return;
Expand All @@ -779,12 +844,12 @@ private static void validatePositiveNumeric(long value, long maxValue, String na
throw new IllegalArgumentException(format("Invalid value %d for %s: maximum allowed value is %d",
value, name, maxValue));

if (value == 0)
if (!allowZero && value == 0)
throw new IllegalArgumentException(format("Invalid value for %s: 0 is not allowed; " +
"if attempting to disable use -1", name));

// We allow -1 as a general "disabling" flag. But reject anything lower to avoid mistakes.
if (value <= 0)
if (value < 0)
throw new IllegalArgumentException(format("Invalid value %d for %s: negative values are not allowed, " +
"outside of -1 which disables the guardrail", value, name));
}
Expand All @@ -808,6 +873,13 @@ private static void validateMaxIntThreshold(int warn, int fail, String name)
validateWarnLowerThanFail(warn, fail, name);
}

private static void validateMaxLongThreshold(long warn, long fail, String name, boolean allowZero)
{
validatePositiveNumeric(warn, Long.MAX_VALUE, name + "_warn_threshold", allowZero);
validatePositiveNumeric(fail, Long.MAX_VALUE, name + "_fail_threshold", allowZero);
validateWarnLowerThanFail(warn, fail, name);
}

private static void validateMinIntThreshold(int warn, int fail, String name)
{
validatePositiveNumeric(warn, Integer.MAX_VALUE, name + "_warn_threshold");
Expand Down Expand Up @@ -835,6 +907,18 @@ private static void validateMaxRFThreshold(int warn, int fail)
fail, DatabaseDescriptor.getDefaultKeyspaceRF()));
}

public static void validateTimestampThreshold(DurationSpec.LongMicrosecondsBound warn,
DurationSpec.LongMicrosecondsBound fail,
String name)
{
// this function is used for both upper and lower thresholds because lower threshold is relative
// despite using MinThreshold we still want the warn threshold to be less than or equal to
// the fail threshold.
validateMaxLongThreshold(warn == null ? -1 : warn.toMicroseconds(),
fail == null ? -1 : fail.toMicroseconds(),
name, true);
}

private static void validateWarnLowerThanFail(long warn, long fail, String name)
{
if (warn == -1 || fail == -1)
Expand Down
Expand Up @@ -302,6 +302,16 @@ public void validateDiskUsage(QueryOptions options, ClientState state)
}
}

public void validateTimestamp(QueryState queryState, QueryOptions options)
{
if (!isTimestampSet())
return;

long ts = attrs.getTimestamp(options.getTimestamp(queryState), options);
Guardrails.maximumAllowableTimestamp.guard(ts, table(), false, queryState.getClientState());
Guardrails.minimumAllowableTimestamp.guard(ts, table(), false, queryState.getClientState());
}

public RegularAndStaticColumns updatedColumns()
{
return updatedColumns;
Expand Down Expand Up @@ -506,6 +516,7 @@ private ResultMessage executeWithoutCondition(QueryState queryState, QueryOption
cl.validateForWrite();

validateDiskUsage(options, queryState.getClientState());
validateTimestamp(queryState, options);

List<? extends IMutation> mutations =
getMutations(queryState.getClientState(),
Expand Down
80 changes: 80 additions & 0 deletions src/java/org/apache/cassandra/db/guardrails/Guardrails.java
Expand Up @@ -31,10 +31,12 @@
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.DurationSpec;
import org.apache.cassandra.config.GuardrailsOptions;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
import org.apache.cassandra.utils.MBeanWrapper;

Expand Down Expand Up @@ -421,6 +423,24 @@ public final class Guardrails implements GuardrailsMBean
format("The keyspace %s has a replication factor of %s, above the %s threshold of %s.",
what, value, isWarning ? "warning" : "failure", threshold));

public static final MaxThreshold maximumAllowableTimestamp =
new MaxThreshold("maximum_timestamp",
"Timestamps too far in the future can lead to data that can't be easily overwritten",
state -> maximumTimestampAsRelativeMicros(CONFIG_PROVIDER.getOrCreate(state).getMaximumTimestampWarnThreshold()),
state -> maximumTimestampAsRelativeMicros(CONFIG_PROVIDER.getOrCreate(state).getMaximumTimestampFailThreshold()),
(isWarning, what, value, threshold) ->
format("The modification to table %s has a timestamp %s after the maximum allowable %s threshold %s",
what, value, isWarning ? "warning" : "failure", threshold));

public static final MinThreshold minimumAllowableTimestamp =
new MinThreshold("minimum_timestamp",
"Timestamps too far in the past can cause writes can be unexpectedly lost",
state -> minimumTimestampAsRelativeMicros(CONFIG_PROVIDER.getOrCreate(state).getMinimumTimestampWarnThreshold()),
state -> minimumTimestampAsRelativeMicros(CONFIG_PROVIDER.getOrCreate(state).getMinimumTimestampFailThreshold()),
(isWarning, what, value, threshold) ->
format("The modification to table %s has a timestamp %s before the minimum allowable %s threshold %s",
what, value, isWarning ? "warning" : "failure", threshold));

private Guardrails()
{
MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
Expand Down Expand Up @@ -1052,6 +1072,42 @@ public void setZeroTTLOnTWCSWarned(boolean value)
DEFAULT_CONFIG.setZeroTTLOnTWCSWarned(value);
}

@Override
public String getMaximumTimestampWarnThreshold()
{
return durationToString(DEFAULT_CONFIG.getMaximumTimestampWarnThreshold());
}

@Override
public String getMaximumTimestampFailThreshold()
{
return durationToString(DEFAULT_CONFIG.getMaximumTimestampFailThreshold());
}

@Override
public void setMaximumTimestampThreshold(String warnSeconds, String failSeconds)
{
DEFAULT_CONFIG.setMaximumTimestampThreshold(durationFromString(warnSeconds), durationFromString(failSeconds));
}

@Override
public String getMinimumTimestampWarnThreshold()
{
return durationToString(DEFAULT_CONFIG.getMinimumTimestampWarnThreshold());
}

@Override
public String getMinimumTimestampFailThreshold()
{
return durationToString(DEFAULT_CONFIG.getMinimumTimestampFailThreshold());
}

@Override
public void setMinimumTimestampThreshold(String warnSeconds, String failSeconds)
{
DEFAULT_CONFIG.setMinimumTimestampThreshold(durationFromString(warnSeconds), durationFromString(failSeconds));
}

private static String toCSV(Set<String> values)
{
return values == null || values.isEmpty() ? "" : String.join(",", values);
Expand Down Expand Up @@ -1100,4 +1156,28 @@ private static DataStorageSpec.LongBytesBound sizeFromString(@Nullable String si
{
return StringUtils.isEmpty(size) ? null : new DataStorageSpec.LongBytesBound(size);
}

private static String durationToString(@Nullable DurationSpec duration)
{
return duration == null ? null : duration.toString();
}

private static DurationSpec.LongMicrosecondsBound durationFromString(@Nullable String duration)
{
return StringUtils.isEmpty(duration) ? null : new DurationSpec.LongMicrosecondsBound(duration);
}

private static long maximumTimestampAsRelativeMicros(@Nullable DurationSpec.LongMicrosecondsBound duration)
{
return duration == null
? Long.MAX_VALUE
: (ClientState.getLastTimestampMicros() + duration.toMicroseconds());
}

private static long minimumTimestampAsRelativeMicros(@Nullable DurationSpec.LongMicrosecondsBound duration)
{
return duration == null
? Long.MIN_VALUE
: (ClientState.getLastTimestampMicros() - duration.toMicroseconds());
}
}

0 comments on commit 2ff1ad4

Please sign in to comment.