Skip to content

Commit

Permalink
Outbound TCP connections should consult internode authenticator.
Browse files Browse the repository at this point in the history
Patch by Ariel Weisberg; Reviewed by Marcus Eriksson for CASSANDRA-13324
  • Loading branch information
aweisberg committed Mar 24, 2017
1 parent 60e2e98 commit 732d1af
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
4.0
* Outbound TCP connections ignore internode authenticator (CASSANDRA-13324)
* Upgrade junit from 4.6 to 4.12 (CASSANDRA-13360)
* Cleanup ParentRepairSession after repairs (CASSANDRA-13359)
* Incremental repair not streaming correct sstables (CASSANDRA-13328)
Expand Down
10 changes: 3 additions & 7 deletions src/java/org/apache/cassandra/auth/AuthConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.utils.FBUtilities;
import org.hsqldb.Database;

/**
* Only purpose is to Initialize authentication/authorization via {@link #applyAuth()}.
Expand Down Expand Up @@ -94,20 +95,15 @@ public static void applyAuth()

// authenticator

IInternodeAuthenticator internodeAuthenticator;
if (conf.internode_authenticator != null)
internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator, "internode_authenticator");
else
internodeAuthenticator = new AllowAllInternodeAuthenticator();

DatabaseDescriptor.setInternodeAuthenticator(internodeAuthenticator);
DatabaseDescriptor.setInternodeAuthenticator(FBUtilities.construct(conf.internode_authenticator, "internode_authenticator"));

// Validate at last to have authenticator, authorizer, role-manager and internode-auth setup
// in case these rely on each other.

authenticator.validateConfiguration();
authorizer.validateConfiguration();
roleManager.validateConfiguration();
internodeAuthenticator.validateConfiguration();
DatabaseDescriptor.getInternodeAuthenticator().validateConfiguration();
}
}
5 changes: 4 additions & 1 deletion src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
import java.util.*;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.auth.AllowAllInternodeAuthenticator;
import org.apache.cassandra.auth.AuthConfig;
import org.apache.cassandra.auth.IAuthenticator;
import org.apache.cassandra.auth.IAuthorizer;
Expand Down Expand Up @@ -79,7 +81,7 @@ public class DatabaseDescriptor
private static InetAddress rpcAddress;
private static InetAddress broadcastRpcAddress;
private static SeedProvider seedProvider;
private static IInternodeAuthenticator internodeAuthenticator;
private static IInternodeAuthenticator internodeAuthenticator = new AllowAllInternodeAuthenticator();

/* Hashing strategy Random or OPHF */
private static IPartitioner partitioner;
Expand Down Expand Up @@ -1538,6 +1540,7 @@ public static IInternodeAuthenticator getInternodeAuthenticator()

public static void setInternodeAuthenticator(IInternodeAuthenticator internodeAuthenticator)
{
Preconditions.checkNotNull(internodeAuthenticator);
DatabaseDescriptor.internodeAuthenticator = internodeAuthenticator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import java.net.InetAddress;
import java.net.UnknownHostException;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.gms.*;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.OutboundTcpConnectionPool;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,20 +53,29 @@ private void reconnect(InetAddress publicAddress, VersionedValue localAddressVal
{
try
{
reconnect(publicAddress, InetAddress.getByName(localAddressValue.value));
reconnect(publicAddress, InetAddress.getByName(localAddressValue.value), snitch, localDc);
}
catch (UnknownHostException e)
{
logger.error("Error in getting the IP address resolved: ", e);
}
}

private void reconnect(InetAddress publicAddress, InetAddress localAddress)
@VisibleForTesting
static void reconnect(InetAddress publicAddress, InetAddress localAddress, IEndpointSnitch snitch, String localDc)
{
OutboundTcpConnectionPool cp = MessagingService.instance().getConnectionPool(publicAddress);
//InternodeAuthenticator said don't connect
if (cp == null)
{
logger.debug("InternodeAuthenticator said don't reconnect to {} on {}", publicAddress, localAddress);
return;
}

if (snitch.getDatacenter(publicAddress).equals(localDc)
&& !MessagingService.instance().getConnectionPool(publicAddress).endPoint().equals(localAddress))
&& !cp.endPoint().equals(localAddress))
{
MessagingService.instance().getConnectionPool(publicAddress).reset(localAddress);
cp.reset(localAddress);
logger.debug("Initiated reconnect to an Internal IP {} for the {}", localAddress, publicAddress);
}
}
Expand Down
44 changes: 38 additions & 6 deletions src/java/org/apache/cassandra/net/MessagingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,8 @@ public long serializedSize(Object o, int version)
/* Lookup table for registering message handlers based on the verb. */
private final Map<Verb, IVerbHandler> verbHandlers;

private final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<>();
@VisibleForTesting
final ConcurrentMap<InetAddress, OutboundTcpConnectionPool> connectionManagers = new NonBlockingHashMap<>();

private static final Logger logger = LoggerFactory.getLogger(MessagingService.class);
private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
Expand Down Expand Up @@ -531,6 +532,10 @@ public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pai
maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, pair.right.timeout);

ConnectionMetrics.totalTimeouts.mark();
OutboundTcpConnectionPool cp = getConnectionPool(expiredCallbackInfo.target);
if (cp != null)
cp.incrementTimeout();

getConnectionPool(expiredCallbackInfo.target).incrementTimeout();

if (expiredCallbackInfo.callback.supportsBackPressure())
Expand Down Expand Up @@ -670,8 +675,16 @@ public void addLatency(InetAddress address, long latency)
*/
public void convict(InetAddress ep)
{
logger.trace("Resetting pool for {}", ep);
getConnectionPool(ep).reset();
OutboundTcpConnectionPool cp = getConnectionPool(ep);
if (cp != null)
{
logger.trace("Resetting pool for {}", ep);
getConnectionPool(ep).reset();
}
else
{
logger.debug("Not resetting pool for {} because internode authenticator said not to connect", ep);
}
}

public void listen()
Expand Down Expand Up @@ -795,11 +808,22 @@ public void destroyConnectionPool(InetAddress to)
connectionManagers.remove(to);
}

/**
* Get a connection pool to the specified endpoint. Constructs one if none exists.
*
* Can return null if the InternodeAuthenticator fails to authenticate the node.
* @param to
* @return The connection pool or null if internode authenticator says not to
*/
public OutboundTcpConnectionPool getConnectionPool(InetAddress to)
{
OutboundTcpConnectionPool cp = connectionManagers.get(to);
if (cp == null)
{
//Don't attempt to connect to nodes that won't (or shouldn't) authenticate anyways
if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(to, OutboundTcpConnectionPool.portFor(to)))
return null;

cp = new OutboundTcpConnectionPool(to, backPressure.newState(to));
OutboundTcpConnectionPool existingPool = connectionManagers.putIfAbsent(to, cp);
if (existingPool != null)
Expand All @@ -811,10 +835,17 @@ public OutboundTcpConnectionPool getConnectionPool(InetAddress to)
return cp;
}


/**
* Get a connection for a message to a specific endpoint. Constructs one if none exists.
*
* Can return null if the InternodeAuthenticator fails to authenticate the node.
* @param to
* @return The connection or null if internode authenticator says not to
*/
public OutboundTcpConnection getConnection(InetAddress to, MessageOut msg)
{
return getConnectionPool(to).getConnection(msg);
OutboundTcpConnectionPool cp = getConnectionPool(to);
return cp == null ? null : cp.getConnection(msg);
}

/**
Expand Down Expand Up @@ -968,7 +999,8 @@ public void sendOneWay(MessageOut message, int id, InetAddress to)
OutboundTcpConnection connection = getConnection(to, message);

// write it
connection.enqueue(message, id);
if (connection != null)
connection.enqueue(message, id);
}

public <T> AsyncOneResponse<T> sendRR(MessageOut message, InetAddress to)
Expand Down
33 changes: 24 additions & 9 deletions src/java/org/apache/cassandra/net/OutboundTcpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,12 @@ else if (socket != null || connect())
break inner;
}
}
catch (InternodeAuthFailed e)
{
logger.warn("Internode auth failed connecting to " + poolReference.endPoint());
//Remove the connection pool and other thread so messages aren't queued
MessagingService.instance().destroyConnectionPool(poolReference.endPoint());
}
catch (Exception e)
{
JVMStabilityInspector.inspectThrowable(e);
Expand Down Expand Up @@ -394,20 +400,27 @@ private void disconnect()
}

@SuppressWarnings("resource")
private boolean connect()
private boolean connect() throws InternodeAuthFailed
{
logger.debug("Attempting to connect to {}", poolReference.endPoint());
InetAddress endpoint = poolReference.endPoint();
if (!DatabaseDescriptor.getInternodeAuthenticator().authenticate(endpoint, poolReference.portFor(endpoint)))
{
throw new InternodeAuthFailed();
}

logger.debug("Attempting to connect to {}", endpoint);


long start = System.nanoTime();
long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getRpcTimeout());
while (System.nanoTime() - start < timeout)
{
targetVersion = MessagingService.instance().getVersion(poolReference.endPoint());
targetVersion = MessagingService.instance().getVersion(endpoint);
try
{
socket = poolReference.newSocket();
socket.setKeepAlive(true);
if (isLocalDC(poolReference.endPoint()))
if (isLocalDC(endpoint))
{
socket.setTcpNoDelay(INTRADC_TCP_NODELAY);
}
Expand Down Expand Up @@ -446,15 +459,15 @@ private boolean connect()
}
else
{
MessagingService.instance().setVersion(poolReference.endPoint(), maxTargetVersion);
MessagingService.instance().setVersion(endpoint, maxTargetVersion);
}

if (targetVersion > maxTargetVersion)
{
logger.trace("Target max version is {}; will reconnect with that version", maxTargetVersion);
try
{
if (DatabaseDescriptor.getSeeds().contains(poolReference.endPoint()))
if (DatabaseDescriptor.getSeeds().contains(endpoint))
logger.warn("Seed gossip version is {}; will not connect with that version", maxTargetVersion);
}
catch (Throwable e)
Expand Down Expand Up @@ -484,7 +497,7 @@ private boolean connect()
if (shouldCompressConnection())
{
out.flush();
logger.trace("Upgrading OutputStream to {} to be compressed", poolReference.endPoint());
logger.trace("Upgrading OutputStream to {} to be compressed", endpoint);

// TODO: custom LZ4 OS that supports BB write methods
LZ4Compressor compressor = LZ4Factory.fastestInstance().fastCompressor();
Expand All @@ -495,7 +508,7 @@ private boolean connect()
checksum,
true)); // no async flushing
}
logger.debug("Done connecting to {}", poolReference.endPoint());
logger.debug("Done connecting to {}", endpoint);
return true;
}
catch (SSLHandshakeException e)
Expand All @@ -508,7 +521,7 @@ private boolean connect()
catch (IOException e)
{
socket = null;
logger.debug("Unable to connect to {}", poolReference.endPoint(), e);
logger.debug("Unable to connect to {}", endpoint, e);
Uninterruptibles.sleepUninterruptibly(OPEN_RETRY_DELAY, TimeUnit.MILLISECONDS);
}
}
Expand Down Expand Up @@ -613,4 +626,6 @@ boolean shouldRetry()
return false;
}
}

private static class InternodeAuthFailed extends Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public static Socket newSocket(InetAddress endpoint) throws IOException
}
}

public static int portFor(InetAddress endpoint)
{
return isEncryptedChannel(endpoint) ? DatabaseDescriptor.getSSLStoragePort() : DatabaseDescriptor.getStoragePort();
}

public InetAddress endPoint()
{
if (id.equals(FBUtilities.getBroadcastAddress()))
Expand Down Expand Up @@ -218,7 +223,7 @@ public void close()
smallMessages.closeSocket(true);
if (gossipMessages != null)
gossipMessages.closeSocket(true);

metrics.release();
if (metrics != null)
metrics.release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
public class DatabaseDescriptorRefTest
{
static final String[] validClasses = {
"org.apache.cassandra.auth.AllowAllInternodeAuthenticator",
"org.apache.cassandra.auth.IInternodeAuthenticator",
"org.apache.cassandra.auth.IAuthenticator",
"org.apache.cassandra.auth.IAuthorizer",
Expand Down

0 comments on commit 732d1af

Please sign in to comment.