Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28001: Fix the flaky test TestLeaderElection #5011

Merged
merged 1 commit into from Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -580,7 +580,7 @@ public enum ConfVars {
"metastore.housekeeping.leader.election",
"host", new StringSetValidator("host", "lock"),
"Set to host, HMS will choose the leader by the configured metastore.housekeeping.leader.hostname.\n" +
"Set to lock, HMS will use the hive lock to elect the leader."),
"Set to lock, HMS will use the Hive lock to elect the leader."),
METASTORE_HOUSEKEEPING_LEADER_AUDITTABLE("metastore.housekeeping.leader.auditTable",
"metastore.housekeeping.leader.auditTable", "",
"Audit the leader election event to a plain json table when configured."),
Expand All @@ -593,6 +593,9 @@ public enum ConfVars {
"metastore.housekeeping.leader.auditFiles.limit", 10,
"Limit the number of small audit files when metastore.housekeeping.leader.newAuditFile is true.\n" +
"If the number of audit files exceeds the limit, then the oldest will be deleted."),
METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE("metastore.housekeeping.leader.lock.namespace",
"metastore.housekeeping.leader.lock.namespace", "",
"The database where the Hive lock sits when metastore.housekeeping.leader.election is set to lock."),
METASTORE_HOUSEKEEPING_THREADS_ON("metastore.housekeeping.threads.on",
"hive.metastore.housekeeping.threads.on", false,
"Whether to run the tasks under metastore.task.threads.remote on this metastore instance or not.\n" +
Expand Down
Expand Up @@ -42,12 +42,12 @@ public class LeaderElectionContext {
* For those tasks which belong to the same type, they will be running in the same leader.
*/
public enum TTYPE {
HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
"metastore_housekeeping_leader"), "housekeeping"),
WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
"metastore_worker_leader"), "compactor_worker"),
ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
"metastore_always_tasks_leader"), "always_tasks");
HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "__METASTORE_LEADER_ELECTION__",
saihemanth-cloudera marked this conversation as resolved.
Show resolved Hide resolved
"metastore_housekeeping"), "housekeeping"),
WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "__METASTORE_LEADER_ELECTION__",
"metastore_compactor_worker"), "compactor_worker"),
ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "__METASTORE_LEADER_ELECTION__",
"metastore_always_tasks"), "always_tasks");
// Mutex of TTYPE, which can be a nonexistent table
private final TableName mutex;
// Name of TTYPE
Expand Down Expand Up @@ -127,9 +127,10 @@ public void start() throws Exception {
throw new RuntimeException("Error claiming to be leader: " + leaderElection.getName(), e);
}
});
daemon.setName("Metastore Election " + leaderElection.getName());
daemon.setDaemon(true);

if (startAsDaemon) {
daemon.setName("Leader-Election-" + leaderElection.getName());
daemon.setDaemon(true);
daemon.start();
} else {
daemon.run();
Expand All @@ -154,7 +155,13 @@ public static Object getLeaderMutex(Configuration conf, TTYPE ttype, String serv
case "host":
return servHost;
case "lock":
return ttype.getTableName();
TableName mutex = ttype.getTableName();
String namespace =
MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE);
if (StringUtils.isNotEmpty(namespace)) {
return new TableName(mutex.getCat(), namespace, mutex.getTable());
}
return mutex;
default:
throw new UnsupportedOperationException(method + " not supported for leader election");
}
Expand Down
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.hive.metastore.leader;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;

Expand All @@ -26,7 +28,7 @@
*/
public class LeaderElectionFactory {

public static LeaderElection create(Configuration conf) {
public static LeaderElection create(Configuration conf) throws IOException {
String method =
MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION);
switch (method.toLowerCase()) {
Expand All @@ -35,7 +37,7 @@ public static LeaderElection create(Configuration conf) {
case "lock":
return new LeaseLeaderElection();
default:
throw new UnsupportedOperationException("Do not support " + method + " now");
throw new UnsupportedOperationException(method + " is not supported for electing the leader");
}
}

Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -92,9 +93,17 @@ public class LeaseLeaderElection implements LeaderElection<TableName> {
public static final String METASTORE_RENEW_LEASE = "metastore.renew.leader.lease";

private String name;
private String userName;
private String hostName;

private void doWork(LockResponse resp, Configuration conf,
public LeaseLeaderElection() throws IOException {
userName = SecurityUtils.getUser();
hostName = InetAddress.getLocalHost().getHostName();
}

private synchronized void doWork(LockResponse resp, Configuration conf,
TableName tableName) throws LeaderException {
long start = System.currentTimeMillis();
lockId = resp.getLockid();
assert resp.getState() == LockState.ACQUIRED || resp.getState() == LockState.WAITING;
shutdownWatcher();
Expand All @@ -121,6 +130,7 @@ private void doWork(LockResponse resp, Configuration conf,
default:
throw new IllegalStateException("Unexpected lock state: " + resp.getState());
}
LOG.debug("Spent {}ms to notify the listeners, isLeader: {}", System.currentTimeMillis() - start, isLeader);
}

private void notifyListener() {
Expand All @@ -142,13 +152,6 @@ private void notifyListener() {
public void tryBeLeader(Configuration conf, TableName table) throws LeaderException {
requireNonNull(conf, "conf is null");
requireNonNull(table, "table is null");
String user, hostName;
try {
user = SecurityUtils.getUser();
hostName = InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
throw new LeaderException("Error while getting the username", e);
}

if (store == null) {
store = TxnUtils.getTxnStore(conf);
Expand All @@ -165,7 +168,7 @@ public void tryBeLeader(Configuration conf, TableName table) throws LeaderExcept
boolean lockable = false;
Exception recentException = null;
long start = System.currentTimeMillis();
LockRequest req = new LockRequest(components, user, hostName);
LockRequest req = new LockRequest(components, userName, hostName);
int numRetries = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.LOCK_NUMRETRIES);
long maxSleep = MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
Expand All @@ -175,6 +178,7 @@ public void tryBeLeader(Configuration conf, TableName table) throws LeaderExcept
if (res.getState() == LockState.WAITING || res.getState() == LockState.ACQUIRED) {
lockable = true;
doWork(res, conf, table);
LOG.debug("Spent {}ms to lock the table {}, retries: {}", System.currentTimeMillis() - start, table, i);
break;
}
} catch (NoSuchTxnException | TxnAbortedException e) {
Expand Down Expand Up @@ -324,6 +328,7 @@ public void runInternal() {
} catch (NoSuchTxnException | TxnAbortedException e) {
throw new AssertionError("This should not happen, we didn't open txn", e);
} catch (NoSuchLockException e) {
LOG.info("No such lock {} for NonLeaderWatcher, try to obtain the lock again...", lockId);
reclaim();
} catch (Exception e) {
// Wait for next cycle.
Expand Down Expand Up @@ -379,6 +384,7 @@ public void runInternal() {
} catch (NoSuchTxnException | TxnAbortedException e) {
throw new AssertionError("This should not happen, we didn't open txn", e);
} catch (NoSuchLockException e) {
LOG.info("No such lock {} for Heartbeater, try to obtain the lock again...", lockId);
reclaim();
} catch (Exception e) {
// Wait for next cycle.
Expand All @@ -404,6 +410,7 @@ public ReleaseAndRequireWatcher(Configuration conf,
super(conf, tableName);
timeout = MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) + 3000;
setName("ReleaseAndRequireWatcher");
}

@Override
Expand Down
Expand Up @@ -27,6 +27,9 @@
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -52,6 +55,28 @@ public void testConfigLeaderElection() throws Exception {
assertFalse(election.isLeader());
}

static class TestLeaderListener implements LeaderElection.LeadershipStateListener {
AtomicBoolean flag;
TestLeaderListener(AtomicBoolean flag) {
this.flag = flag;
}
@Override
public void takeLeadership(LeaderElection election) throws Exception {
synchronized (flag) {
flag.set(true);
flag.notifyAll();
}
}

@Override
public void lossLeadership(LeaderElection election) throws Exception {
synchronized (flag) {
flag.set(false);
flag.notifyAll();
}
}
}

@Test
public void testLeaseLeaderElection() throws Exception {
Configuration configuration = MetastoreConf.newMetastoreConf();
Expand All @@ -68,56 +93,38 @@ public void testLeaseLeaderElection() throws Exception {
TableName mutex = new TableName("hive", "default", "leader_lease_ms");
LeaseLeaderElection instance1 = new LeaseLeaderElection();
AtomicBoolean flag1 = new AtomicBoolean(false);
instance1.addStateListener(new LeaderElection.LeadershipStateListener() {
@Override
public void takeLeadership(LeaderElection election) {
flag1.set(true);
}
@Override
public void lossLeadership(LeaderElection election) {
flag1.set(false);
}
});
instance1.addStateListener(new TestLeaderListener(flag1));
instance1.tryBeLeader(configuration, mutex);
// elect1 as a leader now
assertTrue(flag1.get() && instance1.isLeader());

configuration.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, true);
LeaseLeaderElection instance2 = new LeaseLeaderElection();
AtomicBoolean flag2 = new AtomicBoolean(false);
instance2.addStateListener(new LeaderElection.LeadershipStateListener() {
@Override
public void takeLeadership(LeaderElection election) {
flag2.set(true);
}
@Override
public void lossLeadership(LeaderElection election) {
flag2.set(false);
}
});
instance2.addStateListener(new TestLeaderListener(flag2));
instance2.tryBeLeader(configuration, mutex);

// instance2 should not be leader as elect1 holds the lease
assertFalse(flag2.get() || instance2.isLeader());
Thread.sleep(15 * 1000);

ExecutorService service = Executors.newFixedThreadPool(4);
wait(service, flag1, flag2);
// now instance1 lease is timeout, the instance2 should be leader now
assertTrue(instance2.isLeader() && flag2.get());

assertFalse(flag1.get() || instance1.isLeader());
assertTrue(flag2.get() && instance2.isLeader());

// remove leader's lease (instance2)
long lockId2 = instance2.getLockId();
txnStore.unlock(new UnlockRequest(lockId2));
Thread.sleep(4 * 1000);
assertTrue(flag1.get() && instance1.isLeader());
wait(service, flag1, flag2);
assertFalse(flag2.get() || instance2.isLeader());
assertTrue(lockId2 > 0);
assertFalse(instance2.getLockId() == lockId2);

// remove leader's lease(instance1)
long lockId1 = instance1.getLockId();
txnStore.unlock(new UnlockRequest(lockId1));
Thread.sleep(4 * 1000);
wait(service, flag1, flag2);
assertFalse(lockId1 == instance1.getLockId());
assertTrue(lockId1 > 0);

Expand All @@ -128,4 +135,23 @@ public void lossLeadership(LeaderElection election) {
}
}

private void wait(ExecutorService service, Object... obj) throws Exception {
Future[] fs = new Future[obj.length];
for (int i = 0; i < obj.length; i++) {
Object monitor = obj[i];
fs[i] = service.submit(() -> {
try {
synchronized (monitor) {
monitor.wait();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
for (Future f : fs) {
f.get();
}
}

}