Skip to content

Commit

Permalink
HBASE-28521 Use standard ConnectionRegistry and Client API to get reg…
Browse files Browse the repository at this point in the history
…ion server list in in replication (#5825)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
Reviewed-by: Andor Molnár <andor@apache.org>
  • Loading branch information
Apache9 committed May 1, 2024
1 parent e9ced39 commit 3d66866
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,25 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReservoirSample;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.AuthFailedException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,12 +53,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint

private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);

private ZKWatcher zkw = null;
private final Object zkwLock = new Object();

protected Configuration conf;

private AsyncClusterConnection conn;
private final Object connLock = new Object();

private volatile AsyncClusterConnection conn;

/**
* Default maximum number of times a replication sink can be reported as bad before it will no
Expand Down Expand Up @@ -106,36 +102,15 @@ public void init(Context context) throws IOException {
this.badReportCounts = Maps.newHashMap();
}

protected void disconnect() {
synchronized (zkwLock) {
if (zkw != null) {
zkw.close();
}
}
if (this.conn != null) {
try {
this.conn.close();
this.conn = null;
} catch (IOException e) {
LOG.warn("{} Failed to close the connection", ctx.getPeerId());
}
}
}

/**
* A private method used to re-establish a zookeeper session with a peer cluster.
*/
private void reconnect(KeeperException ke) {
if (
ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
|| ke instanceof AuthFailedException
) {
String clusterKey = ctx.getPeerConfig().getClusterKey();
LOG.warn("Lost the ZooKeeper connection for peer {}", clusterKey, ke);
try {
reloadZkWatcher();
} catch (IOException io) {
LOG.warn("Creation of ZookeeperWatcher failed for peer {}", clusterKey, io);
private void disconnect() {
synchronized (connLock) {
if (this.conn != null) {
try {
this.conn.close();
this.conn = null;
} catch (IOException e) {
LOG.warn("{} Failed to close the connection", ctx.getPeerId());
}
}
}
}
Expand All @@ -152,13 +127,7 @@ public void stop() {

@Override
protected void doStart() {
try {
reloadZkWatcher();
connectPeerCluster();
notifyStarted();
} catch (IOException e) {
notifyFailed(e);
}
notifyStarted();
}

@Override
Expand All @@ -168,44 +137,40 @@ protected void doStop() {
}

@Override
// Synchronize peer cluster connection attempts to avoid races and rate
// limit connections when multiple replication sources try to connect to
// the peer cluster. If the peer cluster is down we can get out of control
// over time.
public UUID getPeerUUID() {
UUID peerUUID = null;
try {
synchronized (zkwLock) {
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
}
} catch (KeeperException ke) {
reconnect(ke);
AsyncClusterConnection conn = connect();
String clusterId = FutureUtils
.get(conn.getAdmin().getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)))
.getClusterId();
return UUID.fromString(clusterId);
} catch (IOException e) {
LOG.warn("Failed to get cluster id for cluster", e);
return null;
}
return peerUUID;
}

/**
* Closes the current ZKW (if not null) and creates a new one
* @throws IOException If anything goes wrong connecting
*/
private void reloadZkWatcher() throws IOException {
synchronized (zkwLock) {
if (zkw != null) {
zkw.close();
}
zkw =
new ZKWatcher(ctx.getConfiguration(), "connection to cluster: " + ctx.getPeerId(), this);
zkw.registerListener(new PeerRegionServerListener(this));
// do not call this method in doStart method, only initialize the connection to remote cluster
// when you actually wants to make use of it. The problem here is that, starting the replication
// endpoint is part of the region server initialization work, so if the peer cluster is fully
// down and we can not connect to it, we will cause the initialization to fail and crash the
// region server, as we need the cluster id while setting up the AsyncClusterConnection, which
// needs to at least connect to zookeeper or some other servers in the peer cluster based on
// different connection registry implementation
private AsyncClusterConnection connect() throws IOException {
AsyncClusterConnection c = this.conn;
if (c != null) {
return c;
}
}

private void connectPeerCluster() throws IOException {
try {
conn = createConnection(this.conf);
} catch (IOException ioe) {
LOG.warn("{} Failed to create connection for peer cluster", ctx.getPeerId(), ioe);
throw ioe;
synchronized (connLock) {
c = this.conn;
if (c != null) {
return c;
}
c = createConnection(this.conf);
conn = c;
}
return c;
}

@Override
Expand All @@ -224,36 +189,27 @@ public boolean isAborted() {
* Get the list of all the region servers from the specified peer
* @return list of region server addresses or an empty list if the slave is unavailable
*/
protected List<ServerName> fetchSlavesAddresses() {
List<String> children = null;
// will be overrided in tests so protected
protected Collection<ServerName> fetchPeerAddresses() {
try {
synchronized (zkwLock) {
children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
}
} catch (KeeperException ke) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetch slaves addresses failed", ke);
}
reconnect(ke);
}
if (children == null) {
return FutureUtils.get(connect().getAdmin().getRegionServers(true));
} catch (IOException e) {
LOG.debug("Fetch peer addresses failed", e);
return Collections.emptyList();
}
List<ServerName> addresses = new ArrayList<>(children.size());
for (String child : children) {
addresses.add(ServerName.parseServerName(child));
}
return addresses;
}

protected synchronized void chooseSinks() {
List<ServerName> slaveAddresses = fetchSlavesAddresses();
Collection<ServerName> slaveAddresses = fetchPeerAddresses();
if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Will not be able to replicate");
this.sinkServers = Collections.emptyList();
} else {
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
ReservoirSample<ServerName> sample = new ReservoirSample<>(numSinks);
sample.add(slaveAddresses.iterator());
this.sinkServers = sample.getSamplingResult();
}
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
this.sinkServers = slaveAddresses.subList(0, numSinks);
badReportCounts.clear();
}

Expand All @@ -275,7 +231,7 @@ protected synchronized SinkPeer getReplicationSink() throws IOException {
}
ServerName serverName =
sinkServers.get(ThreadLocalRandom.current().nextInt(sinkServers.size()));
return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
return new SinkPeer(serverName, connect().getRegionServerAdmin(serverName));
}

/**
Expand Down Expand Up @@ -307,29 +263,6 @@ List<ServerName> getSinkServers() {
return sinkServers;
}

/**
* Tracks changes to the list of region servers in a peer's cluster.
*/
public static class PeerRegionServerListener extends ZKListener {

private final HBaseReplicationEndpoint replicationEndpoint;
private final String regionServerListNode;

public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) {
super(endpoint.zkw);
this.replicationEndpoint = endpoint;
this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode;
}

@Override
public synchronized void nodeChildrenChanged(String path) {
if (path.equals(regionServerListNode)) {
LOG.info("Detected change to peer region servers, fetching updated list");
replicationEndpoint.chooseSinks();
}
}
}

/**
* Wraps a replication region server sink to provide the ability to identify it.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private boolean isSerial = false;
// Initialising as 0 to guarantee at least one logging message
private long lastSinkFetchTime = 0;
private volatile boolean stopping = false;

@Override
public void init(Context context) throws IOException {
Expand Down Expand Up @@ -449,7 +448,7 @@ public boolean replicate(ReplicateContext replicateContext) {
}

List<List<Entry>> batches = createBatches(replicateContext.getEntries());
while (this.isRunning() && !this.stopping) {
while (this.isRunning()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
Expand Down Expand Up @@ -514,14 +513,6 @@ protected boolean isPeerEnabled() {
return ctx.getReplicationPeer().isPeerEnabled();
}

@Override
protected void doStop() {
// Allow currently running replication tasks to finish
this.stopping = true;
disconnect(); // don't call super.doStop()
notifyStopped();
}

protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
int timeout) {
int entriesHashCode = System.identityHashCode(entries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.mockito.Mockito.mock;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -166,6 +167,9 @@ public void testReportBadSinkDownToZeroSinks() {
ServerName serverNameA = endpoint.getSinkServers().get(0);
ServerName serverNameB = endpoint.getSinkServers().get(1);

serverNames.remove(serverNameA);
serverNames.remove(serverNameB);

SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));

Expand All @@ -191,7 +195,7 @@ public void setRegionServers(List<ServerName> regionServers) {
}

@Override
public List<ServerName> fetchSlavesAddresses() {
protected Collection<ServerName> fetchPeerAddresses() {
return regionServers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
Expand Down Expand Up @@ -369,6 +371,14 @@ protected static void runSmallBatchTest() throws IOException, InterruptedExcepti
waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES);
}

protected static void stopAllRegionServers(HBaseTestingUtil util) throws IOException {
List<ServerName> rses = util.getMiniHBaseCluster().getRegionServerThreads().stream()
.map(t -> t.getRegionServer().getServerName()).collect(Collectors.toList());
for (ServerName rs : rses) {
util.getMiniHBaseCluster().stopRegionServer(rs);
}
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
if (htable2 != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestRepli

@Test
public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception {
UTIL2.shutdownMiniHBaseCluster();
// stop all region servers, we need to keep the master up as the below assertions need to get
// cluster id from remote cluster, if master is also down, we can not get any information from
// the remote cluster after source cluster restarts
stopAllRegionServers(UTIL2);

// add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestRe

@Test
public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
UTIL2.shutdownMiniHBaseCluster();
// stop all region servers, we need to keep the master up as the below assertions need to get
// cluster id from remote cluster, if master is also down, we can not get any information from
// the remote cluster after source cluster restarts
stopAllRegionServers(UTIL2);
restartSourceCluster(1);
Admin hbaseAdmin = UTIL1.getAdmin();
// add some values to source cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ public class TestReplicationStatusSourceStartedTargetStoppedNoOps extends TestRe

@Test
public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception {
UTIL2.shutdownMiniHBaseCluster();
// stop all region servers, we need to keep the master up as the below assertions need to get
// cluster id from remote cluster, if master is also down, we can not get any information from
// the remote cluster after source cluster restarts
stopAllRegionServers(UTIL2);
restartSourceCluster(1);
Admin hbaseAdmin = UTIL1.getAdmin();
ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
Expand Down

0 comments on commit 3d66866

Please sign in to comment.