From 4b3f07fc74089151efeff7a8fdfa9c414a1f0d6a Mon Sep 17 00:00:00 2001 From: David Capwell Date: Fri, 20 Aug 2021 12:07:44 -0700 Subject: [PATCH] allow blocking IPs from updating metrics about traffic patch by David Capwell; reviewed by Benjamin Lerer, Caleb Rackliffe, Jon Meredith for CASSANDRA-16859 --- .build/build-rat.xml | 1 + CHANGES.txt | 1 + build.xml | 2 + conf/cassandra.yaml | 8 + .../org/apache/cassandra/config/Config.java | 3 + .../cassandra/config/DatabaseDescriptor.java | 10 ++ .../apache/cassandra/config/SubnetGroups.java | 139 +++++++++++++++++ .../net/InboundConnectionInitiator.java | 26 +++- .../cassandra/transport/Dispatcher.java | 3 +- .../transport/ExceptionHandlers.java | 26 +++- .../cassandra/transport/PreV5Handlers.java | 14 +- .../cassandra/distributed/impl/Instance.java | 2 + .../distributed/impl/InstanceMetrics.java | 26 +++- .../test/FailingResponseDoesNotLogTest.java | 139 +++++++++++++++++ .../test/InternodeErrorExclusionTest.java | 92 ++++++++++++ ...rseClientMessageFromBlockedSubnetTest.java | 142 ++++++++++++++++++ .../test/UnableToParseClientMessageTest.java | 3 +- ...red_client_error_reporting_exclusions.yaml | 6 + .../config/DatabaseDescriptorRefTest.java | 16 +- .../config/YamlConfigurationLoaderTest.java | 15 ++ 20 files changed, 655 insertions(+), 19 deletions(-) create mode 100644 src/java/org/apache/cassandra/config/SubnetGroups.java create mode 100644 test/distributed/org/apache/cassandra/distributed/test/FailingResponseDoesNotLogTest.java create mode 100644 test/distributed/org/apache/cassandra/distributed/test/InternodeErrorExclusionTest.java create mode 100644 test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageFromBlockedSubnetTest.java create mode 100644 test/resources/data/config/YamlConfigurationLoaderTest/shared_client_error_reporting_exclusions.yaml diff --git a/.build/build-rat.xml b/.build/build-rat.xml index ce4e6f4afbce..767ae0641abb 100644 --- a/.build/build-rat.xml +++ b/.build/build-rat.xml @@ -63,6 +63,7 @@ + diff --git a/CHANGES.txt b/CHANGES.txt index 275ebc79657b..e683c95fd847 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1 + * allow blocking IPs from updating metrics about traffic (CASSANDRA-16859) * Request-Based Native Transport Rate-Limiting (CASSANDRA-16663) * Implement nodetool getauditlog command (CASSANDRA-16725) * Clean up repair code (CASSANDRA-13720) diff --git a/build.xml b/build.xml index 99fc92c59d07..a7ab23b5591a 100644 --- a/build.xml +++ b/build.xml @@ -655,6 +655,7 @@ + @@ -826,6 +827,7 @@ + diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 907c54e0805b..bf8c358f5adc 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1445,3 +1445,11 @@ enable_transient_replication: false # Enables the used of 'ALTER ... DROP COMPACT STORAGE' statements on this node. # 'ALTER ... DROP COMPACT STORAGE' is considered experimental and is not recommended for production use. enable_drop_compact_storage: false + +# When the client triggers a protocol exception or unknown issue (Cassandra bug) we increment +# a client metric showing this; this logic will exclude specific subnets from updating these +# metrics +#client_error_reporting_exclusions: +# subnets: +# - 127.0.0.1 +# - 127.0.0.0/31 diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index f7edda0852ea..bd2177d6d9e1 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -570,6 +570,9 @@ public static void setClientMode(boolean clientMode) public volatile int consecutive_message_errors_threshold = 1; + public volatile SubnetGroups client_error_reporting_exclusions = new SubnetGroups(); + public volatile SubnetGroups internode_error_reporting_exclusions = new SubnetGroups(); + public static Supplier getOverrideLoadConfig() { return overrideLoadConfig; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 722e63d4c639..19e79d7a7a6f 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -3433,4 +3433,14 @@ public static void setConsecutiveMessageErrorsThreshold(int value) { conf.consecutive_message_errors_threshold = value; } + + public static SubnetGroups getClientErrorReportingExclusions() + { + return conf.client_error_reporting_exclusions; + } + + public static SubnetGroups getInternodeErrorReportingExclusions() + { + return conf.internode_error_reporting_exclusions; + } } diff --git a/src/java/org/apache/cassandra/config/SubnetGroups.java b/src/java/org/apache/cassandra/config/SubnetGroups.java new file mode 100644 index 000000000000..4f460fab3855 --- /dev/null +++ b/src/java/org/apache/cassandra/config/SubnetGroups.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.config; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + +import inet.ipaddr.IPAddressNetwork; +import inet.ipaddr.IPAddressString; + +/** + * When a group of subnets are needed, this class can be used to represent the group as if it was a single subnet. + * + * This class supports IPV4 and IPV6 subnets + */ +public class SubnetGroups +{ + public Set subnets = Collections.emptySet(); + + public SubnetGroups() + { + } + + /** Used by SnakeYaml */ + @SuppressWarnings("unused") + public SubnetGroups(List values) + { + this.subnets = ImmutableSet.copyOf(values.stream().map(Group::new).collect(Collectors.toSet())); + } + + public boolean contains(SocketAddress address) + { + Preconditions.checkNotNull(address); + Preconditions.checkArgument(address instanceof InetSocketAddress, "Unsupported socket address type: " + address.getClass()); + return contains(((InetSocketAddress) address).getAddress()); + } + + public boolean contains(InetAddress address) + { + for (Group group : subnets) + { + if (group.contains(address)) + { + return true; + } + } + return false; + } + + public boolean isEmpty() + { + return subnets.isEmpty(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SubnetGroups that = (SubnetGroups) o; + return subnets.equals(that.subnets); + } + + @Override + public int hashCode() + { + return Objects.hash(subnets); + } + + @Override + public String toString() + { + return "SubnetGroups{" + + "subnets=" + subnets + + '}'; + } + + private static class Group + { + private static final IPAddressNetwork.IPAddressGenerator IP_ADDRESS_GENERATOR = new IPAddressNetwork.IPAddressGenerator(); + + private final IPAddressString subnet; + + Group(String range) + { + subnet = new IPAddressString(range); + } + + boolean contains(InetAddress address) + { + return subnet.contains(IP_ADDRESS_GENERATOR.from(address).toAddressString()); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Group group = (Group) o; + return subnet.equals(group.subnet); + } + + @Override + public int hashCode() + { + return Objects.hash(subnet); + } + + @Override + public String toString() + { + return subnet.toString(); + } + } +} diff --git a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java index 4c31adf2a586..d74f802a5516 100644 --- a/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java +++ b/src/java/org/apache/cassandra/net/InboundConnectionInitiator.java @@ -34,6 +34,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -44,6 +45,7 @@ import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.EncryptionOptions; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.InetAddressAndPort; @@ -67,6 +69,8 @@ public class InboundConnectionInitiator private static class Initializer extends ChannelInitializer { + private static final String PIPELINE_INTERNODE_ERROR_EXCLUSIONS = "Internode Error Exclusions"; + private final InboundConnectionSettings settings; private final ChannelGroup channelGroup; private final Consumer pipelineInjector; @@ -82,6 +86,9 @@ private static class Initializer extends ChannelInitializer @Override public void initChannel(SocketChannel channel) throws Exception { + // if any of the handlers added fail they will send the error to the "head", so this needs to be first + channel.pipeline().addFirst(PIPELINE_INTERNODE_ERROR_EXCLUSIONS, new InternodeErrorExclusionsHandler()); + channelGroup.add(channel); channel.config().setOption(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance); @@ -99,14 +106,14 @@ public void initChannel(SocketChannel channel) throws Exception { case UNENCRYPTED: // Handler checks for SSL connection attempts and cleanly rejects them if encryption is disabled - pipeline.addFirst("rejectssl", new RejectSslHandler()); + pipeline.addAfter(PIPELINE_INTERNODE_ERROR_EXCLUSIONS, "rejectssl", new RejectSslHandler()); break; case OPTIONAL: - pipeline.addFirst("ssl", new OptionalSslHandler(settings.encryption)); + pipeline.addAfter(PIPELINE_INTERNODE_ERROR_EXCLUSIONS, "ssl", new OptionalSslHandler(settings.encryption)); break; case ENCRYPTED: SslHandler sslHandler = getSslHandler("creating", channel, settings.encryption); - pipeline.addFirst("ssl", sslHandler); + pipeline.addAfter(PIPELINE_INTERNODE_ERROR_EXCLUSIONS, "ssl", sslHandler); break; } @@ -114,7 +121,20 @@ public void initChannel(SocketChannel channel) throws Exception pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO)); channel.pipeline().addLast("handshake", new Handler(settings)); + } + } + private static class InternodeErrorExclusionsHandler extends ChannelInboundHandlerAdapter + { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception + { + if (DatabaseDescriptor.getInternodeErrorReportingExclusions().contains(ctx.channel().remoteAddress())) + { + logger.debug("Excluding internode exception for {}; address contained in internode_error_reporting_exclusions", ctx.channel().remoteAddress(), cause); + return; + } + super.exceptionCaught(ctx, cause); } } diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java index 9a9cdce04d93..9b0a8f298d7b 100644 --- a/src/java/org/apache/cassandra/transport/Dispatcher.java +++ b/src/java/org/apache/cassandra/transport/Dispatcher.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import com.google.common.base.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,7 +136,7 @@ void processRequest(Channel channel, Message.Request request, FlushItemConverter catch (Throwable t) { JVMStabilityInspector.inspectThrowable(t); - ExceptionHandlers.UnexpectedChannelExceptionHandler handler = new ExceptionHandlers.UnexpectedChannelExceptionHandler(channel, true); + Predicate handler = ExceptionHandlers.getUnexpectedExceptionHandler(channel, true); ErrorMessage error = ErrorMessage.fromException(t, handler); error.setStreamId(request.getStreamId()); toFlush = forFlusher.toFlushItem(channel, request, error); diff --git a/src/java/org/apache/cassandra/transport/ExceptionHandlers.java b/src/java/org/apache/cassandra/transport/ExceptionHandlers.java index 63951e9f7ba7..377b640209e7 100644 --- a/src/java/org/apache/cassandra/transport/ExceptionHandlers.java +++ b/src/java/org/apache/cassandra/transport/ExceptionHandlers.java @@ -19,10 +19,12 @@ package org.apache.cassandra.transport; import java.io.IOException; +import java.net.SocketAddress; import java.util.Set; import java.util.concurrent.TimeUnit; import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.ImmutableSet; import org.slf4j.Logger; @@ -33,6 +35,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPromise; import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.metrics.ClientMetrics; import org.apache.cassandra.net.FrameEncoder; import org.apache.cassandra.transport.messages.ErrorMessage; @@ -67,7 +70,7 @@ public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) // Provide error message to client in case channel is still open if (ctx.channel().isOpen()) { - UnexpectedChannelExceptionHandler handler = new UnexpectedChannelExceptionHandler(ctx.channel(), false); + Predicate handler = getUnexpectedExceptionHandler(ctx.channel(), false); ErrorMessage errorMessage = ErrorMessage.fromException(cause, handler); Envelope response = errorMessage.encode(version); FrameEncoder.Payload payload = allocator.allocate(true, CQLMessageHandler.envelopeSize(response.header)); @@ -89,6 +92,14 @@ public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) } } + if (DatabaseDescriptor.getClientErrorReportingExclusions().contains(ctx.channel().remoteAddress())) + { + // Sometimes it is desirable to ignore exceptions from specific IPs; such as when security scans are + // running. To avoid polluting logs and metrics, metrics are not updated when the IP is in the exclude + // list. + logger.debug("Excluding client exception for {}; address contained in client_error_reporting_exclusions", ctx.channel().remoteAddress(), cause); + return; + } logClientNetworkingExceptions(cause); } @@ -124,6 +135,19 @@ else if (Throwables.anyCauseMatches(cause, t -> t instanceof OverloadedException } } + static Predicate getUnexpectedExceptionHandler(Channel channel, boolean alwaysLogAtError) + { + SocketAddress address = channel.remoteAddress(); + if (DatabaseDescriptor.getClientErrorReportingExclusions().contains(address)) + { + return cause -> { + logger.debug("Excluding client exception for {}; address contained in client_error_reporting_exclusions", address, cause); + return true; + }; + } + return new UnexpectedChannelExceptionHandler(channel, alwaysLogAtError); + } + /** * Include the channel info in the logged information for unexpected errors, and (if {@link #alwaysLogAtError} is * false then choose the log level based on the type of exception (some are clearly client issues and shouldn't be diff --git a/src/java/org/apache/cassandra/transport/PreV5Handlers.java b/src/java/org/apache/cassandra/transport/PreV5Handlers.java index f45028deddbd..ea850cc99281 100644 --- a/src/java/org/apache/cassandra/transport/PreV5Handlers.java +++ b/src/java/org/apache/cassandra/transport/PreV5Handlers.java @@ -20,6 +20,8 @@ import java.util.List; +import com.google.common.base.Predicate; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.transport.ClientResourceLimits.Overload; import org.slf4j.Logger; @@ -296,6 +298,8 @@ public void encode(ChannelHandlerContext ctx, Message source, List resul @ChannelHandler.Sharable public static final class ExceptionHandler extends ChannelInboundHandlerAdapter { + private static final Logger logger = LoggerFactory.getLogger(ExceptionHandler.class); + public static final ExceptionHandler instance = new ExceptionHandler(); private ExceptionHandler(){} @@ -305,7 +309,7 @@ public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) // Provide error message to client in case channel is still open if (ctx.channel().isOpen()) { - ExceptionHandlers.UnexpectedChannelExceptionHandler handler = new ExceptionHandlers.UnexpectedChannelExceptionHandler(ctx.channel(), false); + Predicate handler = ExceptionHandlers.getUnexpectedExceptionHandler(ctx.channel(), false); ErrorMessage errorMessage = ErrorMessage.fromException(cause, handler); ChannelFuture future = ctx.writeAndFlush(errorMessage.encode(getConnectionVersion(ctx))); // On protocol exception, close the channel as soon as the message have been sent. @@ -315,6 +319,14 @@ public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) future.addListener((ChannelFutureListener) f -> ctx.close()); } + if (DatabaseDescriptor.getClientErrorReportingExclusions().contains(ctx.channel().remoteAddress())) + { + // Sometimes it is desirable to ignore exceptions from specific IPs; such as when security scans are + // running. To avoid polluting logs and metrics, metrics are not updated when the IP is in the exclude + // list. + logger.debug("Excluding client exception for {}; address contained in client_error_reporting_exclusions", ctx.channel().remoteAddress(), cause); + return; + } ExceptionHandlers.logClientNetworkingExceptions(cause); JVMStabilityInspector.inspectThrowable(cause); } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index a58f2db0f4bf..fc02e70aff19 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -189,6 +189,8 @@ public LogAction logs() // when creating a cluster globally in a test class we get the logs without the suite, try finding those logs: if (!f.exists()) f = new File(String.format("build/test/logs/%s/%s/%s/system.log", tag, clusterId, instanceId)); + if (!f.exists()) + throw new AssertionError("Unable to locate system.log under " + new File("build/test/logs").getAbsolutePath() + "; make sure ICluster.setup() is called or extend TestBaseImpl and do not define a static beforeClass function with @BeforeClass"); return new FileLogAction(f); } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java index 939691de7d64..733a671cdd25 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceMetrics.java @@ -25,9 +25,11 @@ import java.util.function.Predicate; import com.codahale.metrics.Counter; +import com.codahale.metrics.Counting; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; import com.codahale.metrics.Snapshot; import com.codahale.metrics.Timer; import org.apache.cassandra.distributed.shared.Metrics; @@ -45,33 +47,43 @@ class InstanceMetrics implements Metrics this.metricsRegistry = metricsRegistry; } + @Override public List getNames() { return new ArrayList<>(metricsRegistry.getNames()); } + @Override public long getCounter(String name) { - return metricsRegistry.getCounters().get(name).getCount(); + Metric metric = metricsRegistry.getMetrics().get(name); + if (metric instanceof Counting) + return ((Counting) metric).getCount(); + // If the metric is not found or does not expose a getCount method + return 0; } + @Override public Map getCounters(Predicate filter) { Map values = new HashMap<>(); - for (Map.Entry e : metricsRegistry.getCounters().entrySet()) + for (Map.Entry e : metricsRegistry.getMetrics().entrySet()) { - if (filter.test(e.getKey())) - values.put(e.getKey(), e.getValue().getCount()); + Metric metric = e.getValue(); + if (metric instanceof Counting && filter.test(e.getKey())) + values.put(e.getKey(), ((Counting) metric).getCount()); } return values; } + @Override public double getHistogram(String name, MetricValue value) { Histogram histogram = metricsRegistry.getHistograms().get(name); return getValue(histogram, value); } + @Override public Map getHistograms(Predicate filter, MetricValue value) { Map values = new HashMap<>(); @@ -83,11 +95,13 @@ public Map getHistograms(Predicate filter, MetricValue v return values; } + @Override public Object getGauge(String name) { return metricsRegistry.getGauges().get(name).getValue(); } + @Override public Map getGauges(Predicate filter) { Map values = new HashMap<>(); @@ -99,11 +113,13 @@ public Map getGauges(Predicate filter) return values; } + @Override public double getMeter(String name, Rate value) { return getRate(metricsRegistry.getMeters().get(name), value); } + @Override public Map getMeters(Predicate filter, Rate rate) { Map values = new HashMap<>(); @@ -115,11 +131,13 @@ public Map getMeters(Predicate filter, Rate rate) return values; } + @Override public double getTimer(String name, MetricValue value) { return getValue(metricsRegistry.getTimers().get(name).getSnapshot(), value); } + @Override public Map getTimers(Predicate filter, MetricValue value) { Map values = new HashMap<>(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/FailingResponseDoesNotLogTest.java b/test/distributed/org/apache/cassandra/distributed/test/FailingResponseDoesNotLogTest.java new file mode 100644 index 000000000000..1071938a5ea6 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/FailingResponseDoesNotLogTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.BatchQueryOptions; +import org.apache.cassandra.cql3.CQLStatement; +import org.apache.cassandra.cql3.QueryHandler; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.statements.BatchStatement; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.LogAction; +import org.apache.cassandra.distributed.api.LogResult; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.SimpleClient; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.MD5Digest; +import org.assertj.core.api.Assertions; + +/** + * This class is rather impelemntation specific. It is possible that changes made will cause this tests to fail, + * so updating to the latest logic is fine. + * + * This class makes sure we do not do logging/update metrics for client from a specific set of ip domains, so as long + * as we still do not log/update metrics, then the test is doing the right thing. + */ +public class FailingResponseDoesNotLogTest extends TestBaseImpl +{ + @BeforeClass + public static void beforeClassTopLevel() // need to make sure not to conflict with TestBaseImpl.beforeClass + { + + DatabaseDescriptor.clientInitialization(); + } + + @Test + public void dispatcherErrorDoesNotLock() throws IOException + { + System.setProperty("cassandra.custom_query_handler_class", AlwaysRejectErrorQueryHandler.class.getName()); + try (Cluster cluster = Cluster.build(1) + .withConfig(c -> c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP) + .set("client_error_reporting_exclusions", ImmutableMap.of("subnets", Collections.singletonList("127.0.0.1"))) + ) + .start()) + { + try (SimpleClient client = SimpleClient.builder("127.0.0.1", 9042).build().connect(false)) + { + client.execute("SELECT * FROM system.peers", ConsistencyLevel.ONE); + Assert.fail("Query should have failed"); + } + catch (Exception e) + { + // ignore; expected + } + + // logs happen before client response; so grep is enough + LogAction logs = cluster.get(1).logs(); + LogResult> matches = logs.grep("address contained in client_error_reporting_exclusions"); + Assertions.assertThat(matches.getResult()).hasSize(1); + matches = logs.grep("Unexpected exception during request"); + Assertions.assertThat(matches.getResult()).isEmpty(); + } + finally + { + System.clearProperty("cassandra.custom_query_handler_class"); + } + } + + public static class AlwaysRejectErrorQueryHandler implements QueryHandler + { + @Override + public CQLStatement parse(String queryString, QueryState queryState, QueryOptions options) + { + throw new AssertionError("reject"); + } + + @Override + public ResultMessage process(CQLStatement statement, QueryState state, QueryOptions options, Map customPayload, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException + { + throw new AssertionError("reject"); + } + + @Override + public ResultMessage.Prepared prepare(String query, ClientState clientState, Map customPayload) throws RequestValidationException + { + throw new AssertionError("reject"); + } + + @Override + public Prepared getPrepared(MD5Digest id) + { + throw new AssertionError("reject"); + } + + @Override + public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options, Map customPayload, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException + { + throw new AssertionError("reject"); + } + + @Override + public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options, Map customPayload, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException + { + throw new AssertionError("reject"); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/InternodeErrorExclusionTest.java b/test/distributed/org/apache/cassandra/distributed/test/InternodeErrorExclusionTest.java new file mode 100644 index 000000000000..e91e167503f9 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/InternodeErrorExclusionTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Arrays; +import java.util.concurrent.TimeoutException; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.auth.IInternodeAuthenticator; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.net.InboundConnectionInitiator; +import org.apache.cassandra.transport.SimpleClient; + +import static org.assertj.core.api.Assertions.assertThat; + +public class InternodeErrorExclusionTest extends TestBaseImpl +{ + @BeforeClass + public static void beforeClass2() + { + DatabaseDescriptor.clientInitialization(); + } + + @Test + public void ignoreAuthErrors() throws IOException, TimeoutException + { + try (Cluster cluster = Cluster.build(1) + .withConfig(c -> c + .with(Feature.NETWORK) + .set("internode_authenticator", AlwaysFailingIInternodeAuthenticator.class.getName()) + .set("internode_error_reporting_exclusions", ImmutableMap.of("subnets", Arrays.asList("127.0.0.1")))) + .start()) + { + try (SimpleClient client = SimpleClient.builder("127.0.0.1", 7012).build()) + { + client.connect(true); + Assert.fail("Connection should fail"); + } + catch (Exception e) + { + // expected + } + assertThat(cluster.get(1).logs().watchFor("address contained in internode_error_reporting_exclusions").getResult()).hasSize(1); + } + } + + public static class AlwaysFailingIInternodeAuthenticator implements IInternodeAuthenticator + { + @Override + public boolean authenticate(InetAddress remoteAddress, int remotePort) + { + String klass = InboundConnectionInitiator.class.getName(); + for (StackTraceElement e : Thread.currentThread().getStackTrace()) + { + if (e.getClassName().startsWith(klass)) + return false; + } + return true; + } + + @Override + public void validateConfiguration() throws ConfigurationException + { + + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageFromBlockedSubnetTest.java b/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageFromBlockedSubnetTest.java new file mode 100644 index 000000000000..2647ba7a279d --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageFromBlockedSubnetTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeoutException; + +import com.google.common.collect.ImmutableMap; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.LogAction; +import org.apache.cassandra.distributed.api.LogResult; +import org.apache.cassandra.transport.Message; +import org.apache.cassandra.transport.ProtocolVersion; +import org.apache.cassandra.transport.SimpleClient; +import org.apache.cassandra.transport.messages.ErrorMessage; +import org.assertj.core.api.Assertions; + +@RunWith(Parameterized.class) +public class UnableToParseClientMessageFromBlockedSubnetTest extends TestBaseImpl +{ + private static Cluster CLUSTER; + private static List CLUSTER_EXCLUDED_SUBNETS; + + @Parameterized.Parameter(0) + public List excludeSubnets; + @Parameterized.Parameter(1) + public ProtocolVersion version; + + @Parameterized.Parameters(name = "domains={0},version={1}") + public static Iterable params() + { + List tests = new ArrayList<>(); + for (List domains : Arrays.asList(Collections.singletonList("127.0.0.1"), Collections.singletonList("127.0.0.0/31"))) + { + for (ProtocolVersion version : ProtocolVersion.SUPPORTED) + { + tests.add(new Object[] { domains, version }); + } + } + return tests; + } + + @BeforeClass + public static void setup() + { + DatabaseDescriptor.daemonInitialization(); + } + + @AfterClass + public static void cleanup() + { + if (CLUSTER != null) + CLUSTER.close(); + } + + @Test + public void badMessageCausesProtocolExceptionFromExcludeList() throws IOException, TimeoutException + { + Cluster cluster = getCluster(); + // write gibberish to the native protocol + IInvokableInstance node = cluster.get(1); + // make sure everything is fine at the start + Assertions.assertThat(node.metrics().getCounter("org.apache.cassandra.metrics.Client.ProtocolException")).isEqualTo(0); + Assertions.assertThat(node.metrics().getCounter("org.apache.cassandra.metrics.Client.UnknownException")).isEqualTo(0); + + LogAction logs = node.logs(); + long mark = logs.mark(); + try (SimpleClient client = SimpleClient.builder("127.0.0.1", 9042).protocolVersion(version).useBeta().build()) + { + client.connect(false, true); + + // this should return a failed response + // disable waiting on procol errors as that logic was reverted until we can figure out its 100% safe + // right now ProtocolException is thrown for fatal and non-fatal issues, so closing the channel + // on non-fatal issues could cause other issues for the cluster + byte expectedVersion = (byte) (80 + version.asInt()); + Message.Response response = client.execute(new UnableToParseClientMessageTest.CustomHeaderMessage(new byte[]{ expectedVersion, 1, 2, 3, 4, 5, 6, 7, 8, 9 }), false); + Assertions.assertThat(response).isInstanceOf(ErrorMessage.class); + + logs.watchFor(mark, "address contained in client_error_reporting_exclusions"); + Assertions.assertThat(node.metrics().getCounter("org.apache.cassandra.metrics.Client.ProtocolException")).isEqualTo(0); + Assertions.assertThat(node.metrics().getCounter("org.apache.cassandra.metrics.Client.UnknownException")).isEqualTo(0); + + Assertions.assertThat(logs.grep(mark, "Excluding client exception fo").getResult()).hasSize(1); + Assertions.assertThat(logs.grep(mark, "Unexpected exception during request").getResult()).isEmpty(); + } + } + + private Cluster getCluster() + { + if (CLUSTER == null || CLUSTER_EXCLUDED_SUBNETS != excludeSubnets) + { + if (CLUSTER != null) + { + CLUSTER.close(); + CLUSTER = null; + } + try + { + CLUSTER = init(Cluster.build(1) + .withConfig(c -> c.with(Feature.values()).set("client_error_reporting_exclusions", ImmutableMap.of("subnets", excludeSubnets))) + .start()); + CLUSTER_EXCLUDED_SUBNETS = excludeSubnets; + } + catch (IOException e) + { + throw new UncheckedIOException(e); + } + } + return CLUSTER; + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java b/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java index d394b5918b9c..5c976b97c6ea 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java @@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects; -import java.util.function.Consumer; import java.util.function.Predicate; import org.junit.AfterClass; @@ -169,7 +168,7 @@ private static long getProtocolExceptionCount(IInvokableInstance node) .getCount()); } - private static class CustomHeaderMessage extends OptionsMessage + public static class CustomHeaderMessage extends OptionsMessage { private final ByteBuf headerEncoded; diff --git a/test/resources/data/config/YamlConfigurationLoaderTest/shared_client_error_reporting_exclusions.yaml b/test/resources/data/config/YamlConfigurationLoaderTest/shared_client_error_reporting_exclusions.yaml new file mode 100644 index 000000000000..7fcb7200ea7f --- /dev/null +++ b/test/resources/data/config/YamlConfigurationLoaderTest/shared_client_error_reporting_exclusions.yaml @@ -0,0 +1,6 @@ +# This is used to test to validate that YAML Anchors ARE supported; this is useful for cases where client/internode errors are coming from the same sources (such as security scans) +client_error_reporting_exclusions: &share + subnets: + - 127.0.0.1 + - 127.0.0.0/31 +internode_error_reporting_exclusions: *share \ No newline at end of file diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java index 36bea9905d6b..f87b046fcd7a 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Set; +import com.google.common.base.Throwables; import org.junit.Test; import org.apache.cassandra.utils.Pair; @@ -94,6 +95,7 @@ public class DatabaseDescriptorRefTest "org.apache.cassandra.config.YamlConfigurationLoader$PropertiesChecker$1", "org.apache.cassandra.config.YamlConfigurationLoader$CustomConstructor", "org.apache.cassandra.config.TransparentDataEncryptionOptions", + "org.apache.cassandra.config.SubnetGroups", "org.apache.cassandra.db.ConsistencyLevel", "org.apache.cassandra.db.commitlog.CommitLogSegmentManagerFactory", "org.apache.cassandra.db.commitlog.DefaultCommitLogSegmentMgrFactory", @@ -295,15 +297,15 @@ private void checkViolations(PrintStream err, List> viol { if (!violations.isEmpty()) { + StringBuilder sb = new StringBuilder(); for (Pair violation : new ArrayList<>(violations)) - { - err.println(); - err.println(); - err.println("VIOLATION: " + violation.left); - violation.right.printStackTrace(err); - } + sb.append("\n\n") + .append("VIOLATION: ").append(violation.left).append("\n") + .append(Throwables.getStackTraceAsString(violation.right)); + String msg = sb.toString(); + err.println(msg); - fail(); + fail(msg); } } } diff --git a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java index 2aff83f5b920..b533a8fb965d 100644 --- a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java +++ b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java @@ -18,12 +18,17 @@ package org.apache.cassandra.config; +import java.net.URL; +import java.util.Arrays; import java.util.Collections; import java.util.Map; import com.google.common.collect.ImmutableMap; import org.junit.Test; +import org.assertj.core.api.Assertions; + +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; @@ -50,4 +55,14 @@ public void fromMapTest() assertEquals(false, config.client_encryption_options.optional); // Check a nested object assertEquals(true, config.client_encryption_options.enabled); // Check a nested object } + + @Test + public void sharedErrorReportingExclusions() + { + URL url = YamlConfigurationLoaderTest.class.getClassLoader().getResource("data/config/YamlConfigurationLoaderTest/shared_client_error_reporting_exclusions.yaml"); + Config config = new YamlConfigurationLoader().loadConfig(url); + SubnetGroups expected = new SubnetGroups(Arrays.asList("127.0.0.1", "127.0.0.0/31")); + assertThat(config.client_error_reporting_exclusions).isEqualTo(expected); + assertThat(config.internode_error_reporting_exclusions).isEqualTo(expected); + } }