Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 28 additions & 32 deletions ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -2846,36 +2846,32 @@ public static void setNonTransactional(Map<String, String> tblProps) {
}

private static boolean needsLock(Entity entity, boolean isExternalEnabled) {
switch (entity.getType()) {
case TABLE:
return isLockableTable(entity.getTable(), isExternalEnabled);
case PARTITION:
return isLockableTable(entity.getPartition().getTable(), isExternalEnabled);
default:
return true;
}
return needsLock(entity, isExternalEnabled, false);
}

private static Table getTable(WriteEntity we) {
Table t = we.getTable();
if (t == null) {
throw new IllegalStateException("No table info for " + we);
private static boolean needsLock(Entity entity, boolean isExternalEnabled, boolean isLocklessReads) {
switch (entity.getType()) {
case TABLE:
return isLockableTable(entity.getTable(), isExternalEnabled, isLocklessReads);
case PARTITION:
return isLockableTable(entity.getPartition().getTable(), isExternalEnabled, isLocklessReads);
default:
return true;
}
return t;
}

private static boolean isLockableTable(Table t, boolean isExternalEnabled) {
private static boolean isLockableTable(Table t, boolean isExternalEnabled, boolean isLocklessReads) {
if (t.isTemporary()) {
return false;
}
switch (t.getTableType()) {
case MANAGED_TABLE:
case MATERIALIZED_VIEW:
return true;
case EXTERNAL_TABLE:
return isExternalEnabled;
default:
return false;
case MANAGED_TABLE:
case MATERIALIZED_VIEW:
return !(isLocklessReads && isTransactionalTable(t));
case EXTERNAL_TABLE:
return isExternalEnabled;
default:
return false;
}
}

Expand All @@ -2890,8 +2886,10 @@ public static List<LockComponent> makeLockComponents(Set<WriteEntity> outputs, S
Context.Operation operation, HiveConf conf) {

List<LockComponent> lockComponents = new ArrayList<>();
boolean isLocklessReadsEnabled = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED);
boolean skipReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_READ_LOCKS);
boolean skipNonAcidReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_NONACID_READ_LOCKS);

boolean sharedWrite = !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK);
boolean isExternalEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_EXT_LOCKING_ENABLED);
boolean isMerge = operation == Context.Operation.MERGE;
Expand All @@ -2902,7 +2900,7 @@ public static List<LockComponent> makeLockComponents(Set<WriteEntity> outputs, S
.filter(input -> !input.isDummy()
&& input.needsLock()
&& !input.isUpdateOrDelete()
&& AcidUtils.needsLock(input, isExternalEnabled)
&& AcidUtils.needsLock(input, isExternalEnabled, isLocklessReadsEnabled)
&& !skipReadLock)
.collect(Collectors.toList());

Expand Down Expand Up @@ -2961,9 +2959,8 @@ public static List<LockComponent> makeLockComponents(Set<WriteEntity> outputs, S
// overwrite) than we need a shared. If it's update or delete then we
// need a SHARED_WRITE.
for (WriteEntity output : outputs) {
LOG.debug("output is null " + (output == null));
if (output.getType() == Entity.Type.DFS_DIR || output.getType() == Entity.Type.LOCAL_DIR || !AcidUtils
.needsLock(output, isExternalEnabled)) {
if (output.getType() == Entity.Type.DFS_DIR || output.getType() == Entity.Type.LOCAL_DIR
|| !AcidUtils.needsLock(output, isExternalEnabled)) {
// We don't lock files or directories. We also skip locking temp tables.
continue;
}
Expand Down Expand Up @@ -3015,7 +3012,8 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi
case INSERT_OVERWRITE:
assert t != null;
if (AcidUtils.isTransactionalTable(t)) {
if (conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) && !sharedWrite) {
if (conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) && !sharedWrite
&& !isLocklessReadsEnabled) {
compBuilder.setExclusive();
} else {
compBuilder.setExclWrite();
Expand All @@ -3030,18 +3028,16 @@ Seems much cleaner if each stmt is identified as a particular HiveOperation (whi
assert t != null;
if (AcidUtils.isTransactionalTable(t)) {
boolean isExclMergeInsert = conf.getBoolVar(ConfVars.TXN_MERGE_INSERT_X_LOCK) && isMerge;
compBuilder.setSharedRead();

if (sharedWrite) {
compBuilder.setSharedWrite();
} else {
if (isExclMergeInsert) {
compBuilder.setExclWrite();
} else {
if (AcidUtils.isLocklessReadsEnabled(t, conf)) {
compBuilder.setSharedWrite();
} else {
compBuilder.setSharedRead();
}

} else if (isLocklessReadsEnabled) {
compBuilder.setSharedWrite();
}
}
if (isExclMergeInsert) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3591,10 +3591,12 @@ private void testDropTable(boolean blocking) throws Exception {

driver.lockAndRespond();
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
Assert.assertEquals("Unexpected lock count", blocking ? 1 : 0, locks.size());

checkLock(LockType.SHARED_READ,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
if (blocking) {
checkLock(LockType.SHARED_READ,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
}

DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
swapTxnManager(txnMgr2);
Expand All @@ -3619,7 +3621,7 @@ private void testDropTable(boolean blocking) throws Exception {
}
driver2.lockAndRespond();
locks = getLocks();
Assert.assertEquals("Unexpected lock count", blocking ? 1 : 2, locks.size());
Assert.assertEquals("Unexpected lock count", 1, locks.size());

checkLock(blocking ? LockType.EXCLUSIVE : LockType.EXCL_WRITE,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
Expand Down Expand Up @@ -3722,10 +3724,12 @@ private void testRenameTable(boolean blocking) throws Exception {

driver.lockAndRespond();
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
Assert.assertEquals("Unexpected lock count", blocking ? 1 : 0, locks.size());

checkLock(LockType.SHARED_READ,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
if (blocking) {
checkLock(LockType.SHARED_READ,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
}

DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
swapTxnManager(txnMgr2);
Expand All @@ -3750,7 +3754,7 @@ private void testRenameTable(boolean blocking) throws Exception {
}
driver2.lockAndRespond();
locks = getLocks();
Assert.assertEquals("Unexpected lock count", blocking ? 1 : 2, locks.size());
Assert.assertEquals("Unexpected lock count", 1, locks.size());

checkLock(blocking ? LockType.EXCLUSIVE : LockType.EXCL_WRITE,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
Expand Down Expand Up @@ -3859,12 +3863,14 @@ private void testDropMaterializedView(boolean blocking) throws Exception {

driver.lockAndRespond();
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
Assert.assertEquals("Unexpected lock count", blocking ? 2 : 0, locks.size());

checkLock(LockType.SHARED_READ,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
checkLock(LockType.SHARED_READ,
LockState.ACQUIRED, "default", "mv_tab_acid", null, locks);
if (blocking) {
checkLock(LockType.SHARED_READ,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
checkLock(LockType.SHARED_READ,
LockState.ACQUIRED, "default", "mv_tab_acid", null, locks);
}

DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
swapTxnManager(txnMgr2);
Expand All @@ -3889,7 +3895,7 @@ private void testDropMaterializedView(boolean blocking) throws Exception {
}
driver2.lockAndRespond();
locks = getLocks();
Assert.assertEquals("Unexpected lock count", blocking ? 1 : 3, locks.size());
Assert.assertEquals("Unexpected lock count", 1, locks.size());

checkLock(blocking ? LockType.EXCLUSIVE : LockType.EXCL_WRITE,
LockState.ACQUIRED, "default", "mv_tab_acid", null, locks);
Expand Down Expand Up @@ -4223,10 +4229,12 @@ private void testAddColumns(boolean blocking) throws Exception {

driver.lockAndRespond();
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
Assert.assertEquals("Unexpected lock count", blocking ? 1 : 0, locks.size());

checkLock(LockType.SHARED_READ,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
if (blocking) {
checkLock(LockType.SHARED_READ,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
}

DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
swapTxnManager(txnMgr2);
Expand All @@ -4251,7 +4259,7 @@ private void testAddColumns(boolean blocking) throws Exception {
}
driver2.lockAndRespond();
locks = getLocks();
Assert.assertEquals("Unexpected lock count", blocking ? 1 : 2, locks.size());
Assert.assertEquals("Unexpected lock count", 1, locks.size());

checkLock(blocking ? LockType.EXCLUSIVE : LockType.EXCL_WRITE,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
Expand Down Expand Up @@ -4318,10 +4326,12 @@ private void testReplaceRenameColumns(boolean blocking, String alterSubQuery) th

driver.lockAndRespond();
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 1, locks.size());
Assert.assertEquals("Unexpected lock count", blocking ? 1 : 0, locks.size());

checkLock(LockType.SHARED_READ,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
if (blocking) {
checkLock(LockType.SHARED_READ,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
}

DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
swapTxnManager(txnMgr2);
Expand All @@ -4346,7 +4356,7 @@ private void testReplaceRenameColumns(boolean blocking, String alterSubQuery) th
}
driver2.lockAndRespond();
locks = getLocks();
Assert.assertEquals("Unexpected lock count", blocking ? 1 : 2, locks.size());
Assert.assertEquals("Unexpected lock count", 1, locks.size());

checkLock(blocking ? LockType.EXCLUSIVE : LockType.EXCL_WRITE,
LockState.ACQUIRED, "default", "tab_acid", null, locks);
Expand Down