Skip to content

Commit

Permalink
HIVE-26252: Missing locks in case of MERGE with multiple branches (De…
Browse files Browse the repository at this point in the history
…nys Kuzmenko, reviewed by Peter Vary)

Closes #3308
  • Loading branch information
deniskuzZ committed May 24, 2022
1 parent 35d4532 commit 2500b2d
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 2 deletions.
Expand Up @@ -2396,6 +2396,89 @@ private void testConcurrent2MergeUpdatesConflict(boolean slowCompile) throws Exc
Assert.assertEquals("Lost Update", "[earl\t10, amy\t10]", res.toString());
}

@Test
public void testMergeMultipleBranchesOptimistic() throws Exception {
dropTable(new String[]{"target", "src1", "src2"});
conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
conf.setBoolVar(HiveConf.ConfVars.TXN_MERGE_INSERT_X_LOCK, false);

driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
driver2.getConf().setBoolVar(HiveConf.ConfVars.TXN_MERGE_INSERT_X_LOCK, false);

driver.run("create table target (id int, txt string) stored as orc TBLPROPERTIES('transactional'='true')");
driver.run("insert into target values " +
"('0', 'orig_FyZl'), " +
"(5, 'orig_VsbLsaG'), " +
"(10, 'orig_dhhCassOoV')");

driver.run("create table src1 (id int, txt string) stored as orc TBLPROPERTIES('transactional'='true')");
driver.run("insert into src1 values " +
"(0, 'new1_tnlGat'), " +
"(1, 'new1_KulBf'), " +
"(2, 'new1_zkLGuU'), " +
"(3, 'new1_jznZVac')," +
"(4, 'new1_hdyazJXL')," +
"(5, 'new1_gxclXFtP')," +
"(6, 'new1_CNZr')," +
"(7, 'new1_GoBjjuow')," +
"(8, 'new1_vRfY')," +
"(9, 'new1_bdnQA')," +
"(10, 'new1_FNboL')");

driver.run("create table src2 (id int, txt string) stored as orc TBLPROPERTIES('transactional'='true')");
driver.run("insert into src2 values " +
"(0, 'new2_Cjdj'), " +
"(1, 'new2_GysxGF'), " +
"(2, 'new2_ToHyf'), " +
"(3, 'new2_HZjkahVJ')," +
"(4, 'new2_qcySYYUul')," +
"(5, 'new2_FupKyDcVcJ')," +
"(6, 'new2_DAcCwakVr')," +
"(7, 'new2_nZozPAZKI')," +
"(8, 'new2_bjdEmdRp')," +
"(9, 'new2_PkRAwdJeLX')," +
"(10, 'new2_aGSuZHx')");

driver.compileAndRespond("MERGE INTO target t USING src1 s ON t.id = s.id " +
"WHEN MATCHED THEN UPDATE SET txt = CONCAT_WS(' ',t.txt,s.txt) " +
"WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.txt)");

DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
swapTxnManager(txnMgr2);
driver2.compileAndRespond("MERGE INTO target t USING src2 s ON t.id = s.id " +
"WHEN MATCHED THEN UPDATE SET txt = CONCAT_WS(' ',t.txt,s.txt) " +
"WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.txt)");

swapTxnManager(txnMgr);
driver.run();

swapTxnManager(txnMgr2);
try {
driver2.run();
} catch (Exception ex) {
Assert.assertTrue(ex.getCause() instanceof LockException);
Assert.assertTrue(ex.getMessage().matches(".*Aborting .* due to a write conflict on default/target.*"));
}
swapTxnManager(txnMgr);
driver.run("select * from target order by id");

List<String> res = new ArrayList<>();
driver.getFetchTask().fetch(res);
Assert.assertEquals(11, res.size());
Assert.assertEquals(
"[0\torig_FyZl new1_tnlGat, " +
"1\tnew1_KulBf, " +
"2\tnew1_zkLGuU, " +
"3\tnew1_jznZVac, " +
"4\tnew1_hdyazJXL, " +
"5\torig_VsbLsaG new1_gxclXFtP, " +
"6\tnew1_CNZr, " +
"7\tnew1_GoBjjuow, " +
"8\tnew1_vRfY, " +
"9\tnew1_bdnQA, " +
"10\torig_dhhCassOoV new1_FNboL]", res.toString());
}

@Test
public void testConcurrent2InsertOverwritesDiffPartitions() throws Exception {
testConcurrent2InsertOverwrites(false);
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.metastore;

import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockRequest;

Expand Down Expand Up @@ -168,8 +169,9 @@ private void setPart(LockComponent comp, PartTrie parts) {
if (existing == null) {
// No existing lock for this partition.
parts.put(comp.getPartitionname(), comp);
} else if (lockTypeComparator.compare(comp.getType(), existing.getType()) > 0) {
// We only need to promote if comp.type is > existing.type.
} else if (lockTypeComparator.compare(comp.getType(), existing.getType()) > 0
|| comp.getType() == existing.getType() && existing.getOperationType() == DataOperationType.INSERT) {
// We only need to promote if comp.type is > existing.type or it's an update/delete
parts.put(comp.getPartitionname(), comp);
}
}
Expand Down

0 comments on commit 2500b2d

Please sign in to comment.