Skip to content

Commit

Permalink
Add metadata refreshes for single nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
olim7t committed Apr 19, 2017
1 parent c4b16b2 commit f1f37c9
Show file tree
Hide file tree
Showing 14 changed files with 453 additions and 65 deletions.
Expand Up @@ -58,7 +58,11 @@ public static AdminRequestHandler query(
new Query(
query,
buildQueryOptions(pageSize, serialize(parameters, channel.protocolVersion()), null));
return new AdminRequestHandler(channel, message, timeout);
String debugString = "query '" + query + "'";
if (!parameters.isEmpty()) {
debugString += " with parameters " + parameters;
}
return new AdminRequestHandler(channel, message, timeout, debugString);
}

public static AdminRequestHandler query(
Expand All @@ -67,32 +71,29 @@ public static AdminRequestHandler query(
}

public static AdminRequestHandler prepare(DriverChannel channel, String query, Duration timeout) {
return new AdminRequestHandler(channel, new Prepare(query), timeout);
String debugString = "prepare '" + query + "'";
return new AdminRequestHandler(channel, new Prepare(query), timeout, debugString);
}

private final DriverChannel channel;
private final Message message;
private final Duration timeout;
private final String debugString;
private final CompletableFuture<AdminResult> result = new CompletableFuture<>();

// This is only ever accessed on the channel's event loop, so it doesn't need to be volatile
private ScheduledFuture<?> timeoutFuture;

private AdminRequestHandler(DriverChannel channel, Message message, Duration timeout) {
private AdminRequestHandler(
DriverChannel channel, Message message, Duration timeout, String debugString) {
this.channel = channel;
this.message = message;
this.timeout = timeout;
this.debugString = debugString;
}

public CompletionStage<AdminResult> start() {
if (LOG.isDebugEnabled()) {
if (message instanceof Query) {
Query query = (Query) this.message;
LOG.debug("Executing query '{}' with values {}", query.query, query.options.namedValues);
} else if (message instanceof Prepare) {
LOG.debug("Preparing {}", ((Prepare) message).cqlQuery);
}
}
LOG.debug("Executing {}", this);
channel.write(message, false, Frame.NO_PAYLOAD, this).addListener(this::onWriteComplete);
return result;
}
Expand Down Expand Up @@ -146,7 +147,8 @@ private AdminRequestHandler copy(ByteBuffer pagingState) {
QueryOptions currentOptions = current.options;
QueryOptions newOptions =
buildQueryOptions(currentOptions.pageSize, currentOptions.namedValues, pagingState);
return new AdminRequestHandler(channel, new Query(current.query, newOptions), timeout);
return new AdminRequestHandler(
channel, new Query(current.query, newOptions), timeout, debugString);
}

private static QueryOptions buildQueryOptions(
Expand Down Expand Up @@ -185,4 +187,9 @@ private static ByteBuffer serialize(Object parameter, ProtocolVersion protocolVe
"Unsupported variable type for admin query: " + parameter.getClass());
}
}

@Override
public String toString() {
return debugString;
}
}
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2017-2017 DataStax Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.internal.core.metadata;

import com.datastax.oss.driver.api.core.metadata.Node;
import com.google.common.collect.ImmutableMap;
import java.net.InetSocketAddress;
import java.util.Map;

public class AddNodeRefresh extends NodesRefresh {
private final TopologyMonitor.NodeInfo newNodeInfo;

public AddNodeRefresh(DefaultMetadata oldMetadata, TopologyMonitor.NodeInfo newNodeInfo) {
super(oldMetadata);
this.newNodeInfo = newNodeInfo;
}

@Override
protected Map<InetSocketAddress, Node> computeNewNodes() {
Map<InetSocketAddress, Node> oldNodes = oldMetadata.getNodes();
if (oldNodes.containsKey(newNodeInfo.getConnectAddress())) {
return oldNodes;
} else {
DefaultNode newNode = new DefaultNode(newNodeInfo.getConnectAddress());
copyInfos(newNodeInfo, newNode);
events.add(NodeStateEvent.added(newNode));
return ImmutableMap.<InetSocketAddress, Node>builder()
.putAll(oldNodes)
.put(newNode.getConnectAddress(), newNode)
.build();
}
}
}
Expand Up @@ -57,21 +57,4 @@ public DefaultMetadata addNode(Node toAdd) {
return new DefaultMetadata(newNodes);
}
}

public DefaultMetadata removeNode(InetSocketAddress toRemove) {
Map<InetSocketAddress, Node> newNodes;
if (!nodes.containsKey(toRemove)) {
return this;
} else {
ImmutableMap.Builder<InetSocketAddress, Node> builder = ImmutableMap.builder();
for (Map.Entry<InetSocketAddress, Node> entry : nodes.entrySet()) {
if (!entry.getKey().equals(toRemove)) {
builder.put(entry.getKey(), entry.getValue());
}
}
newNodes = builder.build();
// TODO recompute token map
return new DefaultMetadata(newNodes);
}
}
}
Expand Up @@ -18,22 +18,30 @@
import com.datastax.oss.driver.api.core.addresstranslation.AddressTranslator;
import com.datastax.oss.driver.api.core.config.CoreDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigProfile;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler;
import com.datastax.oss.driver.internal.core.adminrequest.AdminResult;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.control.ControlConnection;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.google.common.collect.ImmutableMap;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** The default topology monitor, based on the control connection. */
/**
* The default topology monitor, based on {@link ControlConnection}.
*
* <p>Note that event processing is implemented directly in the control connection, not here.
*/
public class DefaultTopologyMonitor implements TopologyMonitor {
private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyMonitor.class);

Expand All @@ -59,8 +67,40 @@ public CompletionStage<Void> init() {
}

@Override
public CompletionStage<NodeInfo> refreshNode(InetSocketAddress address) {
return CompletableFutures.failedFuture(new UnsupportedOperationException("TODO"));
public CompletionStage<Optional<NodeInfo>> refreshNode(Node node) {
LOG.debug("Refreshing info for {}", node);
DriverChannel channel = controlConnection.channel();
if (node.getConnectAddress().equals(channel.address())) {
// refreshNode is called for nodes that just came up. If the control node just came up, it
// means the control connection just reconnected, which means we did a full node refresh. So
// we don't need to process this call.
LOG.debug("Ignoring refresh of control node");
return CompletableFuture.completedFuture(Optional.empty());
} else if (node.getBroadcastAddress().isPresent()) {
return AdminRequestHandler.query(
channel,
"SELECT * FROM system.peers WHERE peer = :address",
ImmutableMap.of("address", node.getBroadcastAddress().get()),
timeout,
INFINITE_PAGE_SIZE)
.start()
.thenApply(this::buildNodeInfoFromFirstRow);
} else {
return AdminRequestHandler.query(
channel, "SELECT * FROM system.peers", timeout, INFINITE_PAGE_SIZE)
.start()
.thenApply(result -> this.findInPeers(result, node.getConnectAddress()));
}
}

@Override
public CompletionStage<Optional<NodeInfo>> getNewNodeInfo(InetSocketAddress connectAddress) {
LOG.debug("Fetching info for new node {}", connectAddress);
DriverChannel channel = controlConnection.channel();
return AdminRequestHandler.query(
channel, "SELECT * FROM system.peers", timeout, INFINITE_PAGE_SIZE)
.start()
.thenApply(result -> this.findInPeers(result, connectAddress));
}

@Override
Expand Down Expand Up @@ -114,6 +154,31 @@ private NodeInfo buildNodeInfo(AdminResult.Row row) {
return builder.build();
}

private Optional<NodeInfo> buildNodeInfoFromFirstRow(AdminResult result) {
Iterator<AdminResult.Row> iterator = result.iterator();
if (iterator.hasNext()) {
return Optional.of(buildNodeInfo(iterator.next()));
} else {
return Optional.empty();
}
}

private Optional<NodeInfo> findInPeers(AdminResult result, InetSocketAddress connectAddress) {
// The peers table is keyed by broadcast_address, but we only have the translated
// broadcast_rpc_address, so we have to traverse the whole table and check the rows one by one.
for (AdminResult.Row row : result) {
InetAddress broadcastRpcAddress = row.getInet("rpc_address");
if (broadcastRpcAddress != null
&& addressTranslator
.translate(new InetSocketAddress(broadcastRpcAddress, port))
.equals(connectAddress)) {
return Optional.of(buildNodeInfo(row));
}
}
LOG.debug("Could not find any peer row matching {}", connectAddress);
return Optional.empty();
}

// Current versions of Cassandra (3.11 at the time of writing), require the same port for all
// nodes. As a consequence, the port is not stored in system tables.
// We save it the first time we get a control connection channel.
Expand Down
Expand Up @@ -26,7 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FullNodeListRefresh extends NodeListRefresh {
class FullNodeListRefresh extends NodesRefresh {

private static final Logger LOG = LoggerFactory.getLogger(FullNodeListRefresh.class);
private final Iterable<TopologyMonitor.NodeInfo> nodeInfos;
Expand Down
Expand Up @@ -16,11 +16,14 @@
package com.datastax.oss.driver.internal.core.metadata;

import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.metadata.TopologyMonitor.NodeInfo;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import io.netty.util.concurrent.EventExecutor;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -66,22 +69,43 @@ public CompletionStage<Void> refreshNodes() {
.thenApplyAsync(singleThreaded::refreshNodes, adminExecutor);
}

public void addNode(InetSocketAddress address) {
context
public CompletionStage<Void> refreshNode(Node node) {
return context
.topologyMonitor()
.refreshNode(address)
.thenApplyAsync(singleThreaded::addNode, adminExecutor)
.exceptionally(
e -> {
LOG.debug(
"Error adding node "
+ address
+ ", this will be retried on the next full refresh",
e);
.refreshNode(node)
// The callback only updates volatile fields so no need to schedule it on adminExecutor
.thenApply(
maybeInfo -> {
if (maybeInfo.isPresent()) {
NodesRefresh.copyInfos(maybeInfo.get(), (DefaultNode) node);
} else {
LOG.debug(
"Topology monitor did not return any info for the refresh of {}, skipping",
node);
}
return null;
});
}

public void addNode(InetSocketAddress address) {
context
.topologyMonitor()
.getNewNodeInfo(address)
.whenCompleteAsync(
(info, error) -> {
if (error != null) {
LOG.debug(
"Error refreshing node info for "
+ address
+ ", this will be retried on the next full refresh",
error);
} else {
singleThreaded.addNode(address, info);
}
},
adminExecutor);
}

public void removeNode(InetSocketAddress address) {
RunOrSchedule.on(adminExecutor, () -> singleThreaded.removeNode(address));
}
Expand All @@ -106,19 +130,37 @@ private void initNodes(
initNodesFuture.complete(null);
}

private Void refreshNodes(Iterable<TopologyMonitor.NodeInfo> nodeInfos) {
private Void refreshNodes(Iterable<NodeInfo> nodeInfos) {
return refresh(new FullNodeListRefresh(metadata, nodeInfos));
}

private Void addNode(TopologyMonitor.NodeInfo nodeInfo) {
//TODO
return null;
private void addNode(InetSocketAddress address, Optional<NodeInfo> maybeInfo) {
try {
if (maybeInfo.isPresent()) {
NodeInfo info = maybeInfo.get();
if (!address.equals(info.getConnectAddress())) {
// This would be a bug in the TopologyMonitor, protect against it
LOG.warn(
"Received a request to add a node for {}, "
+ "but the provided info uses the address {}, ignoring it",
address,
info.getBroadcastAddress());
} else {
refresh(new AddNodeRefresh(metadata, info));
}
} else {
LOG.debug(
"Ignoring node addition for {} because the "
+ "topology monitor didn't return any information",
address);
}
} catch (Throwable t) {
LOG.warn("Unexpected exception while handling added node", t);
}
}

private void removeNode(InetSocketAddress address) {
LOG.debug("Removing node {}", address);
metadata = metadata.removeNode(address);
// TODO recompute token map
refresh(new RemoveNodeRefresh(metadata, address));
}

private Void refresh(MetadataRefresh refresh) {
Expand Down

0 comments on commit f1f37c9

Please sign in to comment.