diff --git a/.circleci/config.yml b/.circleci/config.yml index 12e5daede91b..e74bc82b0e90 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -18,13 +18,22 @@ version: 2 jobs: + start_j8_dtests: + docker: [ {image: "alpine:latest" } ] + steps: [ { run: "echo skipping" } ] + start_jvm_upgrade_dtest: + docker: [ {image: "alpine:latest" } ] + steps: [ { run: "echo skipping" } ] + start_j8_dtest_jars_build: + docker: [ {image: "alpine:latest" } ] + steps: [ { run: "echo skipping" } ] j8_jvm_upgrade_dtests: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 1 + parallelism: 2 steps: - attach_workspace: at: /home/cassandra @@ -130,10 +139,10 @@ jobs: repeated_jvm_upgrade_dtest: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -305,10 +314,10 @@ jobs: j8_cqlsh-dtests-py2-with-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -400,10 +409,10 @@ jobs: j11_unit_tests: docker: - image: apache/cassandra-testing-ubuntu2004-java11:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -510,10 +519,10 @@ jobs: repeated_upgrade_dtest: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -632,10 +641,10 @@ jobs: j8_cqlsh-dtests-py38-no-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -727,10 +736,10 @@ jobs: j11_cqlsh-dtests-py3-with-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -823,10 +832,10 @@ jobs: j11_cqlsh-dtests-py3-no-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -919,10 +928,10 @@ jobs: j11_cqlsh-dtests-py38-with-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -1015,10 +1024,10 @@ jobs: j8_cqlsh-dtests-py3-with-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -1110,10 +1119,10 @@ jobs: j8_cqlsh-dtests-py2-no-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -1205,10 +1214,10 @@ jobs: j11_repeated_dtest: docker: - image: apache/cassandra-testing-ubuntu2004-java11:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -1350,10 +1359,10 @@ jobs: j8_repeated_dtest: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -1472,10 +1481,10 @@ jobs: j11_cqlsh-dtests-py2-with-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -1568,10 +1577,10 @@ jobs: j11_dtests-with-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 25 steps: - attach_workspace: at: /home/cassandra @@ -1667,10 +1676,10 @@ jobs: j8_dtests-no-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 25 steps: - attach_workspace: at: /home/cassandra @@ -1743,10 +1752,10 @@ jobs: j8_upgradetests-no-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 100 steps: - attach_workspace: at: /home/cassandra @@ -1819,7 +1828,7 @@ jobs: utests_stress: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -1882,10 +1891,10 @@ jobs: j8_unit_tests: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 35 steps: - attach_workspace: at: /home/cassandra @@ -1991,10 +2000,10 @@ jobs: j11_jvm_dtests: docker: - image: apache/cassandra-testing-ubuntu2004-java11:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 1 + parallelism: 10 steps: - attach_workspace: at: /home/cassandra @@ -2200,10 +2209,10 @@ jobs: j11_cqlsh-dtests-py2-no-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -2351,10 +2360,10 @@ jobs: j11_repeated_utest: docker: - image: apache/cassandra-testing-ubuntu2004-java11:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -2527,10 +2536,10 @@ jobs: j8_dtests-with-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 25 steps: - attach_workspace: at: /home/cassandra @@ -2603,10 +2612,10 @@ jobs: j11_cqlsh-dtests-py38-no-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -2699,10 +2708,10 @@ jobs: j8_jvm_dtests: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 1 + parallelism: 14 steps: - attach_workspace: at: /home/cassandra @@ -2906,10 +2915,10 @@ jobs: j8_cqlsh-dtests-py3-no-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -3001,10 +3010,10 @@ jobs: j8_cqlsh-dtests-py38-with-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -3096,7 +3105,7 @@ jobs: utests_long: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -3159,10 +3168,10 @@ jobs: utests_system_keyspace_directory: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -3322,7 +3331,7 @@ jobs: utests_fqltool: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l parallelism: 1 @@ -3385,10 +3394,10 @@ jobs: j11_dtests-no-vnodes: docker: - image: apache/cassandra-testing-ubuntu2004-java11:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 25 steps: - attach_workspace: at: /home/cassandra @@ -3484,10 +3493,10 @@ jobs: utests_compression: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -3593,10 +3602,10 @@ jobs: j8_repeated_utest: docker: - image: apache/cassandra-testing-ubuntu2004-java11-w-dependencies:20210304 - resource_class: medium + resource_class: xlarge working_directory: ~/ shell: /bin/bash -eo pipefail -l - parallelism: 4 + parallelism: 20 steps: - attach_workspace: at: /home/cassandra @@ -3910,20 +3919,17 @@ workflows: requires: - start_utests_system_keyspace_directory - j8_build - - start_j8_dtest_jars_build: - type: approval + - start_j8_dtest_jars_build - j8_dtest_jars_build: requires: - j8_build - start_j8_dtest_jars_build - - start_jvm_upgrade_dtest: - type: approval + - start_jvm_upgrade_dtest - j8_jvm_upgrade_dtests: requires: - start_jvm_upgrade_dtest - j8_dtest_jars_build - - start_j8_dtests: - type: approval + - start_j8_dtests: { requires: [ "j8_build" ]} - j8_dtests-with-vnodes: requires: - start_j8_dtests diff --git a/NEWS.txt b/NEWS.txt index 6b39b21b79b5..443a5c4c0c80 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -38,6 +38,13 @@ using the provided 'sstableupgrade' tool. New features ------------ + - Warn/abort thresholds added to read queries notifying clients when these thresholds trigger (by + emitting a client warning or aborting the query). This feature is disabled by default, scheduled + to be enabled in 4.2; it is controlled with the configuration client_track_warnings_enabled, + setting to true will enable this feature. Each check has its own warn/abort thresholds, currently + tombstones (tombstone_warn_threshold, and tombstone_failure_threshold) and coordinator result set + materialized size (client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb) + are supported; more checks will be added over time. Upgrading --------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index bf8c358f5adc..ff073da4c4d2 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1453,3 +1453,17 @@ enable_drop_compact_storage: false # subnets: # - 127.0.0.1 # - 127.0.0.0/31 + +# Enables tracking warnings/aborts across all replicas for reporting back to client. +# Scheduled to enable in 4.2 +# See: CASSANDRA-16850 +# See: tombstone_warn_threshold, tombstone_failure_threshold, client_large_read_warn_threshold_kb, and client_large_read_abort_threshold_kb +#client_track_warnings_enabled: false + +# When client_track_warnings_enabled: true, this tracks the materialized size of a query on the +# coordinator. If client_large_read_warn_threshold_kb is greater than 0, this will emit a warning +# to clients with details on what query triggered this as well as the size of the result set; if +# client_large_read_abort_threshold_kb is greater than 0, this will abort the query after it +# has exceeded this threshold, returning a read error to the user. +#client_large_read_warn_threshold_kb: 0 +#client_large_read_abort_threshold_kb: 0 diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index bd2177d6d9e1..731b4662c530 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -349,11 +349,16 @@ public class Config public volatile int tombstone_warn_threshold = 1000; public volatile int tombstone_failure_threshold = 100000; + public volatile long client_large_read_warn_threshold_kb = 0; + public volatile long client_large_read_abort_threshold_kb = 0; + public final ReplicaFilteringProtectionOptions replica_filtering_protection = new ReplicaFilteringProtectionOptions(); public volatile Long index_summary_capacity_in_mb; public volatile int index_summary_resize_interval_in_minutes = 60; + public volatile boolean client_track_warnings_enabled = false; // should set to true in 4.2 + public int gc_log_threshold_in_ms = 200; public int gc_warn_threshold_in_ms = 1000; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 19e79d7a7a6f..7e804850f038 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -857,6 +857,9 @@ else if (config.concurrent_validations > config.concurrent_compactors && !allowU throw new ConfigurationException("To set concurrent_validations > concurrent_compactors, " + "set the system property cassandra.allow_unlimited_concurrent_validations=true"); } + + conf.client_large_read_warn_threshold_kb = Math.max(conf.client_large_read_warn_threshold_kb, 0); + conf.client_large_read_abort_threshold_kb = Math.max(conf.client_large_read_abort_threshold_kb, 0); } @VisibleForTesting @@ -3443,4 +3446,34 @@ public static SubnetGroups getInternodeErrorReportingExclusions() { return conf.internode_error_reporting_exclusions; } + + public static long getClientLargeReadWarnThresholdKB() + { + return conf.client_large_read_warn_threshold_kb; + } + + public static void setClientLargeReadWarnThresholdKB(long threshold) + { + conf.client_large_read_warn_threshold_kb = Math.max(threshold, 0); + } + + public static long getClientLargeReadAbortThresholdKB() + { + return conf.client_large_read_abort_threshold_kb; + } + + public static void setClientLargeReadAbortThresholdKB(long threshold) + { + conf.client_large_read_abort_threshold_kb = Math.max(threshold, 0); + } + + public static boolean getClientTrackWarningsEnabled() + { + return conf.client_track_warnings_enabled; + } + + public static void setClientTrackWarningsEnabled(boolean value) + { + conf.client_track_warnings_enabled = value; + } } diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java index d3b1a03cca9e..e46c45897e5b 100644 --- a/src/java/org/apache/cassandra/cql3/QueryOptions.java +++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBuf; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.marshal.UTF8Type; @@ -216,11 +217,103 @@ public int getNowInSeconds(QueryState state) // Mainly for the sake of BatchQueryOptions abstract SpecificOptions getSpecificOptions(); + abstract TrackWarnings getTrackWarnings(); + + public boolean isClientTrackWarningsEnabled() + { + return getTrackWarnings().isEnabled(); + } + + public long getClientLargeReadWarnThresholdKb() + { + return getTrackWarnings().getClientLargeReadWarnThresholdKb(); + } + + public long getClientLargeReadAbortThresholdKB() + { + return getTrackWarnings().getClientLargeReadAbortThresholdKB(); + } + public QueryOptions prepare(List specs) { return this; } + interface TrackWarnings + { + boolean isEnabled(); + + long getClientLargeReadWarnThresholdKb(); + + long getClientLargeReadAbortThresholdKB(); + + static TrackWarnings create() + { + // if daemon initialization hasn't happened yet (very common in tests) then ignore + if (!DatabaseDescriptor.isDaemonInitialized()) + return DisabledTrackWarnings.INSTANCE; + boolean enabled = DatabaseDescriptor.getClientTrackWarningsEnabled(); + if (!enabled) + return DisabledTrackWarnings.INSTANCE; + long clientLargeReadWarnThresholdKb = DatabaseDescriptor.getClientLargeReadWarnThresholdKB(); + long clientLargeReadAbortThresholdKB = DatabaseDescriptor.getClientLargeReadAbortThresholdKB(); + return new DefaultTrackWarnings(clientLargeReadWarnThresholdKb, clientLargeReadAbortThresholdKB); + } + } + + private enum DisabledTrackWarnings implements TrackWarnings + { + INSTANCE; + + @Override + public boolean isEnabled() + { + return false; + } + + @Override + public long getClientLargeReadWarnThresholdKb() + { + return 0; + } + + @Override + public long getClientLargeReadAbortThresholdKB() + { + return 0; + } + } + + private static class DefaultTrackWarnings implements TrackWarnings + { + private final long clientLargeReadWarnThresholdKb; + private final long clientLargeReadAbortThresholdKB; + + public DefaultTrackWarnings(long clientLargeReadWarnThresholdKb, long clientLargeReadAbortThresholdKB) + { + this.clientLargeReadWarnThresholdKb = clientLargeReadWarnThresholdKb; + this.clientLargeReadAbortThresholdKB = clientLargeReadAbortThresholdKB; + } + + @Override + public boolean isEnabled() + { + return true; + } + + @Override + public long getClientLargeReadWarnThresholdKb() + { + return clientLargeReadWarnThresholdKb; + } + + @Override + public long getClientLargeReadAbortThresholdKB() + { + return clientLargeReadAbortThresholdKB; + } + } + static class DefaultQueryOptions extends QueryOptions { private final ConsistencyLevel consistency; @@ -230,6 +323,7 @@ static class DefaultQueryOptions extends QueryOptions private final SpecificOptions options; private final transient ProtocolVersion protocolVersion; + private final transient TrackWarnings trackWarnings = TrackWarnings.create(); DefaultQueryOptions(ConsistencyLevel consistency, List values, boolean skipMetadata, SpecificOptions options, ProtocolVersion protocolVersion) { @@ -264,6 +358,12 @@ SpecificOptions getSpecificOptions() { return options; } + + @Override + TrackWarnings getTrackWarnings() + { + return trackWarnings; + } } static class QueryOptionsWrapper extends QueryOptions @@ -300,6 +400,12 @@ SpecificOptions getSpecificOptions() return wrapped.getSpecificOptions(); } + @Override + TrackWarnings getTrackWarnings() + { + return wrapped.getTrackWarnings(); + } + @Override public QueryOptions prepare(List specs) { diff --git a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java index 84e1e8458997..852872a77835 100644 --- a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java +++ b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java @@ -59,6 +59,9 @@ public final class ResultSetBuilder final long[] timestamps; final int[] ttls; + private long size = 0; + private boolean sizeWarningEmitted = false; + public ResultSetBuilder(ResultMetadata metadata, Selectors selectors) { this(metadata, selectors, null); @@ -79,6 +82,30 @@ public ResultSetBuilder(ResultMetadata metadata, Selectors selectors, GroupMaker Arrays.fill(ttls, -1); } + private void addSize(List row) + { + for (int i=0, isize=row.size(); i 0 && !sizeWarningEmitted && size > thresholdKB << 10) + { + sizeWarningEmitted = true; + return true; + } + return false; + } + + public boolean shouldReject(long thresholdKB) + { + return thresholdKB > 0 && size > thresholdKB << 10; + } + public void add(ByteBuffer v) { current.add(v); @@ -166,6 +193,8 @@ public ResultSet build() private List getOutputRow() { - return selectors.getOutputRow(); + List row = selectors.getOutputRow(); + addSize(row); + return row; } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 774bd689799d..5c7ac2952f07 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +31,7 @@ import org.apache.cassandra.audit.AuditLogContext; import org.apache.cassandra.audit.AuditLogEntryType; import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; @@ -61,12 +64,15 @@ import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.pager.AggregationQueryPager; import org.apache.cassandra.service.pager.PagingState; import org.apache.cassandra.service.pager.QueryPager; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -242,6 +248,9 @@ public ResultMessage.Rows execute(QueryState state, QueryOptions options, long q Selectors selectors = selection.newSelectors(options); ReadQuery query = getQuery(options, selectors.getColumnFilter(), nowInSec, userLimit, userPerPartitionLimit, pageSize); + if (options.isClientTrackWarningsEnabled()) + query.trackWarnings(); + if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize))) return execute(query, options, state, selectors, nowInSec, userLimit, queryStartNanoTime); @@ -783,6 +792,7 @@ private ResultSet process(PartitionIterator partitions, } ResultSet cqlRows = result.build(); + maybeWarn(result, options); orderResults(cqlRows); @@ -804,10 +814,49 @@ public static ByteBuffer[] getComponents(TableMetadata metadata, DecoratedKey dk } } + private void maybeWarn(ResultSetBuilder result, QueryOptions options) + { + if (!options.isClientTrackWarningsEnabled()) + return; + if (result.shouldWarn(options.getClientLargeReadWarnThresholdKb())) + { + String msg = String.format("Read on table %s has exceeded the size warning threshold of %,d kb", table, options.getClientLargeReadWarnThresholdKb()); + ClientWarn.instance.warn(msg + " with " + loggableTokens(options)); + logger.warn("{} with query {}", msg, asCQL(options)); + cfs().metric.clientReadSizeWarnings.mark(); + } + } + + private void maybeFail(ResultSetBuilder result, QueryOptions options) + { + if (!options.isClientTrackWarningsEnabled()) + return; + if (result.shouldReject(options.getClientLargeReadAbortThresholdKB())) + { + String msg = String.format("Read on table %s has exceeded the size failure threshold of %,d kb", table, options.getClientLargeReadAbortThresholdKB()); + String clientMsg = msg + " with " + loggableTokens(options); + ClientWarn.instance.warn(clientMsg); + logger.warn("{} with query {}", msg, asCQL(options)); + cfs().metric.clientReadSizeAborts.mark(); + // read errors require blockFor and recieved (its in the protocol message), but this isn't known; + // to work around this, treat the coordinator as the only response we care about and mark it failed + ReadSizeAbortException exception = new ReadSizeAbortException(clientMsg, options.getConsistency(), 0, 1, true, + ImmutableMap.of(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.READ_TOO_LARGE)); + StorageProxy.recordReadRegularAbort(options.getConsistency(), exception); + throw exception; + } + } + + private ColumnFamilyStore cfs() + { + return Schema.instance.getColumnFamilyStoreInstance(table.id); + } + // Used by ModificationStatement for CAS operations void processPartition(RowIterator partition, QueryOptions options, ResultSetBuilder result, int nowInSec) throws InvalidRequestException { + maybeFail(result, options); ProtocolVersion protocolVersion = options.getProtocolVersion(); ByteBuffer[] keyComponents = getComponents(table, partition.partitionKey()); @@ -819,6 +868,7 @@ void processPartition(RowIterator partition, QueryOptions options, ResultSetBuil if (!staticRow.isEmpty() && restrictions.returnStaticContentOnPartitionWithNoRows()) { result.newRow(partition.partitionKey(), staticRow.clustering()); + maybeFail(result, options); for (ColumnMetadata def : selection.getColumns()) { switch (def.kind) @@ -841,6 +891,13 @@ void processPartition(RowIterator partition, QueryOptions options, ResultSetBuil { Row row = partition.next(); result.newRow( partition.partitionKey(), row.clustering()); + + // reads aren't failed as soon the size exceeds the failure threshold, they're failed once the failure + // threshold has been exceeded and we start adding more data. We're slightly more permissive to avoid + // cases where a row can never be read. Since we only warn/fail after entire rows are read, this will + // still allow the entire dataset to be read with LIMIT 1 queries, even if every row is oversized + maybeFail(result, options); + // Respect selection order for (ColumnMetadata def : selection.getColumns()) { @@ -1358,4 +1415,131 @@ public String toString() { return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } + + private String loggableTokens(QueryOptions options) + { + if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing()) + { + AbstractBounds bounds = restrictions.getPartitionKeyBounds(options); + return "token range: " + (bounds.inclusiveLeft() ? '[' : '(') + + bounds.left.getToken().toString() + ", " + + bounds.right.getToken().toString() + + (bounds.inclusiveRight() ? ']' : ')'); + } + else + { + Collection keys = restrictions.getPartitionKeys(options); + if (keys.size() == 1) + { + return "token: " + table.partitioner.getToken(Iterables.getOnlyElement(keys)).toString(); + } + else + { + StringBuilder sb = new StringBuilder("tokens: ["); + boolean isFirst = true; + for (ByteBuffer key : keys) + { + if (!isFirst) sb.append(", "); + sb.append(table.partitioner.getToken(key).toString()); + isFirst = false; + } + return sb.append(']').toString(); + } + } + } + + private String asCQL(QueryOptions options) + { + ColumnFilter columnFilter = selection.newSelectors(options).getColumnFilter(); + StringBuilder sb = new StringBuilder(); + + sb.append("SELECT ").append(queriedColumns().toCQLString()); + sb.append(" FROM ").append(table.keyspace).append('.').append(table.name); + if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing()) + { + // partition range + ClusteringIndexFilter clusteringIndexFilter = makeClusteringIndexFilter(options, columnFilter); + if (clusteringIndexFilter == null) + return "EMPTY"; + + RowFilter rowFilter = getRowFilter(options); + + // The LIMIT provided by the user is the number of CQL row he wants returned. + // We want to have getRangeSlice to count the number of columns, not the number of keys. + AbstractBounds keyBounds = restrictions.getPartitionKeyBounds(options); + if (keyBounds == null) + return "EMPTY"; + + DataRange dataRange = new DataRange(keyBounds, clusteringIndexFilter); + + if (!dataRange.isUnrestricted(table) || !rowFilter.isEmpty()) + { + sb.append(" WHERE "); + // We put the row filter first because the data range can end by "ORDER BY" + if (!rowFilter.isEmpty()) + { + sb.append(rowFilter); + if (!dataRange.isUnrestricted(table)) + sb.append(" AND "); + } + if (!dataRange.isUnrestricted(table)) + sb.append(dataRange.toCQLString(table, rowFilter)); + } + } + else + { + // single partition + Collection keys = restrictions.getPartitionKeys(options); + if (keys.isEmpty()) + return "EMPTY"; + ClusteringIndexFilter filter = makeClusteringIndexFilter(options, columnFilter); + if (filter == null) + return "EMPTY"; + + sb.append(" WHERE "); + + + boolean compoundPk = table.partitionKeyColumns().size() > 1; + if (compoundPk) sb.append('('); + sb.append(ColumnMetadata.toCQLString(table.partitionKeyColumns())); + if (compoundPk) sb.append(')'); + if (keys.size() == 1) + { + sb.append(" = "); + if (compoundPk) sb.append('('); + DataRange.appendKeyString(sb, table.partitionKeyType, Iterables.getOnlyElement(keys)); + if (compoundPk) sb.append(')'); + } + else + { + sb.append(" IN ("); + boolean first = true; + for (ByteBuffer key : keys) + { + if (!first) + sb.append(", "); + + if (compoundPk) sb.append('('); + DataRange.appendKeyString(sb, table.partitionKeyType, key); + if (compoundPk) sb.append(')'); + first = false; + } + + sb.append(')'); + } + + RowFilter rowFilter = getRowFilter(options); + if (!rowFilter.isEmpty()) + sb.append(" AND ").append(rowFilter); + + String filterString = filter.toCQLString(table, rowFilter); + if (!filterString.isEmpty()) + sb.append(" AND ").append(filterString); + } + + DataLimits limits = getDataLimits(getLimit(options), getPerPartitionLimit(options), options.getPageSize()); + if (limits != DataLimits.NONE) + sb.append(' ').append(limits); + return sb.toString(); + } } diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java index b3229125dc42..52162be72a3f 100644 --- a/src/java/org/apache/cassandra/db/DataRange.java +++ b/src/java/org/apache/cassandra/db/DataRange.java @@ -312,7 +312,7 @@ private static String getOperator(boolean isStart, boolean isInclusive) : (isInclusive ? "<=" : "<"); } - private static void appendKeyString(StringBuilder sb, AbstractType type, ByteBuffer key) + public static void appendKeyString(StringBuilder sb, AbstractType type, ByteBuffer key) { if (type instanceof CompositeType) { diff --git a/src/java/org/apache/cassandra/db/MessageParams.java b/src/java/org/apache/cassandra/db/MessageParams.java new file mode 100644 index 000000000000..137d3a6ec5c4 --- /dev/null +++ b/src/java/org/apache/cassandra/db/MessageParams.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.util.EnumMap; +import java.util.Map; + +import io.netty.util.concurrent.FastThreadLocal; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.ParamType; + +public class MessageParams +{ + private static final FastThreadLocal> local = new FastThreadLocal<>(); + + private MessageParams() + { + } + + private static Map get() + { + Map instance = local.get(); + if (instance == null) + { + instance = new EnumMap<>(ParamType.class); + local.set(instance); + } + + return instance; + } + + public static void add(ParamType key, Object value) + { + get().put(key, value); + } + + public static T get(ParamType key) + { + return (T) get().get(key); + } + + public static void remove(ParamType key) + { + get().remove(key); + } + + public static void reset() + { + get().clear(); + } + + public static Message addToMessage(Message message) + { + return message.withParams(get()); + } +} diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 45cd3084ca3b..917f0f034c3e 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -55,17 +55,18 @@ public class PartitionRangeReadCommand extends ReadCommand implements PartitionR private final DataRange dataRange; private PartitionRangeReadCommand(boolean isDigest, - int digestVersion, - boolean acceptsTransient, - TableMetadata metadata, - int nowInSec, - ColumnFilter columnFilter, - RowFilter rowFilter, - DataLimits limits, - DataRange dataRange, - IndexMetadata index) + int digestVersion, + boolean acceptsTransient, + TableMetadata metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange, + IndexMetadata index, + boolean trackWarnings) { - super(Kind.PARTITION_RANGE, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index); + super(Kind.PARTITION_RANGE, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index, trackWarnings); this.dataRange = dataRange; } @@ -85,7 +86,8 @@ public static PartitionRangeReadCommand create(TableMetadata metadata, rowFilter, limits, dataRange, - findIndex(metadata, rowFilter)); + findIndex(metadata, rowFilter), + false); } /** @@ -107,7 +109,8 @@ public static PartitionRangeReadCommand allDataRead(TableMetadata metadata, int RowFilter.NONE, DataLimits.NONE, DataRange.allData(metadata.partitioner), - null); + null, + false); } public DataRange dataRange() @@ -156,7 +159,8 @@ public PartitionRangeReadCommand forSubRange(AbstractBounds r rowFilter(), isRangeContinuation ? limits() : limits().withoutState(), dataRange().forSubRange(range), - indexMetadata()); + indexMetadata(), + isTrackingWarnings()); } public PartitionRangeReadCommand copy() @@ -170,7 +174,8 @@ public PartitionRangeReadCommand copy() rowFilter(), limits(), dataRange(), - indexMetadata()); + indexMetadata(), + isTrackingWarnings()); } @Override @@ -185,7 +190,8 @@ protected PartitionRangeReadCommand copyAsDigestQuery() rowFilter(), limits(), dataRange(), - indexMetadata()); + indexMetadata(), + isTrackingWarnings()); } @Override @@ -200,7 +206,8 @@ protected PartitionRangeReadCommand copyAsTransientQuery() rowFilter(), limits(), dataRange(), - indexMetadata()); + indexMetadata(), + isTrackingWarnings()); } @Override @@ -215,7 +222,8 @@ public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits) rowFilter(), newLimits, dataRange(), - indexMetadata()); + indexMetadata(), + isTrackingWarnings()); } @Override @@ -230,7 +238,8 @@ public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLim rowFilter(), newLimits, newDataRange, - indexMetadata()); + indexMetadata(), + isTrackingWarnings()); } public long getTimeout(TimeUnit unit) @@ -363,6 +372,15 @@ protected void appendCQLWhereClause(StringBuilder sb) sb.append(" WHERE ").append(filterString); } + @Override + public String loggableTokens() + { + return "token range: " + (dataRange.keyRange.inclusiveLeft() ? '[' : '(') + + dataRange.keyRange.left.getToken().toString() + ", " + + dataRange.keyRange.right.getToken().toString() + + (dataRange.keyRange.inclusiveRight() ? ']' : ')'); + } + /** * Allow to post-process the result of the query after it has been reconciled on the coordinator * but before it is passed to the CQL layer to return the ResultSet. @@ -431,7 +449,7 @@ public ReadCommand deserialize(DataInputPlus in, throws IOException { DataRange range = DataRange.serializer.deserialize(in, version, metadata); - return new PartitionRangeReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, index); + return new PartitionRangeReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, range, index, false); } } } diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 71bce0b7dd1f..42206e188b50 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -38,6 +38,7 @@ import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.net.MessageFlag; +import org.apache.cassandra.net.ParamType; import org.apache.cassandra.net.Verb; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; @@ -99,6 +100,8 @@ public abstract class ReadCommand extends AbstractReadQuery int oldestUnrepairedTombstone = Integer.MAX_VALUE; + private boolean trackWarnings; + @Nullable private final IndexMetadata index; @@ -139,7 +142,8 @@ protected ReadCommand(Kind kind, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, - IndexMetadata index) + IndexMetadata index, + boolean trackWarnings) { super(metadata, nowInSec, columnFilter, rowFilter, limits); if (acceptsTransient && isDigestQuery) @@ -150,6 +154,7 @@ protected ReadCommand(Kind kind, this.digestVersion = digestVersion; this.acceptsTransient = acceptsTransient; this.index = index; + this.trackWarnings = trackWarnings; } protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException; @@ -281,6 +286,17 @@ public boolean isRepairedDataDigestConclusive() return repairedDataInfo.isConclusive(); } + @Override + public void trackWarnings() + { + trackWarnings = true; + } + + public boolean isTrackingWarnings() + { + return trackWarnings; + } + /** * Index (metadata) chosen for this query. Can be null. * @@ -588,6 +604,11 @@ private void countTombstone(ClusteringPrefix clustering) String query = ReadCommand.this.toCQLString(); Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query); metric.tombstoneFailures.inc(); + if (trackWarnings) + { + MessageParams.remove(ParamType.TOMBSTONE_WARNING); + MessageParams.add(ParamType.TOMBSTONE_ABORT, tombstones); + } throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering); } } @@ -606,7 +627,10 @@ public void onClose() String msg = String.format( "Read %d live rows and %d tombstone cells for query %1.512s; token %s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString(), currentKey.getToken()); - ClientWarn.instance.warn(msg); + if (trackWarnings) + MessageParams.add(ParamType.TOMBSTONE_WARNING, tombstones); + else + ClientWarn.instance.warn(msg); if (tombstones < failureThreshold) { metric.tombstoneWarnings.inc(); @@ -686,9 +710,12 @@ protected UnfilteredPartitionIterator withStateTracking(UnfilteredPartitionItera */ public Message createMessage(boolean trackRepairedData) { - return trackRepairedData - ? Message.outWithFlags(verb(), this, MessageFlag.CALL_BACK_ON_FAILURE, MessageFlag.TRACK_REPAIRED_DATA) - : Message.outWithFlag (verb(), this, MessageFlag.CALL_BACK_ON_FAILURE); + Message msg = trackRepairedData + ? Message.outWithFlags(verb(), this, MessageFlag.CALL_BACK_ON_FAILURE, MessageFlag.TRACK_REPAIRED_DATA) + : Message.outWithFlag(verb(), this, MessageFlag.CALL_BACK_ON_FAILURE); + if (trackWarnings) + msg = msg.withFlag(MessageFlag.TRACK_WARNINGS); + return msg; } public abstract Verb verb(); @@ -746,6 +773,11 @@ public String toCQLString() return sb.toString(); } + /** + * Return the queried token(s) for logging + */ + public abstract String loggableTokens(); + // Monitorable interface public String name() { diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index 2c28ed9d4b8f..2260fdefe526 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -48,10 +48,14 @@ public void doVerb(Message message) ReadCommand command = message.payload; validateTransientStatus(message); + MessageParams.reset(); long timeout = message.expiresAtNanos() - message.createdAtNanos(); command.setMonitoringTime(message.createdAtNanos(), message.isCrossNode(), timeout, DatabaseDescriptor.getSlowQueryTimeout(NANOSECONDS)); + if (message.trackWarnings()) + command.trackWarnings(); + if (message.trackRepairedData()) command.trackRepairedStatus(); @@ -61,6 +65,21 @@ public void doVerb(Message message) { response = command.createResponse(iterator); } + catch (RejectException e) + { + if (!command.isTrackingWarnings()) + throw e; + + // make sure to log as the exception is swallowed + logger.error(e.getMessage()); + + response = command.createResponse(EmptyIterators.unfilteredPartition(command.metadata())); + Message reply = message.responseWith(response); + reply = MessageParams.addToMessage(reply); + + MessagingService.instance().send(reply, message.from()); + return; + } if (!command.complete()) { @@ -71,6 +90,7 @@ public void doVerb(Message message) Tracing.trace("Enqueuing response to {}", message.from()); Message reply = message.responseWith(response); + reply = MessageParams.addToMessage(reply); MessagingService.instance().send(reply, message.from()); } diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java index bd20c26d9fd4..f9695d6f6182 100644 --- a/src/java/org/apache/cassandra/db/ReadQuery.java +++ b/src/java/org/apache/cassandra/db/ReadQuery.java @@ -254,4 +254,8 @@ public default boolean isEmpty() default void maybeValidateIndex() { } + + default void trackWarnings() + { + } } diff --git a/src/java/org/apache/cassandra/db/RejectException.java b/src/java/org/apache/cassandra/db/RejectException.java new file mode 100644 index 000000000000..a879b760ec44 --- /dev/null +++ b/src/java/org/apache/cassandra/db/RejectException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +/** + * Represents a request to reject the current operation + */ +public abstract class RejectException extends RuntimeException +{ + public RejectException(String message) + { + super(message); + } + + public RejectException(String message, Throwable cause) + { + super(message, cause); + } + + public RejectException(Throwable cause) + { + super(cause); + } + + public RejectException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) + { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 026a795a2413..b7fcf3167f1c 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -71,9 +71,10 @@ protected SinglePartitionReadCommand(boolean isDigest, DataLimits limits, DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter, - IndexMetadata index) + IndexMetadata index, + boolean trackWarnings) { - super(Kind.SINGLE_PARTITION, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index); + super(Kind.SINGLE_PARTITION, isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, index, trackWarnings); assert partitionKey.getPartitioner() == metadata.partitioner; this.partitionKey = partitionKey; this.clusteringIndexFilter = clusteringIndexFilter; @@ -112,7 +113,8 @@ public static SinglePartitionReadCommand create(TableMetadata metadata, limits, partitionKey, clusteringIndexFilter, - indexMetadata); + indexMetadata, + false); } /** @@ -288,7 +290,8 @@ public SinglePartitionReadCommand copy() limits(), partitionKey(), clusteringIndexFilter(), - indexMetadata()); + indexMetadata(), + isTrackingWarnings()); } @Override @@ -304,7 +307,8 @@ protected SinglePartitionReadCommand copyAsDigestQuery() limits(), partitionKey(), clusteringIndexFilter(), - indexMetadata()); + indexMetadata(), + isTrackingWarnings()); } @Override @@ -320,7 +324,8 @@ protected SinglePartitionReadCommand copyAsTransientQuery() limits(), partitionKey(), clusteringIndexFilter(), - indexMetadata()); + indexMetadata(), + isTrackingWarnings()); } @Override @@ -336,7 +341,8 @@ public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits) newLimits, partitionKey(), clusteringIndexFilter(), - indexMetadata()); + indexMetadata(), + isTrackingWarnings()); } @Override @@ -371,13 +377,16 @@ public SinglePartitionReadCommand forPaging(Clustering lastReturned, DataLimi { // We shouldn't have set digest yet when reaching that point assert !isDigestQuery(); - return create(metadata(), - nowInSec(), - columnFilter(), - rowFilter(), - limits, - partitionKey(), - lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false)); + SinglePartitionReadCommand cmd = create(metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits, + partitionKey(), + lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false)); + if (isTrackingWarnings()) + cmd.trackWarnings(); + return cmd; } public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException @@ -1068,6 +1077,12 @@ protected void appendCQLWhereClause(StringBuilder sb) } } + @Override + public String loggableTokens() + { + return "token=" + partitionKey.getToken().toString(); + } + protected void serializeSelection(DataOutputPlus out, int version) throws IOException { metadata().partitionKeyType.writeValue(partitionKey().getKey(), out); @@ -1151,7 +1166,7 @@ public ReadCommand deserialize(DataInputPlus in, { DecoratedKey key = metadata.partitioner.decorateKey(metadata.partitionKeyType.readBuffer(in, DatabaseDescriptor.getMaxValueSize())); ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); - return new SinglePartitionReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index); + return new SinglePartitionReadCommand(isDigest, digestVersion, acceptsTransient, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index, false); } } diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java index 755d55227e62..5d344de58524 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadQuery.java @@ -281,6 +281,12 @@ public ColumnFilter columnFilter() return queries.get(0).columnFilter(); } + @Override + public void trackWarnings() + { + queries.forEach(ReadQuery::trackWarnings); + } + @Override public String toString() { diff --git a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java index 28d49ae370c5..efca3ac4db44 100644 --- a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java +++ b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java @@ -24,7 +24,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.*; -public class TombstoneOverwhelmingException extends RuntimeException +public class TombstoneOverwhelmingException extends RejectException { public TombstoneOverwhelmingException(int numTombstones, String query, TableMetadata metadata, DecoratedKey lastPartitionKey, ClusteringPrefix lastClustering) { diff --git a/src/java/org/apache/cassandra/exceptions/ReadAbortException.java b/src/java/org/apache/cassandra/exceptions/ReadAbortException.java new file mode 100644 index 000000000000..007585895314 --- /dev/null +++ b/src/java/org/apache/cassandra/exceptions/ReadAbortException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.exceptions; + +import java.util.Map; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.locator.InetAddressAndPort; + +/** + * Special Read Failure which is caused by user query; implies a user request is not allowed and not that Cassandra had an issue. + */ +public abstract class ReadAbortException extends ReadFailureException +{ + protected ReadAbortException(String msg, ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map failureReasonByEndpoint) + { + super(msg, consistency, received, blockFor, dataPresent, failureReasonByEndpoint); + } +} diff --git a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java index 744cad41484d..698c8ae9c483 100644 --- a/src/java/org/apache/cassandra/exceptions/ReadFailureException.java +++ b/src/java/org/apache/cassandra/exceptions/ReadFailureException.java @@ -33,4 +33,10 @@ public ReadFailureException(ConsistencyLevel consistency, int received, int bloc super(ExceptionCode.READ_FAILURE, consistency, received, blockFor, ImmutableMap.copyOf(failureReasonByEndpoint)); this.dataPresent = dataPresent; } + + protected ReadFailureException(String msg, ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map failureReasonByEndpoint) + { + super(ExceptionCode.READ_FAILURE, msg, consistency, received, blockFor, failureReasonByEndpoint); + this.dataPresent = dataPresent; + } } diff --git a/src/java/org/apache/cassandra/exceptions/ReadSizeAbortException.java b/src/java/org/apache/cassandra/exceptions/ReadSizeAbortException.java new file mode 100644 index 000000000000..ed8100859413 --- /dev/null +++ b/src/java/org/apache/cassandra/exceptions/ReadSizeAbortException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.exceptions; + +import java.util.Map; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.locator.InetAddressAndPort; + +public class ReadSizeAbortException extends ReadAbortException +{ + public ReadSizeAbortException(String msg, ConsistencyLevel consistency, int received, int blockFor, boolean dataPresent, Map failureReasonByEndpoint) + { + super(msg, consistency, received, blockFor, dataPresent, failureReasonByEndpoint); + } +} diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java index 56cee1a4a8c9..c75ff9c19d24 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java +++ b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java @@ -32,7 +32,12 @@ public class RequestFailureException extends RequestExecutionException protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int blockFor, Map failureReasonByEndpoint) { - super(code, buildErrorMessage(received, failureReasonByEndpoint)); + this(code, buildErrorMessage(received, failureReasonByEndpoint), consistency, received, blockFor, failureReasonByEndpoint); + } + + protected RequestFailureException(ExceptionCode code, String msg, ConsistencyLevel consistency, int received, int blockFor, Map failureReasonByEndpoint) + { + super(code, buildErrorMessage(msg, failureReasonByEndpoint)); this.consistency = consistency; this.received = received; this.blockFor = blockFor; @@ -41,10 +46,7 @@ protected RequestFailureException(ExceptionCode code, ConsistencyLevel consisten private static String buildErrorMessage(int received, Map failures) { - return String.format("Operation failed - received %d responses and %d failures: %s", - received, - failures.size(), - buildFailureString(failures)); + return String.format("received %d responses and %d failures", received, failures.size()); } private static String buildFailureString(Map failures) @@ -53,4 +55,13 @@ private static String buildFailureString(Map String.format("%s from %s", e.getValue(), e.getKey())) .collect(Collectors.joining(", ")); } + + private static String buildErrorMessage(CharSequence msg, Map failures) + { + StringBuilder sb = new StringBuilder("Operation failed - "); + sb.append(msg); + if (failures != null && !failures.isEmpty()) + sb.append(": ").append(buildFailureString(failures)); + return sb.toString(); + } } diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java index 1cdbdb544d28..3f6c2d465352 100644 --- a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java +++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java @@ -35,8 +35,9 @@ public enum RequestFailureReason UNKNOWN (0), READ_TOO_MANY_TOMBSTONES (1), TIMEOUT (2), - INCOMPATIBLE_SCHEMA (3); - + INCOMPATIBLE_SCHEMA (3), + READ_TOO_LARGE (4); + public static final Serializer serializer = new Serializer(); public final int code; diff --git a/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java b/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java new file mode 100644 index 000000000000..e86e760ae650 --- /dev/null +++ b/src/java/org/apache/cassandra/exceptions/TombstoneAbortException.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.exceptions; + +import java.util.Map; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.locator.InetAddressAndPort; + +import static org.apache.cassandra.service.reads.ReadCallback.tombstoneAbortMessage; + +public class TombstoneAbortException extends ReadAbortException +{ + public final int nodes; + public final int tombstones; + + public TombstoneAbortException(int nodes, int tombstones, String cql, boolean dataPresent, ConsistencyLevel consistency, int received, int blockFor, Map failureReasonByEndpoint) + { + super(tombstoneAbortMessage(nodes, tombstones, cql), consistency, received, blockFor, dataPresent, failureReasonByEndpoint); + this.nodes = nodes; + this.tombstones = tombstones; + } +} diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java index e3a6970f7ee7..19bc6d6c1a47 100644 --- a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java @@ -22,6 +22,9 @@ import com.codahale.metrics.Meter; +import org.apache.cassandra.exceptions.ReadAbortException; +import org.apache.cassandra.exceptions.ReadSizeAbortException; +import org.apache.cassandra.exceptions.TombstoneAbortException; import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; @@ -31,6 +34,9 @@ public class ClientRequestMetrics extends LatencyMetrics public final Meter timeouts; public final Meter unavailables; public final Meter failures; + public final Meter aborts; + public final Meter tombstoneAborts; + public final Meter readSizeAborts; public ClientRequestMetrics(String scope) { @@ -39,6 +45,24 @@ public ClientRequestMetrics(String scope) timeouts = Metrics.meter(factory.createMetricName("Timeouts")); unavailables = Metrics.meter(factory.createMetricName("Unavailables")); failures = Metrics.meter(factory.createMetricName("Failures")); + aborts = Metrics.meter(factory.createMetricName("Aborts")); + tombstoneAborts = Metrics.meter(factory.createMetricName("TombstoneAborts")); + readSizeAborts = Metrics.meter(factory.createMetricName("ReadSizeAborts")); + } + + public void markAbort(Throwable cause) + { + aborts.mark(); + if (!(cause instanceof ReadAbortException)) + return; + if (cause instanceof TombstoneAbortException) + { + tombstoneAborts.mark(); + } + else if (cause instanceof ReadSizeAbortException) + { + readSizeAborts.mark(); + } } public void release() @@ -47,5 +71,8 @@ public void release() Metrics.remove(factory.createMetricName("Timeouts")); Metrics.remove(factory.createMetricName("Unavailables")); Metrics.remove(factory.createMetricName("Failures")); + Metrics.remove(factory.createMetricName("Aborts")); + Metrics.remove(factory.createMetricName("TombstoneAborts")); + Metrics.remove(factory.createMetricName("ReadSizeAborts")); } } diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index dadbe476087f..d0607af071af 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -153,6 +153,12 @@ public class KeyspaceMetrics public final Histogram repairedDataTrackingOverreadRows; public final Timer repairedDataTrackingOverreadTime; + public final Meter clientTombstoneWarnings; + public final Meter clientTombstoneAborts; + + public final Meter clientReadSizeWarnings; + public final Meter clientReadSizeAborts; + public final MetricNameFactory factory; private Keyspace keyspace; @@ -235,6 +241,12 @@ public KeyspaceMetrics(final Keyspace ks) repairedDataTrackingOverreadRows = createKeyspaceHistogram("RepairedDataTrackingOverreadRows", false); repairedDataTrackingOverreadTime = createKeyspaceTimer("RepairedDataTrackingOverreadTime"); + + clientTombstoneWarnings = createKeyspaceMeter("ClientTombstoneWarnings"); + clientTombstoneAborts = createKeyspaceMeter("ClientTombstoneAborts"); + + clientReadSizeWarnings = createKeyspaceMeter("ClientReadSizeWarnings"); + clientReadSizeAborts = createKeyspaceMeter("ClientReadSizeAborts"); } /** diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 09f41a15ee0f..ced062225ba7 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -259,6 +259,11 @@ public class TableMetrics /** When sampler activated, will track the slowest local reads **/ public final Sampler topLocalReadQueryTime; + public final TableMeter clientTombstoneWarnings; + public final TableMeter clientTombstoneAborts; + public final TableMeter clientReadSizeWarnings; + public final TableMeter clientReadSizeAborts; + private static Pair totalNonSystemTablesSize(Predicate predicate) { long total = 0; @@ -913,6 +918,12 @@ protected double getDenominator() } return cnt; }); + + clientTombstoneWarnings = createTableMeter("ClientTombstoneWarnings", cfs.keyspace.metric.clientTombstoneWarnings); + clientTombstoneAborts = createTableMeter("ClientTombstoneAborts", cfs.keyspace.metric.clientTombstoneAborts); + + clientReadSizeWarnings = createTableMeter("ClientReadSizeWarnings", cfs.keyspace.metric.clientReadSizeWarnings); + clientReadSizeAborts = createTableMeter("ClientReadSizeAborts", cfs.keyspace.metric.clientReadSizeAborts); } public void updateSSTableIterated(int count) diff --git a/src/java/org/apache/cassandra/net/Message.java b/src/java/org/apache/cassandra/net/Message.java index ca740120344c..19e223100327 100644 --- a/src/java/org/apache/cassandra/net/Message.java +++ b/src/java/org/apache/cassandra/net/Message.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.EnumMap; import java.util.Map; import java.util.UUID; @@ -136,6 +137,11 @@ boolean callBackOnFailure() return header.callBackOnFailure(); } + public boolean trackWarnings() + { + return header.trackWarnings(); + } + /** See CASSANDRA-14145 */ public boolean trackRepairedData() { @@ -267,6 +273,23 @@ public Message withForwardTo(ForwardingInfo peers) return new Message<>(header.withParam(ParamType.FORWARD_TO, peers), payload); } + public Message withFlag(MessageFlag flag) + { + return new Message<>(header.withFlag(flag), payload); + } + + public Message withParam(ParamType type, Object value) + { + return new Message<>(header.withParam(type, value), payload); + } + + public Message withParams(Map values) + { + if (values == null || values.isEmpty()) + return this; + return new Message<>(header.withParams(values), payload); + } + private static final EnumMap NO_PARAMS = new EnumMap<>(ParamType.class); private static Map buildParams(ParamType type, Object value) @@ -295,6 +318,16 @@ private static Map addParam(Map params, Pa return params; } + private static Map addParams(Map params, Map values) + { + if (values == null || values.isEmpty()) + return params; + + params = new EnumMap<>(params); + params.putAll(values); + return params; + } + /* * id generation */ @@ -383,6 +416,11 @@ Header withParam(ParamType type, Object value) return new Header(id, verb, from, createdAtNanos, expiresAtNanos, flags, addParam(params, type, value)); } + Header withParams(Map values) + { + return new Header(id, verb, from, createdAtNanos, expiresAtNanos, flags, addParams(params, values)); + } + boolean callBackOnFailure() { return MessageFlag.CALL_BACK_ON_FAILURE.isIn(flags); @@ -393,6 +431,11 @@ boolean trackRepairedData() return MessageFlag.TRACK_REPAIRED_DATA.isIn(flags); } + boolean trackWarnings() + { + return MessageFlag.TRACK_WARNINGS.isIn(flags); + } + @Nullable ForwardingInfo forwardTo() { @@ -416,6 +459,11 @@ public TraceType traceType() { return (TraceType) params.getOrDefault(ParamType.TRACE_TYPE, TraceType.QUERY); } + + public Map params() + { + return Collections.unmodifiableMap(params); + } } @SuppressWarnings("WeakerAccess") diff --git a/src/java/org/apache/cassandra/net/MessageFlag.java b/src/java/org/apache/cassandra/net/MessageFlag.java index c74784d4caba..441b06b6a3a0 100644 --- a/src/java/org/apache/cassandra/net/MessageFlag.java +++ b/src/java/org/apache/cassandra/net/MessageFlag.java @@ -27,7 +27,9 @@ public enum MessageFlag /** a failure response should be sent back in case of failure */ CALL_BACK_ON_FAILURE (0), /** track repaired data - see CASSANDRA-14145 */ - TRACK_REPAIRED_DATA (1); + TRACK_REPAIRED_DATA (1), + /** allow creating warnings or aborting queries based off query - see CASSANDRA-16850 */ + TRACK_WARNINGS(2); private final int id; diff --git a/src/java/org/apache/cassandra/net/ParamType.java b/src/java/org/apache/cassandra/net/ParamType.java index 5d4b982ce578..d8b8c0ed2c09 100644 --- a/src/java/org/apache/cassandra/net/ParamType.java +++ b/src/java/org/apache/cassandra/net/ParamType.java @@ -19,12 +19,12 @@ import java.util.HashMap; import java.util.Map; - import javax.annotation.Nullable; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.Int32Serializer; import org.apache.cassandra.utils.UUIDSerializer; import static java.lang.Math.max; @@ -54,7 +54,10 @@ public enum ParamType TRACE_TYPE (6, "TraceType", Tracing.traceTypeSerializer), @Deprecated - TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer); + TRACK_REPAIRED_DATA (7, "TrackRepaired", LegacyFlag.serializer), + + TOMBSTONE_ABORT(8, "TSA", Int32Serializer.serializer), + TOMBSTONE_WARNING(9, "TSW", Int32Serializer.serializer); final int id; @Deprecated final String legacyAlias; // pre-4.0 we used to serialize entire param name string diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 2f6ad38b5fce..1387ce37e71a 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -53,11 +53,14 @@ import org.apache.cassandra.batchlog.BatchlogManager; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.RejectException; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.CounterMutation; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.EmptyIterators; import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.MessageParams; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.PartitionRangeReadCommand; import org.apache.cassandra.db.ReadCommand; @@ -75,6 +78,7 @@ import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.view.ViewUtils; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ReadAbortException; import org.apache.cassandra.exceptions.CasWriteTimeoutException; import org.apache.cassandra.exceptions.CasWriteUnknownResultException; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -357,6 +361,12 @@ public static RowIterator cas(String keyspaceName, writeMetricsMap.get(consistencyForPaxos).timeouts.mark(); throw e; } + catch (ReadAbortException e) + { + casWriteMetrics.markAbort(e); + writeMetricsMap.get(consistencyForPaxos).markAbort(e); + throw e; + } catch (WriteFailureException | ReadFailureException e) { casWriteMetrics.failures.mark(); @@ -1798,6 +1808,13 @@ private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group readMetricsMap.get(consistencyLevel).timeouts.mark(); throw e; } + catch (ReadAbortException e) + { + readMetrics.markAbort(e); + casReadMetrics.markAbort(e); + readMetricsMap.get(consistencyLevel).markAbort(e); + throw e; + } catch (ReadFailureException e) { readMetrics.failures.mark(); @@ -1846,6 +1863,11 @@ private static PartitionIterator readRegular(SinglePartitionReadCommand.Group gr readMetricsMap.get(consistencyLevel).timeouts.mark(); throw e; } + catch (ReadAbortException e) + { + recordReadRegularAbort(consistencyLevel, e); + throw e; + } catch (ReadFailureException e) { readMetrics.failures.mark(); @@ -1863,6 +1885,12 @@ private static PartitionIterator readRegular(SinglePartitionReadCommand.Group gr } } + public static void recordReadRegularAbort(ConsistencyLevel consistencyLevel, Throwable cause) + { + readMetrics.markAbort(cause); + readMetricsMap.get(consistencyLevel).markAbort(cause); + } + public static PartitionIterator concatAndBlockOnRepair(List iterators, List> repairs) { PartitionIterator concatenated = PartitionIterators.concat(iterators); @@ -1980,6 +2008,9 @@ protected void runMayThrow() { try { + MessageParams.reset(); + + boolean readRejected = false; command.setMonitoringTime(approxCreationTimeNanos, false, verb.expiresAfterNanos(), DatabaseDescriptor.getSlowQueryTimeout(NANOSECONDS)); ReadResponse response; @@ -1988,6 +2019,13 @@ protected void runMayThrow() { response = command.createResponse(iterator); } + catch (RejectException e) + { + if (!command.isTrackingWarnings()) + throw e; + response = command.createResponse(EmptyIterators.unfilteredPartition(command.metadata())); + readRejected = true; + } if (command.complete()) { @@ -1999,7 +2037,8 @@ protected void runMayThrow() handler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.UNKNOWN); } - MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(), MonotonicClock.approxTime.now() - approxCreationTimeNanos, NANOSECONDS); + if (!readRejected) + MessagingService.instance().latencySubscribers.add(FBUtilities.getBroadcastAddressAndPort(), MonotonicClock.approxTime.now() - approxCreationTimeNanos, NANOSECONDS); } catch (Throwable t) { diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index c226d3723d2b..29b52b11e93e 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -6059,4 +6059,43 @@ public int getCompactionTombstoneWarningThreshold() public void addSnapshot(TableSnapshot snapshot) { snapshotManager.addSnapshot(snapshot); } + + @Override + public long getClientLargeReadWarnThresholdKB() + { + return DatabaseDescriptor.getClientLargeReadWarnThresholdKB(); + } + + @Override + public void setClientLargeReadWarnThresholdKB(long threshold) + { + DatabaseDescriptor.setClientLargeReadWarnThresholdKB(threshold); + logger.info("updated client_large_read_warn_threshold_kb to {}", threshold); + } + + @Override + public long getClientLargeReadAbortThresholdKB() + { + return DatabaseDescriptor.getClientLargeReadAbortThresholdKB(); + } + + @Override + public void setClientLargeReadAbortThresholdKB(long threshold) + { + DatabaseDescriptor.setClientLargeReadAbortThresholdKB(threshold); + logger.info("updated client_large_read_abort_threshold_kb to {}", threshold); + } + + @Override + public boolean getClientTrackWarningsEnabled() + { + return DatabaseDescriptor.getClientTrackWarningsEnabled(); + } + + @Override + public void setClientTrackWarningsEnabled(boolean value) + { + DatabaseDescriptor.setClientTrackWarningsEnabled(value); + logger.info("updated client_track_warnings_enabled to {}", value); + } } diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 3154c82ea9f5..4a27f8439e5d 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -884,4 +884,11 @@ public void enableAuditLog(String loggerName, String includedKeyspaces, String e public void setCompactionTombstoneWarningThreshold(int count); public int getCompactionTombstoneWarningThreshold(); + + public long getClientLargeReadWarnThresholdKB(); + public void setClientLargeReadWarnThresholdKB(long threshold); + public long getClientLargeReadAbortThresholdKB(); + public void setClientLargeReadAbortThresholdKB(long threshold); + public boolean getClientTrackWarningsEnabled(); + public void setClientTrackWarningsEnabled(boolean value); } diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java index 91d9370f8f06..67031478a586 100644 --- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java @@ -20,9 +20,16 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.MessageParams; +import org.apache.cassandra.exceptions.TombstoneAbortException; import org.apache.cassandra.locator.ReplicaPlan; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,9 +42,12 @@ import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.ParamType; import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.Verb; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.concurrent.SimpleCondition; @@ -46,6 +56,31 @@ public class ReadCallback, P extends ReplicaPlan.ForRead> implements RequestCallback { protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class ); + private class WarningCounter + { + // the highest number of tombstones reported by a node's warning + final AtomicInteger tombstoneWarnings = new AtomicInteger(); + final AtomicInteger maxTombstoneWarningCount = new AtomicInteger(); + // the highest number of tombstones reported by a node's rejection. This should be the same as + // our configured limit, but including to aid in diagnosing misconfigurations + final AtomicInteger tombstoneAborts = new AtomicInteger(); + final AtomicInteger maxTombstoneAbortsCount = new AtomicInteger(); + + // TODO: take message as arg and return boolean for 'had warning' etc + void addTombstoneWarning(InetAddressAndPort from, int tombstones) + { + if (!waitingFor(from)) return; + tombstoneWarnings.incrementAndGet(); + maxTombstoneWarningCount.accumulateAndGet(tombstones, Math::max); + } + + void addTombstoneAbort(InetAddressAndPort from, int tombstones) + { + if (!waitingFor(from)) return; + tombstoneAborts.incrementAndGet(); + maxTombstoneAbortsCount.accumulateAndGet(tombstones, Math::max); + } + } public final ResponseResolver resolver; final SimpleCondition condition = new SimpleCondition(); @@ -59,6 +94,9 @@ public class ReadCallback, P extends ReplicaPlan.ForRead< = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "failures"); private volatile int failures = 0; private final Map failureReasonByEndpoint; + private volatile WarningCounter warningCounter; + private static final AtomicReferenceFieldUpdater warningsUpdater + = AtomicReferenceFieldUpdater.newUpdater(ReadCallback.class, ReadCallback.WarningCounter.class, "warningCounter"); public ReadCallback(ResponseResolver resolver, ReadCommand command, ReplicaPlan.Shared replicaPlan, long queryStartNanoTime) { @@ -93,6 +131,23 @@ public boolean await(long timePastStart, TimeUnit unit) } } + @VisibleForTesting + public static String tombstoneAbortMessage(int nodes, int tombstones, String cql) + { + return String.format("%s nodes scanned over %s tombstones and aborted the query %s (see tombstone_failure_threshold)", nodes, tombstones, cql); + } + + @VisibleForTesting + public static String tombstoneWarnMessage(int nodes, int tombstones, String cql) + { + return String.format("%s nodes scanned up to %s tombstones and issued tombstone warnings for query %s (see tombstone_warn_threshold)", nodes, tombstones, cql); + } + + private ColumnFamilyStore cfs() + { + return Schema.instance.getColumnFamilyStoreInstance(command.metadata().id); + } + public void awaitResults() throws ReadFailureException, ReadTimeoutException { boolean signaled = await(command.getTimeout(MILLISECONDS), TimeUnit.MILLISECONDS); @@ -105,6 +160,25 @@ public void awaitResults() throws ReadFailureException, ReadTimeoutException */ int received = resolver.responses.size(); boolean failed = failures > 0 && (blockFor > received || !resolver.isDataPresent()); + WarningCounter warnings = warningCounter; + if (warnings != null) + { + if (warnings.tombstoneAborts.get() > 0) + { + String msg = tombstoneAbortMessage(warnings.tombstoneAborts.get(), warnings.maxTombstoneAbortsCount.get(), command.toCQLString()); + ClientWarn.instance.warn(msg + " with " + command.loggableTokens()); + logger.warn(msg); + cfs().metric.clientTombstoneAborts.mark(); + } + + if (warnings.tombstoneWarnings.get() > 0) + { + String msg = tombstoneWarnMessage(warnings.tombstoneWarnings.get(), warnings.maxTombstoneWarningCount.get(), command.toCQLString()); + ClientWarn.instance.warn(msg + " with " + command.loggableTokens()); + logger.warn(msg); + cfs().metric.clientTombstoneWarnings.mark(); + } + } if (signaled && !failed) return; @@ -119,6 +193,10 @@ else if (logger.isDebugEnabled()) logger.debug("{}; received {} of {} responses{}", failed ? "Failed" : "Timed out", received, blockFor, gotData); } + if (warnings != null && warnings.tombstoneAborts.get() > 0) + throw new TombstoneAbortException(warnings.tombstoneAborts.get(), warnings.maxTombstoneAbortsCount.get(), command.toCQLString(), resolver.isDataPresent(), + replicaPlan.get().consistencyLevel(), received, blockFor, failureReasonByEndpoint); + // Same as for writes, see AbstractWriteResponseHandler throw failed ? new ReadFailureException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint) @@ -134,6 +212,17 @@ public int blockFor() public void onResponse(Message message) { assertWaitingFor(message.from()); + Map params = message.header.params(); + if (params.containsKey(ParamType.TOMBSTONE_ABORT)) + { + getWarningCounter().addTombstoneAbort(message.from(), (Integer) params.get(ParamType.TOMBSTONE_ABORT)); + onFailure(message.from(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + return; + } + else if (params.containsKey(ParamType.TOMBSTONE_WARNING)) + { + getWarningCounter().addTombstoneWarning(message.from(), (Integer) params.get(ParamType.TOMBSTONE_WARNING)); + } resolver.preprocess(message); /* @@ -146,10 +235,25 @@ public void onResponse(Message message) condition.signalAll(); } + private WarningCounter getWarningCounter() + { + WarningCounter current; + do { + + current = warningCounter; + if (current != null) + return current; + + current = new WarningCounter(); + } while (!warningsUpdater.compareAndSet(this, null, current)); + return current; + } + public void response(ReadResponse result) { Verb kind = command.isRangeRequest() ? Verb.RANGE_RSP : Verb.READ_RSP; Message message = Message.internalResponse(kind, result); + message = MessageParams.addToMessage(message); onResponse(message); } @@ -182,8 +286,12 @@ public boolean invokeOnFailure() */ private void assertWaitingFor(InetAddressAndPort from) { - assert !replicaPlan().consistencyLevel().isDatacenterLocal() - || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from)) - : "Received read response from unexpected replica: " + from; + assert waitingFor(from): "Received read response from unexpected replica: " + from; + } + + private boolean waitingFor(InetAddressAndPort from) + { + return !replicaPlan().consistencyLevel().isDatacenterLocal() + || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from)); } } diff --git a/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java index ae7ee60666db..d4dd3c82e598 100644 --- a/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java +++ b/src/java/org/apache/cassandra/service/reads/range/RangeCommandIterator.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.exceptions.ReadAbortException; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.UnavailableException; @@ -129,6 +130,11 @@ protected RowIterator computeNext() rangeMetrics.timeouts.mark(); throw e; } + catch (ReadAbortException e) + { + rangeMetrics.markAbort(e); + throw e; + } catch (ReadFailureException e) { rangeMetrics.failures.mark(); diff --git a/src/java/org/apache/cassandra/utils/Int32Serializer.java b/src/java/org/apache/cassandra/utils/Int32Serializer.java new file mode 100644 index 000000000000..731f5aa038a9 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/Int32Serializer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import java.io.IOException; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class Int32Serializer implements IVersionedSerializer +{ + public static final Int32Serializer serializer = new Int32Serializer(); + + public void serialize(Integer t, DataOutputPlus out, int version) throws IOException + { + out.writeInt(t); + } + + public Integer deserialize(DataInputPlus in, int version) throws IOException + { + return in.readInt(); + } + + public long serializedSize(Integer t, int version) + { + return TypeSizes.sizeof(t.intValue()); + } +} diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index fcdef2df53f1..c8ac72b3614a 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -53,3 +53,6 @@ enable_materialized_views: true enable_drop_compact_storage: true file_cache_enabled: true auto_hints_cleanup_enabled: true +client_track_warnings_enabled: true +client_large_read_warn_threshold_kb: 1024 +client_large_read_abort_threshold_kb: 4096 diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java new file mode 100644 index 000000000000..2d790f4703ad --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ClientReadSizeWarningTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.function.Consumer; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import org.junit.*; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.SimpleStatement; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.*; +import org.apache.cassandra.exceptions.ReadSizeAbortException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.service.QueryState; +import org.assertj.core.api.Assertions; +import org.assertj.core.api.Condition; + +/** + * ReadSize client warn/abort is coordinator only, so the fact ClientMetrics is coordinator only does not + * impact the user experience + */ +public class ClientReadSizeWarningTest extends TestBaseImpl +{ + private static final Random RANDOM = new Random(0); + private static ICluster CLUSTER; + private static com.datastax.driver.core.Cluster JAVA_DRIVER; + private static com.datastax.driver.core.Session JAVA_DRIVER_SESSION; + + @BeforeClass + public static void setupClass() throws IOException + { + Cluster.Builder builder = Cluster.build(3); + builder.withConfig(c -> c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP)); + CLUSTER = builder.start(); + JAVA_DRIVER = JavaDriverUtils.create(CLUSTER); + JAVA_DRIVER_SESSION = JAVA_DRIVER.connect(); + + // setup threshold after init to avoid driver issues loading + // the test uses a rather small limit, which causes driver to fail while loading metadata + CLUSTER.stream().forEach(i -> i.runOnInstance(() -> { + DatabaseDescriptor.setClientLargeReadWarnThresholdKB(1); + DatabaseDescriptor.setClientLargeReadAbortThresholdKB(2); + })); + } + + @Before + public void setup() + { + CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE); + init(CLUSTER); + CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v blob, PRIMARY KEY (pk, ck))"); + } + + private static void enable(boolean value) + { + CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setClientTrackWarningsEnabled(value))); + } + + private static void assertPrefix(String expectedPrefix, String actual) + { + if (!actual.startsWith(expectedPrefix)) + throw new AssertionError(String.format("expected \"%s\" to begin with \"%s\"", actual, expectedPrefix)); + } + + private static ByteBuffer bytes(int size) + { + byte[] b = new byte[size]; + RANDOM.nextBytes(b); + return ByteBuffer.wrap(b); + } + + @Test + public void noWarningsSinglePartition() + { + noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1"); + } + + @Test + public void noWarningsScan() + { + noWarnings("SELECT * FROM " + KEYSPACE + ".tbl"); + } + + public void noWarnings(String cql) + { + CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(128)); + CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(128)); + + Consumer> test = warnings -> + Assert.assertEquals(Collections.emptyList(), warnings); + + for (boolean b : Arrays.asList(true, false)) + { + enable(b); + SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL); + test.accept(result.warnings()); + test.accept(driverQueryAll(cql).getExecutionInfo().getWarnings()); + assertWarnAborts(0, 0, 0); + } + } + + @Test + public void warnThresholdSinglePartition() + { + warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1"); + } + + @Test + public void warnThresholdScan() + { + warnThreshold("SELECT * FROM " + KEYSPACE + ".tbl"); + } + + public void warnThreshold(String cql) + { + CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(512)); + CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(512)); + + Consumer> testEnabled = warnings -> + assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size warning threshold", Iterables.getOnlyElement(warnings)); + + enable(true); + SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL); + testEnabled.accept(result.warnings()); + assertWarnAborts(1, 0, 0); + testEnabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings()); + assertWarnAborts(2, 0, 0); + + enable(false); + result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL); + Assertions.assertThat(result.warnings()).isEmpty(); + Assertions.assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty(); + assertWarnAborts(2, 0, 0); + } + + @Test + public void failThresholdSinglePartition() throws UnknownHostException + { + failThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1"); + } + + @Test + public void failThresholdScan() throws UnknownHostException + { + failThreshold("SELECT * FROM " + KEYSPACE + ".tbl"); + } + + public void failThreshold(String cql) throws UnknownHostException + { + CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, ?)", ConsistencyLevel.ALL, bytes(512)); + CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, ?)", ConsistencyLevel.ALL, bytes(512)); + CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, ?)", ConsistencyLevel.ALL, bytes(512)); + CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 4, ?)", ConsistencyLevel.ALL, bytes(512)); + CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 5, ?)", ConsistencyLevel.ALL, bytes(512)); + + enable(true); + List warnings = CLUSTER.get(1).callsOnInstance(() -> { + ClientWarn.instance.captureWarnings(); + try + { + QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls()); + Assert.fail("Expected query failure"); + } + catch (ReadSizeAbortException e) + { + // expected, client transport returns an error message and includes client warnings + } + return ClientWarn.instance.getWarnings(); + }).call(); + Assertions.assertThat(warnings).hasSize(1); + assertPrefix("Read on table " + KEYSPACE + ".tbl has exceeded the size failure threshold", warnings.get(0)); + assertWarnAborts(0, 1, 1); + + try + { + driverQueryAll(cql); + Assert.fail("Query should have thrown ReadFailureException"); + } + catch (com.datastax.driver.core.exceptions.ReadFailureException e) + { + // without changing the client can't produce a better message... + // client does NOT include the message sent from the server in the exception; so the message doesn't work + // well in this case + Assertions.assertThat(e.getMessage()).endsWith("(1 responses were required but only 0 replica responded, 1 failed)"); + ImmutableSet expectedKeys = ImmutableSet.of(InetAddress.getByAddress(new byte[]{ 127, 0, 0, 1 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 2 }), InetAddress.getByAddress(new byte[]{ 127, 0, 0, 3 })); + Assertions.assertThat(e.getFailuresMap()) + .hasSize(1) + // coordinator changes from run to run, so can't assert map as the key is dynamic... so assert the domain of keys and the single value expect + .containsValue(RequestFailureReason.READ_TOO_LARGE.code) + .hasKeySatisfying(new Condition() { + public boolean matches(InetAddress value) + { + return expectedKeys.contains(value); + } + }); + } + assertWarnAborts(0, 2, 1); + + // query should no longer fail + enable(false); + SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL); + Assertions.assertThat(result.warnings()).isEmpty(); + Assertions.assertThat(driverQueryAll(cql).getExecutionInfo().getWarnings()).isEmpty(); + assertWarnAborts(0, 2, 0); + } + + private static long GLOBAL_READ_ABORTS = 0; + private static void assertWarnAborts(int warns, int aborts, int globalAborts) + { + Assertions.assertThat(totalWarnings()).as("warnings").isEqualTo(warns); + Assertions.assertThat(totalAborts()).as("aborts").isEqualTo(aborts); + long expectedGlobalAborts = GLOBAL_READ_ABORTS + globalAborts; + Assertions.assertThat(totalReadAborts()).as("global aborts").isEqualTo(expectedGlobalAborts); + GLOBAL_READ_ABORTS = expectedGlobalAborts; + } + + private static long totalWarnings() + { + return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientReadSizeWarnings." + KEYSPACE)).sum(); + } + + private static long totalAborts() + { + return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientReadSizeAborts." + KEYSPACE)).sum(); + } + + private static long totalReadAborts() + { + return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.Read-ALL")).sum(); + } + + private static ResultSet driverQueryAll(String cql) + { + return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL)); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java b/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java new file mode 100644 index 000000000000..d529515a8f4a --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ClientTombstoneWarningTest.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.SimpleStatement; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.exceptions.TombstoneAbortException; +import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.service.QueryState; +import org.assertj.core.api.Assertions; + +public class ClientTombstoneWarningTest extends TestBaseImpl +{ + private static final int TOMBSTONE_WARN = 50; + private static final int TOMBSTONE_FAIL = 100; + private static ICluster CLUSTER; + private static com.datastax.driver.core.Cluster JAVA_DRIVER; + private static com.datastax.driver.core.Session JAVA_DRIVER_SESSION; + + @BeforeClass + public static void setupClass() throws IOException + { + Cluster.Builder builder = Cluster.build(3); + builder.withConfig(c -> c.set("tombstone_warn_threshold", TOMBSTONE_WARN) + .set("tombstone_failure_threshold", TOMBSTONE_FAIL) + .with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP)); + CLUSTER = builder.start(); + JAVA_DRIVER = JavaDriverUtils.create(CLUSTER); + JAVA_DRIVER_SESSION = JAVA_DRIVER.connect(); + } + + @AfterClass + public static void teardown() + { + if (JAVA_DRIVER_SESSION != null) + JAVA_DRIVER_SESSION.close(); + if (JAVA_DRIVER != null) + JAVA_DRIVER.close(); + } + + @Before + public void setup() + { + CLUSTER.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE); + init(CLUSTER); + CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + } + + private static void enable(boolean value) + { + CLUSTER.stream().forEach(i -> i.runOnInstance(() -> DatabaseDescriptor.setClientTrackWarningsEnabled(value))); + } + + @Test + public void noWarningsSinglePartition() + { + noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1"); + } + + @Test + public void noWarningsScan() + { + noWarnings("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk=1"); + } + + public void noWarnings(String cql) + { + Consumer> test = warnings -> + Assert.assertEquals(Collections.emptyList(), warnings); + + for (int i=0; i> testEnabled = warnings -> + Assertions.assertThat(Iterables.getOnlyElement(warnings)) + .contains("nodes scanned up to " + (TOMBSTONE_WARN + 1) + " tombstones and issued tombstone warnings for query " + cql); + + SimpleQueryResult result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL); + testEnabled.accept(result.warnings()); + assertWarnAborts(1, 0, 0); + testEnabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings()); + assertWarnAborts(2, 0, 0); + + enable(false); + Consumer> testDisabled = warnings -> { + // client warnings are currently coordinator only, so if present only 1 is expected + if (isScan) + { + // Scans perform multiple ReadCommands, which will not propgate the warnings to the top-level coordinator; so no warnings are expected + Assertions.assertThat(warnings).isEmpty(); + } + else + { + Assertions.assertThat(Iterables.getOnlyElement(warnings)) + .startsWith("Read " + (TOMBSTONE_WARN + 1) + " live rows and " + (TOMBSTONE_WARN + 1) + " tombstone cells for query " + cql); + } + }; + result = CLUSTER.coordinator(1).executeWithResult(cql, ConsistencyLevel.ALL); + testDisabled.accept(result.warnings()); + assertWarnAborts(2, 0, 0); + testDisabled.accept(driverQueryAll(cql).getExecutionInfo().getWarnings()); + assertWarnAborts(2, 0, 0); + } + + @Test + public void failThresholdSinglePartition() throws UnknownHostException + { + failThreshold("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", false); + } + + @Test + public void failThresholdScan() throws UnknownHostException + { + failThreshold("SELECT * FROM " + KEYSPACE + ".tbl", true); + } + + private void failThreshold(String cql, boolean isScan) throws UnknownHostException + { + for (int i = 0; i < TOMBSTONE_FAIL + 1; i++) + CLUSTER.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, ?, null)", ConsistencyLevel.ALL, i); + + enable(true); + List warnings = CLUSTER.get(1).callsOnInstance(() -> { + ClientWarn.instance.captureWarnings(); + try + { + QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls()); + Assert.fail("Expected query failure"); + } + catch (TombstoneAbortException e) + { + Assert.assertTrue(e.nodes >= 1 && e.nodes <= 3); + Assert.assertEquals(TOMBSTONE_FAIL + 1, e.tombstones); + // expected, client transport returns an error message and includes client warnings + } + return ClientWarn.instance.getWarnings(); + }).call(); + Assertions.assertThat(Iterables.getOnlyElement(warnings)) + .contains("nodes scanned over " + (TOMBSTONE_FAIL + 1) + " tombstones and aborted the query " + cql); + + assertWarnAborts(0, 1, 1); + + try + { + driverQueryAll(cql); + Assert.fail("Query should have thrown ReadFailureException"); + } + catch (com.datastax.driver.core.exceptions.ReadFailureException e) + { + // without changing the client can't produce a better message... + // client does NOT include the message sent from the server in the exception; so the message doesn't work + // well in this case + Assertions.assertThat(e.getMessage()).contains("(3 responses were required but only 0 replica responded"); // can't include ', 3 failed)' as some times its 2 + Assertions.assertThat(e.getFailuresMap()) + .isEqualTo(ImmutableMap.of( + InetAddress.getByAddress(new byte[] {127, 0, 0, 1}), RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code, + InetAddress.getByAddress(new byte[] {127, 0, 0, 2}), RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code, + InetAddress.getByAddress(new byte[] {127, 0, 0, 3}), RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code)); + } + + assertWarnAborts(0, 2, 1); + + enable(false); + warnings = CLUSTER.get(1).callsOnInstance(() -> { + ClientWarn.instance.captureWarnings(); + try + { + QueryProcessor.execute(cql, org.apache.cassandra.db.ConsistencyLevel.ALL, QueryState.forInternalCalls()); + Assert.fail("Expected query failure"); + } + catch (ReadFailureException e) + { + Assertions.assertThat(e).isNotInstanceOf(TombstoneAbortException.class); + } + return ClientWarn.instance.getWarnings(); + }).call(); + // client warnings are currently coordinator only, so if present only 1 is expected + if (isScan) + { + // Scans perform multiple ReadCommands, which will not propgate the warnings to the top-level coordinator; so no warnings are expected + Assertions.assertThat(warnings).isNull(); + } + else + { + Assertions.assertThat(Iterables.getOnlyElement(warnings)) + .startsWith("Read " + TOMBSTONE_FAIL + " live rows and " + (TOMBSTONE_FAIL + 1) + " tombstone cells for query " + cql); + } + + assertWarnAborts(0, 2, 0); + + try + { + driverQueryAll(cql); + Assert.fail("Query should have thrown ReadFailureException"); + } + catch (com.datastax.driver.core.exceptions.ReadFailureException e) + { + // not checking the message as different cases exist for the failure, so the fact that this failed is enough + + Assertions.assertThat(e.getFailuresMap()) + .isNotEmpty(); + Assertions.assertThat(e.getFailuresMap().values()) + .as("Non READ_TOO_MANY_TOMBSTONES exists") + .allMatch(i -> i.equals(RequestFailureReason.READ_TOO_MANY_TOMBSTONES.code)); + } + + assertWarnAborts(0, 2, 0); + } + + private static long GLOBAL_READ_ABORTS = 0; + private static void assertWarnAborts(int warns, int aborts, int globalAborts) + { + Assertions.assertThat(totalWarnings()).as("warnings").isEqualTo(warns); + Assertions.assertThat(totalAborts()).as("aborts").isEqualTo(aborts); + long expectedGlobalAborts = GLOBAL_READ_ABORTS + globalAborts; + Assertions.assertThat(totalReadAborts()).as("global aborts").isEqualTo(expectedGlobalAborts); + GLOBAL_READ_ABORTS = expectedGlobalAborts; + } + + private static long totalWarnings() + { + return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientTombstoneWarnings." + KEYSPACE)).sum(); + } + + private static long totalAborts() + { + return CLUSTER.stream().mapToLong(i -> i.metrics().getCounter("org.apache.cassandra.metrics.keyspace.ClientTombstoneAborts." + KEYSPACE)).sum(); + } + + private static long totalReadAborts() + { + return CLUSTER.stream().mapToLong(i -> + i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.Read-ALL") + + i.metrics().getCounter("org.apache.cassandra.metrics.ClientRequest.Aborts.RangeSlice") + ).sum(); + } + + private static ResultSet driverQueryAll(String cql) + { + return JAVA_DRIVER_SESSION.execute(new SimpleStatement(cql).setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL)); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java b/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java new file mode 100644 index 000000000000..bc39ba1633c8 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/JavaDriverUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstance; + +public final class JavaDriverUtils +{ + private JavaDriverUtils() + { + } + + public static com.datastax.driver.core.Cluster create(ICluster dtest) + { + if (dtest.size() == 0) + throw new IllegalArgumentException("Attempted to open java driver for empty cluster"); + + // make sure the needed Features are added + dtest.stream().forEach(i -> { + if (!(i.config().has(Feature.NATIVE_PROTOCOL) && i.config().has(Feature.GOSSIP))) // gossip is needed as currently Host.getHostId is empty without it + throw new IllegalStateException("java driver requires Feature.NATIVE_PROTOCOL and Feature.GOSSIP; but one or more is missing"); + }); + + com.datastax.driver.core.Cluster.Builder builder = com.datastax.driver.core.Cluster.builder(); + + //TODO support port + //TODO support auth + dtest.stream().forEach(i -> builder.addContactPoint(i.broadcastAddress().getAddress().getHostAddress())); + + return builder.build(); + } +} diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java index 868227390427..47b24174a085 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java @@ -166,7 +166,8 @@ private static SinglePartitionReadCommand command(TableMetadata metadata) DataLimits.NONE, KEY, new ClusteringIndexSliceFilter(Slices.ALL, false), - null); + null, + false); } private static DecoratedKey key(TableMetadata metadata, int key) diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java index 6e1a804e5fb6..f625d41e7179 100644 --- a/test/unit/org/apache/cassandra/db/ReadResponseTest.java +++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java @@ -236,7 +236,8 @@ private static class StubReadCommand extends SinglePartitionReadCommand DataLimits.NONE, metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)), null, - null); + null, + false); this.repairedDigest = repairedDigest; this.conclusive = conclusive; } diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java index 6fc8fbfc0354..ac6820548ad9 100644 --- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java +++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java @@ -246,7 +246,7 @@ public static class MockSinglePartitionReadCommand extends SinglePartitionReadCo MockSinglePartitionReadCommand(long timeout) { - super(false, 0, false, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null); + super(false, 0, false, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null, false); this.timeout = timeout; } diff --git a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java index 169e09d67802..592bff8d3290 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/RepairedDataVerifierTest.java @@ -287,7 +287,8 @@ private static class StubReadCommand extends SinglePartitionReadCommand DataLimits.NONE, metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)), new ClusteringIndexSliceFilter(Slices.ALL, false), - null); + null, + false); } } }