Skip to content

Commit

Permalink
allow blocking IPs from updating metrics about traffic
Browse files Browse the repository at this point in the history
patch by David Capwell; reviewed by Benjamin Lerer, Caleb Rackliffe, Jon Meredith for CASSANDRA-16859
  • Loading branch information
dcapwell committed Aug 21, 2021
1 parent d220d24 commit 4b3f07f
Show file tree
Hide file tree
Showing 20 changed files with 655 additions and 19 deletions.
1 change: 1 addition & 0 deletions .build/build-rat.xml
Expand Up @@ -63,6 +63,7 @@
<exclude name="**/test/data/jmxdump/cassandra-3.0-jmx.yaml"/>
<exclude name="**/test/data/jmxdump/cassandra-3.11-jmx.yaml"/>
<exclude name="**/test/data/jmxdump/cassandra-4.0-jmx.yaml"/>
<exclude name="**/test/resources/data/config/YamlConfigurationLoaderTest/shared_client_error_reporting_exclusions.yaml"/>
<exclude name="**/tools/cqlstress-counter-example.yaml"/>
<exclude name="**/tools/cqlstress-example.yaml"/>
<exclude name="**/tools/cqlstress-insanity-example.yaml"/>
Expand Down
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
4.1
* allow blocking IPs from updating metrics about traffic (CASSANDRA-16859)
* Request-Based Native Transport Rate-Limiting (CASSANDRA-16663)
* Implement nodetool getauditlog command (CASSANDRA-16725)
* Clean up repair code (CASSANDRA-13720)
Expand Down
2 changes: 2 additions & 0 deletions build.xml
Expand Up @@ -655,6 +655,7 @@
<exclusion groupId="org.hamcrest" artifactId="hamcrest"/>
</dependency>
<dependency groupId="org.hamcrest" artifactId="hamcrest" version="2.2" scope="test"/>
<dependency groupId="com.github.seancfoley" artifactId="ipaddress" version="5.3.3" />
</dependencyManagement>
<developer id="adelapena" name="Andres de la Peña"/>
<developer id="alakshman" name="Avinash Lakshman"/>
Expand Down Expand Up @@ -826,6 +827,7 @@
<dependency groupId="org.jboss.byteman" artifactId="byteman"/>
<dependency groupId="org.jboss.byteman" artifactId="byteman-submit"/>
<dependency groupId="org.jboss.byteman" artifactId="byteman-bmunit"/>
<dependency groupId="com.github.seancfoley" artifactId="ipaddress" />
</artifact:pom>
</target>

Expand Down
8 changes: 8 additions & 0 deletions conf/cassandra.yaml
Expand Up @@ -1445,3 +1445,11 @@ enable_transient_replication: false
# Enables the used of 'ALTER ... DROP COMPACT STORAGE' statements on this node.
# 'ALTER ... DROP COMPACT STORAGE' is considered experimental and is not recommended for production use.
enable_drop_compact_storage: false

# When the client triggers a protocol exception or unknown issue (Cassandra bug) we increment
# a client metric showing this; this logic will exclude specific subnets from updating these
# metrics
#client_error_reporting_exclusions:
# subnets:
# - 127.0.0.1
# - 127.0.0.0/31
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Expand Up @@ -570,6 +570,9 @@ public static void setClientMode(boolean clientMode)

public volatile int consecutive_message_errors_threshold = 1;

public volatile SubnetGroups client_error_reporting_exclusions = new SubnetGroups();
public volatile SubnetGroups internode_error_reporting_exclusions = new SubnetGroups();

public static Supplier<Config> getOverrideLoadConfig()
{
return overrideLoadConfig;
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Expand Up @@ -3433,4 +3433,14 @@ public static void setConsecutiveMessageErrorsThreshold(int value)
{
conf.consecutive_message_errors_threshold = value;
}

public static SubnetGroups getClientErrorReportingExclusions()
{
return conf.client_error_reporting_exclusions;
}

public static SubnetGroups getInternodeErrorReportingExclusions()
{
return conf.internode_error_reporting_exclusions;
}
}
139 changes: 139 additions & 0 deletions src/java/org/apache/cassandra/config/SubnetGroups.java
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.config;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;

import inet.ipaddr.IPAddressNetwork;
import inet.ipaddr.IPAddressString;

/**
* When a group of subnets are needed, this class can be used to represent the group as if it was a single subnet.
*
* This class supports IPV4 and IPV6 subnets
*/
public class SubnetGroups
{
public Set<Group> subnets = Collections.emptySet();

public SubnetGroups()
{
}

/** Used by SnakeYaml */
@SuppressWarnings("unused")
public SubnetGroups(List<String> values)
{
this.subnets = ImmutableSet.copyOf(values.stream().map(Group::new).collect(Collectors.toSet()));
}

public boolean contains(SocketAddress address)
{
Preconditions.checkNotNull(address);
Preconditions.checkArgument(address instanceof InetSocketAddress, "Unsupported socket address type: " + address.getClass());
return contains(((InetSocketAddress) address).getAddress());
}

public boolean contains(InetAddress address)
{
for (Group group : subnets)
{
if (group.contains(address))
{
return true;
}
}
return false;
}

public boolean isEmpty()
{
return subnets.isEmpty();
}

@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SubnetGroups that = (SubnetGroups) o;
return subnets.equals(that.subnets);
}

@Override
public int hashCode()
{
return Objects.hash(subnets);
}

@Override
public String toString()
{
return "SubnetGroups{" +
"subnets=" + subnets +
'}';
}

private static class Group
{
private static final IPAddressNetwork.IPAddressGenerator IP_ADDRESS_GENERATOR = new IPAddressNetwork.IPAddressGenerator();

private final IPAddressString subnet;

Group(String range)
{
subnet = new IPAddressString(range);
}

boolean contains(InetAddress address)
{
return subnet.contains(IP_ADDRESS_GENERATOR.from(address).toAddressString());
}

@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Group group = (Group) o;
return subnet.equals(group.subnet);
}

@Override
public int hashCode()
{
return Objects.hash(subnet);
}

@Override
public String toString()
{
return subnet.toString();
}
}
}
26 changes: 23 additions & 3 deletions src/java/org/apache/cassandra/net/InboundConnectionInitiator.java
Expand Up @@ -34,6 +34,7 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
Expand All @@ -44,6 +45,7 @@
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.InetAddressAndPort;
Expand All @@ -67,6 +69,8 @@ public class InboundConnectionInitiator

private static class Initializer extends ChannelInitializer<SocketChannel>
{
private static final String PIPELINE_INTERNODE_ERROR_EXCLUSIONS = "Internode Error Exclusions";

private final InboundConnectionSettings settings;
private final ChannelGroup channelGroup;
private final Consumer<ChannelPipeline> pipelineInjector;
Expand All @@ -82,6 +86,9 @@ private static class Initializer extends ChannelInitializer<SocketChannel>
@Override
public void initChannel(SocketChannel channel) throws Exception
{
// if any of the handlers added fail they will send the error to the "head", so this needs to be first
channel.pipeline().addFirst(PIPELINE_INTERNODE_ERROR_EXCLUSIONS, new InternodeErrorExclusionsHandler());

channelGroup.add(channel);

channel.config().setOption(ChannelOption.ALLOCATOR, GlobalBufferPoolAllocator.instance);
Expand All @@ -99,22 +106,35 @@ public void initChannel(SocketChannel channel) throws Exception
{
case UNENCRYPTED:
// Handler checks for SSL connection attempts and cleanly rejects them if encryption is disabled
pipeline.addFirst("rejectssl", new RejectSslHandler());
pipeline.addAfter(PIPELINE_INTERNODE_ERROR_EXCLUSIONS, "rejectssl", new RejectSslHandler());
break;
case OPTIONAL:
pipeline.addFirst("ssl", new OptionalSslHandler(settings.encryption));
pipeline.addAfter(PIPELINE_INTERNODE_ERROR_EXCLUSIONS, "ssl", new OptionalSslHandler(settings.encryption));
break;
case ENCRYPTED:
SslHandler sslHandler = getSslHandler("creating", channel, settings.encryption);
pipeline.addFirst("ssl", sslHandler);
pipeline.addAfter(PIPELINE_INTERNODE_ERROR_EXCLUSIONS, "ssl", sslHandler);
break;
}

if (WIRETRACE)
pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO));

channel.pipeline().addLast("handshake", new Handler(settings));
}
}

private static class InternodeErrorExclusionsHandler extends ChannelInboundHandlerAdapter
{
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
{
if (DatabaseDescriptor.getInternodeErrorReportingExclusions().contains(ctx.channel().remoteAddress()))
{
logger.debug("Excluding internode exception for {}; address contained in internode_error_reporting_exclusions", ctx.channel().remoteAddress(), cause);
return;
}
super.exceptionCaught(ctx, cause);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/java/org/apache/cassandra/transport/Dispatcher.java
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import com.google.common.base.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -135,7 +136,7 @@ void processRequest(Channel channel, Message.Request request, FlushItemConverter
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
ExceptionHandlers.UnexpectedChannelExceptionHandler handler = new ExceptionHandlers.UnexpectedChannelExceptionHandler(channel, true);
Predicate<Throwable> handler = ExceptionHandlers.getUnexpectedExceptionHandler(channel, true);
ErrorMessage error = ErrorMessage.fromException(t, handler);
error.setStreamId(request.getStreamId());
toFlush = forFlusher.toFlushItem(channel, request, error);
Expand Down
26 changes: 25 additions & 1 deletion src/java/org/apache/cassandra/transport/ExceptionHandlers.java
Expand Up @@ -19,10 +19,12 @@
package org.apache.cassandra.transport;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableSet;

import org.slf4j.Logger;
Expand All @@ -33,6 +35,7 @@
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.transport.messages.ErrorMessage;
Expand Down Expand Up @@ -67,7 +70,7 @@ public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause)
// Provide error message to client in case channel is still open
if (ctx.channel().isOpen())
{
UnexpectedChannelExceptionHandler handler = new UnexpectedChannelExceptionHandler(ctx.channel(), false);
Predicate<Throwable> handler = getUnexpectedExceptionHandler(ctx.channel(), false);
ErrorMessage errorMessage = ErrorMessage.fromException(cause, handler);
Envelope response = errorMessage.encode(version);
FrameEncoder.Payload payload = allocator.allocate(true, CQLMessageHandler.envelopeSize(response.header));
Expand All @@ -89,6 +92,14 @@ public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause)
}
}

if (DatabaseDescriptor.getClientErrorReportingExclusions().contains(ctx.channel().remoteAddress()))
{
// Sometimes it is desirable to ignore exceptions from specific IPs; such as when security scans are
// running. To avoid polluting logs and metrics, metrics are not updated when the IP is in the exclude
// list.
logger.debug("Excluding client exception for {}; address contained in client_error_reporting_exclusions", ctx.channel().remoteAddress(), cause);
return;
}
logClientNetworkingExceptions(cause);
}

Expand Down Expand Up @@ -124,6 +135,19 @@ else if (Throwables.anyCauseMatches(cause, t -> t instanceof OverloadedException
}
}

static Predicate<Throwable> getUnexpectedExceptionHandler(Channel channel, boolean alwaysLogAtError)
{
SocketAddress address = channel.remoteAddress();
if (DatabaseDescriptor.getClientErrorReportingExclusions().contains(address))
{
return cause -> {
logger.debug("Excluding client exception for {}; address contained in client_error_reporting_exclusions", address, cause);
return true;
};
}
return new UnexpectedChannelExceptionHandler(channel, alwaysLogAtError);
}

/**
* Include the channel info in the logged information for unexpected errors, and (if {@link #alwaysLogAtError} is
* false then choose the log level based on the type of exception (some are clearly client issues and shouldn't be
Expand Down

0 comments on commit 4b3f07f

Please sign in to comment.