Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3af2a20
CASSANDRA-16850 - Add client warnings and abort to tombstone and coor…
dcapwell Aug 12, 2021
c2bff7c
higher
dcapwell Aug 12, 2021
7c44570
moved read warning/abort thresholds to be a instance field to avoid c…
dcapwell Aug 12, 2021
f55396d
added flag to disable tracking warnings
dcapwell Aug 13, 2021
bbc1b23
tests track global abort metric as well
dcapwell Aug 13, 2021
c86c8c6
moved track warnings to message flags
dcapwell Aug 13, 2021
ef75194
feedback to rename block to abort
dcapwell Aug 13, 2021
7b95926
fixed issue where disabled tracking caused tombstones to be handled a…
dcapwell Aug 16, 2021
bb9ca95
approval
dcapwell Aug 16, 2021
253ddf8
remove DatabaseDescriptor.getClientTrackWarningsEnabled in local exec…
dcapwell Aug 16, 2021
f1c1697
removed extra cql query in the warning as the message includes it
dcapwell Aug 16, 2021
4745ea0
fixed compile aftre rebase
dcapwell Aug 23, 2021
eeff7fc
reverted trackWarnings() relying on DatabaseDescriptor.getClientTrack…
dcapwell Aug 23, 2021
23c12bd
abort now allows to track tombstone/readsize aborts as well
dcapwell Aug 23, 2021
f964cc9
refactor to only check if client warnings are enabled or not once
dcapwell Aug 24, 2021
d108232
reuse trackWarnings as thats defined on init
dcapwell Aug 24, 2021
e750ec2
moved track warnings variables into QueryOptions as that is not cached
dcapwell Aug 24, 2021
d4ba247
limit visiblity
dcapwell Aug 24, 2021
732799f
ignore tracking if daemon not init
dcapwell Aug 24, 2021
80f42cc
removed comment which is out of date
dcapwell Aug 24, 2021
6f860fa
feedback
dcapwell Aug 25, 2021
32da745
docs
dcapwell Aug 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 75 additions & 69 deletions .circleci/config.yml

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions NEWS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ using the provided 'sstableupgrade' tool.

New features
------------
- Warn/abort thresholds added to read queries notifying clients when these thresholds trigger (by
emitting a client warning or aborting the query). This feature is disabled by default, scheduled
to be enabled in 4.2; it is controlled with the configuration client_track_warnings_enabled,
setting to true will enable this feature. Each check has its own warn/abort thresholds, currently
tombstones (tombstone_warn_threshold, and tombstone_failure_threshold) and coordinator result set
materialized size (client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb)
are supported; more checks will be added over time.

Upgrading
---------
Expand Down
14 changes: 14 additions & 0 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1453,3 +1453,17 @@ enable_drop_compact_storage: false
# subnets:
# - 127.0.0.1
# - 127.0.0.0/31

# Enables tracking warnings/aborts across all replicas for reporting back to client.
# Scheduled to enable in 4.2
# See: CASSANDRA-16850
# See: tombstone_warn_threshold, tombstone_failure_threshold, client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb
#client_track_warnings_enabled: false

# When client_track_warnings_enabled: true, this tracks the materialized size of a query on the
# coordinator. If client_large_read_warn_threshold_kb is greater than 0, this will emit a warning
# to clients with details on what query triggered this as well as the size of the result set; if
# client_large_read_abort_threshold_kb is greater than 0, this will abort the query after it
# has exceeded this threshold, returning a read error to the user.
#client_large_read_warn_threshold_kb: 0
#client_large_read_abort_threshold_kb: 0
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,16 @@ public class Config
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;

public volatile long client_large_read_warn_threshold_kb = 0;
public volatile long client_large_read_abort_threshold_kb = 0;

public final ReplicaFilteringProtectionOptions replica_filtering_protection = new ReplicaFilteringProtectionOptions();

public volatile Long index_summary_capacity_in_mb;
public volatile int index_summary_resize_interval_in_minutes = 60;

public volatile boolean client_track_warnings_enabled = false; // should set to true in 4.2

public int gc_log_threshold_in_ms = 200;
public int gc_warn_threshold_in_ms = 1000;

Expand Down
33 changes: 33 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,9 @@ else if (config.concurrent_validations > config.concurrent_compactors && !allowU
throw new ConfigurationException("To set concurrent_validations > concurrent_compactors, " +
"set the system property cassandra.allow_unlimited_concurrent_validations=true");
}

conf.client_large_read_warn_threshold_kb = Math.max(conf.client_large_read_warn_threshold_kb, 0);
conf.client_large_read_abort_threshold_kb = Math.max(conf.client_large_read_abort_threshold_kb, 0);
}

@VisibleForTesting
Expand Down Expand Up @@ -3443,4 +3446,34 @@ public static SubnetGroups getInternodeErrorReportingExclusions()
{
return conf.internode_error_reporting_exclusions;
}

public static long getClientLargeReadWarnThresholdKB()
{
return conf.client_large_read_warn_threshold_kb;
}

public static void setClientLargeReadWarnThresholdKB(long threshold)
{
conf.client_large_read_warn_threshold_kb = Math.max(threshold, 0);
}

public static long getClientLargeReadAbortThresholdKB()
{
return conf.client_large_read_abort_threshold_kb;
}

public static void setClientLargeReadAbortThresholdKB(long threshold)
{
conf.client_large_read_abort_threshold_kb = Math.max(threshold, 0);
}

public static boolean getClientTrackWarningsEnabled()
{
return conf.client_track_warnings_enabled;
}

public static void setClientTrackWarningsEnabled(boolean value)
{
conf.client_track_warnings_enabled = value;
}
}
106 changes: 106 additions & 0 deletions src/java/org/apache/cassandra/cql3/QueryOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import io.netty.buffer.ByteBuf;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.UTF8Type;
Expand Down Expand Up @@ -216,11 +217,103 @@ public int getNowInSeconds(QueryState state)
// Mainly for the sake of BatchQueryOptions
abstract SpecificOptions getSpecificOptions();

abstract TrackWarnings getTrackWarnings();

public boolean isClientTrackWarningsEnabled()
{
return getTrackWarnings().isEnabled();
}

public long getClientLargeReadWarnThresholdKb()
{
return getTrackWarnings().getClientLargeReadWarnThresholdKb();
}

public long getClientLargeReadAbortThresholdKB()
{
return getTrackWarnings().getClientLargeReadAbortThresholdKB();
}

public QueryOptions prepare(List<ColumnSpecification> specs)
{
return this;
}

interface TrackWarnings
{
boolean isEnabled();

long getClientLargeReadWarnThresholdKb();

long getClientLargeReadAbortThresholdKB();

static TrackWarnings create()
{
// if daemon initialization hasn't happened yet (very common in tests) then ignore
if (!DatabaseDescriptor.isDaemonInitialized())
return DisabledTrackWarnings.INSTANCE;
boolean enabled = DatabaseDescriptor.getClientTrackWarningsEnabled();
if (!enabled)
return DisabledTrackWarnings.INSTANCE;
long clientLargeReadWarnThresholdKb = DatabaseDescriptor.getClientLargeReadWarnThresholdKB();
long clientLargeReadAbortThresholdKB = DatabaseDescriptor.getClientLargeReadAbortThresholdKB();
return new DefaultTrackWarnings(clientLargeReadWarnThresholdKb, clientLargeReadAbortThresholdKB);
}
}

private enum DisabledTrackWarnings implements TrackWarnings
{
INSTANCE;

@Override
public boolean isEnabled()
{
return false;
}

@Override
public long getClientLargeReadWarnThresholdKb()
{
return 0;
}

@Override
public long getClientLargeReadAbortThresholdKB()
{
return 0;
}
}

private static class DefaultTrackWarnings implements TrackWarnings
{
private final long clientLargeReadWarnThresholdKb;
private final long clientLargeReadAbortThresholdKB;

public DefaultTrackWarnings(long clientLargeReadWarnThresholdKb, long clientLargeReadAbortThresholdKB)
{
this.clientLargeReadWarnThresholdKb = clientLargeReadWarnThresholdKb;
this.clientLargeReadAbortThresholdKB = clientLargeReadAbortThresholdKB;
}

@Override
public boolean isEnabled()
{
return true;
}

@Override
public long getClientLargeReadWarnThresholdKb()
{
return clientLargeReadWarnThresholdKb;
}

@Override
public long getClientLargeReadAbortThresholdKB()
{
return clientLargeReadAbortThresholdKB;
}
}

static class DefaultQueryOptions extends QueryOptions
{
private final ConsistencyLevel consistency;
Expand All @@ -230,6 +323,7 @@ static class DefaultQueryOptions extends QueryOptions
private final SpecificOptions options;

private final transient ProtocolVersion protocolVersion;
private final transient TrackWarnings trackWarnings = TrackWarnings.create();

DefaultQueryOptions(ConsistencyLevel consistency, List<ByteBuffer> values, boolean skipMetadata, SpecificOptions options, ProtocolVersion protocolVersion)
{
Expand Down Expand Up @@ -264,6 +358,12 @@ SpecificOptions getSpecificOptions()
{
return options;
}

@Override
TrackWarnings getTrackWarnings()
{
return trackWarnings;
}
}

static class QueryOptionsWrapper extends QueryOptions
Expand Down Expand Up @@ -300,6 +400,12 @@ SpecificOptions getSpecificOptions()
return wrapped.getSpecificOptions();
}

@Override
TrackWarnings getTrackWarnings()
{
return wrapped.getTrackWarnings();
}

@Override
public QueryOptions prepare(List<ColumnSpecification> specs)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public final class ResultSetBuilder
final long[] timestamps;
final int[] ttls;

private long size = 0;
private boolean sizeWarningEmitted = false;

public ResultSetBuilder(ResultMetadata metadata, Selectors selectors)
{
this(metadata, selectors, null);
Expand All @@ -79,6 +82,30 @@ public ResultSetBuilder(ResultMetadata metadata, Selectors selectors, GroupMaker
Arrays.fill(ttls, -1);
}

private void addSize(List<ByteBuffer> row)
{
for (int i=0, isize=row.size(); i<isize; i++)
{
ByteBuffer value = row.get(i);
size += value != null ? value.remaining() : 0;
}
}

public boolean shouldWarn(long thresholdKB)
{
if (thresholdKB > 0 && !sizeWarningEmitted && size > thresholdKB << 10)
{
sizeWarningEmitted = true;
return true;
}
return false;
}

public boolean shouldReject(long thresholdKB)
{
return thresholdKB > 0 && size > thresholdKB << 10;
}

public void add(ByteBuffer v)
{
current.add(v);
Expand Down Expand Up @@ -166,6 +193,8 @@ public ResultSet build()

private List<ByteBuffer> getOutputRow()
{
return selectors.getOutputRow();
List<ByteBuffer> row = selectors.getOutputRow();
addSize(row);
return row;
}
}
Loading