Skip to content

Commit

Permalink
Replication queue's lock will live forever if RS acquiring the lock h…
Browse files Browse the repository at this point in the history
…as died prematurely
  • Loading branch information
yangzhe1991 committed Jul 18, 2016
1 parent 9bc7ecf commit a8dfd8c
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 2 deletions.
Expand Up @@ -31,10 +31,12 @@
@InterfaceAudience.Private
public class ReplicationFactory {

public static final Class defaultReplicationQueueClass = ReplicationQueuesZKImpl.class;

public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
throws Exception {
Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
"replication.replicationQueues.class", ReplicationQueuesZKImpl.class);
"replication.replicationQueues.class", defaultReplicationQueueClass);
return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args);
}

Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
Expand Down Expand Up @@ -238,7 +239,8 @@ public List<String> getAllQueues() {
* @param znode the server names of the other server
* @return true if the lock was acquired, false in every other cases
*/
private boolean lockOtherRS(String znode) {
@VisibleForTesting
public boolean lockOtherRS(String znode) {
try {
String parent = ZKUtil.joinZNode(this.queuesZNode, znode);
if (parent.equals(this.myQueuesZnode)) {
Expand All @@ -265,6 +267,15 @@ private boolean lockOtherRS(String znode) {
return true;
}

public String getLockZNode(String znode) {
return this.queuesZNode + "/" + znode + "/" + RS_LOCK_ZNODE;
}

@VisibleForTesting
public boolean checkLockExists(String znode) throws KeeperException {
return ZKUtil.checkExists(zookeeper, getLockZNode(znode)) >= 0;
}

/**
* Delete all the replication queues for a given region server.
* @param regionserverZnode The znode of the region server to delete.
Expand Down
Expand Up @@ -97,6 +97,7 @@
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
Expand Down Expand Up @@ -137,6 +138,8 @@
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.UserProvider;
Expand Down Expand Up @@ -301,6 +304,7 @@ public void run() {
private PeriodicDoMetrics periodicDoMetricsChore = null;

CatalogJanitor catalogJanitorChore;
private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
private LogCleaner logCleaner;
private HFileCleaner hfileCleaner;
private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
Expand Down Expand Up @@ -962,6 +966,17 @@ this, conf, getMasterWalManager().getFileSystem(),
if (LOG.isTraceEnabled()) {
LOG.trace("Started service threads");
}
if (conf.getClass("hbase.region.replica.replication.replicationQueues.class",
ReplicationFactory.defaultReplicationQueueClass) == ReplicationQueuesZKImpl.class && !conf
.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
try {
replicationZKLockCleanerChore = new ReplicationZKLockCleanerChore(this, this,
cleanerInterval, this.getZooKeeper(), this.conf);
getChoreService().scheduleChore(replicationZKLockCleanerChore);
} catch (Exception e) {
LOG.error("start replicationZKLockCleanerChore failed", e);
}
}
}

@Override
Expand Down Expand Up @@ -995,6 +1010,7 @@ protected void stopServiceThreads() {
// Clean up and close up shop
if (this.logCleaner != null) this.logCleaner.cancel(true);
if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
if (this.quotaManager != null) this.quotaManager.stop();
if (this.activeMasterManager != null) this.activeMasterManager.stop();
if (this.serverManager != null) this.serverManager.stop();
Expand Down
@@ -0,0 +1,112 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;


/**
* A cleaner that cleans replication locks on zk which is locked by dead region servers
*/
@InterfaceAudience.Private
public class ReplicationZKLockCleanerChore extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(ReplicationZKLockCleanerChore.class);
private ZooKeeperWatcher zk;
private ReplicationTracker tracker;
private long ttl;
private ReplicationQueuesZKImpl queues;

// Wait some times before delete lock to prevent a session expired RS not dead fully.
private static final long DEFAULT_TTL = 60 * 10 * 1000;//10 min

@VisibleForTesting
public static final String TTL_CONFIG_KEY = "hbase.replication.zk.deadrs.lock.ttl";

public ReplicationZKLockCleanerChore(Stoppable stopper, Abortable abortable, int period,
ZooKeeperWatcher zk, Configuration conf) throws Exception {
super("ReplicationZKLockCleanerChore", stopper, period);

this.zk = zk;
this.ttl = conf.getLong(TTL_CONFIG_KEY, DEFAULT_TTL);
tracker = ReplicationFactory.getReplicationTracker(zk,
ReplicationFactory.getReplicationPeers(zk, conf, abortable), conf, abortable, stopper);
queues = new ReplicationQueuesZKImpl(zk, conf, abortable);
}

@Override protected void chore() {
try {
List<String> regionServers = tracker.getListOfRegionServers();
if (regionServers == null) {
return;
}
Set<String> rsSet = new HashSet<String>(regionServers);
List<String> replicators = queues.getListOfReplicators();

for (String replicator: replicators) {
try {
String lockNode = queues.getLockZNode(replicator);
byte[] data = ZKUtil.getData(zk, lockNode);
if (data == null) {
continue;
}
String rsServerNameZnode = Bytes.toString(data);
String[] array = rsServerNameZnode.split("/");
String znode = array[array.length - 1];
if (!rsSet.contains(znode)) {
Stat s = zk.getRecoverableZooKeeper().exists(lockNode, false);
if (s != null && EnvironmentEdgeManager.currentTime() - s.getMtime() > this.ttl) {
// server is dead, but lock is still there, we have to delete the lock.
ZKUtil.deleteNode(zk, lockNode);
LOG.info("Remove lock acquired by dead RS: " + lockNode + " by " + znode);
}
continue;
}
LOG.info("Skip lock acquired by live RS: " + lockNode + " by " + znode);

} catch (KeeperException.NoNodeException ignore) {
} catch (InterruptedException e) {
LOG.warn("zk operation interrupted", e);
Thread.currentThread().interrupt();
}
}
} catch (KeeperException e) {
LOG.warn("zk operation interrupted", e);
}

}
}
Expand Up @@ -21,11 +21,14 @@

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.commons.logging.Log;
Expand All @@ -36,12 +39,15 @@
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
Expand Down Expand Up @@ -92,10 +98,17 @@ public static void setUpBeforeClass() throws Exception {
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
conf1.setBoolean(HConstants.ZOOKEEPER_USEMULTI , false);// for testZKLockCleaner
conf1.setInt("hbase.master.cleaner.interval", 5 * 1000);
conf1.setClass("hbase.region.replica.replication.replicationQueues.class",
ReplicationQueuesZKImpl.class, ReplicationQueues.class);
conf1.setLong(ReplicationZKLockCleanerChore.TTL_CONFIG_KEY, 0L);


utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
utility1.setZkCluster(miniZK);
new ZooKeeperWatcher(conf1, "cluster1", null, true);

conf2 = new Configuration(conf1);
Expand Down Expand Up @@ -198,6 +211,40 @@ public void testMultiSlaveReplication() throws Exception {
utility1.shutdownMiniCluster();
}

@Test
public void testZKLockCleaner() throws Exception {
MiniHBaseCluster cluster = utility1.startMiniCluster(1, 2);
HBaseAdmin admin = utility1.getHBaseAdmin();
HTableDescriptor table = new HTableDescriptor(TableName.valueOf(Bytes.toBytes("zk")));
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
admin.createTable(table);
ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf1);
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
replicationAdmin.addPeer("cluster2", rpc, null);
HRegionServer rs = cluster.getRegionServer(0);
ReplicationQueuesZKImpl zk = new ReplicationQueuesZKImpl(rs.getZooKeeper(), conf1, rs);
zk.init(rs.getServerName().toString());
List<String> replicators = zk.getListOfReplicators();
assertEquals(2, replicators.size());
String zNode = cluster.getRegionServer(1).getServerName().toString();

assertTrue(zk.lockOtherRS(zNode));
assertTrue(zk.checkLockExists(zNode));
Thread.sleep(10000);
assertTrue(zk.checkLockExists(zNode));
cluster.abortRegionServer(0);
Thread.sleep(10000);
HRegionServer rs1 = cluster.getRegionServer(1);
zk = new ReplicationQueuesZKImpl(rs1.getZooKeeper(), conf1, rs1);
zk.init(rs1.getServerName().toString());
assertFalse(zk.checkLockExists(zNode));

utility1.shutdownMiniCluster();
}

private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
final byte[] row) throws IOException {
final Admin admin = utility.getHBaseAdmin();
Expand Down

0 comments on commit a8dfd8c

Please sign in to comment.