Skip to content

Commit

Permalink
MB-40596: support alternate addresses
Browse files Browse the repository at this point in the history
Change-Id: Ic58c0c1f7c7cdbf15f776db1754f2ee245213eb0
Reviewed-on: http://review.couchbase.org/c/analytics-dcp-client/+/133109
Well-Formed: Build Bot <build@couchbase.com>
Reviewed-by: Till Westmann <till@couchbase.com>
Tested-by: Michael Blow <michael.blow@couchbase.com>
  • Loading branch information
mblow committed Jul 24, 2020
1 parent 7d1733d commit a12549a
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 29 deletions.
13 changes: 12 additions & 1 deletion src/main/java/com/couchbase/client/dcp/Client.java
Expand Up @@ -12,6 +12,7 @@
import java.util.stream.Collectors;

import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.env.NetworkResolution;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.time.Delay;
Expand Down Expand Up @@ -88,7 +89,8 @@ public Client(Builder builder) {
.setBootstrapHttpDirectPort(builder.configPort()).setBootstrapHttpSslPort(builder.sslConfigPort())
.setVbuckets(builder.vbuckets()).setClusterAt(builder.clusterAt())
.setFlowControlCallback(builder.flowControlCallback()).setUuid(builder.uuid())
.setDynamicConfigurationNodes(builder.dynamicConfigurationNodes()).build();
.setDynamicConfigurationNodes(builder.dynamicConfigurationNodes())
.setNetworkResolution(builder.networkResolution()).build();

ackEnabled = env.dcpControl().ackEnabled();
if (ackEnabled && env.ackWaterMark() == 0) {
Expand Down Expand Up @@ -456,6 +458,7 @@ public static class Builder {
private int sslConfigPort = ClientEnvironment.BOOTSTRAP_HTTP_SSL_PORT;
private short[] vbuckets;
private FlowControlCallback flowControlCallback = FlowControlCallback.NOOP;
private NetworkResolution networkResolution = NetworkResolution.DEFAULT;
// Total timeouts, attempt timeouts, and delays
private long configProviderAttemptTimeout = ClientEnvironment.DEFAULT_CONFIG_PROVIDER_ATTEMPT_TIMEOUT;
private long configProviderTotalTimeout = ClientEnvironment.DEFAULT_CONFIG_PROVIDER_TOTAL_TIMEOUT;
Expand Down Expand Up @@ -792,6 +795,10 @@ public Builder connectionString(String connectionString) {
return this;
}

public void networkResolution(NetworkResolution external) {
this.networkResolution = external;
}

public ConnectionNameGenerator connectionNameGenerator() {
return connectionNameGenerator;
}
Expand Down Expand Up @@ -884,6 +891,10 @@ public short[] vbuckets() {
public String connectionString() {
return connectionString;
}

public NetworkResolution networkResolution() {
return networkResolution;
}
}

public long[] getStreamedSequenceNumbers() {
Expand Down
84 changes: 62 additions & 22 deletions src/main/java/com/couchbase/client/dcp/conductor/Conductor.java
Expand Up @@ -3,14 +3,17 @@
*/
package com.couchbase.client.dcp.conductor;

import static com.couchbase.client.core.env.NetworkResolution.EXTERNAL;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.util.NetworkUtil;

import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.core.config.AlternateAddress;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.logging.CouchbaseLogLevel;
Expand Down Expand Up @@ -193,20 +196,44 @@ private DcpChannel masterChannelByPartition(short partition) {
int index = config.nodeIndexForMaster(partition, false);
if (index < 0) {
throw new CouchbaseException(
"partition " + partition + " does not have a master node. Configuration: " + config);
"partition " + partition + " does not have a master node; config: " + config);
}
NodeInfo node = config.nodeAtIndex(index);
InetSocketAddress address = new InetSocketAddress(node.hostname(),
(env.sslEnabled() ? node.sslServices() : node.services()).get(ServiceType.BINARY));
DcpChannel theChannel = channels.get(address);
DcpChannel theChannel = dcpChannelForNode(partition, config.nodeAtIndex(index));
if (theChannel == null) {
throw new IllegalStateException("No DcpChannel found for partition " + partition + ". env vbuckets = "
+ Arrays.toString(env.vbuckets()));
throw new MasterDcpChannelNotFoundException(
"master DcpChannel not found for partition " + partition + "; config: " + config);
}
return theChannel;
}
}

private DcpChannel dcpChannelForNode(short partition, NodeInfo node) {
if (env.networkResolution().equals(EXTERNAL)) {
AlternateAddress aa = node.alternateAddresses().get(EXTERNAL.name());
if (aa == null) {
LOGGER.debug("partition {} master node {} does not provide an external alternate address", partition,
NetworkUtil.toHostPort(node.hostname(), node.services().get(ServiceType.CONFIG)));
return null;
}
Map<ServiceType, Integer> services = env.sslEnabled() ? aa.sslServices() : aa.services();
if (services.containsKey(ServiceType.BINARY)) {
int altPort = services.get(ServiceType.BINARY);
InetSocketAddress altAddress = new InetSocketAddress(aa.hostname(), altPort);
return channels.get(altAddress);
} else {
LOGGER.debug(
"partition {} master node {} does not provide the KV service on its external alternate address {}",
partition, NetworkUtil.toHostPort(node.hostname(), node.services().get(ServiceType.CONFIG)),
aa.hostname());
return null;
}
} else {
InetSocketAddress address = new InetSocketAddress(node.hostname(),
(env.sslEnabled() ? node.sslServices() : node.services()).get(ServiceType.BINARY));
return channels.get(address);
}
}

private synchronized void createSession(CouchbaseBucketConfig config) {
if (sessionState == null) {
sessionState = new SessionState(config.numberOfPartitions(), getUuid(config.uri()));
Expand All @@ -218,28 +245,41 @@ private synchronized void createSession(CouchbaseBucketConfig config) {
public void add(NodeInfo node, CouchbaseBucketConfig config, long attemptTimeout, long totalTimeout, Delay delay)
throws Throwable {
synchronized (channels) {
if (!(node.services().containsKey(ServiceType.BINARY)
|| node.sslServices().containsKey(ServiceType.BINARY))) {
return;
}
if (!config.hasPrimaryPartitionsOnNode(node.hostname())) {
return;
}
InetSocketAddress address = new InetSocketAddress(node.hostname(),
(env.sslEnabled() ? node.sslServices() : node.services()).get(ServiceType.BINARY));
InetSocketAddress address;
if (env.networkResolution().equals(EXTERNAL)) {
AlternateAddress aa = node.alternateAddresses().get(EXTERNAL.name());
if (aa == null) {
LOGGER.warn("node {} does not provide an external alternate address",
NetworkUtil.toHostPort(node.hostname(), node.services().get(ServiceType.CONFIG)));
return;
}
Map<ServiceType, Integer> services = env.sslEnabled() ? aa.sslServices() : aa.services();
if (!services.containsKey(ServiceType.BINARY)) {
LOGGER.warn("node {} does not provide the KV service on its external alternate address {}",
NetworkUtil.toHostPort(node.hostname(), node.services().get(ServiceType.CONFIG)),
aa.hostname());
return;
}
int altPort = services.get(ServiceType.BINARY);
address = new InetSocketAddress(aa.hostname(), altPort);
} else {
final Map<ServiceType, Integer> services = env.sslEnabled() ? node.sslServices() : node.services();
if (!services.containsKey(ServiceType.BINARY)) {
return;
}
address = new InetSocketAddress(node.hostname(), services.get(ServiceType.BINARY));
}
if (channels.containsKey(address)) {
return;
}
LOGGER.debug("Adding DCP Channel against {}", node);
final DcpChannel channel = new DcpChannel(address, node.hostname(), env, sessionState,
DcpChannel channel = new DcpChannel(address, node.hostname(), env, sessionState,
configProvider.config().numberOfPartitions());
LOGGER.debug("Adding DCP Channel against {}", node);
channel.connect(attemptTimeout, totalTimeout, delay);
channels.put(address, channel);
try {
channel.connect(attemptTimeout, totalTimeout, delay);
} catch (Throwable th) {
channels.remove(address);
throw th;
}
}
}

Expand Down
@@ -0,0 +1,14 @@
/*
* Copyright 2020 Couchbase, Inc.
*/
package com.couchbase.client.dcp.conductor;

import com.couchbase.client.core.CouchbaseException;

public class MasterDcpChannelNotFoundException extends CouchbaseException {
private static final long serialVersionUID = 1L;

public MasterDcpChannelNotFoundException(String message) {
super(message);
}
}
Expand Up @@ -3,11 +3,14 @@
*/
package com.couchbase.client.dcp.conductor;

import static com.couchbase.client.core.env.NetworkResolution.EXTERNAL;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -18,6 +21,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.couchbase.client.core.config.AlternateAddress;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.service.ServiceType;
Expand All @@ -36,7 +40,7 @@ public class NonStreamingConfigProvider implements ConfigProvider, IConfigurable

private static final Logger LOGGER = LogManager.getLogger();
private static final long MIN_MILLIS_PER_REFRESH = 1000;
private final Set<InetSocketAddress> sockets = new HashSet<>();
private final Set<InetSocketAddress> sockets = new LinkedHashSet<>();

private final ClientEnvironment env;
private volatile CouchbaseBucketConfig config;
Expand Down Expand Up @@ -164,8 +168,27 @@ public synchronized void configure(CouchbaseBucketConfig config) throws Exceptio
for (NodeInfo node : config.nodes()) {
int port = (env.sslEnabled() ? node.sslServices() : node.services()).get(ServiceType.CONFIG);
InetSocketAddress address = new InetSocketAddress(node.hostname(), port);
LOGGER.info("Adding a config node {}", address);
configNodes.add(address);
if (env.networkResolution().equals(EXTERNAL)) {
AlternateAddress aa = node.alternateAddresses().get(EXTERNAL.name());
if (aa == null) {
LOGGER.info("omitting node {} which does not provide an external alternate address", address);
continue;
}
Map<ServiceType, Integer> services = env.sslEnabled() ? aa.sslServices() : aa.services();
if (services.containsKey(ServiceType.CONFIG)) {
int altPort = services.get(ServiceType.CONFIG);
InetSocketAddress altAddress = new InetSocketAddress(aa.hostname(), altPort);
LOGGER.info("Adding a config node {} at alternate address {}", address, altAddress);
configNodes.add(altAddress);
} else {
LOGGER.info(
"omitting node {} which does not provide the config service on its external alternate address",
address);
}
} else {
LOGGER.info("Adding a config node {}", address);
configNodes.add(address);
}
}
if (!configNodes.isEmpty()) {
sockets.clear();
Expand Down
Expand Up @@ -78,7 +78,7 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception {
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final HttpResponse msg) throws Exception {
int statusCode = msg.getStatus().code();
LOGGER.log(Level.INFO, "Status code " + statusCode);
LOGGER.debug("Status code " + statusCode);
if (statusCode == 200) {
ctx.pipeline().remove(this);
originalPromise().setSuccess();
Expand Down
Expand Up @@ -9,6 +9,7 @@
import java.util.concurrent.TimeUnit;

import com.couchbase.client.core.env.ConfigParserEnvironment;
import com.couchbase.client.core.env.NetworkResolution;
import com.couchbase.client.core.node.DefaultMemcachedHashingStrategy;
import com.couchbase.client.core.node.MemcachedHashingStrategy;
import com.couchbase.client.core.time.Delay;
Expand Down Expand Up @@ -156,6 +157,7 @@ public class ClientEnvironment implements SecureEnvironment, ConfigParserEnviron
private short[] vbuckets;
private final String uuid;
private final boolean dynamicConfigurationNodes;
private final NetworkResolution networkResolution;

/**
* Creates a new environment based on the builder.
Expand Down Expand Up @@ -197,6 +199,7 @@ private ClientEnvironment(final Builder builder) {
partitionRequestsTimeout = builder.partitionRequestsTimeout;
uuid = builder.uuid;
dynamicConfigurationNodes = builder.dynamicConfigurationNodes;
networkResolution = builder.networkResolution;
}

/**
Expand Down Expand Up @@ -390,6 +393,10 @@ public MemcachedHashingStrategy memcachedHashingStrategy() {
return DefaultMemcachedHashingStrategy.INSTANCE;
}

public NetworkResolution networkResolution() {
return networkResolution;
}

public static class Builder {
private List<InetSocketAddress> clusterAt;
private ConnectionNameGenerator connectionNameGenerator = DefaultConnectionNameGenerator.INSTANCE;
Expand Down Expand Up @@ -424,6 +431,7 @@ public static class Builder {
private long dcpChannelTotalTimeout = DEFAULT_DCP_CHANNEL_TOTAL_TIMEOUT;
private Delay dcpChannelsReconnectDelay = DEFAULT_DCP_CHANNELS_RECONNECT_DELAY;
private long partitionRequestsTimeout = DEFAULT_PARTITION_REQUESTS_TIMEOUT;
private NetworkResolution networkResolution;

public Builder setClusterAt(List<InetSocketAddress> clusterAt) {
this.clusterAt = clusterAt;
Expand Down Expand Up @@ -600,6 +608,11 @@ public ClientEnvironment build() {
}
return new ClientEnvironment(this);
}

public Builder setNetworkResolution(NetworkResolution networkResolution) {
this.networkResolution = networkResolution;
return this;
}
}

/**
Expand Down
Expand Up @@ -6,6 +6,7 @@
import org.apache.hyracks.api.util.ExceptionUtils;

import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.dcp.conductor.MasterDcpChannelNotFoundException;
import com.couchbase.client.dcp.error.BucketNotFoundException;

public class RetryUtil {
Expand All @@ -15,7 +16,7 @@ private RetryUtil() {

public static boolean shouldRetry(Throwable th) {
Throwable root = ExceptionUtils.getRootCause(th);
return !((root instanceof BucketNotFoundException)
return !((root instanceof BucketNotFoundException) || (root instanceof MasterDcpChannelNotFoundException)
|| (root instanceof CouchbaseException && root.getMessage().contains("Unauthorized"))
|| (root.getMessage().contains("36: No access")));
}
Expand Down

0 comments on commit a12549a

Please sign in to comment.