Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
*/
public class DistributedReadWriteLock implements java.util.concurrent.locks.ReadWriteLock {

public static enum LockType {
public enum LockType {
READ, WRITE,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.manager.Manager;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -52,8 +53,8 @@ public ChangeTableState(NamespaceId namespaceId, TableId tableId, TableOperation
public long isReady(long tid, Manager env) throws Exception {
// reserve the table so that this op does not run concurrently with create, clone, or delete
// table
return Utils.reserveNamespace(env, namespaceId, tid, false, true, top)
+ Utils.reserveTable(env, tableId, tid, true, true, top);
return Utils.reserveNamespace(env, namespaceId, tid, LockType.READ, true, top)
+ Utils.reserveTable(env, tableId, tid, LockType.WRITE, true, top);
}

@Override
Expand All @@ -64,16 +65,16 @@ public Repo<Manager> call(long tid, Manager env) {
}

env.getTableManager().transitionTableState(tableId, ts, expectedCurrStates);
Utils.unreserveNamespace(env, namespaceId, tid, false);
Utils.unreserveTable(env, tableId, tid, true);
Utils.unreserveNamespace(env, namespaceId, tid, LockType.READ);
Utils.unreserveTable(env, tableId, tid, LockType.WRITE);
LoggerFactory.getLogger(ChangeTableState.class).debug("Changed table state {} {}", tableId, ts);
env.getEventCoordinator().event("Set table state of %s to %s", tableId, ts);
return null;
}

@Override
public void undo(long tid, Manager env) {
Utils.unreserveNamespace(env, namespaceId, tid, false);
Utils.unreserveTable(env, tableId, tid, true);
Utils.unreserveNamespace(env, namespaceId, tid, LockType.READ);
Utils.unreserveTable(env, tableId, tid, LockType.WRITE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ public static <T extends AbstractId<T>> T getNextId(String name, ServerContext c
static final Lock tableNameLock = new ReentrantLock();
static final Lock idLock = new ReentrantLock();

public static long reserveTable(Manager env, TableId tableId, long tid, boolean writeLock,
public static long reserveTable(Manager env, TableId tableId, long tid, LockType lockType,
boolean tableMustExist, TableOperation op) throws Exception {
if (getLock(env.getContext(), tableId, tid, writeLock).tryLock()) {
if (getLock(env.getContext(), tableId, tid, lockType).tryLock()) {
if (tableMustExist) {
ZooReaderWriter zk = env.getContext().getZooReaderWriter();
if (!zk.exists(env.getContext().getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId)) {
Expand All @@ -101,29 +101,27 @@ public static long reserveTable(Manager env, TableId tableId, long tid, boolean
}
}
log.info("table {} {} locked for {} operation: {}", tableId, FateTxId.formatTid(tid),
(writeLock ? "write" : "read"), op);
lockType, op);
return 0;
} else {
return 100;
}
}

public static void unreserveTable(Manager env, TableId tableId, long tid, boolean writeLock) {
getLock(env.getContext(), tableId, tid, writeLock).unlock();
log.info("table {} {} unlocked for {}", tableId, FateTxId.formatTid(tid),
(writeLock ? "write" : "read"));
public static void unreserveTable(Manager env, TableId tableId, long tid, LockType lockType) {
getLock(env.getContext(), tableId, tid, lockType).unlock();
log.info("table {} {} unlocked for {}", tableId, FateTxId.formatTid(tid), lockType);
}

public static void unreserveNamespace(Manager env, NamespaceId namespaceId, long id,
boolean writeLock) {
getLock(env.getContext(), namespaceId, id, writeLock).unlock();
log.info("namespace {} {} unlocked for {}", namespaceId, FateTxId.formatTid(id),
(writeLock ? "write" : "read"));
LockType lockType) {
getLock(env.getContext(), namespaceId, id, lockType).unlock();
log.info("namespace {} {} unlocked for {}", namespaceId, FateTxId.formatTid(id), lockType);
}

public static long reserveNamespace(Manager env, NamespaceId namespaceId, long id,
boolean writeLock, boolean mustExist, TableOperation op) throws Exception {
if (getLock(env.getContext(), namespaceId, id, writeLock).tryLock()) {
LockType lockType, boolean mustExist, TableOperation op) throws Exception {
if (getLock(env.getContext(), namespaceId, id, lockType).tryLock()) {
if (mustExist) {
ZooReaderWriter zk = env.getContext().getZooReaderWriter();
if (!zk.exists(
Expand All @@ -133,7 +131,7 @@ public static long reserveNamespace(Manager env, NamespaceId namespaceId, long i
}
}
log.info("namespace {} {} locked for {} operation: {}", namespaceId, FateTxId.formatTid(id),
(writeLock ? "write" : "read"), op);
lockType, op);
return 0;
} else {
return 100;
Expand Down Expand Up @@ -163,27 +161,30 @@ public static void unreserveHdfsDirectory(Manager env, String directory, long ti
}

private static Lock getLock(ServerContext context, AbstractId<?> id, long tid,
boolean writeLock) {
LockType lockType) {
byte[] lockData = FastFormat.toZeroPaddedHex(tid);
var fLockPath =
FateLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS + "/" + id.canonical());
FateLock qlock = new FateLock(context.getZooReaderWriter(), fLockPath);
DistributedLock lock = DistributedReadWriteLock.recoverLock(qlock, lockData);
if (lock != null) {

// Validate the recovered lock type
boolean isWriteLock = lock.getType() == LockType.WRITE;
if (writeLock != isWriteLock) {
if (lock.getType() != lockType) {
throw new IllegalStateException("Unexpected lock type " + lock.getType()
+ " recovered for transaction " + FateTxId.formatTid(tid) + " on object " + id
+ ". Expected " + (writeLock ? LockType.WRITE : LockType.READ) + " lock instead.");
+ ". Expected " + lockType + " lock instead.");
}
} else {
DistributedReadWriteLock locker = new DistributedReadWriteLock(qlock, lockData);
if (writeLock) {
lock = locker.writeLock();
} else {
lock = locker.readLock();
switch (lockType) {
case WRITE:
lock = locker.writeLock();
break;
case READ:
lock = locker.readLock();
break;
default:
throw new IllegalStateException("Unexpected LockType: " + lockType);
}
}
return lock;
Expand All @@ -198,7 +199,7 @@ public static Lock getTableNameLock() {
}

public static Lock getReadLock(Manager env, AbstractId<?> id, long tid) {
return Utils.getLock(env.getContext(), id, tid, false);
return Utils.getLock(env.getContext(), id, tid, LockType.READ);
}

public static void checkNamespaceDoesNotExist(ServerContext context, String namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.Utils;
Expand All @@ -48,9 +49,9 @@ public CloneTable(String user, NamespaceId namespaceId, TableId srcTableId, Stri

@Override
public long isReady(long tid, Manager environment) throws Exception {
long val = Utils.reserveNamespace(environment, cloneInfo.srcNamespaceId, tid, false, true,
TableOperation.CLONE);
val += Utils.reserveTable(environment, cloneInfo.srcTableId, tid, false, true,
long val = Utils.reserveNamespace(environment, cloneInfo.srcNamespaceId, tid, LockType.READ,
true, TableOperation.CLONE);
val += Utils.reserveTable(environment, cloneInfo.srcTableId, tid, LockType.READ, true,
TableOperation.CLONE);
return val;
}
Expand All @@ -71,8 +72,8 @@ public Repo<Manager> call(long tid, Manager environment) throws Exception {

@Override
public void undo(long tid, Manager environment) {
Utils.unreserveNamespace(environment, cloneInfo.srcNamespaceId, tid, false);
Utils.unreserveTable(environment, cloneInfo.srcTableId, tid, false);
Utils.unreserveNamespace(environment, cloneInfo.srcNamespaceId, tid, LockType.READ);
Utils.unreserveTable(environment, cloneInfo.srcTableId, tid, LockType.READ);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.accumulo.core.clientImpl.Namespaces;
import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.core.util.tables.TableNameUtil;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
Expand All @@ -45,11 +46,11 @@ public CloneZookeeper(CloneInfo cloneInfo, ClientContext context)
public long isReady(long tid, Manager environment) throws Exception {
long val = 0;
if (!cloneInfo.srcNamespaceId.equals(cloneInfo.namespaceId)) {
val += Utils.reserveNamespace(environment, cloneInfo.namespaceId, tid, false, true,
val += Utils.reserveNamespace(environment, cloneInfo.namespaceId, tid, LockType.READ, true,
TableOperation.CLONE);
}
val +=
Utils.reserveTable(environment, cloneInfo.tableId, tid, true, false, TableOperation.CLONE);
val += Utils.reserveTable(environment, cloneInfo.tableId, tid, LockType.WRITE, false,
TableOperation.CLONE);
return val;
}

Expand Down Expand Up @@ -77,9 +78,9 @@ public Repo<Manager> call(long tid, Manager environment) throws Exception {
public void undo(long tid, Manager environment) throws Exception {
environment.getTableManager().removeTable(cloneInfo.tableId);
if (!cloneInfo.srcNamespaceId.equals(cloneInfo.namespaceId)) {
Utils.unreserveNamespace(environment, cloneInfo.namespaceId, tid, false);
Utils.unreserveNamespace(environment, cloneInfo.namespaceId, tid, LockType.READ);
}
Utils.unreserveTable(environment, cloneInfo.tableId, tid, true);
Utils.unreserveTable(environment, cloneInfo.tableId, tid, LockType.WRITE);
environment.getContext().clearTableListCache();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.EnumSet;

import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
Expand Down Expand Up @@ -58,12 +59,12 @@ public Repo<Manager> call(long tid, Manager environment) {
expectedCurrStates);
}

Utils.unreserveNamespace(environment, cloneInfo.srcNamespaceId, tid, false);
Utils.unreserveNamespace(environment, cloneInfo.srcNamespaceId, tid, LockType.READ);
if (!cloneInfo.srcNamespaceId.equals(cloneInfo.namespaceId)) {
Utils.unreserveNamespace(environment, cloneInfo.namespaceId, tid, false);
Utils.unreserveNamespace(environment, cloneInfo.namespaceId, tid, LockType.READ);
}
Utils.unreserveTable(environment, cloneInfo.srcTableId, tid, false);
Utils.unreserveTable(environment, cloneInfo.tableId, tid, true);
Utils.unreserveTable(environment, cloneInfo.srcTableId, tid, LockType.READ);
Utils.unreserveTable(environment, cloneInfo.tableId, tid, LockType.WRITE);

environment.getEventCoordinator().event("Cloned table %s from %s", cloneInfo.tableName,
cloneInfo.srcTableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.util.FastFormat;
import org.apache.accumulo.core.util.TextUtil;
Expand Down Expand Up @@ -89,8 +90,9 @@ public CompactRange(NamespaceId namespaceId, TableId tableId, CompactionConfig c

@Override
public long isReady(long tid, Manager env) throws Exception {
return Utils.reserveNamespace(env, namespaceId, tid, false, true, TableOperation.COMPACT)
+ Utils.reserveTable(env, tableId, tid, false, true, TableOperation.COMPACT);
return Utils.reserveNamespace(env, namespaceId, tid, LockType.READ, true,
TableOperation.COMPACT)
+ Utils.reserveTable(env, tableId, tid, LockType.READ, true, TableOperation.COMPACT);
}

@Override
Expand Down Expand Up @@ -179,8 +181,8 @@ public void undo(long tid, Manager env) throws Exception {
try {
removeIterators(env, tid, tableId);
} finally {
Utils.unreserveNamespace(env, namespaceId, tid, false);
Utils.unreserveTable(env, tableId, tid, false);
Utils.unreserveNamespace(env, namespaceId, tid, LockType.READ);
Utils.unreserveTable(env, tableId, tid, LockType.READ);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.FateTxId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
Expand All @@ -48,8 +49,9 @@ public CancelCompactions(NamespaceId namespaceId, TableId tableId) {

@Override
public long isReady(long tid, Manager env) throws Exception {
return Utils.reserveNamespace(env, namespaceId, tid, false, true, TableOperation.COMPACT_CANCEL)
+ Utils.reserveTable(env, tableId, tid, false, true, TableOperation.COMPACT_CANCEL);
return Utils.reserveNamespace(env, namespaceId, tid, LockType.READ, true,
TableOperation.COMPACT_CANCEL)
+ Utils.reserveTable(env, tableId, tid, LockType.READ, true, TableOperation.COMPACT_CANCEL);
}

@Override
Expand All @@ -60,8 +62,8 @@ public Repo<Manager> call(long tid, Manager environment) throws Exception {

@Override
public void undo(long tid, Manager env) {
Utils.unreserveTable(env, tableId, tid, false);
Utils.unreserveNamespace(env, namespaceId, tid, false);
Utils.unreserveTable(env, tableId, tid, LockType.READ);
Utils.unreserveNamespace(env, namespaceId, tid, LockType.READ);
}

public static void mutateZooKeeper(long tid, TableId tableId, Manager environment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.Utils;
Expand All @@ -37,8 +38,8 @@ public FinishCancelCompaction(NamespaceId namespaceId, TableId tableId) {

@Override
public Repo<Manager> call(long tid, Manager environment) {
Utils.unreserveTable(environment, tableId, tid, false);
Utils.unreserveNamespace(environment, namespaceId, tid, false);
Utils.unreserveTable(environment, tableId, tid, LockType.READ);
Utils.unreserveNamespace(environment, namespaceId, tid, LockType.READ);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.manager.tableOps.TableInfo;
Expand Down Expand Up @@ -60,7 +61,7 @@ public CreateTable(String user, String tableName, TimeType timeType, Map<String,
@Override
public long isReady(long tid, Manager environment) throws Exception {
// reserve the table's namespace to make sure it doesn't change while the table is created
return Utils.reserveNamespace(environment, tableInfo.getNamespaceId(), tid, false, true,
return Utils.reserveNamespace(environment, tableInfo.getNamespaceId(), tid, LockType.READ, true,
TableOperation.CREATE);
}

Expand Down Expand Up @@ -95,7 +96,7 @@ public void undo(long tid, Manager env) throws IOException {
} catch (IOException e) {
log.error("Table failed to be created and failed to clean up split files at {}", p, e);
} finally {
Utils.unreserveNamespace(env, tableInfo.getNamespaceId(), tid, false);
Utils.unreserveNamespace(env, tableInfo.getNamespaceId(), tid, LockType.READ);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.accumulo.core.client.admin.InitialTableState;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.DistributedReadWriteLock.LockType;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
Expand Down Expand Up @@ -61,8 +62,8 @@ public Repo<Manager> call(long tid, Manager env) throws Exception {
TableState.ONLINE, expectedCurrStates);
}

Utils.unreserveNamespace(env, tableInfo.getNamespaceId(), tid, false);
Utils.unreserveTable(env, tableInfo.getTableId(), tid, true);
Utils.unreserveNamespace(env, tableInfo.getNamespaceId(), tid, LockType.READ);
Utils.unreserveTable(env, tableInfo.getTableId(), tid, LockType.WRITE);

env.getEventCoordinator().event("Created table %s ", tableInfo.getTableName());

Expand Down
Loading