Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.Callable;

Expand All @@ -46,6 +47,8 @@
/** Hive {@link CatalogLock}. */
public class HiveCatalogLock implements CatalogLock {

private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogLock.class);

static final String LOCK_IDENTIFIER = "hive";

private final ClientPool<IMetaStoreClient, TException> clients;
Expand Down Expand Up @@ -77,43 +80,60 @@ private long lock(String database, String table)
new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
lockComponent.setTablename(table);
lockComponent.unsetOperationType();

long startMs = System.currentTimeMillis();
final LockRequest lockRequest =
new LockRequest(
Collections.singletonList(lockComponent),
System.getProperty("user.name"),
InetAddress.getLocalHost().getHostName());
LockResponse lockResponse = clients.run(client -> client.lock(lockRequest));
long lockId = lockResponse.getLockid();

long nextSleep = 50;
long startRetry = System.currentTimeMillis();
while (lockResponse.getState() == LockState.WAITING) {
nextSleep *= 2;
if (nextSleep > checkMaxSleep) {
nextSleep = checkMaxSleep;
}
Thread.sleep(nextSleep);

final LockResponse tempLockResponse = lockResponse;
lockResponse = clients.run(client -> client.checkLock(tempLockResponse.getLockid()));
if (System.currentTimeMillis() - startRetry > acquireTimeout) {
break;
try {
while (lockResponse.getState() == LockState.WAITING) {
long elapsed = System.currentTimeMillis() - startMs;
if (elapsed >= acquireTimeout) {
break;
}

nextSleep = Math.min(nextSleep * 2, checkMaxSleep);
Thread.sleep(nextSleep);

lockResponse = clients.run(client -> client.checkLock(lockId));
}
} finally {
if (lockResponse.getState() != LockState.ACQUIRED) {
// unlock if not acquired
unlock(lockId);
}
}
long retryDuration = System.currentTimeMillis() - startRetry;

if (lockResponse.getState() != LockState.ACQUIRED) {
if (lockResponse.getState() == LockState.WAITING) {
final LockResponse tempLockResponse = lockResponse;
clients.execute(client -> client.unlock(tempLockResponse.getLockid()));
}
throw new RuntimeException(
"Acquire lock failed with time: " + Duration.ofMillis(retryDuration));
LockState lockState = lockResponse.getState();
long duration = System.currentTimeMillis() - startMs;
String msg =
String.format(
"for table %s.%s (lockId=%d) after %dms. Final lock state: %s",
database, table, lockId, duration, lockState);
LOG.info("Acquire lock {}", msg);
if (lockState == LockState.ACQUIRED) {
return lockId;
}
return lockResponse.getLockid();

throw new RuntimeException("Acquire lock failed " + msg);
}

private void unlock(long lockId) throws TException, InterruptedException {
clients.execute(client -> client.unlock(lockId));
private void unlock(long lockId) {
if (lockId <= 0) {
return;
}
try {
clients.execute(client -> client.unlock(lockId));
} catch (Exception e) {
LOG.warn("Unlock failed for lockId={}", lockId, e);
}
}

@Override
Expand Down