Skip to content

Commit

Permalink
JAVA-2280: Ignore peer rows with missing host id or RPC address
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexandre Dutra authored and olim7t committed Jun 14, 2019
1 parent 8add38b commit d40901c
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 67 deletions.
1 change: 1 addition & 0 deletions changelog/README.md
Expand Up @@ -4,6 +4,7 @@

### 4.1.0 (in progress)

- [bug] JAVA-2280: Ignore peer rows with missing host id or RPC address
- [bug] JAVA-2264: Adjust HashedWheelTimer tick duration from 1 to 100 ms
- [bug] JAVA-2260: Handle empty collections in PreparedStatement.bind(...)
- [improvement] JAVA-2278: Pass the request's log prefix to RequestTracker
Expand Down
Expand Up @@ -42,6 +42,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -105,6 +106,7 @@ public CompletionStage<Optional<NodeInfo>> refreshNode(Node node) {
}
LOG.debug("[{}] Refreshing info for {}", logPrefix, node);
DriverChannel channel = controlConnection.channel();
EndPoint localEndPoint = channel.getEndPoint();
if (node.getEndPoint().equals(channel.getEndPoint())) {
// refreshNode is called for nodes that just came up. If the control node just came up, it
// means the control connection just reconnected, which means we did a full node refresh. So
Expand Down Expand Up @@ -132,10 +134,10 @@ public CompletionStage<Optional<NodeInfo>> refreshNode(Node node) {
"SELECT * FROM " + retrievePeerTableName() + " WHERE peer = :address",
ImmutableMap.of("address", node.getBroadcastAddress().get().getAddress()));
}
return query.thenApply(this::firstRowAsNodeInfo);
return query.thenApply(result -> firstPeerRowAsNodeInfo(result, localEndPoint));
} else {
return query(channel, "SELECT * FROM " + retrievePeerTableName())
.thenApply(result -> this.findInPeers(result, node.getHostId()));
.thenApply(result -> findInPeers(result, node.getHostId(), localEndPoint));
}
}

Expand All @@ -146,8 +148,9 @@ public CompletionStage<Optional<NodeInfo>> getNewNodeInfo(InetSocketAddress broa
}
LOG.debug("[{}] Fetching info for new node {}", logPrefix, broadcastRpcAddress);
DriverChannel channel = controlConnection.channel();
EndPoint localEndPoint = channel.getEndPoint();
return query(channel, "SELECT * FROM " + retrievePeerTableName())
.thenApply(result -> this.findInPeers(result, broadcastRpcAddress));
.thenApply(result -> findInPeers(result, broadcastRpcAddress, localEndPoint));
}

@Override
Expand All @@ -157,11 +160,7 @@ public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {
}
LOG.debug("[{}] Refreshing node list", logPrefix);
DriverChannel channel = controlConnection.channel();

// This cast always succeeds in production. The only way it could fail is in a test that uses a
// local channel, and we don't have such tests at the moment.
InetSocketAddress controlBroadcastRpcAddress =
(InetSocketAddress) channel.getEndPoint().resolve();
EndPoint localEndPoint = channel.getEndPoint();

savePort(channel);

Expand Down Expand Up @@ -197,14 +196,16 @@ public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {
peersQuery,
(controlNodeResult, peersResult) -> {
List<NodeInfo> nodeInfos = new ArrayList<>();
// Don't rely on system.local.rpc_address for the control row, because it mistakenly
// reports the normal RPC address instead of the broadcast one (CASSANDRA-11181). We
// already know the address since we've just used it to query.
nodeInfos.add(
nodeInfoBuilder(controlNodeResult.iterator().next(), controlBroadcastRpcAddress)
.build());
for (AdminRow row : peersResult) {
nodeInfos.add(asNodeInfo(row));
AdminRow localRow = controlNodeResult.iterator().next();
InetSocketAddress localBroadcastRpcAddress = getBroadcastRpcAddress(localRow);
nodeInfos.add(nodeInfoBuilder(localRow, localBroadcastRpcAddress, localEndPoint).build());
for (AdminRow peerRow : peersResult) {
if (isPeerValid(peerRow)) {
InetSocketAddress peerBroadcastRpcAddress = getBroadcastRpcAddress(peerRow);
NodeInfo nodeInfo =
nodeInfoBuilder(peerRow, peerBroadcastRpcAddress, localEndPoint).build();
nodeInfos.add(nodeInfo);
}
}
return nodeInfos;
});
Expand Down Expand Up @@ -257,36 +258,55 @@ private String retrievePeerTableName() {
return "system.peers";
}

private NodeInfo asNodeInfo(AdminRow row) {
return nodeInfoBuilder(row, getBroadcastRpcAddress(row)).build();
}

private Optional<NodeInfo> firstRowAsNodeInfo(AdminResult result) {
private Optional<NodeInfo> firstPeerRowAsNodeInfo(AdminResult result, EndPoint localEndPoint) {
Iterator<AdminRow> iterator = result.iterator();
if (iterator.hasNext()) {
return Optional.of(asNodeInfo(iterator.next()));
} else {
return Optional.empty();
AdminRow row = iterator.next();
if (isPeerValid(row)) {
InetSocketAddress peerBroadcastRpcAddress = getBroadcastRpcAddress(row);
return Optional.of(nodeInfoBuilder(row, peerBroadcastRpcAddress, localEndPoint).build());
}
}
return Optional.empty();
}

/**
* Creates a {@link DefaultNodeInfo.Builder} instance from the given row.
*
* @param broadcastRpcAddress this is a parameter only because we already have it when we come
* from {@link #findInPeers(AdminResult, InetSocketAddress)}. Callers that don't already have
* it can use {@link #getBroadcastRpcAddress}.
* from {@link #findInPeers(AdminResult, InetSocketAddress, EndPoint)}. Callers that don't
* already have it can use {@link #getBroadcastRpcAddress}. For the control host, this can be
* null; if this node is a peer however, this cannot be null, since we use that address to
* create the node's endpoint. Callers can use {@link #isPeerValid(AdminRow)} to check that
* before calling this method.
* @param localEndPoint the control node endpoint that was used to query the node's system tables.
* This is a parameter because it would be racy to call {@code
* controlConnection.channel().getEndPoint()} from within this method, as the control
* connection may have changed its channel since. So this parameter must be provided by the
* caller.
*/
@NonNull
protected DefaultNodeInfo.Builder nodeInfoBuilder(
AdminRow row, InetSocketAddress broadcastRpcAddress) {

// Deployments that use a custom EndPoint implementation will need their own TopologyMonitor.
// One simple approach is to extend this class and override this method.
EndPoint endPoint =
new DefaultEndPoint(context.getAddressTranslator().translate(broadcastRpcAddress));

DefaultNodeInfo.Builder builder =
DefaultNodeInfo.builder()
.withEndPoint(endPoint)
.withBroadcastRpcAddress(broadcastRpcAddress);
@NonNull AdminRow row,
@Nullable InetSocketAddress broadcastRpcAddress,
@NonNull EndPoint localEndPoint) {

boolean peer = row.contains("peer");

EndPoint endPoint;
if (peer) {
// If this node is a peer, its broadcast RPC address must be present.
Objects.requireNonNull(
broadcastRpcAddress, "broadcastRpcAddress cannot be null for a peer row");
// Deployments that use a custom EndPoint implementation will need their own TopologyMonitor.
// One simple approach is to extend this class and override this method.
endPoint = new DefaultEndPoint(context.getAddressTranslator().translate(broadcastRpcAddress));
} else {
// Don't rely on system.local.rpc_address for the control node, because it mistakenly
// reports the normal RPC address instead of the broadcast one (CASSANDRA-11181). We
// already know the endpoint anyway since we've just used it to query.
endPoint = localEndPoint;
}

// in system.local
InetAddress broadcastInetAddress = row.getInetAddress("broadcast_address");
Expand Down Expand Up @@ -324,37 +344,47 @@ protected DefaultNodeInfo.Builder nodeInfoBuilder(
listenAddress = new InetSocketAddress(listenInetAddress, listenPort);
}

builder.withBroadcastAddress(broadcastAddress);
builder.withListenAddress(listenAddress);
builder.withDatacenter(row.getString("data_center"));
builder.withRack(row.getString("rack"));
builder.withCassandraVersion(row.getString("release_version"));
builder.withTokens(row.getSetOfString("tokens"));
builder.withPartitioner(row.getString("partitioner"));
builder.withHostId(row.getUuid("host_id"));
builder.withSchemaVersion(row.getUuid("schema_version"));
return builder;
return DefaultNodeInfo.builder()
.withEndPoint(endPoint)
.withBroadcastRpcAddress(broadcastRpcAddress)
.withBroadcastAddress(broadcastAddress)
.withListenAddress(listenAddress)
.withDatacenter(row.getString("data_center"))
.withRack(row.getString("rack"))
.withCassandraVersion(row.getString("release_version"))
.withTokens(row.getSetOfString("tokens"))
.withPartitioner(row.getString("partitioner"))
.withHostId(Objects.requireNonNull(row.getUuid("host_id")))
.withSchemaVersion(row.getUuid("schema_version"));
}

// Called when a new node is being added; the peers table is keyed by broadcast_address,
// but the received event only contains broadcast_rpc_address, so
// we have to traverse the whole table and check the rows one by one.
private Optional<NodeInfo> findInPeers(
AdminResult result, InetSocketAddress broadcastRpcAddressToFind) {
// The peers table is keyed by broadcast_address, but we only have the broadcast_rpc_address, so
// we have to traverse the whole table and check the rows one by one.
AdminResult result, InetSocketAddress broadcastRpcAddressToFind, EndPoint localEndPoint) {
for (AdminRow row : result) {
InetSocketAddress broadcastRpcAddress = getBroadcastRpcAddress(row);
if (broadcastRpcAddress != null && broadcastRpcAddress.equals(broadcastRpcAddressToFind)) {
return Optional.of(nodeInfoBuilder(row, broadcastRpcAddress).build());
if (broadcastRpcAddress != null
&& broadcastRpcAddress.equals(broadcastRpcAddressToFind)
&& isPeerValid(row)) {
return Optional.of(nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build());
}
}
LOG.debug("[{}] Could not find any peer row matching {}", logPrefix, broadcastRpcAddressToFind);
return Optional.empty();
}

private Optional<NodeInfo> findInPeers(AdminResult result, UUID hostIdToFind) {
// Called when refreshing an existing node, and we don't know its broadcast address; in this
// case we attempt a search by host id and have to traverse the whole table and check the rows one
// by one.
private Optional<NodeInfo> findInPeers(
AdminResult result, UUID hostIdToFind, EndPoint localEndPoint) {
for (AdminRow row : result) {
UUID hostId = row.getUuid("host_id");
if (hostId != null && hostId.equals(hostIdToFind)) {
return Optional.of(nodeInfoBuilder(row, getBroadcastRpcAddress(row)).build());
if (hostId != null && hostId.equals(hostIdToFind) && isPeerValid(row)) {
InetSocketAddress broadcastRpcAddress = getBroadcastRpcAddress(row);
return Optional.of(nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build());
}
}
LOG.debug("[{}] Could not find any peer row matching {}", logPrefix, hostIdToFind);
Expand Down Expand Up @@ -406,4 +436,29 @@ protected InetSocketAddress getBroadcastRpcAddress(@NonNull AdminRow row) {
}
return new InetSocketAddress(broadcastRpcInetAddress, broadcastRpcPort);
}

/**
* Returns {@code true} if the given peer row is valid, and {@code false} otherwise.
*
* <p>This method must at least ensure that the row contains enough information to extract the
* node's broadcast RPC address and host ID; otherwise the driver may not work properly.
*/
protected boolean isPeerValid(AdminRow peerRow) {
boolean hasPeersRpcAddress = peerRow.getInetAddress("rpc_address") != null;
boolean hasPeersV2RpcAddress =
peerRow.getInetAddress("native_address") != null
&& peerRow.getInteger("native_port") != null;
boolean hasRpcAddress = hasPeersV2RpcAddress || hasPeersRpcAddress;
boolean hasHostId = peerRow.getUuid("host_id") != null;
boolean valid = hasRpcAddress && hasHostId;
if (!valid) {
LOG.warn(
"[{}] Found invalid row in {} for peer: {}. "
+ "This is likely a gossip or snitch issue, this node will be ignored.",
logPrefix,
retrievePeerTableName(),
peerRow.getInetAddress("peer"));
}
return valid;
}
}

0 comments on commit d40901c

Please sign in to comment.