Skip to content

Commit

Permalink
HBASE-19687 Move the logic in ReplicationZKNodeCleaner to Replication…
Browse files Browse the repository at this point in the history
…Checker and remove ReplicationZKNodeCleanerChore
  • Loading branch information
Apache9 committed Jan 9, 2018
1 parent d95ee41 commit 368db31
Show file tree
Hide file tree
Showing 13 changed files with 259 additions and 459 deletions.
Expand Up @@ -50,8 +50,8 @@
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
Expand Down Expand Up @@ -345,10 +345,10 @@ public boolean isAborted() {
}
});
ReplicationPeerStorage storage =
ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
return Pair.newPair(peerConfig,
ReplicationPeers.getPeerClusterConfiguration(peerConfig, conf));
ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf));
} catch (ReplicationException e) {
throw new IOException("An error occurred while trying to connect to the remove peer cluster",
e);
Expand Down
Expand Up @@ -17,14 +17,11 @@
*/
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -106,25 +103,6 @@ public Set<String> getAllPeerIds() {
return Collections.unmodifiableSet(peerCache.keySet());
}

public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
Configuration baseConf) throws ReplicationException {
Configuration otherConf;
try {
otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
} catch (IOException e) {
throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
}

if (!peerConfig.getConfiguration().isEmpty()) {
CompoundConfiguration compound = new CompoundConfiguration();
compound.add(otherConf);
compound.addStringMap(peerConfig.getConfiguration());
return compound;
}

return otherConf;
}

public PeerState refreshPeerState(String peerId) throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
if (peer == null) {
Expand All @@ -151,7 +129,7 @@ public ReplicationPeerConfig refreshPeerConfig(String peerId) throws Replication
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
return new ReplicationPeerImpl(getPeerClusterConfiguration(peerConfig, conf), peerId, enabled,
peerConfig);
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
peerId, enabled, peerConfig);
}
}
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Helper class for replication.
*/
@InterfaceAudience.Private
public final class ReplicationUtils {

private ReplicationUtils() {
}

public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
Configuration baseConf) throws ReplicationException {
Configuration otherConf;
try {
otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
} catch (IOException e) {
throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
}

if (!peerConfig.getConfiguration().isEmpty()) {
CompoundConfiguration compound = new CompoundConfiguration();
compound.add(otherConf);
compound.addStringMap(peerConfig.getConfiguration());
return compound;
}

return otherConf;
}

public static void removeAllQueues(ReplicationQueueStorage queueStorage, String peerId)
throws ReplicationException {
for (ServerName replicator : queueStorage.getListOfReplicators()) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (queueInfo.getPeerId().equals(peerId)) {
queueStorage.removeQueue(replicator, queueId);
}
}
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
}
}
}
Expand Up @@ -240,7 +240,7 @@ public void testReplicationPeers() throws Exception {
rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
assertNumberOfPeers(2);

assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
rp.getPeerStorage().removePeer(ID_ONE);
rp.removePeer(ID_ONE);
Expand Down
Expand Up @@ -108,8 +108,6 @@
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
Expand Down Expand Up @@ -363,7 +361,6 @@ public void run() {

CatalogJanitor catalogJanitorChore;
private ReplicationMetaCleaner replicationMetaCleaner;
private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
Expand Down Expand Up @@ -1150,15 +1147,6 @@ this, conf, getMasterWalManager().getFileSystem(),
if (LOG.isTraceEnabled()) {
LOG.trace("Started service threads");
}

// Start replication zk node cleaner
try {
replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval,
new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this));
getChoreService().scheduleChore(replicationZKNodeCleanerChore);
} catch (Exception e) {
LOG.error("start replicationZKNodeCleanerChore failed", e);
}
replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval);
getChoreService().scheduleChore(replicationMetaCleaner);
}
Expand All @@ -1183,7 +1171,6 @@ protected void stopServiceThreads() {
// Clean up and close up shop
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true);
if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();

Expand Down

This file was deleted.

0 comments on commit 368db31

Please sign in to comment.