Skip to content

Commit

Permalink
HBASE-15958 Implement ClaimQueues on top of HBase
Browse files Browse the repository at this point in the history
Building on HBase-15883.
Now implementing the claim queues procedure within an HBase table.
Also added UnitTests to test claimQueue.
Peer tracking will still be performed by ZooKeeper though.
Also modified the queueId tracking procedure so we no longer have to perform scans over the Replication Table.
This does make our queue naming schema slightly different from ReplicationQueuesZKImpl though.

Signed-off-by: Elliott Clark <eclark@apache.org>
  • Loading branch information
Joseph Hwang authored and elliottneilclark committed Jun 9, 2016
1 parent 108d39a commit babdedc
Show file tree
Hide file tree
Showing 9 changed files with 579 additions and 272 deletions.
Expand Up @@ -19,8 +19,8 @@
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;


import java.util.List; import java.util.List;
import java.util.SortedMap; import java.util.Map;
import java.util.SortedSet; import java.util.Set;


import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;


Expand Down Expand Up @@ -96,10 +96,10 @@ public interface ReplicationQueues {
/** /**
* Take ownership for the set of queues belonging to a dead region server. * Take ownership for the set of queues belonging to a dead region server.
* @param regionserver the id of the dead region server * @param regionserver the id of the dead region server
* @return A SortedMap of the queues that have been claimed, including a SortedSet of WALs in * @return A Map of the queues that have been claimed, including a Set of WALs in
* each queue. Returns an empty map if no queues were failed-over. * each queue. Returns an empty map if no queues were failed-over.
*/ */
SortedMap<String, SortedSet<String>> claimQueues(String regionserver); Map<String, Set<String>> claimQueues(String regionserver);


/** /**
* Get a list of all region servers that have outstanding replication queues. These servers could * Get a list of all region servers that have outstanding replication queues. These servers could
Expand Down
Expand Up @@ -56,11 +56,11 @@ public void setConf(Configuration conf) {
this.conf = conf; this.conf = conf;
} }


public Abortable getAbort() { public Abortable getAbortable() {
return abort; return abort;
} }


public void setAbort(Abortable abort) { public void setAbortable(Abortable abort) {
this.abort = abort; this.abort = abort;
} }
} }

Large diffs are not rendered by default.

Expand Up @@ -19,11 +19,11 @@
package org.apache.hadoop.hbase.replication; package org.apache.hadoop.hbase.replication;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.SortedMap; import java.util.Map;
import java.util.SortedSet; import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;


import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -73,7 +73,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class); private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);


public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) { public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) {
this(args.getZk(), args.getConf(), args.getAbort()); this(args.getZk(), args.getConf(), args.getAbortable());
} }


public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
Expand Down Expand Up @@ -178,8 +178,8 @@ public boolean isThisOurRegionServer(String regionserver) {
} }


@Override @Override
public SortedMap<String, SortedSet<String>> claimQueues(String regionserverZnode) { public Map<String, Set<String>> claimQueues(String regionserverZnode) {
SortedMap<String, SortedSet<String>> newQueues = new TreeMap<String, SortedSet<String>>(); Map<String, Set<String>> newQueues = new HashMap<>();
// check whether there is multi support. If yes, use it. // check whether there is multi support. If yes, use it.
if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) { if (conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue"); LOG.info("Atomically moving " + regionserverZnode + "'s WALs to my queue");
Expand Down Expand Up @@ -304,8 +304,8 @@ private void deleteAnotherRSQueues(String regionserverZnode) {
* @param znode pertaining to the region server to copy the queues from * @param znode pertaining to the region server to copy the queues from
* @return WAL queues sorted per peer cluster * @return WAL queues sorted per peer cluster
*/ */
private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String znode) { private Map<String, Set<String>> copyQueuesFromRSUsingMulti(String znode) {
SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>(); Map<String, Set<String>> queues = new HashMap<>();
// hbase/replication/rs/deadrs // hbase/replication/rs/deadrs
String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode); String deadRSZnodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
List<String> peerIdsToProcess = null; List<String> peerIdsToProcess = null;
Expand All @@ -330,7 +330,7 @@ private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String z
continue; // empty log queue. continue; // empty log queue.
} }
// create the new cluster znode // create the new cluster znode
SortedSet<String> logQueue = new TreeSet<String>(); Set<String> logQueue = new HashSet<String>();
queues.put(newPeerId, logQueue); queues.put(newPeerId, logQueue);
ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY); ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
listOfOps.add(op); listOfOps.add(op);
Expand Down Expand Up @@ -373,10 +373,10 @@ private SortedMap<String, SortedSet<String>> copyQueuesFromRSUsingMulti(String z
* @param znode server names to copy * @param znode server names to copy
* @return all wals for all peers of that cluster, null if an error occurred * @return all wals for all peers of that cluster, null if an error occurred
*/ */
private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) { private Map<String, Set<String>> copyQueuesFromRS(String znode) {
// TODO this method isn't atomic enough, we could start copying and then // TODO this method isn't atomic enough, we could start copying and then
// TODO fail for some reason and we would end up with znodes we don't want. // TODO fail for some reason and we would end up with znodes we don't want.
SortedMap<String, SortedSet<String>> queues = new TreeMap<String, SortedSet<String>>(); Map<String, Set<String>> queues = new HashMap<>();
try { try {
String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode); String nodePath = ZKUtil.joinZNode(this.queuesZNode, znode);
List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath); List<String> clusters = ZKUtil.listChildrenNoWatch(this.zookeeper, nodePath);
Expand Down Expand Up @@ -406,7 +406,7 @@ private SortedMap<String, SortedSet<String>> copyQueuesFromRS(String znode) {
} }
ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode, ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, newClusterZnode,
HConstants.EMPTY_BYTE_ARRAY); HConstants.EMPTY_BYTE_ARRAY);
SortedSet<String> logQueue = new TreeSet<String>(); Set<String> logQueue = new HashSet<String>();
queues.put(newCluster, logQueue); queues.put(newCluster, logQueue);
for (String wal : wals) { for (String wal : wals) {
String z = ZKUtil.joinZNode(clusterPath, wal); String z = ZKUtil.joinZNode(clusterPath, wal);
Expand Down
Expand Up @@ -31,7 +31,6 @@
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.UUID; import java.util.UUID;
Expand Down Expand Up @@ -642,7 +641,7 @@ public void run() {
LOG.info("Not transferring queue since we are shutting down"); LOG.info("Not transferring queue since we are shutting down");
return; return;
} }
SortedMap<String, SortedSet<String>> newQueues = null; Map<String, Set<String>> newQueues = null;


newQueues = this.rq.claimQueues(rsZnode); newQueues = this.rq.claimQueues(rsZnode);


Expand All @@ -653,9 +652,9 @@ public void run() {
return; return;
} }


for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) { for (Map.Entry<String, Set<String>> entry : newQueues.entrySet()) {
String peerId = entry.getKey(); String peerId = entry.getKey();
SortedSet<String> walsSet = entry.getValue(); Set<String> walsSet = entry.getValue();
try { try {
// there is not an actual peer defined corresponding to peerId for the failover. // there is not an actual peer defined corresponding to peerId for the failover.
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId); ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
Expand Down
Expand Up @@ -22,8 +22,8 @@


import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.SortedMap; import java.util.Map;
import java.util.SortedSet; import java.util.Set;


import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -146,7 +146,7 @@ public void testReplicationQueues() throws ReplicationException {
assertEquals(0, rq3.claimQueues(server1).size()); assertEquals(0, rq3.claimQueues(server1).size());
assertEquals(2, rq3.getListOfReplicators().size()); assertEquals(2, rq3.getListOfReplicators().size());


SortedMap<String, SortedSet<String>> queues = rq2.claimQueues(server3); Map<String, Set<String>> queues = rq2.claimQueues(server3);
assertEquals(5, queues.size()); assertEquals(5, queues.size());
assertEquals(1, rq2.getListOfReplicators().size()); assertEquals(1, rq2.getListOfReplicators().size());


Expand Down

0 comments on commit babdedc

Please sign in to comment.