Skip to content

HIVE-28956 : Use msdb.alterPartitions() API and implement batching for alter table add column cascade command #5814

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

vikramahuja1001
Copy link
Contributor

@vikramahuja1001 vikramahuja1001 commented May 14, 2025

What changes were proposed in this pull request?

Currently Alter table add column cascade uses msdb.alterPartition API. This API only JDO based implementation. Instead of this use msdb.alterPartitions which supports both DirectSql and JDO Implementation and use batching as well.

Why are the changes needed?

DirectSql bein more optimized than JDO can improve performance for tables with very high number of partitions and very high number of columns.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Tested this on a 3 node cluster on a table with 5000+ partitions and 900+ columns.

@vikramahuja1001 vikramahuja1001 changed the title HIVE-28956 : Implement DirectSql for alter table add column cascade command. HIVE-28956 : Use msdb.alterPartitions() API and implement batching for alter table add column cascade command May 22, 2025
@vikramahuja1001
Copy link
Contributor Author

@deniskuzZ , i have made the necessary changes. Could you please check the PR once again?

@vikramahuja1001
Copy link
Contributor Author

Cc @zhangbutao , @chinnaraolalam

msdb.alterPartition(
catName, dbname, name, part.getValues(), part, writeIdList);
} else {
String catalogName = catName;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vikramahuja1001, why do we need new local vars? otherwise looks good to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to access these variables within inner class. Cannot directly use catName, dbName. So copied them to effective final temp variables

Copy link
Member

@deniskuzZ deniskuzZ Jun 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vikramahuja1001, would be cleaner if you do this at top level.
Try applying the below patch to your branch

Subject: [PATCH] refactor
---
Index: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java	(revision 2a5cc01ba722cd0026c774ab118f157b169ee9a4)
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java	(date 1749733306367)
@@ -103,9 +103,9 @@
       String name, Table newt, EnvironmentContext environmentContext,
       IHMSHandler handler, String writeIdList)
           throws InvalidOperationException, MetaException {
-    catName = normalizeIdentifier(catName);
-    name = normalizeIdentifier(name);
-    dbname = normalizeIdentifier(dbname);
+    String catalogName = normalizeIdentifier(catName);
+    String tableName = normalizeIdentifier(name);
+    String databaseName = normalizeIdentifier(dbname);
 
     final boolean cascade;
     final boolean replDataLocationChanged;
@@ -136,7 +136,7 @@
     // Validate bucketedColumns in new table
     List<String> bucketColumns = MetaStoreServerUtils.validateBucketColumns(newt.getSd());
     if (CollectionUtils.isNotEmpty(bucketColumns)) {
-      String errMsg = "Bucket columns - " + bucketColumns.toString() + " doesn't match with any table columns";
+      String errMsg = "Bucket columns - " + bucketColumns + " doesn't match with any table columns";
       LOG.error(errMsg);
       throw new InvalidOperationException(errMsg);
     }
@@ -162,14 +162,14 @@
       List<Partition> parts;
 
       // Switching tables between catalogs is not allowed.
-      if (!catName.equalsIgnoreCase(newt.getCatName())) {
+      if (!catalogName.equalsIgnoreCase(newt.getCatName())) {
         throw new InvalidOperationException("Tables cannot be moved between catalogs, old catalog" +
-            catName + ", new catalog " + newt.getCatName());
+            catalogName + ", new catalog " + newt.getCatName());
       }
 
       // check if table with the new name already exists
-      if (!newTblName.equals(name) || !newDbName.equals(dbname)) {
-        if (msdb.getTable(catName, newDbName, newTblName, null) != null) {
+      if (!newTblName.equals(tableName) || !newDbName.equals(databaseName)) {
+        if (msdb.getTable(catalogName, newDbName, newTblName, null) != null) {
           throw new InvalidOperationException("new table " + newDbName
               + "." + newTblName + " already exists");
         }
@@ -184,11 +184,11 @@
       msdb.openTransaction();
       // get old table
       // Note: we don't verify stats here; it's done below in alterTableUpdateTableColumnStats.
-      olddb = msdb.getDatabase(catName, dbname);
-      oldt = msdb.getTable(catName, dbname, name, null);
+      olddb = msdb.getDatabase(catalogName, databaseName);
+      oldt = msdb.getTable(catalogName, databaseName, tableName, null);
       if (oldt == null) {
         throw new InvalidOperationException("table " +
-            TableName.getQualified(catName, dbname, name) + " doesn't exist");
+            TableName.getQualified(catalogName, databaseName, tableName) + " doesn't exist");
       }
 
       if (expectedKey != null && expectedValue != null) {
@@ -226,11 +226,10 @@
       boolean partKeysPartiallyEqual = checkPartialPartKeysEqual(oldt.getPartitionKeys(),
           newt.getPartitionKeys());
 
-      if(!oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())){
+      if (!oldt.getTableType().equals(TableType.VIRTUAL_VIEW.toString())){
         Map<String, String> properties = environmentContext.getProperties();
-        if (properties == null || (properties != null &&
-            !Boolean.parseBoolean(properties.getOrDefault(HiveMetaHook.ALLOW_PARTITION_KEY_CHANGE,
-                "false")))) {
+        if (properties == null || !Boolean.parseBoolean(properties.getOrDefault(HiveMetaHook.ALLOW_PARTITION_KEY_CHANGE,
+                "false"))) {
           if (!partKeysPartiallyEqual) {
             throw new InvalidOperationException("partition keys can not be changed.");
           }
@@ -251,7 +250,7 @@
               || StringUtils.isEmpty(newt.getSd().getLocation()))
           && (!MetaStoreUtils.isExternalTable(oldt));
 
-      Database db = msdb.getDatabase(catName, newDbName);
+      Database db = msdb.getDatabase(catalogName, newDbName);
 
       boolean renamedTranslatedToExternalTable = rename && MetaStoreUtils.isTranslatedToExternalTable(oldt)
           && MetaStoreUtils.isTranslatedToExternalTable(newt);
@@ -286,8 +285,8 @@
           // Same applies to the ACID tables suffixed with the `txnId`, case with `lockless reads`.
           String oldtRelativePath = wh.getDatabaseManagedPath(olddb).toUri()
               .relativize(srcPath.toUri()).toString();
-          boolean tableInSpecifiedLoc = !oldtRelativePath.equalsIgnoreCase(name)
-                  && !oldtRelativePath.equalsIgnoreCase(name + Path.SEPARATOR);
+          boolean tableInSpecifiedLoc = !oldtRelativePath.equalsIgnoreCase(tableName)
+                  && !oldtRelativePath.equalsIgnoreCase(tableName + Path.SEPARATOR);
 
 
           if (renamedTranslatedToExternalTable || !tableInSpecifiedLoc) {
@@ -323,7 +322,7 @@
             try {
               if (destFs.exists(destPath)) {
                 throw new InvalidOperationException("New location for this table " +
-                        TableName.getQualified(catName, newDbName, newTblName) +
+                        TableName.getQualified(catalogName, newDbName, newTblName) +
                         " already exists : " + destPath);
               }
               // check that src exists and also checks permissions necessary, rename src to dest
@@ -332,18 +331,18 @@
                 dataWasMoved = true;
               }
             } catch (IOException | MetaException e) {
-              LOG.error("Alter Table operation for " + dbname + "." + name + " failed.", e);
-              throw new InvalidOperationException("Alter Table operation for " + dbname + "." + name +
+              LOG.error("Alter Table operation for " + databaseName + "." + tableName + " failed.", e);
+              throw new InvalidOperationException("Alter Table operation for " + databaseName + "." + tableName +
                       " failed to move data due to: '" + getSimpleMessage(e)
                       + "' See hive log file for details.");
             }
 
             if (!HiveMetaStore.isRenameAllowed(olddb, db)) {
-              LOG.error("Alter Table operation for " + TableName.getQualified(catName, dbname, name) +
-                      "to new table = " + TableName.getQualified(catName, newDbName, newTblName) + " failed ");
+              LOG.error("Alter Table operation for " + TableName.getQualified(catalogName, databaseName, tableName) +
+                      "to new table = " + TableName.getQualified(catalogName, newDbName, newTblName) + " failed ");
               throw new MetaException("Alter table not allowed for table " +
-                      TableName.getQualified(catName, dbname, name) +
-                      "to new table = " + TableName.getQualified(catName, newDbName, newTblName));
+                      TableName.getQualified(catalogName, databaseName, tableName) +
+                      "to new table = " + TableName.getQualified(catalogName, newDbName, newTblName));
             }
           }
         }
@@ -353,7 +352,7 @@
           String newTblLocPath = dataWasMoved ? destPath.toUri().getPath() : null;
 
           // also the location field in partition
-          parts = msdb.getPartitions(catName, dbname, name, -1);
+          parts = msdb.getPartitions(catalogName, databaseName, tableName, -1);
           for (Partition part : parts) {
             String oldPartLoc = part.getSd().getLocation();
             if (dataWasMoved && oldPartLoc.contains(oldTblLocPath)) {
@@ -366,10 +365,10 @@
             part.setTableName(newTblName);
           }
           // Do not verify stats parameters on a partitioned table.
-          msdb.alterTable(catName, dbname, name, newt, null);
+          msdb.alterTable(catalogName, databaseName, tableName, newt, null);
           int partitionBatchSize = MetastoreConf.getIntVar(handler.getConf(),
               MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
-          String catalogName = catName;
+
           // alterPartition is only for changing the partition location in the table rename
           if (dataWasMoved) {
             Batchable.runBatched(partitionBatchSize, parts, new Batchable<Partition, Void>() {
@@ -384,7 +383,7 @@
           }
           Deadline.checkTimeout();
         } else {
-          msdb.alterTable(catName, dbname, name, newt, writeIdList);
+          msdb.alterTable(catalogName, databaseName, tableName, newt, writeIdList);
         }
       } else {
         // operations other than table rename
@@ -408,10 +407,7 @@
 
           if (runPartitionMetadataUpdate) {
             if (cascade || retainOnColRemoval) {
-              parts = msdb.getPartitions(catName, dbname, name, -1);
-              String catalogName = catName;
-              String databaseName = dbname;
-              String tableName = name;
+              parts = msdb.getPartitions(catalogName, databaseName, tableName, -1);
               Table finalOldt = oldt;
               int partitionBatchSize = MetastoreConf.getIntVar(handler.getConf(),
                       MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
@@ -446,17 +442,17 @@
               });
             } else {
               // clear all column stats to prevent incorract behaviour in case same column is reintroduced
-              TableName tableName = new TableName(catName, dbname, name);
-              msdb.deleteAllPartitionColumnStatistics(tableName, writeIdList);
+              msdb.deleteAllPartitionColumnStatistics(
+                  new TableName(catalogName, databaseName, tableName), writeIdList);
             }
             // Don't validate table-level stats for a partitoned table.
-            msdb.alterTable(catName, dbname, name, newt, null);
+            msdb.alterTable(catalogName, databaseName, tableName, newt, null);
           } else {
             LOG.warn("Alter table not cascaded to partitions.");
-            msdb.alterTable(catName, dbname, name, newt, writeIdList);
+            msdb.alterTable(catalogName, databaseName, tableName, newt, writeIdList);
           }
         } else {
-          msdb.alterTable(catName, dbname, name, newt, writeIdList);
+          msdb.alterTable(catalogName, databaseName, tableName, newt, writeIdList);
         }
       }
 
@@ -488,24 +484,22 @@
         // Txn was committed successfully.
         // If data location is changed in replication flow, then need to delete the old path.
         if (replDataLocationChanged) {
-          assert(olddb != null);
-          assert(oldt != null);
           Path deleteOldDataLoc = new Path(oldt.getSd().getLocation());
           boolean isSkipTrash = MetaStoreUtils.isSkipTrash(oldt.getParameters());
           try {
             wh.deleteDir(deleteOldDataLoc, true, isSkipTrash,
                     ReplChangeManager.shouldEnableCm(olddb, oldt));
             LOG.info("Deleted the old data location: {} for the table: {}",
-                    deleteOldDataLoc, dbname + "." + name);
+                    deleteOldDataLoc, databaseName + "." + tableName);
           } catch (MetaException ex) {
             // Eat the exception as it doesn't affect the state of existing tables.
             // Expect, user to manually drop this path when exception and so logging a warning.
             LOG.warn("Unable to delete the old data location: {} for the table: {}",
-                    deleteOldDataLoc, dbname + "." + name);
+                    deleteOldDataLoc, databaseName + "." + tableName);
           }
         }
       } else {
-        LOG.error("Failed to alter table " + TableName.getQualified(catName, dbname, name));
+        LOG.error("Failed to alter table " + TableName.getQualified(catalogName, databaseName, tableName));
         msdb.rollbackTransaction();
         if (!replDataLocationChanged && dataWasMoved) {
           try {
@@ -580,7 +574,7 @@
     }
 
     //alter partition
-    if (part_vals == null || part_vals.size() == 0) {
+    if (CollectionUtils.isEmpty(part_vals)) {
       try {
         msdb.openTransaction();
 
@@ -692,66 +686,64 @@
               + " Check metastore logs for detailed stack." + e.getMessage());
         }
 
-        if (destPath != null) {
-          newPartLoc = destPath.toString();
-          oldPartLoc = oldPart.getSd().getLocation();
-          LOG.info("srcPath:" + oldPartLoc);
-          LOG.info("descPath:" + newPartLoc);
-          srcPath = new Path(oldPartLoc);
-          srcFs = wh.getFs(srcPath);
-          destFs = wh.getFs(destPath);
-          // check that src and dest are on the same file system
-          if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
-            throw new InvalidOperationException("New table location " + destPath
-              + " is on a different file system than the old location "
-              + srcPath + ". This operation is not supported.");
-          }
+        newPartLoc = destPath.toString();
+        oldPartLoc = oldPart.getSd().getLocation();
+        LOG.info("srcPath:" + oldPartLoc);
+        LOG.info("descPath:" + newPartLoc);
+        srcPath = new Path(oldPartLoc);
+        srcFs = wh.getFs(srcPath);
+        destFs = wh.getFs(destPath);
+        // check that src and dest are on the same file system
+        if (!FileUtils.equalsFileSystem(srcFs, destFs)) {
+          throw new InvalidOperationException("New table location " + destPath
+            + " is on a different file system than the old location "
+            + srcPath + ". This operation is not supported.");
+        }
 
-          try {
-            if (srcFs.exists(srcPath)) {
-              if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) {
-                throw new InvalidOperationException("New location for this table "
-                  + tbl.getDbName() + "." + tbl.getTableName()
-                  + " already exists : " + destPath);
-              }
-              //if destPath's parent path doesn't exist, we should mkdir it
-              Path destParentPath = destPath.getParent();
-              if (!wh.mkdirs(destParentPath)) {
-                  throw new MetaException("Unable to create path " + destParentPath);
-              }
+        try {
+          if (srcFs.exists(srcPath)) {
+            if (newPartLoc.compareTo(oldPartLoc) != 0 && destFs.exists(destPath)) {
+              throw new InvalidOperationException("New location for this table "
+                + tbl.getDbName() + "." + tbl.getTableName()
+                + " already exists : " + destPath);
+            }
+            //if destPath's parent path doesn't exist, we should mkdir it
+            Path destParentPath = destPath.getParent();
+            if (!wh.mkdirs(destParentPath)) {
+                throw new MetaException("Unable to create path " + destParentPath);
+            }
 
-              boolean clonePart = Optional.ofNullable(environmentContext)
-                  .map(EnvironmentContext::getProperties)
-                  .map(prop -> prop.get(RENAME_PARTITION_MAKE_COPY))
-                  .map(Boolean::parseBoolean)
-                  .orElse(false);
-              long writeId = new_part.getWriteId();
+            boolean clonePart = Optional.ofNullable(environmentContext)
+                .map(EnvironmentContext::getProperties)
+                .map(prop -> prop.get(RENAME_PARTITION_MAKE_COPY))
+                .map(Boolean::parseBoolean)
+                .orElse(false);
+            long writeId = new_part.getWriteId();
 
-              if (writeId > 0 && clonePart) {
-                LOG.debug("Making a copy of the partition directory: {} under a new location: {}", srcPath, destPath);
+            if (writeId > 0 && clonePart) {
+              LOG.debug("Making a copy of the partition directory: {} under a new location: {}", srcPath, destPath);
 
-                if (!wh.copyDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl))) {
-                  LOG.error("Copy failed for source: " + srcPath + " to destination: " + destPath);
-                  throw new IOException("File copy failed.");
-                }
-                addTruncateBaseFile(srcPath, writeId, conf, DataFormat.DROPPED);
-              } else {
-                //rename the data directory
-                wh.renameDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl));
-              }
-              LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done.");
-              dataWasMoved = true;
-            }
-          } catch (IOException e) {
-            LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, e);
-            throw new InvalidOperationException("Unable to access src or dest location for partition "
-                + tbl.getDbName() + "." + tbl.getTableName() + " " + new_part.getValues());
-          } catch (MetaException me) {
-            LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, me);
-            throw me;
-          }
-          new_part.getSd().setLocation(newPartLoc);
-        }
+              if (!wh.copyDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl))) {
+                LOG.error("Copy failed for source: " + srcPath + " to destination: " + destPath);
+                throw new IOException("File copy failed.");
+              }
+              addTruncateBaseFile(srcPath, writeId, conf, DataFormat.DROPPED);
+            } else {
+              //rename the data directory
+              wh.renameDir(srcPath, destPath, ReplChangeManager.shouldEnableCm(db, tbl));
+            }
+            LOG.info("Partition directory rename from " + srcPath + " to " + destPath + " done.");
+            dataWasMoved = true;
+          }
+        } catch (IOException e) {
+          LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, e);
+          throw new InvalidOperationException("Unable to access src or dest location for partition "
+              + tbl.getDbName() + "." + tbl.getTableName() + " " + new_part.getValues());
+        } catch (MetaException me) {
+          LOG.error("Cannot rename partition directory from " + srcPath + " to " + destPath, me);
+          throw me;
+        }
+        new_part.getSd().setLocation(newPartLoc);
       } else {
         new_part.getSd().setLocation(oldPart.getSd().getLocation());
       }
@@ -798,10 +790,7 @@
             if (destFs.exists(destPath)) {
               wh.renameDir(destPath, srcPath, false);
             }
-          } catch (MetaException me) {
-            LOG.error("Failed to restore partition data from " + destPath + " to " + srcPath
-                +  " in alter partition failure. Manual restore is needed.");
-          } catch (IOException ioe) {
+          } catch (MetaException | IOException me) {
             LOG.error("Failed to restore partition data from " + destPath + " to " + srcPath
                 +  " in alter partition failure. Manual restore is needed.");
           }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deniskuzZ done

Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants