Skip to content

Commit

Permalink
HBASE-25292 Improve InetSocketAddress usage discipline (#2669)
Browse files Browse the repository at this point in the history
Network identities should be bound late. Remote addresses should be
resolved at the last possible moment, just before connect(). Network
identity mappings can change, so our code should not inappropriately
cache them. Otherwise we might miss a change and fail to operate normally.

Revert "HBASE-14544 Allow HConnectionImpl to not refresh the dns on errors"
Removes hbase.resolve.hostnames.on.failure and related code. We always
resolve hostnames, as late as possible.

Preserve InetSocketAddress caching per RPC connection. Avoids potential
lookups per Call.

Replace InetSocketAddress with Address where used as a map key. If we want
to key by hostname and/or resolved address we should be explicit about it.
Using Address chooses mapping by hostname and port only.

Add metrics for potential nameservice resolution attempts, whenever an
InetSocketAddress is instantiated for connect; and metrics for failed
resolution, whenever InetSocketAddress#isUnresolved on the new instance
is true.

* Use ServerName directly to build a stub key

* Resolve and cache ISA on a RpcChannel as late as possible, at first call

* Remove now invalid unit test TestCIBadHostname

We resolve DNS at the latest possible time, at first call, and do not
resolve hostnames for creating stubs at all, so this unit test cannot
work now.

Reviewed-by: Mingliang Liu <liuml07@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
apurtell committed Dec 4, 2020
1 parent 23ef0cb commit 8e45165
Show file tree
Hide file tree
Showing 20 changed files with 273 additions and 169 deletions.
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;

import com.google.protobuf.BlockingRpcChannel;
Expand All @@ -27,8 +28,6 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
Expand Down Expand Up @@ -220,7 +219,6 @@ class ConnectionManager {

public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
private static final String RESOLVE_HOSTNAME_ON_FAIL_KEY = "hbase.resolve.hostnames.on.failure";

// An LRU Map of HConnectionKey -> HConnection (TableServer). All
// access must be synchronized. This map is not private because tests
Expand Down Expand Up @@ -564,7 +562,6 @@ public static <T> T execute(HConnectable<T> connectable) throws IOException {
justification="Access to the conncurrent hash map is under a lock so should be fine.")
static class HConnectionImplementation implements ClusterConnection, Closeable {
static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
private final boolean hostnamesCanChange;
private final long pause;
private final long pauseForCQTBE;// pause for CallQueueTooBigException, if specified
private boolean useMetaReplicas;
Expand Down Expand Up @@ -706,7 +703,6 @@ static class HConnectionImplementation implements ClusterConnection, Closeable {
}
this.metaCache = new MetaCache(this.metrics);

this.hostnamesCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
Class<? extends ClusterStatusListener.Listener> listenerClass =
Expand Down Expand Up @@ -1599,8 +1595,7 @@ private Object makeStubNoRetries() throws IOException, ServiceException {
throw new MasterNotRunningException(sn + " is dead.");
}
// Use the security info interface name as our stub key
String key = getStubKey(getServiceName(),
sn.getHostname(), sn.getPort(), hostnamesCanChange);
String key = getStubKey(getServiceName(), sn);
connectionLock.putIfAbsent(key, key);
Object stub = null;
synchronized (connectionLock.get(key)) {
Expand Down Expand Up @@ -1682,8 +1677,7 @@ public AdminService.BlockingInterface getAdmin(final ServerName serverName,
if (isDeadServer(serverName)) {
throw new RegionServerStoppedException(serverName + " is dead.");
}
String key = getStubKey(AdminService.BlockingInterface.class.getName(),
serverName.getHostname(), serverName.getPort(), this.hostnamesCanChange);
String key = getStubKey(AdminService.BlockingInterface.class.getName(), serverName);
this.connectionLock.putIfAbsent(key, key);
AdminService.BlockingInterface stub = null;
synchronized (this.connectionLock.get(key)) {
Expand All @@ -1704,8 +1698,7 @@ public ClientService.BlockingInterface getClient(final ServerName sn)
if (isDeadServer(sn)) {
throw new RegionServerStoppedException(sn + " is dead.");
}
String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostname(),
sn.getPort(), this.hostnamesCanChange);
String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn);
this.connectionLock.putIfAbsent(key, key);
ClientService.BlockingInterface stub = null;
synchronized (this.connectionLock.get(key)) {
Expand All @@ -1722,26 +1715,6 @@ public ClientService.BlockingInterface getClient(final ServerName sn)
return stub;
}

static String getStubKey(final String serviceName,
final String rsHostname,
int port,
boolean resolveHostnames) {

// Sometimes, servers go down and they come back up with the same hostname but a different
// IP address. Force a resolution of the rsHostname by trying to instantiate an
// InetSocketAddress, and this way we will rightfully get a new stubKey.
// Also, include the hostname in the key so as to take care of those cases where the
// DNS name is different but IP address remains the same.
String address = rsHostname;
if (resolveHostnames) {
InetAddress i = new InetSocketAddress(rsHostname, port).getAddress();
if (i != null) {
address = i.getHostAddress() + "-" + rsHostname;
}
}
return serviceName + "@" + address + ":" + port;
}

private ZooKeeperKeepAliveConnection keepAliveZookeeper;
private AtomicInteger keepAliveZookeeperUserCount = new AtomicInteger(0);
private boolean canCloseZKW = true;
Expand Down
Expand Up @@ -189,6 +189,13 @@ public boolean isTableDisabled(TableName tableName) throws IOException {
}
}

/**
* Get a unique key for the rpc stub to the given server.
*/
static String getStubKey(String serviceName, ServerName serverName) {
return String.format("%s@%s", serviceName, serverName);
}

// A byte array in which all elements are the max byte, and it is used to
// construct closest front row
static final byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9);
Expand Down
Expand Up @@ -63,6 +63,8 @@ public class MetricsConnection implements StatisticTrackable {
private static final String HEAP_BASE = "heapOccupancy_";
private static final String CACHE_BASE = "cacheDroppingExceptions_";
private static final String UNKNOWN_EXCEPTION = "UnknownException";
private static final String NS_LOOKUPS = "nsLookups";
private static final String NS_LOOKUPS_FAILED = "nsLookupsFailed";
private static final String CLIENT_SVC = ClientService.getDescriptor().getName();

/** A container class for collecting details about the RPC call as it percolates. */
Expand Down Expand Up @@ -287,6 +289,8 @@ private static interface NewMetric<T> {
protected final Counter hedgedReadOps;
protected final Counter hedgedReadWin;
protected final Histogram concurrentCallsPerServerHist;
protected final Counter nsLookups;
protected final Counter nsLookupsFailed;

// dynamic metrics

Expand Down Expand Up @@ -359,6 +363,8 @@ public MetricsConnection(final ConnectionManager.HConnectionImplementation conn)
this.runnerStats = new RunnerStats(this.registry);
this.concurrentCallsPerServerHist = registry.newHistogram(this.getClass(),
"concurrentCallsPerServer", scope);
this.nsLookups = registry.newCounter(this.getClass(), NS_LOOKUPS, scope);
this.nsLookupsFailed = registry.newCounter(this.getClass(), NS_LOOKUPS_FAILED, scope);

this.reporter = new JmxReporter(this.registry);
this.reporter.start();
Expand Down Expand Up @@ -524,4 +530,12 @@ public void incrCacheDroppingExceptions(Object exception) {
(exception == null? UNKNOWN_EXCEPTION : exception.getClass().getSimpleName()),
cacheDroppingExceptions, counterFactory).inc();
}

public void incrNsLookups() {
this.nsLookups.inc();
}

public void incrNsLookupsFailed() {
this.nsLookupsFailed.inc();
}
}
Expand Up @@ -57,6 +57,7 @@
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
Expand Down Expand Up @@ -142,10 +143,10 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC

private int maxConcurrentCallsPerServer;

private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache =
private static final LoadingCache<Address, AtomicInteger> concurrentCounterCache =
CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).
build(new CacheLoader<InetSocketAddress, AtomicInteger>() {
@Override public AtomicInteger load(InetSocketAddress key) throws Exception {
build(new CacheLoader<Address, AtomicInteger>() {
@Override public AtomicInteger load(Address key) throws Exception {
return new AtomicInteger(0);
}
});
Expand Down Expand Up @@ -213,7 +214,7 @@ private void cleanupIdleConnections() {
// have some pending calls on connection so we should not shutdown the connection outside.
// The connection itself will disconnect if there is no pending call for maxIdleTime.
if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) {
LOG.info("Cleanup idle connection to " + conn.remoteId().address);
LOG.info("Cleanup idle connection to " + conn.remoteId().getAddress());
connections.removeValue(conn.remoteId(), conn);
conn.cleanupConnection();
}
Expand Down Expand Up @@ -342,11 +343,11 @@ private Message callBlockingMethod(Descriptors.MethodDescriptor md, HBaseRpcCont
private T getConnection(ConnectionId remoteId) throws IOException {
if (failedServers.isFailedServer(remoteId.getAddress())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Not trying to connect to " + remoteId.address
LOG.debug("Not trying to connect to " + remoteId.getAddress()
+ " this server is in the failed servers list");
}
throw new FailedServerException(
"This server is in the failed servers list: " + remoteId.address);
"This server is in the failed servers list: " + remoteId.getAddress());
}
T conn;
synchronized (connections) {
Expand All @@ -368,7 +369,7 @@ private T getConnection(ConnectionId remoteId) throws IOException {
*/
protected abstract T createConnection(ConnectionId remoteId) throws IOException;

private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr,
private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
RpcCallback<Message> callback) {
call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
if (metrics != null) {
Expand All @@ -393,10 +394,11 @@ private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress
}

private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final InetSocketAddress addr,
final RpcCallback<Message> callback) {
final Message param, Message returnType, final User ticket,
final InetSocketAddress inetAddr, final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
final Address addr = Address.fromSocketAddress(inetAddr);
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
Expand All @@ -420,12 +422,8 @@ public void run(Call call) {
}
}

private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException {
InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort());
if (addr.isUnresolved()) {
throw new UnknownHostException("can not resolve " + sn.getServerName());
}
return addr;
private static Address createAddr(ServerName sn) {
return Address.fromParts(sn.getHostname(), sn.getPort());
}

/**
Expand All @@ -440,8 +438,8 @@ public void cancelConnections(ServerName sn) {
synchronized (connections) {
for (T connection : connections.values()) {
ConnectionId remoteId = connection.remoteId();
if (remoteId.address.getPort() == sn.getPort() &&
remoteId.address.getHostName().equals(sn.getHostname())) {
if (remoteId.getAddress().getPort() == sn.getPort() &&
remoteId.getAddress().getHostname().equals(sn.getHostname())) {
LOG.info("The server on " + sn.toString() + " is dead - stopping the connection " +
connection.remoteId);
connections.removeValue(remoteId, connection);
Expand Down Expand Up @@ -500,27 +498,33 @@ public void close() {

@Override
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
int rpcTimeout) throws UnknownHostException {
int rpcTimeout) {
return new BlockingRpcChannelImplementation(this, createAddr(sn), ticket, rpcTimeout);
}

@Override
public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
throws UnknownHostException {
public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) {
return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout);
}

private static class AbstractRpcChannel {

protected final InetSocketAddress addr;
protected final Address addr;

// We cache the resolved InetSocketAddress for the channel so we do not do a DNS lookup
// per method call on the channel. If the remote target is removed or reprovisioned and
// its identity changes a new channel with a newly resolved InetSocketAddress will be
// created as part of retry, so caching here is fine.
// Normally, caching an InetSocketAddress is an anti-pattern.
protected InetSocketAddress isa;

protected final AbstractRpcClient<?> rpcClient;

protected final User ticket;

protected final int rpcTimeout;

protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
protected AbstractRpcChannel(AbstractRpcClient<?> rpcClient, Address addr,
User ticket, int rpcTimeout) {
this.addr = addr;
this.rpcClient = rpcClient;
Expand Down Expand Up @@ -557,15 +561,29 @@ public static class BlockingRpcChannelImplementation extends AbstractRpcChannel
implements BlockingRpcChannel {

protected BlockingRpcChannelImplementation(AbstractRpcClient<?> rpcClient,
InetSocketAddress addr, User ticket, int rpcTimeout) {
Address addr, User ticket, int rpcTimeout) {
super(rpcClient, addr, ticket, rpcTimeout);
}

@Override
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType) throws ServiceException {
// Look up remote address upon first call
if (isa == null) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookups();
}
isa = Address.toSocketAddress(addr);
if (isa.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookupsFailed();
}
isa = null;
throw new ServiceException(new UnknownHostException(addr + " could not be resolved"));
}
}
return rpcClient.callBlockingMethod(md, configureRpcController(controller),
param, returnType, ticket, addr);
param, returnType, ticket, isa);
}
}

Expand All @@ -575,20 +593,35 @@ public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController
public static class RpcChannelImplementation extends AbstractRpcChannel implements
RpcChannel {

protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, InetSocketAddress addr,
User ticket, int rpcTimeout) throws UnknownHostException {
protected RpcChannelImplementation(AbstractRpcClient<?> rpcClient, Address addr,
User ticket, int rpcTimeout) {
super(rpcClient, addr, ticket, rpcTimeout);
}

@Override
public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType, RpcCallback<Message> done) {
HBaseRpcController configuredController =
configureRpcController(Preconditions.checkNotNull(controller,
"RpcController can not be null for async rpc call"));
// Look up remote address upon first call
if (isa == null || isa.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookups();
}
isa = Address.toSocketAddress(addr);
if (isa.isUnresolved()) {
if (this.rpcClient.metrics != null) {
this.rpcClient.metrics.incrNsLookupsFailed();
}
isa = null;
controller.setFailed(addr + " could not be resolved");
return;
}
}
// This method does not throw any exceptions, so the caller must provide a
// HBaseRpcController which is used to pass the exceptions.
this.rpcClient.callMethod(md,
configureRpcController(Preconditions.checkNotNull(controller,
"RpcController can not be null for async rpc call")),
param, returnType, ticket, addr, done);
this.rpcClient.callMethod(md, configuredController, param, returnType, ticket, isa, done);
}
}
}

0 comments on commit 8e45165

Please sign in to comment.