Skip to content

HIVE-28916: Fix unbalannced calls in ObjectStore rollback transaction #5780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -914,23 +914,24 @@ public void create_catalog(CreateCatalogRequest rqst)
throw new InvalidObjectException("You must specify a path for the catalog");
}

RawStore ms = getMS();
Path catPath = new Path(catalog.getLocationUri());
boolean madeDir = false;
Map<String, String> transactionalListenersResponses = Collections.emptyMap();
try {
firePreEvent(new PreCreateCatalogEvent(this, catalog));
if (!wh.isDir(catPath)) {
if (!wh.mkdirs(catPath)) {
throw new MetaException("Unable to create catalog path " + catPath +
", failed to create catalog " + catalog.getName());
}
madeDir = true;
firePreEvent(new PreCreateCatalogEvent(this, catalog));
if (!wh.isDir(catPath)) {
if (!wh.mkdirs(catPath)) {
throw new MetaException("Unable to create catalog path " + catPath +
", failed to create catalog " + catalog.getName());
}
madeDir = true;
}

RawStore ms = getMS();
try {
ms.openTransaction();
// set the create time of catalog
long time = System.currentTimeMillis() / 1000;
catalog.setCreateTime((int) time);
ms.openTransaction();
ms.createCatalog(catalog);

// Create a default database inside the catalog
Expand Down Expand Up @@ -964,7 +965,6 @@ public void create_catalog(CreateCatalogRequest rqst)
transactionalListenersResponses, ms);
}
}
success = true;
} catch (AlreadyExistsException|InvalidObjectException|MetaException e) {
ex = e;
throw e;
Expand All @@ -983,12 +983,11 @@ public void alter_catalog(AlterCatalogRequest rqst) throws TException {
GetCatalogResponse oldCat = null;

try {
ms.openTransaction();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to open txn before the PreAlterCatalogEvent? are those transactional as well?

oldCat = get_catalog(new GetCatalogRequest(rqst.getName()));
// Above should have thrown NoSuchObjectException if there is no such catalog
assert oldCat != null && oldCat.getCatalog() != null;
firePreEvent(new PreAlterCatalogEvent(oldCat.getCatalog(), rqst.getNewCat(), this));

ms.openTransaction();
ms.alterCatalog(rqst.getName(), rqst.getNewCat());

if (!transactionalListeners.isEmpty()) {
Expand Down Expand Up @@ -1134,8 +1133,6 @@ private void dropCatalogCore(String catName, boolean ifExists)
} catch (NoSuchObjectException e) {
if (!ifExists) {
throw new NoSuchObjectException(e.getMessage());
} else {
ms.rollbackTransaction();
}
} finally {
if (success) {
Expand Down Expand Up @@ -1213,6 +1210,7 @@ private void create_database_core(RawStore ms, final Database db)
boolean madeExternalDir = false;
boolean isReplicated = isDbReplicationTarget(db);
Map<String, String> transactionalListenersResponses = Collections.emptyMap();
boolean openTransaction = false;
try {
firePreEvent(new PreCreateDatabaseEvent(db, this));
//reinstate location uri for metastore db.
Expand Down Expand Up @@ -1286,7 +1284,7 @@ private void create_database_core(RawStore ms, final Database db)
}
}

ms.openTransaction();
openTransaction = ms.openTransaction();
ms.createDatabase(db);

if (!transactionalListeners.isEmpty()) {
Expand All @@ -1299,7 +1297,9 @@ private void create_database_core(RawStore ms, final Database db)
success = ms.commitTransaction();
} finally {
if (!success) {
ms.rollbackTransaction();
if (openTransaction) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you checking for an openTransaction flag here? I don't see the same check in other places.

if (isActiveTransaction() && transactionStatus != TXN_STATUS.ROLLBACK) {
    currentTransaction.rollback();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In create_database_core it may throw exception before calling openTransaction(), so it may not need call rollbackTransaction() in finally block.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't isActiveTransaction() return false?

ms.rollbackTransaction();
}

if (db.getCatalogName() != null && !db.getCatalogName().
equals(Warehouse.DEFAULT_CATALOG_NAME)) {
Expand Down Expand Up @@ -1928,9 +1928,8 @@ private void create_dataconnector_core(RawStore ms, final DataConnector connecto
boolean success = false;
Map<String, String> transactionalListenersResponses = Collections.emptyMap();
try {
firePreEvent(new PreCreateDataConnectorEvent(connector, this));

ms.openTransaction();
firePreEvent(new PreCreateDataConnectorEvent(connector, this));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, firePreEvent may throw exception and not call openTransaction().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point? We open a txn, throw an exception in firePreEvent, and then abort txn?

ms.createDataConnector(connector);

if (!transactionalListeners.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam
throw new InvalidOperationException(errMsg);
}

// Switching tables between catalogs is not allowed.
if (!catName.equalsIgnoreCase(newt.getCatName())) {
throw new InvalidOperationException("Tables cannot be moved between catalogs, old catalog" +
catName + ", new catalog " + newt.getCatName());
}

boolean rename = false;
// check if table with the new name already exists
if (!newTblName.equals(name) || !newDbName.equals(dbname)) {
if (msdb.getTable(catName, newDbName, newTblName, null) != null) {
throw new InvalidOperationException("new table " + newDbName
+ "." + newTblName + " already exists");
}
rename = true;
}

Path srcPath = null;
FileSystem srcFs;
Path destPath = null;
Expand All @@ -158,29 +174,6 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam
Map<String, String> txnAlterTableEventResponses = Collections.emptyMap();

try {
boolean rename = false;
List<Partition> parts;

// Switching tables between catalogs is not allowed.
if (!catName.equalsIgnoreCase(newt.getCatName())) {
throw new InvalidOperationException("Tables cannot be moved between catalogs, old catalog" +
catName + ", new catalog " + newt.getCatName());
}

// check if table with the new name already exists
if (!newTblName.equals(name) || !newDbName.equals(dbname)) {
if (msdb.getTable(catName, newDbName, newTblName, null) != null) {
throw new InvalidOperationException("new table " + newDbName
+ "." + newTblName + " already exists");
}
rename = true;
}

String expectedKey = environmentContext != null && environmentContext.getProperties() != null ?
environmentContext.getProperties().get(hive_metastoreConstants.EXPECTED_PARAMETER_KEY) : null;
String expectedValue = environmentContext != null && environmentContext.getProperties() != null ?
environmentContext.getProperties().get(hive_metastoreConstants.EXPECTED_PARAMETER_VALUE) : null;

msdb.openTransaction();
// get old table
// Note: we don't verify stats here; it's done below in alterTableUpdateTableColumnStats.
Expand All @@ -191,6 +184,10 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam
TableName.getQualified(catName, dbname, name) + " doesn't exist");
}

String expectedKey = environmentContext != null && environmentContext.getProperties() != null ?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move non-transactional logic under the txn boundary?

environmentContext.getProperties().get(hive_metastoreConstants.EXPECTED_PARAMETER_KEY) : null;
String expectedValue = environmentContext != null && environmentContext.getProperties() != null ?
environmentContext.getProperties().get(hive_metastoreConstants.EXPECTED_PARAMETER_VALUE) : null;
if (expectedKey != null && expectedValue != null) {
String newValue = newt.getParameters().get(expectedKey);
if (newValue == null) {
Expand Down Expand Up @@ -263,6 +260,7 @@ public void alterTable(RawStore msdb, Warehouse wh, String catName, String dbnam
List<ColumnStatistics> columnStatistics = getColumnStats(msdb, oldt);
columnStatistics = deleteTableColumnStats(msdb, oldt, newt, columnStatistics);

List<Partition> parts;
if (!isRenameIcebergTable &&
(replDataLocationChanged || renamedManagedTable || renamedTranslatedToExternalTable ||
renamedExternalTable)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,17 +561,10 @@ public void shutdown() {
@Override
public boolean openTransaction() {
openTrasactionCalls++;
if (openTrasactionCalls == 1) {
if (currentTransaction == null || !currentTransaction.isActive()){
Copy link
Member

@deniskuzZ deniskuzZ May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we check for !currentTransaction.isActive()? What if we called rollback on the interior txn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To open transaction if it's not opened. It would reopen a new transaction if has already called rollback.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the interior txn was rolled back, should we rollback everything?

currentTransaction = pm.currentTransaction();
currentTransaction.begin();
transactionStatus = TXN_STATUS.OPEN;
} else {
// openTransactionCalls > 1 means this is an interior transaction
// We should already have a transaction created that is active.
if ((currentTransaction == null) || (!currentTransaction.isActive())){
throw new RuntimeException("openTransaction called in an interior"
+ " transaction scope, but currentTransaction is not active.");
}
}

boolean result = currentTransaction.isActive();
Expand Down Expand Up @@ -629,13 +622,13 @@ public boolean commitTransaction() {
LOG.error("Unbalanced calls to open/commit Transaction", e);
throw e;
}
openTrasactionCalls--;
debugLog("Commit transaction: count = " + openTrasactionCalls + ", isactive "+ currentTransaction.isActive());

if ((openTrasactionCalls == 0) && currentTransaction.isActive()) {
debugLog("Commit transaction: count = " + openTrasactionCalls + ", isactive "+ currentTransaction.isActive());
if ((openTrasactionCalls == 1) && currentTransaction.isActive()) {
transactionStatus = TXN_STATUS.COMMITED;
currentTransaction.commit();
}
openTrasactionCalls--;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if commit throws RuntimeException, db txn would be closed, but openTrasactionCalls won't be 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If commit throw RuntimeException, it should call rollback in the finally block of previous layer where the openTransactionCalls would be decreased.

return true;
}

Expand All @@ -662,7 +655,7 @@ public void rollbackTransaction() {
currentTransaction.rollback();
}
} finally {
openTrasactionCalls = 0;
openTrasactionCalls--;
Copy link
Member

@deniskuzZ deniskuzZ May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after rollback openTrasactionCalls should be 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our unit test could not pass if is 0 here, the key point seems that if we should support such usage:

openTransaction()
  // new function1
  {
    openTransaction()
    commitTransaction() # place1
  }

  // new function2
  {
    openTransaction()
    commitTransaction()
  }
commitTransaction() # place2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is what we support now, isn't ? the inner transactions are somewhat similar to @transactional(REQUIRED) annotation

transactionStatus = TXN_STATUS.ROLLBACK;
// remove all detached objects from the cache, since the transaction is
// being rolled back they are no longer relevant, and this prevents them
Expand Down Expand Up @@ -3254,31 +3247,38 @@ protected List<Partition> getJdoResult(GetHelper<List<Partition>> ctx) throws Me
public Partition getPartitionWithAuth(String catName, String dbName, String tblName,
List<String> partVals, String user_name, List<String> group_names)
throws NoSuchObjectException, MetaException, InvalidObjectException {
boolean success = false;
try {
openTransaction();
MPartition mpart = getMPartition(catName, dbName, tblName, partVals, null);
if (mpart == null) {
commitTransaction();
throw new NoSuchObjectException("partition values="
+ partVals.toString());
return new GetHelper<Partition>(catName, dbName, tblName, false, true) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why refactor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may throw exception, so we need such logic:

try {
  openTransaction();

  success = commitTransaction();
} finally {
  if (!success) rollbackTransaction();
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it even run in txn? where do we open it?

@Override
protected String describeResult() {
return "partition with auth";
}
MTable mtbl = mpart.getTable();

Partition part = convertToPart(catName, dbName, tblName, mpart, TxnUtils.isAcidTable(mtbl.getParameters()));
if ("TRUE".equalsIgnoreCase(mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))) {
String partName = Warehouse.makePartName(this.convertToFieldSchemas(mtbl
.getPartitionKeys()), partVals);
PrincipalPrivilegeSet partAuth = this.getPartitionPrivilegeSet(catName, dbName,
tblName, partName, user_name, group_names);
part.setPrivileges(partAuth);
@Override
protected Partition getSqlResult(GetHelper<Partition> ctx) throws MetaException {
throw new UnsupportedOperationException("UnsupportedOperationException");
}

success = commitTransaction();
return part;
} finally {
rollbackAndCleanup(success, null);
}
@Override
protected Partition getJdoResult(GetHelper<Partition> ctx)
throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
MPartition mpart = getMPartition(catName, dbName, tblName, partVals, null);
if (mpart == null) {
throw new NoSuchObjectException("partition values="
+ partVals.toString());
}
MTable mtbl = mpart.getTable();

Partition part = convertToPart(catName, dbName, tblName, mpart, TxnUtils.isAcidTable(mtbl.getParameters()));
if ("TRUE".equalsIgnoreCase(mtbl.getParameters().get("PARTITION_LEVEL_PRIVILEGE"))) {
String partName = Warehouse.makePartName(convertToFieldSchemas(mtbl
.getPartitionKeys()), partVals);
PrincipalPrivilegeSet partAuth = getPartitionPrivilegeSet(catName, dbName,
tblName, partName, user_name, group_names);
part.setPrivileges(partAuth);
}
return part;
}
}.run(false);
}

private List<Partition> convertToParts(String catName, String dbName, String tblName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,58 @@ protected Object getJdoResult(ObjectStore.GetHelper ctx) throws MetaException, N
}
}

@Test
public void testNestedTransaction() throws Exception {
List<String> partNames = Arrays.asList("test_part_col=a0", "test_part_col=a1", "test_part_col=a2");
createPartitionedTable(true, false);
Assert.assertEquals(3, objectStore.getPartitionCount());

objectStore.openTransaction();
try {
objectStore.new GetHelper<Object>(DEFAULT_CATALOG_NAME, DB1, TABLE1, true, false) {
@Override
protected String describeResult() {
return "Test nested transaction";
}
@Override
protected Object getSqlResult(ObjectStore.GetHelper<Object> ctx) throws MetaException {
try (AutoCloseable c = deadline()) {
objectStore.dropPartitionsInternal(ctx.catName, ctx.dbName, ctx.tblName, partNames, true,
false);
Assert.assertEquals(0, objectStore.getPartitionCount());
} catch (Exception e) {
throw new MetaException(e.getMessage());
}
throw new MetaException("fallback it");
}

@Override
protected Object getJdoResult(ObjectStore.GetHelper<Object> ctx)
throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException {
throw new UnsupportedOperationException("Unimplemented method 'getJdoResult'");
}
}.run(false);
} catch (MetaException e) {
// expected
Assert.assertEquals(3, objectStore.getPartitionCount());
}

// new operation after rollback
{
objectStore.openTransaction();
try (AutoCloseable c = deadline()) {
objectStore.dropPartitionsInternal(DEFAULT_CATALOG_NAME, DB1, TABLE1,
Arrays.asList("test_part_col=a1"), true, true);
} catch (Exception e) {
throw new MetaException(e.getMessage());
}
objectStore.commitTransaction();
}
objectStore.commitTransaction();

Assert.assertEquals(2, objectStore.getPartitionCount());
}

/**
* Helper method to check whether the Java system properties were set correctly in {@link ObjectStore#configureSSL(Configuration)}
* @param useSSL whether or not SSL is enabled
Expand Down
Loading