Skip to content

Commit

Permalink
JAVA-2165: Abstract node connection information
Browse files Browse the repository at this point in the history
Motivation:

Until now, the only information used to connect to a node was its socket
address (host+port). Internally, the driver also used this address as a
unique node identifier (it was the key in the Metadata.nodes map).
However, there are more complex deployment scenarios where the address
might not be sufficient: for example, if nodes are accessed through a
proxy that uses SNI routing, the connection information becomes
host+port+sni_server_name, and the host+port part is no longer unique.
We need to support those alternative connection methods, and use a
different unique node identifier.

Modifications:

- Introduce EndPoint as an abstract wrapper around connection
  information.  EndPoint.resolve() returns the socket address that will
  be used to open connections, but an endpoint may also contain
  additional pieces of information that will be used in other places
  (for example a custom SslHandlerFactory).
- Add Node.getEndPoint(), and modify DefaultTopologyMonitor to fill it.
  Allow contact points to be specified as EndPoints (preserving legacy
  methods).
- Replace InetSocketAddress by EndPoint everywhere that it was used to
  identify a node.
- Index Metadata.nodes by the true unique node identifier: host_id from
  system tables. This has two consequences:
  - we can't put contact points in Metadata.nodes directly, because we
    don't know their host_id yet. Store them separately
    (MetadataManager.contactPoints) and transfer them during the first
    node list refresh.
  - when we receive a status event, we can't use a simple map lookup
    anymore, so do a linear traversal to find the node with a matching
    broadcast_rpc_address (side effect: we don't need to translate it).

Result:

We no longer depend on nodes having a unique socket address.
We can plug in custom EndPoint implementations.
  • Loading branch information
olim7t committed Mar 9, 2019
1 parent 6194b1d commit 5b2db90
Show file tree
Hide file tree
Showing 94 changed files with 1,632 additions and 1,056 deletions.
1 change: 1 addition & 0 deletions changelog/README.md
Expand Up @@ -4,6 +4,7 @@

### 4.0.0 (in progress)

- [improvement] JAVA-2165: Abstract node connection information
- [improvement] JAVA-2090: Add support for additional_write_policy and read_repair table options
- [improvement] JAVA-2164: Rename statement builder methods to setXxx
- [bug] JAVA-2178: QueryBuilder: Alias after function column is not included in a query
Expand Down
99 changes: 99 additions & 0 deletions core/revapi.json
Expand Up @@ -135,6 +135,105 @@
"old": "method SelfT com.datastax.oss.driver.api.core.cql.StatementBuilder<SelfT extends com.datastax.oss.driver.api.core.cql.StatementBuilder<SelfT extends com.datastax.oss.driver.api.core.cql.StatementBuilder<SelfT, StatementT extends com.datastax.oss.driver.api.core.cql.Statement<StatementT>>, StatementT>, StatementT extends com.datastax.oss.driver.api.core.cql.Statement<StatementT extends com.datastax.oss.driver.api.core.cql.Statement<StatementT>>>::withTracing()",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2164: Rename statement builder methods to setXxx"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException::<init>(===java.net.SocketAddress===, java.lang.String, java.util.List<com.datastax.oss.driver.api.core.ProtocolVersion>)",
"new": "parameter void com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException::<init>(===com.datastax.oss.driver.api.core.metadata.EndPoint===, java.lang.String, java.util.List<com.datastax.oss.driver.api.core.ProtocolVersion>)",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException::forNegotiation(===java.net.SocketAddress===, java.util.List<com.datastax.oss.driver.api.core.ProtocolVersion>)",
"new": "parameter com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException::forNegotiation(===com.datastax.oss.driver.api.core.metadata.EndPoint===, java.util.List<com.datastax.oss.driver.api.core.ProtocolVersion>)",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException::forSingleAttempt(===java.net.SocketAddress===, com.datastax.oss.driver.api.core.ProtocolVersion)",
"new": "parameter com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException::forSingleAttempt(===com.datastax.oss.driver.api.core.metadata.EndPoint===, com.datastax.oss.driver.api.core.ProtocolVersion)",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.removed",
"old": "method java.net.SocketAddress com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException::getAddress()",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter com.datastax.oss.driver.api.core.auth.Authenticator com.datastax.oss.driver.api.core.auth.AuthProvider::newAuthenticator(===java.net.SocketAddress===, java.lang.String) throws com.datastax.oss.driver.api.core.auth.AuthenticationException",
"new": "parameter com.datastax.oss.driver.api.core.auth.Authenticator com.datastax.oss.driver.api.core.auth.AuthProvider::newAuthenticator(===com.datastax.oss.driver.api.core.metadata.EndPoint===, java.lang.String) throws com.datastax.oss.driver.api.core.auth.AuthenticationException",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void com.datastax.oss.driver.api.core.auth.AuthProvider::onMissingChallenge(===java.net.SocketAddress===) throws com.datastax.oss.driver.api.core.auth.AuthenticationException",
"new": "parameter void com.datastax.oss.driver.api.core.auth.AuthProvider::onMissingChallenge(===com.datastax.oss.driver.api.core.metadata.EndPoint===) throws com.datastax.oss.driver.api.core.auth.AuthenticationException",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void com.datastax.oss.driver.api.core.auth.AuthenticationException::<init>(===java.net.SocketAddress===, java.lang.String)",
"new": "parameter void com.datastax.oss.driver.api.core.auth.AuthenticationException::<init>(===com.datastax.oss.driver.api.core.metadata.EndPoint===, java.lang.String)",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter void com.datastax.oss.driver.api.core.auth.AuthenticationException::<init>(===java.net.SocketAddress===, java.lang.String, java.lang.Throwable)",
"new": "parameter void com.datastax.oss.driver.api.core.auth.AuthenticationException::<init>(===com.datastax.oss.driver.api.core.metadata.EndPoint===, java.lang.String, java.lang.Throwable)",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.removed",
"old": "method java.net.SocketAddress com.datastax.oss.driver.api.core.auth.AuthenticationException::getAddress()",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.numberOfParametersChanged",
"old": "method void com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy::init(java.util.Map<java.net.InetSocketAddress, com.datastax.oss.driver.api.core.metadata.Node>, com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy.DistanceReporter, java.util.Set<java.net.InetSocketAddress>)",
"new": "method void com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy::init(java.util.Map<java.util.UUID, com.datastax.oss.driver.api.core.metadata.Node>, com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy.DistanceReporter)",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.returnTypeTypeParametersChanged",
"old": "method java.util.Map<java.net.InetSocketAddress, com.datastax.oss.driver.api.core.metadata.Node> com.datastax.oss.driver.api.core.metadata.Metadata::getNodes()",
"new": "method java.util.Map<java.util.UUID, com.datastax.oss.driver.api.core.metadata.Node> com.datastax.oss.driver.api.core.metadata.Metadata::getNodes()",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.addedToInterface",
"new": "method java.util.Optional<java.net.InetSocketAddress> com.datastax.oss.driver.api.core.metadata.Node::getBroadcastRpcAddress()",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.removed",
"old": "method java.net.InetSocketAddress com.datastax.oss.driver.api.core.metadata.Node::getConnectAddress()",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.addedToInterface",
"new": "method com.datastax.oss.driver.api.core.metadata.EndPoint com.datastax.oss.driver.api.core.metadata.Node::getEndPoint()",
"package": "com.datastax.oss.driver.api.core.metadata",
"justification": "JAVA-2165: Abstract node connection information"
},
{
"code": "java.method.parameterTypeChanged",
"old": "parameter javax.net.ssl.SSLEngine com.datastax.oss.driver.api.core.ssl.SslEngineFactory::newSslEngine(===java.net.SocketAddress===)",
"new": "parameter javax.net.ssl.SSLEngine com.datastax.oss.driver.api.core.ssl.SslEngineFactory::newSslEngine(===com.datastax.oss.driver.api.core.metadata.EndPoint===)",
"oldArchive": "com.datastax.oss:java-driver-core:jar:4.0.0-rc1",
"justification": "JAVA-2165: Abstract node connection information"
}
]
}
Expand Down
Expand Up @@ -16,10 +16,10 @@
package com.datastax.oss.driver.api.core;

import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;

Expand All @@ -34,51 +34,51 @@
public class UnsupportedProtocolVersionException extends DriverException {
private static final long serialVersionUID = 0;

private final SocketAddress address;
private final EndPoint endPoint;
private final List<ProtocolVersion> attemptedVersions;

@NonNull
public static UnsupportedProtocolVersionException forSingleAttempt(
@NonNull SocketAddress address, @NonNull ProtocolVersion attemptedVersion) {
@NonNull EndPoint endPoint, @NonNull ProtocolVersion attemptedVersion) {
String message =
String.format("[%s] Host does not support protocol version %s", address, attemptedVersion);
String.format("[%s] Host does not support protocol version %s", endPoint, attemptedVersion);
return new UnsupportedProtocolVersionException(
address, message, Collections.singletonList(attemptedVersion), null);
endPoint, message, Collections.singletonList(attemptedVersion), null);
}

@NonNull
public static UnsupportedProtocolVersionException forNegotiation(
@NonNull SocketAddress address, @NonNull List<ProtocolVersion> attemptedVersions) {
@NonNull EndPoint endPoint, @NonNull List<ProtocolVersion> attemptedVersions) {
String message =
String.format(
"[%s] Protocol negotiation failed: could not find a common version (attempted: %s). "
+ "Note that the driver does not support Cassandra 2.0 or lower.",
address, attemptedVersions);
endPoint, attemptedVersions);
return new UnsupportedProtocolVersionException(
address, message, ImmutableList.copyOf(attemptedVersions), null);
endPoint, message, ImmutableList.copyOf(attemptedVersions), null);
}

public UnsupportedProtocolVersionException(
@Nullable SocketAddress address, // technically nullable, but should never be in real life
@Nullable EndPoint endPoint, // technically nullable, but should never be in real life
@NonNull String message,
@NonNull List<ProtocolVersion> attemptedVersions) {
this(address, message, attemptedVersions, null);
this(endPoint, message, attemptedVersions, null);
}

private UnsupportedProtocolVersionException(
SocketAddress address,
EndPoint endPoint,
String message,
List<ProtocolVersion> attemptedVersions,
ExecutionInfo executionInfo) {
super(message, executionInfo, null, true);
this.address = address;
this.endPoint = endPoint;
this.attemptedVersions = attemptedVersions;
}

/** The address of the node that threw the error. */
@Nullable
public SocketAddress getAddress() {
return address;
public EndPoint getEndPoint() {
return endPoint;
}

/** The versions that were attempted. */
Expand All @@ -91,6 +91,6 @@ public List<ProtocolVersion> getAttemptedVersions() {
@Override
public DriverException copy() {
return new UnsupportedProtocolVersionException(
address, getMessage(), attemptedVersions, getExecutionInfo());
endPoint, getMessage(), attemptedVersions, getExecutionInfo());
}
}
Expand Up @@ -16,9 +16,9 @@
package com.datastax.oss.driver.api.core.auth;

import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.internal.core.auth.PlainTextAuthProvider;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.net.SocketAddress;

/**
* Provides {@link Authenticator} instances to use when connecting to Cassandra nodes.
Expand All @@ -31,12 +31,12 @@ public interface AuthProvider extends AutoCloseable {
/**
* The authenticator to use when connecting to {@code host}.
*
* @param host the Cassandra host to connect to.
* @param endPoint the Cassandra host to connect to.
* @param serverAuthenticator the configured authenticator on the host.
* @return the authentication implementation to use.
*/
@NonNull
Authenticator newAuthenticator(@NonNull SocketAddress host, @NonNull String serverAuthenticator)
Authenticator newAuthenticator(@NonNull EndPoint endPoint, @NonNull String serverAuthenticator)
throws AuthenticationException;

/**
Expand All @@ -55,5 +55,5 @@ Authenticator newAuthenticator(@NonNull SocketAddress host, @NonNull String serv
* will be retried according to the {@link ReconnectionPolicy}).
* </ul>
*/
void onMissingChallenge(@NonNull SocketAddress host) throws AuthenticationException;
void onMissingChallenge(@NonNull EndPoint endPoint) throws AuthenticationException;
}
Expand Up @@ -16,9 +16,9 @@
package com.datastax.oss.driver.api.core.auth;

import com.datastax.oss.driver.api.core.AllNodesFailedException;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.net.SocketAddress;

/**
* Indicates an error during the authentication phase while connecting to a node.
Expand All @@ -30,21 +30,21 @@
public class AuthenticationException extends RuntimeException {
private static final long serialVersionUID = 0;

private final SocketAddress address;
private final EndPoint endPoint;

public AuthenticationException(@NonNull SocketAddress address, @NonNull String message) {
this(address, message, null);
public AuthenticationException(@NonNull EndPoint endPoint, @NonNull String message) {
this(endPoint, message, null);
}

public AuthenticationException(
@NonNull SocketAddress address, @NonNull String message, @Nullable Throwable cause) {
super(String.format("Authentication error on host %s: %s", address, message), cause);
this.address = address;
@NonNull EndPoint endPoint, @NonNull String message, @Nullable Throwable cause) {
super(String.format("Authentication error on node %s: %s", endPoint, message), cause);
this.endPoint = endPoint;
}

/** The address of the node that encountered the error. */
@NonNull
public SocketAddress getAddress() {
return address;
public EndPoint getEndPoint() {
return endPoint;
}
}
Expand Up @@ -19,14 +19,11 @@
import com.datastax.oss.driver.api.core.metadata.NodeState;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.session.SessionBuilder;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;

/** Decides which Cassandra nodes to contact for each query. */
public interface LoadBalancingPolicy extends AutoCloseable {
Expand All @@ -42,17 +39,8 @@ public interface LoadBalancingPolicy extends AutoCloseable {
* NodeState#UNKNOWN}. Node states may be updated concurrently while this method executes, but
* if so you will receive a notification.
* @param distanceReporter an object that will be used by the policy to signal distance changes.
* @param contactPoints the set of contact points that the driver was initialized with (see {@link
* SessionBuilder#addContactPoints(Collection)}). This is provided for reference, in case the
* policy needs to handle those nodes in a particular way. Each address in this set should
* normally have a corresponding entry in {@code nodes}, except for contact points that were
* down or invalid. If no contact points were provided, the driver defaults to 127.0.0.1:9042,
* but the set will be empty.
*/
void init(
@NonNull Map<InetSocketAddress, Node> nodes,
@NonNull DistanceReporter distanceReporter,
@NonNull Set<InetSocketAddress> contactPoints);
void init(@NonNull Map<UUID, Node> nodes, @NonNull DistanceReporter distanceReporter);

/**
* Returns the coordinators to use for a new query.
Expand Down
@@ -0,0 +1,47 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.api.core.metadata;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

/**
* Encapsulates the information needed to open connections to a node.
*
* <p>By default, the driver assumes plain TCP connections, and this is just a wrapper around an
* {@link InetSocketAddress}. However, more complex deployment scenarios might use a custom
* implementation that contains additional information; for example, if the nodes are accessed
* through a proxy with SNI routing, an SNI server name is needed in addition to the proxy address.
*/
public interface EndPoint {

/**
* Resolves this instance to a socket address.
*
* <p>This will be called each time the driver opens a new connection to the node. The returned
* address cannot be null.
*/
SocketAddress resolve();

/**
* Returns an alternate string representation for use in node-level metric names.
*
* <p>Because metrics names are path-like, dot-separated strings, raw IP addresses don't make very
* good identifiers. So this method will typically replace the dots by another character, for
* example {@code 127_0_0_1_9042}.
*/
String asMetricPrefix();
}

0 comments on commit 5b2db90

Please sign in to comment.