Skip to content

Commit

Permalink
moved txnList supplier to queryState object
Browse files Browse the repository at this point in the history
  • Loading branch information
deniskuzZ committed May 24, 2024
1 parent fd92849 commit 9251359
Show file tree
Hide file tree
Showing 38 changed files with 187 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3768,7 +3768,7 @@ public void testEventsDumpedCountWithFilteringOfOpenTransactions() throws Throwa
);

WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
.run("create table t1 (id int)")
.run("create table t1 (id int) stored as orc tblproperties (\"transactional\"=\"true\")")
.run("insert into table t1 values (1)")
.dump(primaryDbName, incrementalBatchConfigs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void testRebalanceCompactionWithParallelDeleteAsSecond(boolean optimisti
}
aborted = true;
Assert.assertEquals(LockException.class, e.getCause().getClass());
Assert.assertEquals( "Transaction manager has aborted the transaction txnid:19. Reason: Aborting [txnid:19,19] due to a write conflict on default/rebalance_test committed by [txnid:18,19] d/u", e.getCauseMessage());
Assert.assertEquals( "Transaction manager has aborted the transaction txnid:20. Reason: Aborting [txnid:20,20] due to a write conflict on default/rebalance_test committed by [txnid:19,20] d/u", e.getCauseMessage());
// Delete the record, so the rest of the test can be the same in both cases
executeStatementOnDriver("DELETE FROM " + tableName + " WHERE b = 12", driver);
} finally {
Expand Down Expand Up @@ -197,7 +197,7 @@ private void testRebalanceCompactionWithParallelDeleteAsSecond(boolean optimisti
},
};
verifyRebalance(testDataProvider, tableName, null, expectedBuckets,
new String[] {"bucket_00000", "bucket_00001", "bucket_00002", "bucket_00003"}, "base_0000007_v0000018");
new String[] {"bucket_00000", "bucket_00001", "bucket_00002", "bucket_00003"}, "base_0000007_v0000019");
}

@Test
Expand Down Expand Up @@ -253,7 +253,7 @@ public void testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTableWithOr
},
};
verifyRebalance(testDataProvider, tableName, null, expectedBuckets,
new String[] {"bucket_00000", "bucket_00001", "bucket_00002","bucket_00003"}, "base_0000007_v0000018");
new String[] {"bucket_00000", "bucket_00001", "bucket_00002","bucket_00003"}, "base_0000007_v0000019");
}

@Test
Expand Down Expand Up @@ -307,7 +307,7 @@ public void testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable() thr
},
};
verifyRebalance(testDataProvider, tableName, null, expectedBuckets,
new String[] {"bucket_00000", "bucket_00001", "bucket_00002","bucket_00003"}, "base_0000007_v0000018");
new String[] {"bucket_00000", "bucket_00001", "bucket_00002","bucket_00003"}, "base_0000007_v0000019");
}

@Test
Expand Down Expand Up @@ -416,7 +416,7 @@ public void testRebalanceCompactionOfPartitionedImplicitlyBucketedTable() throws
},
};
verifyRebalance(testDataProvider, tableName, "ds=tomorrow", expectedBuckets,
new String[] {"bucket_00000", "bucket_00001", "bucket_00002"}, "base_0000007_v0000014");
new String[] {"bucket_00000", "bucket_00001", "bucket_00002"}, "base_0000007_v0000015");
}

@Test
Expand Down Expand Up @@ -490,7 +490,7 @@ public void testRebalanceCompactionNotPartitionedExplicitBucketNumbers() throws
},
};
verifyRebalance(testDataProvider, tableName, null, expectedBuckets,
new String[] {"bucket_00000", "bucket_00001", "bucket_00002", "bucket_00003"}, "base_0000007_v0000018");
new String[] {"bucket_00000", "bucket_00001", "bucket_00002", "bucket_00003"}, "base_0000007_v0000019");
}

private TestDataProvider prepareRebalanceTestData(String tableName) throws Exception {
Expand Down Expand Up @@ -2305,7 +2305,7 @@ public void testCompactionWithSchemaEvolutionNoBucketsMultipleReducers() throws
}

@Test public void testMajorCompactionDb() throws Exception {
testCompactionDb(CompactionType.MAJOR, "base_0000005_v0000008");
testCompactionDb(CompactionType.MAJOR, "base_0000005_v0000009");
}

@Test public void testMinorCompactionDb() throws Exception {
Expand Down Expand Up @@ -2764,12 +2764,12 @@ public boolean run(CompactorContext context) throws IOException {
hiveConf.set(ValidTxnList.VALID_TXNS_KEY, "8:9223372036854775807::");

// Check for default case.
qc.runCompactionQueries(hiveConf, null, sdMock, null, ciMock, null, emptyQueries, emptyQueries, emptyQueries, null);
qc.runCompactionQueries(hiveConf, null, ciMock, null, emptyQueries, emptyQueries, emptyQueries, null);
Assert.assertEquals("all", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT));

// Check for case where hive.llap.io.etl.skip.format is explicitly set to none - as to always use cache.
hiveConf.setVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT, "none");
qc.runCompactionQueries(hiveConf, null, sdMock, null, ciMock, null, emptyQueries, emptyQueries, emptyQueries, null);
qc.runCompactionQueries(hiveConf, null, ciMock, null, emptyQueries, emptyQueries, emptyQueries, null);
Assert.assertEquals("none", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static void beforeTest() throws Exception {
TestTxnDbUtil.prepDb(conf);
miniHS2 = new MiniHS2(conf, MiniHS2.MiniClusterType.TEZ);
confOverlay.put(ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
confOverlay.put(ConfVars.CREATE_TABLES_AS_ACID.varname, "true");
miniHS2.start(confOverlay);
}

Expand All @@ -79,7 +80,7 @@ public void testCancelOperation() throws Exception {
CLIServiceClient serviceClient = miniHS2.getServiceClient();
SessionHandle sessHandle = serviceClient.openSession("foo", "bar");
serviceClient.executeStatement(sessHandle, "DROP TABLE IF EXISTS " + tableName, confOverlay);
serviceClient.executeStatement(sessHandle, "CREATE TABLE " + tableName + " (id INT)", confOverlay);
serviceClient.executeStatement(sessHandle, "CREATE TABLE " + tableName + " (id INT) STORED AS ORC", confOverlay);
serviceClient.executeStatement(sessHandle, "insert into " + tableName + " values(5)", confOverlay);

serviceClient.executeStatement(sessHandle,
Expand Down Expand Up @@ -110,7 +111,7 @@ public void testAsyncConcurrent() throws Exception {
CLIServiceClient serviceClient = miniHS2.getServiceClient();
SessionHandle sessHandle = serviceClient.openSession("foo", "bar");
serviceClient.executeStatement(sessHandle, "DROP TABLE IF EXISTS " + tableName, confOverlay);
serviceClient.executeStatement(sessHandle, "CREATE TABLE " + tableName + " (id INT)", confOverlay);
serviceClient.executeStatement(sessHandle, "CREATE TABLE " + tableName + " (id INT) STORED AS ORC", confOverlay);
serviceClient.executeStatement(sessHandle, "insert into " + tableName + " values(5)", confOverlay);
OperationHandle opHandle =
serviceClient.executeStatementAsync(sessHandle, "select * from " + tableName, confOverlay);
Expand All @@ -128,7 +129,6 @@ public void testAsyncConcurrent() throws Exception {
assertOperationFinished(serviceClient, opHandle2);
rowSet = serviceClient.fetchResults(opHandle2);
assertEquals(1, rowSet.numRows());
serviceClient.executeStatement(sessHandle, "DROP TABLE IF EXISTS " + tableName, confOverlay);
serviceClient.closeSession(sessHandle);
}

Expand Down
38 changes: 18 additions & 20 deletions ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,27 +186,26 @@ private BaseSemanticAnalyzer analyze() throws Exception {
Hive.get().getMSC().flushCache();

boolean executeHooks = driverContext.getHookRunner().hasPreAnalyzeHooks();
QueryState queryState = driverContext.getQueryState();

HiveSemanticAnalyzerHookContext hookCtx = new HiveSemanticAnalyzerHookContextImpl();
if (executeHooks) {
hookCtx.setConf(driverContext.getConf());
hookCtx.setUserName(SessionState.get().getUserName());
hookCtx.setIpAddress(SessionState.get().getUserIpAddress());
hookCtx.setCommand(context.getCmd());
hookCtx.setHiveOperation(driverContext.getQueryState().getHiveOperation());
hookCtx.setHiveOperation(queryState.getHiveOperation());

tree = driverContext.getHookRunner().runPreAnalyzeHooks(hookCtx, tree);
}
queryState.setValidTxnList(this::openTxnAndGetValidTxnList);

// SemanticAnalyzerFactory also sets the hive operation in query state
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(driverContext.getQueryState(), tree);

HiveOperation hiveOperation = driverContext.getQueryState().getHiveOperation();
sem.setValidTxnList(this::openTxnAndGetValidTxnList);

if (!driverContext.isRetrial() && driverContext.getCompactorTxnId() <= 0) {
if (HiveOperation.REPLDUMP == hiveOperation) {
setLastReplIdForDump(driverContext.getQueryState().getConf());
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);

if (!driverContext.isRetrial()) {
if (HiveOperation.REPLDUMP == queryState.getHiveOperation()) {
setLastReplIdForDump(queryState.getConf());
}
driverContext.setValidTxnListsGenerated(false);
}
Expand Down Expand Up @@ -234,10 +233,11 @@ private BaseSemanticAnalyzer analyze() throws Exception {
// validate the plan
sem.validate();

if (sem.hasAcidResourcesInQuery() || HiveOperation.START_TRANSACTION == hiveOperation) {
if (HiveOperation.START_TRANSACTION == queryState.getHiveOperation()
|| sem.hasAcidResourcesInQuery()) {
openTxnAndGetValidTxnList();
}
verifyTxnState(hiveOperation);
verifyTxnState(queryState.getHiveOperation());

perfLogger.perfLogEnd(CLASS_NAME, PerfLogger.ANALYZE);
return sem;
Expand All @@ -260,10 +260,11 @@ private void setLastReplIdForDump(HiveConf conf) throws HiveException, TExceptio
}

private String openTxnAndGetValidTxnList() {
if (!driverContext.isRetrial() && driverContext.getCompactorTxnId() <= 0) {
if (!driverContext.isRetrial() && !SessionState.get().isCompaction()) {
HiveTxnManager txnMgr = driverContext.getTxnManager();
try {
openTransaction(getTxnMgr());
generateValidTxnList(getTxnMgr());
openTransaction(txnMgr);
generateValidTxnList(txnMgr);
} catch (Exception e) {
throw new RuntimeException("Failed to open a new transaction", e);
}
Expand Down Expand Up @@ -323,7 +324,7 @@ private boolean startImplicitTxn() {

private void verifyTxnState(HiveOperation operation) throws LockException {
if (operation != null && operation.isRequiresOpenTransaction()
&& !getTxnMgr().isTxnOpen()) {
&& !driverContext.getTxnManager().isTxnOpen()) {
throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, operation.getOperationName());
}
}
Expand All @@ -339,17 +340,14 @@ private void generateValidTxnList(HiveTxnManager txnMgr) throws LockException {
ValidTxnList txnList = txnMgr.getValidTxns();
// Write the current set of valid transactions into the conf file
LOG.debug("Encoding valid txns info " + txnList.toString() + " txnid:" + txnMgr.getCurrentTxnId());
driverContext.setValidTxnList(txnList.toString());
driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY, txnList.toString());
driverContext.setValidTxnListsGenerated(true);
} catch (LockException e) {
LOG.error("Exception while acquiring valid txn list", e);
throw e;
}
}
}

private HiveTxnManager getTxnMgr(){
return driverContext.getTxnManager();
}

private QueryPlan createPlan(BaseSemanticAnalyzer sem) {
// get the output schema
Expand Down
19 changes: 2 additions & 17 deletions ql/src/java/org/apache/hadoop/hive/ql/Driver.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.cache.results.CacheUsage;
import org.apache.hadoop.hive.ql.cache.results.QueryResultsCache;
import org.apache.hadoop.hive.ql.ddl.DDLDesc;
import org.apache.hadoop.hive.ql.exec.ExplainTask;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.exec.Task;
Expand Down Expand Up @@ -107,17 +106,6 @@ public Driver(QueryState queryState, QueryInfo queryInfo) {
this(queryState, queryInfo, null);
}

public Driver(QueryState queryState, ValidWriteIdList compactionWriteIds, long compactorTxnId) {
this(queryState);
driverContext.setCompactionWriteIds(compactionWriteIds);
driverContext.setCompactorTxnId(compactorTxnId);
}

public Driver(QueryState queryState, long analyzeTableWriteId) {
this(queryState);
driverContext.setAnalyzeTableWriteId(analyzeTableWriteId);
}

public Driver(QueryState queryState, QueryInfo queryInfo, HiveTxnManager txnManager) {
driverContext = new DriverContext(queryState, queryInfo, new HookRunner(queryState.getConf(), CONSOLE),
txnManager);
Expand Down Expand Up @@ -282,11 +270,8 @@ private boolean validateTxnList() throws CommandProcessorException {
driverContext.setRetrial(true);
driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY,
driverContext.getTxnManager().getValidTxns().toString());

DDLDesc.DDLDescWithWriteId acidDdlDesc = driverContext.getPlan().getAcidDdlDesc();
boolean hasAcidDdl = acidDdlDesc != null && acidDdlDesc.mayNeedWriteId();

if (driverContext.getPlan().hasReadWriteAcidInQuery() || hasAcidDdl) {

if (driverContext.getPlan().hasAcidResourcesInQuery()) {
compileInternal(context.getCmd(), true);
driverTxnHandler.recordValidWriteIds();
driverTxnHandler.setWriteIdForAcidFileSinks();
Expand Down
32 changes: 0 additions & 32 deletions ql/src/java/org/apache/hadoop/hive/ql/DriverContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ public class DriverContext {

private CacheUsage cacheUsage;
private CacheEntry usedCacheEntry;
private ValidWriteIdList compactionWriteIds = null;
private long compactorTxnId = 0;
private long analyzeTableWriteId = 0;

private boolean retrial = false;

Expand Down Expand Up @@ -185,11 +182,6 @@ public void setValidTxnListsGenerated(boolean validTxnListsGenerated) {
this.validTxnListsGenerated = validTxnListsGenerated;
}

public void setValidTxnList(String txnList) {
conf.set(ValidTxnList.VALID_TXNS_KEY, txnList);
this.validTxnListsGenerated = true;
}

public CacheUsage getCacheUsage() {
return cacheUsage;
}
Expand All @@ -206,30 +198,6 @@ public void setUsedCacheEntry(CacheEntry usedCacheEntry) {
this.usedCacheEntry = usedCacheEntry;
}

public ValidWriteIdList getCompactionWriteIds() {
return compactionWriteIds;
}

public void setCompactionWriteIds(ValidWriteIdList compactionWriteIds) {
this.compactionWriteIds = compactionWriteIds;
}

public long getCompactorTxnId() {
return compactorTxnId;
}

public void setCompactorTxnId(long compactorTxnId) {
this.compactorTxnId = compactorTxnId;
}

public long getAnalyzeTableWriteId() {
return analyzeTableWriteId;
}

public void setAnalyzeTableWriteId(long analyzeTableWriteId) {
this.analyzeTableWriteId = analyzeTableWriteId;
}

public boolean isRetrial() {
return retrial;
}
Expand Down
Loading

0 comments on commit 9251359

Please sign in to comment.