-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-28916: Fix unbalannced calls in ObjectStore rollback transaction #5780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -914,23 +914,24 @@ public void create_catalog(CreateCatalogRequest rqst) | |
throw new InvalidObjectException("You must specify a path for the catalog"); | ||
} | ||
|
||
RawStore ms = getMS(); | ||
Path catPath = new Path(catalog.getLocationUri()); | ||
boolean madeDir = false; | ||
Map<String, String> transactionalListenersResponses = Collections.emptyMap(); | ||
try { | ||
firePreEvent(new PreCreateCatalogEvent(this, catalog)); | ||
if (!wh.isDir(catPath)) { | ||
if (!wh.mkdirs(catPath)) { | ||
throw new MetaException("Unable to create catalog path " + catPath + | ||
", failed to create catalog " + catalog.getName()); | ||
} | ||
madeDir = true; | ||
firePreEvent(new PreCreateCatalogEvent(this, catalog)); | ||
if (!wh.isDir(catPath)) { | ||
if (!wh.mkdirs(catPath)) { | ||
throw new MetaException("Unable to create catalog path " + catPath + | ||
", failed to create catalog " + catalog.getName()); | ||
} | ||
madeDir = true; | ||
} | ||
|
||
RawStore ms = getMS(); | ||
try { | ||
ms.openTransaction(); | ||
// set the create time of catalog | ||
long time = System.currentTimeMillis() / 1000; | ||
catalog.setCreateTime((int) time); | ||
ms.openTransaction(); | ||
ms.createCatalog(catalog); | ||
|
||
// Create a default database inside the catalog | ||
|
@@ -964,7 +965,6 @@ public void create_catalog(CreateCatalogRequest rqst) | |
transactionalListenersResponses, ms); | ||
} | ||
} | ||
success = true; | ||
} catch (AlreadyExistsException|InvalidObjectException|MetaException e) { | ||
ex = e; | ||
throw e; | ||
|
@@ -983,12 +983,11 @@ public void alter_catalog(AlterCatalogRequest rqst) throws TException { | |
GetCatalogResponse oldCat = null; | ||
|
||
try { | ||
ms.openTransaction(); | ||
oldCat = get_catalog(new GetCatalogRequest(rqst.getName())); | ||
// Above should have thrown NoSuchObjectException if there is no such catalog | ||
assert oldCat != null && oldCat.getCatalog() != null; | ||
firePreEvent(new PreAlterCatalogEvent(oldCat.getCatalog(), rqst.getNewCat(), this)); | ||
|
||
ms.openTransaction(); | ||
ms.alterCatalog(rqst.getName(), rqst.getNewCat()); | ||
|
||
if (!transactionalListeners.isEmpty()) { | ||
|
@@ -1134,8 +1133,6 @@ private void dropCatalogCore(String catName, boolean ifExists) | |
} catch (NoSuchObjectException e) { | ||
if (!ifExists) { | ||
throw new NoSuchObjectException(e.getMessage()); | ||
} else { | ||
ms.rollbackTransaction(); | ||
} | ||
} finally { | ||
if (success) { | ||
|
@@ -1213,6 +1210,7 @@ private void create_database_core(RawStore ms, final Database db) | |
boolean madeExternalDir = false; | ||
boolean isReplicated = isDbReplicationTarget(db); | ||
Map<String, String> transactionalListenersResponses = Collections.emptyMap(); | ||
boolean openTransaction = false; | ||
try { | ||
firePreEvent(new PreCreateDatabaseEvent(db, this)); | ||
//reinstate location uri for metastore db. | ||
|
@@ -1286,7 +1284,7 @@ private void create_database_core(RawStore ms, final Database db) | |
} | ||
} | ||
|
||
ms.openTransaction(); | ||
openTransaction = ms.openTransaction(); | ||
ms.createDatabase(db); | ||
|
||
if (!transactionalListeners.isEmpty()) { | ||
|
@@ -1299,7 +1297,9 @@ private void create_database_core(RawStore ms, final Database db) | |
success = ms.commitTransaction(); | ||
} finally { | ||
if (!success) { | ||
ms.rollbackTransaction(); | ||
if (openTransaction) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why are you checking for an
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. won't |
||
ms.rollbackTransaction(); | ||
} | ||
|
||
if (db.getCatalogName() != null && !db.getCatalogName(). | ||
equals(Warehouse.DEFAULT_CATALOG_NAME)) { | ||
|
@@ -1928,9 +1928,8 @@ private void create_dataconnector_core(RawStore ms, final DataConnector connecto | |
boolean success = false; | ||
Map<String, String> transactionalListenersResponses = Collections.emptyMap(); | ||
try { | ||
firePreEvent(new PreCreateDataConnectorEvent(connector, this)); | ||
|
||
ms.openTransaction(); | ||
firePreEvent(new PreCreateDataConnectorEvent(connector, this)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the point? We open a txn, throw an exception in |
||
ms.createDataConnector(connector); | ||
|
||
if (!transactionalListeners.isEmpty()) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -141,6 +141,22 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam | |
throw new InvalidOperationException(errMsg); | ||
} | ||
|
||
// Switching tables between catalogs is not allowed. | ||
if (!catName.equalsIgnoreCase(newt.getCatName())) { | ||
throw new InvalidOperationException("Tables cannot be moved between catalogs, old catalog" + | ||
catName + ", new catalog " + newt.getCatName()); | ||
} | ||
|
||
boolean rename = false; | ||
// check if table with the new name already exists | ||
if (!newTblName.equals(name) || !newDbName.equals(dbname)) { | ||
if (msdb.getTable(catName, newDbName, newTblName, null) != null) { | ||
throw new InvalidOperationException("new table " + newDbName | ||
+ "." + newTblName + " already exists"); | ||
} | ||
rename = true; | ||
} | ||
|
||
Path srcPath = null; | ||
FileSystem srcFs; | ||
Path destPath = null; | ||
|
@@ -158,29 +174,6 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam | |
Map<String, String> txnAlterTableEventResponses = Collections.emptyMap(); | ||
|
||
try { | ||
boolean rename = false; | ||
List<Partition> parts; | ||
|
||
// Switching tables between catalogs is not allowed. | ||
if (!catName.equalsIgnoreCase(newt.getCatName())) { | ||
throw new InvalidOperationException("Tables cannot be moved between catalogs, old catalog" + | ||
catName + ", new catalog " + newt.getCatName()); | ||
} | ||
|
||
// check if table with the new name already exists | ||
if (!newTblName.equals(name) || !newDbName.equals(dbname)) { | ||
if (msdb.getTable(catName, newDbName, newTblName, null) != null) { | ||
throw new InvalidOperationException("new table " + newDbName | ||
+ "." + newTblName + " already exists"); | ||
} | ||
rename = true; | ||
} | ||
|
||
String expectedKey = environmentContext != null && environmentContext.getProperties() != null ? | ||
environmentContext.getProperties().get(hive_metastoreConstants.EXPECTED_PARAMETER_KEY) : null; | ||
String expectedValue = environmentContext != null && environmentContext.getProperties() != null ? | ||
environmentContext.getProperties().get(hive_metastoreConstants.EXPECTED_PARAMETER_VALUE) : null; | ||
|
||
msdb.openTransaction(); | ||
// get old table | ||
// Note: we don't verify stats here; it's done below in alterTableUpdateTableColumnStats. | ||
|
@@ -191,6 +184,10 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam | |
TableName.getQualified(catName, dbname, name) + " doesn't exist"); | ||
} | ||
|
||
String expectedKey = environmentContext != null && environmentContext.getProperties() != null ? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why move non-transactional logic under the txn boundary? |
||
environmentContext.getProperties().get(hive_metastoreConstants.EXPECTED_PARAMETER_KEY) : null; | ||
String expectedValue = environmentContext != null && environmentContext.getProperties() != null ? | ||
environmentContext.getProperties().get(hive_metastoreConstants.EXPECTED_PARAMETER_VALUE) : null; | ||
if (expectedKey != null && expectedValue != null) { | ||
String newValue = newt.getParameters().get(expectedKey); | ||
if (newValue == null) { | ||
|
@@ -263,6 +260,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam | |
List<ColumnStatistics> columnStatistics = getColumnStats(msdb, oldt); | ||
columnStatistics = deleteTableColumnStats(msdb, oldt, newt, columnStatistics); | ||
|
||
List<Partition> parts; | ||
if (!isRenameIcebergTable && | ||
(replDataLocationChanged || renamedManagedTable || renamedTranslatedToExternalTable || | ||
renamedExternalTable)) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -561,17 +561,10 @@ public void shutdown() { | |
@Override | ||
public boolean openTransaction() { | ||
openTrasactionCalls++; | ||
if (openTrasactionCalls == 1) { | ||
if (currentTransaction == null || !currentTransaction.isActive()){ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we check for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To open transaction if it's not opened. It would reopen a new transaction if has already called rollback. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the interior txn was rolled back, should we rollback everything? |
||
currentTransaction = pm.currentTransaction(); | ||
currentTransaction.begin(); | ||
transactionStatus = TXN_STATUS.OPEN; | ||
} else { | ||
// openTransactionCalls > 1 means this is an interior transaction | ||
// We should already have a transaction created that is active. | ||
if ((currentTransaction == null) || (!currentTransaction.isActive())){ | ||
throw new RuntimeException("openTransaction called in an interior" | ||
+ " transaction scope, but currentTransaction is not active."); | ||
} | ||
} | ||
|
||
boolean result = currentTransaction.isActive(); | ||
|
@@ -629,13 +622,13 @@ public boolean commitTransaction() { | |
LOG.error("Unbalanced calls to open/commit Transaction", e); | ||
throw e; | ||
} | ||
openTrasactionCalls--; | ||
debugLog("Commit transaction: count = " + openTrasactionCalls + ", isactive "+ currentTransaction.isActive()); | ||
|
||
if ((openTrasactionCalls == 0) && currentTransaction.isActive()) { | ||
debugLog("Commit transaction: count = " + openTrasactionCalls + ", isactive "+ currentTransaction.isActive()); | ||
if ((openTrasactionCalls == 1) && currentTransaction.isActive()) { | ||
transactionStatus = TXN_STATUS.COMMITED; | ||
currentTransaction.commit(); | ||
} | ||
openTrasactionCalls--; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if commit throws RuntimeException, db txn would be closed, but openTrasactionCalls won't be 0 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If commit throw RuntimeException, it should call rollback in the finally block of previous layer where the openTransactionCalls would be decreased. |
||
return true; | ||
} | ||
|
||
|
@@ -662,7 +655,7 @@ public void rollbackTransaction() { | |
currentTransaction.rollback(); | ||
} | ||
} finally { | ||
openTrasactionCalls = 0; | ||
openTrasactionCalls--; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. after rollback openTrasactionCalls should be 0 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our unit test could not pass if is 0 here, the key point seems that if we should support such usage: openTransaction()
// new function1
{
openTransaction()
commitTransaction() # place1
}
// new function2
{
openTransaction()
commitTransaction()
}
commitTransaction() # place2 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that is what we support now, isn't ? the inner transactions are somewhat similar to @transactional(REQUIRED) annotation |
||
transactionStatus = TXN_STATUS.ROLLBACK; | ||
// remove all detached objects from the cache, since the transaction is | ||
// being rolled back they are no longer relevant, and this prevents them | ||
|
@@ -3254,31 +3247,38 @@ protected List<Partition> getJdoResult(GetHelper<List<Partition>> ctx) throws Me | |
public Partition getPartitionWithAuth(String catName, String dbName, String tblName, | ||
List<String> partVals, String user_name, List<String> group_names) | ||
throws NoSuchObjectException, MetaException, InvalidObjectException { | ||
boolean success = false; | ||
try { | ||
openTransaction(); | ||
MPartition mpart = getMPartition(catName, dbName, tblName, partVals, null); | ||
if (mpart == null) { | ||
commitTransaction(); | ||
throw new NoSuchObjectException("partition values=" | ||
+ partVals.toString()); | ||
return new GetHelper<Partition>(catName, dbName, tblName, false, true) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why refactor? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may throw exception, so we need such logic: try {
openTransaction();
success = commitTransaction();
} finally {
if (!success) rollbackTransaction();
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does it even run in txn? where do we open it? |
||
@Override | ||
protected String describeResult() { | ||
return "partition with auth"; | ||
} | ||
MTable mtbl = mpart.getTable(); | ||
|
||
Partition part = convertToPart(catName, dbName, tblName, mpart, TxnUtils.isAcidTable(mtbl.getParameters())); | ||
if ("TRUE".equalsIgnoreCase(mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))) { | ||
String partName = Warehouse.makePartName(this.convertToFieldSchemas(mtbl | ||
.getPartitionKeys()), partVals); | ||
PrincipalPrivilegeSet partAuth = this.getPartitionPrivilegeSet(catName, dbName, | ||
tblName, partName, user_name, group_names); | ||
part.setPrivileges(partAuth); | ||
@Override | ||
protected Partition getSqlResult(GetHelper<Partition> ctx) throws MetaException { | ||
throw new UnsupportedOperationException("UnsupportedOperationException"); | ||
} | ||
|
||
success = commitTransaction(); | ||
return part; | ||
} finally { | ||
rollbackAndCleanup(success, null); | ||
} | ||
@Override | ||
protected Partition getJdoResult(GetHelper<Partition> ctx) | ||
throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { | ||
MPartition mpart = getMPartition(catName, dbName, tblName, partVals, null); | ||
if (mpart == null) { | ||
throw new NoSuchObjectException("partition values=" | ||
+ partVals.toString()); | ||
} | ||
MTable mtbl = mpart.getTable(); | ||
|
||
Partition part = convertToPart(catName, dbName, tblName, mpart, TxnUtils.isAcidTable(mtbl.getParameters())); | ||
if ("TRUE".equalsIgnoreCase(mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))) { | ||
String partName = Warehouse.makePartName(convertToFieldSchemas(mtbl | ||
.getPartitionKeys()), partVals); | ||
PrincipalPrivilegeSet partAuth = getPartitionPrivilegeSet(catName, dbName, | ||
tblName, partName, user_name, group_names); | ||
part.setPrivileges(partAuth); | ||
} | ||
return part; | ||
} | ||
}.run(false); | ||
} | ||
|
||
private List<Partition> convertToParts(String catName, String dbName, String tblName, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to open txn before the
PreAlterCatalogEvent
? are those transactional as well?