Skip to content

Commit

Permalink
HBASE-19599 Remove ReplicationQueuesClient, use ReplicationQueueStora…
Browse files Browse the repository at this point in the history
…ge directly
  • Loading branch information
Apache9 committed Jan 9, 2018
1 parent 5e6c303 commit c4fa568
Show file tree
Hide file tree
Showing 23 changed files with 471 additions and 896 deletions.
@@ -1,5 +1,4 @@
/* /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
Expand Down Expand Up @@ -39,20 +38,14 @@ public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments
args); args);
} }


public static ReplicationQueuesClient public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
getReplicationQueuesClient(ReplicationQueuesClientArguments args) throws Exception { Abortable abortable) {
return (ReplicationQueuesClient) ConstructorUtils
.invokeConstructor(ReplicationQueuesClientZKImpl.class, args);
}

public static ReplicationPeers getReplicationPeers(final ZKWatcher zk, Configuration conf,
Abortable abortable) {
return getReplicationPeers(zk, conf, null, abortable); return getReplicationPeers(zk, conf, null, abortable);
} }


public static ReplicationPeers getReplicationPeers(final ZKWatcher zk, Configuration conf, public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
final ReplicationQueuesClient queuesClient, Abortable abortable) { ReplicationQueueStorage queueStorage, Abortable abortable) {
return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable); return new ReplicationPeersZKImpl(zk, conf, queueStorage, abortable);
} }


public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper,
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
Expand Down Expand Up @@ -80,17 +81,17 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re


// Map of peer clusters keyed by their id // Map of peer clusters keyed by their id
private Map<String, ReplicationPeerZKImpl> peerClusters; private Map<String, ReplicationPeerZKImpl> peerClusters;
private final ReplicationQueuesClient queuesClient; private final ReplicationQueueStorage queueStorage;
private Abortable abortable; private Abortable abortable;


private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeersZKImpl.class); private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeersZKImpl.class);


public ReplicationPeersZKImpl(final ZKWatcher zk, final Configuration conf, public ReplicationPeersZKImpl(ZKWatcher zk, Configuration conf,
final ReplicationQueuesClient queuesClient, Abortable abortable) { ReplicationQueueStorage queueStorage, Abortable abortable) {
super(zk, conf, abortable); super(zk, conf, abortable);
this.abortable = abortable; this.abortable = abortable;
this.peerClusters = new ConcurrentHashMap<>(); this.peerClusters = new ConcurrentHashMap<>();
this.queuesClient = queuesClient; this.queueStorage = queueStorage;
} }


@Override @Override
Expand Down Expand Up @@ -512,17 +513,16 @@ private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationExcept
} }


private void checkQueuesDeleted(String peerId) throws ReplicationException { private void checkQueuesDeleted(String peerId) throws ReplicationException {
if (queuesClient == null) { if (queueStorage == null) {
return; return;
} }

try { try {
List<String> replicators = queuesClient.getListOfReplicators(); List<ServerName> replicators = queueStorage.getListOfReplicators();
if (replicators == null || replicators.isEmpty()) { if (replicators == null || replicators.isEmpty()) {
return; return;
} }
for (String replicator : replicators) { for (ServerName replicator : replicators) {
List<String> queueIds = queuesClient.getAllQueues(replicator); List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) { for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (queueInfo.getPeerId().equals(peerId)) { if (queueInfo.getPeerId().equals(peerId)) {
Expand All @@ -533,7 +533,7 @@ private void checkQueuesDeleted(String peerId) throws ReplicationException {
} }
// Check for hfile-refs queue // Check for hfile-refs queue
if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode) if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
&& queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) { && queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId
+ ", found in hfile-refs node path " + hfileRefsZNode); + ", found in hfile-refs node path " + hfileRefsZNode);
} }
Expand Down
Expand Up @@ -77,6 +77,14 @@ void setWALPosition(ServerName serverName, String queueId, String fileName, long
long getWALPosition(ServerName serverName, String queueId, String fileName) long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException; throws ReplicationException;


/**
* Get a list of all WALs in the given queue on the given region server.
* @param serverName the server name of the region server that owns the queue
* @param queueId a String that identifies the queue
* @return a list of WALs
*/
List<String> getWALsInQueue(ServerName serverName, String queueId) throws ReplicationException;

/** /**
* Get a list of all queues for the specified region server. * Get a list of all queues for the specified region server.
* @param serverName the server name of the region server that owns the set of queues * @param serverName the server name of the region server that owns the set of queues
Expand Down Expand Up @@ -108,8 +116,8 @@ Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String q


/** /**
* Load all wals in all replication queues. This method guarantees to return a snapshot which * Load all wals in all replication queues. This method guarantees to return a snapshot which
* contains all WALs in the zookeeper at the start of this call even there is concurrent queue * contains all WALs at the start of this call even there is concurrent queue failover. However,
* failover. However, some newly created WALs during the call may not be included. * some newly created WALs during the call may not be included.
*/ */
Set<String> getAllWALs() throws ReplicationException; Set<String> getAllWALs() throws ReplicationException;


Expand Down Expand Up @@ -142,13 +150,6 @@ Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String q
*/ */
void removeHFileRefs(String peerId, List<String> files) throws ReplicationException; void removeHFileRefs(String peerId, List<String> files) throws ReplicationException;


/**
* Get the change version number of replication hfile references node. This can be used as
* optimistic locking to get a consistent snapshot of the replication queues of hfile references.
* @return change version number of hfile references node
*/
int getHFileRefsNodeChangeVersion() throws ReplicationException;

/** /**
* Get list of all peers from hfile reference queue. * Get list of all peers from hfile reference queue.
* @return a list of peer ids * @return a list of peer ids
Expand All @@ -161,4 +162,11 @@ Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String q
* @return a list of hfile references * @return a list of hfile references
*/ */
List<String> getReplicableHFiles(String peerId) throws ReplicationException; List<String> getReplicableHFiles(String peerId) throws ReplicationException;

/**
* Load all hfile references in all replication queues. This method guarantees to return a
* snapshot which contains all hfile references at the start of this call. However, some newly
* created hfile references during the call may not be included.
*/
Set<String> getAllHFileRefs() throws ReplicationException;
} }

This file was deleted.

This file was deleted.

0 comments on commit c4fa568

Please sign in to comment.