Skip to content
Permalink
Browse files
HIVE-25057: Implement rollback for hive to iceberg migration (#2219) …
…(Laszlo Pinter, reviewed by Marton Bod and Peter Vary)
  • Loading branch information
lcspinter committed May 29, 2021
1 parent 8864082 commit ec7c95db5d38cbb0c21f27c830d2773ea6fde776
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 12 deletions.
@@ -278,7 +278,7 @@ private void commitAndThrowException(HiveTableOperations realOperations, HiveTab
// Simulate a communication error after a successful commit
doAnswer(i -> {
org.apache.hadoop.hive.metastore.api.Table tbl =
i.getArgumentAt(0, org.apache.hadoop.hive.metastore.api.Table.class);
i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class);
realOperations.persistTable(tbl, true);
throw new TException("Datacenter on fire");
}).when(spyOperations).persistTable(any(), anyBoolean());
@@ -290,7 +290,7 @@ private void concurrentCommitAndThrowException(HiveTableOperations realOperation
// Simulate a communication error after a successful commit
doAnswer(i -> {
org.apache.hadoop.hive.metastore.api.Table tbl =
i.getArgumentAt(0, org.apache.hadoop.hive.metastore.api.Table.class);
i.getArgument(0, org.apache.hadoop.hive.metastore.api.Table.class);
realOperations.persistTable(tbl, true);
// Simulate lock expiration or removal
realOperations.doUnlock(lockId.get());
@@ -91,6 +91,11 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
@@ -26,6 +26,8 @@
import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
@@ -45,6 +47,7 @@
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
@@ -163,11 +166,16 @@ public void commitCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTabl

@Override
public void preDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
// do nothing
}

@Override
public void preDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, boolean deleteData) {
this.catalogProperties = getCatalogProperties(hmsTable);
this.deleteIcebergTable = hmsTable.getParameters() != null &&
"TRUE".equalsIgnoreCase(hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE));

if (deleteIcebergTable && Catalogs.hiveCatalog(conf, catalogProperties)) {
if (deleteIcebergTable && Catalogs.hiveCatalog(conf, catalogProperties) && deleteData) {
// Store the metadata and the id for deleting the actual table data
String metadataLocation = hmsTable.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
this.deleteIo = IcebergTableUtil.getTable(conf, catalogProperties).io();
@@ -210,6 +218,7 @@ public void preAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, E
try {
icebergTable = IcebergTableUtil.getTable(conf, catalogProperties);
} catch (NoSuchTableException nte) {
context.getProperties().put(MIGRATE_HIVE_TO_ICEBERG, "true");
// If the iceberg table does not exist, and the hms table is external and not temporary and not acid
// we will create it in commitAlterTable
StorageDescriptor sd = hmsTable.getSd();
@@ -273,6 +282,34 @@ public void commitAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable
}
}

@Override
public void rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context)
throws MetaException {
if (Boolean.valueOf(context.getProperties().getOrDefault(MIGRATE_HIVE_TO_ICEBERG, "false"))) {
LOG.debug("Initiating rollback for table {} at location {}",
hmsTable.getTableName(), hmsTable.getSd().getLocation());
context.getProperties().put(INITIALIZE_ROLLBACK_MIGRATION, "true");
this.catalogProperties = getCatalogProperties(hmsTable);
try {
this.icebergTable = Catalogs.loadTable(conf, catalogProperties);
} catch (NoSuchTableException nte) {
// iceberg table was not yet created, no need to delete the metadata dir separately
return;
}

// we want to keep the data files but get rid of the metadata directory
String metadataLocation = ((BaseTable) this.icebergTable).operations().current().metadataFileLocation();
try {
Path path = new Path(metadataLocation).getParent();
FileSystem.get(path.toUri(), conf).delete(path, true);
LOG.debug("Metadata directory of iceberg table {} at location {} was deleted",
icebergTable.name(), path);
} catch (IOException e) {
// the file doesn't exists, do nothing
}
}
}

private void setFileFormat() {
String format = preAlterTableProperties.format.toLowerCase();
if (format.contains("orc")) {
@@ -26,13 +26,20 @@
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionKey;
@@ -66,6 +73,9 @@
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
@@ -201,6 +211,95 @@ public void testScanTable() throws IOException {
Assert.assertArrayEquals(new Object[] {"Alice", 0L}, descRows.get(2));
}

@Test
public void testMigrateHiveTableToIceberg() throws TException, InterruptedException {
Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == FileFormat.PARQUET);
String tableName = "tbl";
String createQuery = "CREATE EXTERNAL TABLE " + tableName + " (a int) STORED AS " + fileFormat.name() + " " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName));
shell.executeStatement(createQuery);
shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2), (3)");
validateMigration(tableName);
}

@Test
public void testMigratePartitionedHiveTableToIceberg() throws TException, InterruptedException {
Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == FileFormat.PARQUET);
String tableName = "tbl_part";
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) PARTITIONED BY (b string) STORED AS " +
fileFormat.name() + " " + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)));
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa') VALUES (1), (2), (3)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb') VALUES (4), (5)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc') VALUES (6)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ddd') VALUES (7), (8), (9), (10)");
validateMigration(tableName);
}

@Test
public void testMigratePartitionedBucketedHiveTableToIceberg() throws TException, InterruptedException {
Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == FileFormat.PARQUET);
String tableName = "tbl_part_bucketed";
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) PARTITIONED BY (b string) clustered by " +
"(a) INTO 2 BUCKETS STORED AS " + fileFormat.name() + " " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)));
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa') VALUES (1), (2), (3)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb') VALUES (4), (5)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc') VALUES (6)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ddd') VALUES (7), (8), (9), (10)");
validateMigration(tableName);
}

@Test
public void testRollbackMigrateHiveTableToIceberg() throws TException, InterruptedException {
Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == FileFormat.PARQUET);
String tableName = "tbl_rollback";
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) STORED AS " + fileFormat.name() + " " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)));
shell.executeStatement("INSERT INTO " + tableName + " VALUES (1), (2), (3)");
validateMigrationRollback(tableName);
}

@Test
public void testRollbackMigratePartitionedHiveTableToIceberg() throws TException, InterruptedException {
Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == FileFormat.PARQUET);
String tableName = "tbl_rollback";
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) PARTITIONED BY (b string) STORED AS " +
fileFormat.name() + " " + testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)));
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa') VALUES (1), (2), (3)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb') VALUES (4), (5)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc') VALUES (6)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ddd') VALUES (7), (8), (9), (10)");
validateMigrationRollback(tableName);
}

@Test
public void testRollbackMultiPartitionedHiveTableToIceberg() throws TException, InterruptedException {
Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == FileFormat.PARQUET);
String tableName = "tbl_rollback";
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) PARTITIONED BY (b string, c int) " +
"STORED AS " + fileFormat.name() + " " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)));
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa', c='111') VALUES (1), (2), (3)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb', c='111') VALUES (4), (5)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa', c='222') VALUES (6)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc', c='333') VALUES (7), (8), (9), (10)");
validateMigrationRollback(tableName);
}

@Test
public void testRollbackMigratePartitionedBucketedHiveTableToIceberg() throws TException, InterruptedException {
Assume.assumeTrue(fileFormat == FileFormat.AVRO || fileFormat == FileFormat.PARQUET);
String tableName = "tbl_part_bucketed";
shell.executeStatement("CREATE EXTERNAL TABLE " + tableName + " (a int) PARTITIONED BY (b string) clustered by " +
"(a) INTO 2 BUCKETS STORED AS " + fileFormat.name() + " " +
testTables.locationForCreateTableSQL(TableIdentifier.of("default", tableName)));
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='aaa') VALUES (1), (2), (3)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='bbb') VALUES (4), (5)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ccc') VALUES (6)");
shell.executeStatement("INSERT INTO " + tableName + " PARTITION (b='ddd') VALUES (7), (8), (9), (10)");
validateMigrationRollback(tableName);
}

@Test
public void testAnalyzeTableComputeStatistics() throws IOException, TException, InterruptedException {
String dbName = "default";
@@ -1230,4 +1329,49 @@ private void validateBasicStats(Table icebergTable, String dbName, String tableN
Assert.assertEquals(summary.get(entry.getValue()), hmsParams.get(entry.getKey()));
}
}

private void validateMigration(String tableName) throws TException, InterruptedException {
List<Object[]> originalResult = shell.executeStatement("SELECT * FROM " + tableName + " ORDER BY a");
shell.executeStatement("ALTER TABLE " + tableName + " SET TBLPROPERTIES " +
"('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler')");
List<Object[]> alterResult = shell.executeStatement("SELECT * FROM " + tableName + " ORDER BY a");
Assert.assertEquals(originalResult.size(), alterResult.size());
for (int i = 0; i < originalResult.size(); i++) {
Assert.assertTrue(Arrays.equals(originalResult.get(i), alterResult.get(i)));
}
validateSd(tableName, "iceberg");
}

private void validateMigrationRollback(String tableName) throws TException, InterruptedException {
List<Object[]> originalResult = shell.executeStatement("SELECT * FROM " + tableName + " ORDER BY a");
try (MockedStatic<HiveTableUtil> mockedTableUtil = Mockito.mockStatic(HiveTableUtil.class)) {
mockedTableUtil.when(() -> HiveTableUtil.importFiles(ArgumentMatchers.anyString(), ArgumentMatchers.anyString(),
ArgumentMatchers.any(PartitionSpecProxy.class), ArgumentMatchers.anyList(),
ArgumentMatchers.any(Properties.class), ArgumentMatchers.any(Configuration.class)))
.thenThrow(new MetaException());
try {
shell.executeStatement("ALTER TABLE " + tableName + " SET TBLPROPERTIES " +
"('storage_handler'='org.apache.iceberg.mr.hive.HiveIcebergStorageHandler')");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains("Error occurred during hive table migration to iceberg."));
validateSd(tableName, fileFormat.name());
shell.executeStatement("MSCK REPAIR TABLE " + tableName);
List<Object[]> alterResult = shell.executeStatement("SELECT * FROM " + tableName + " ORDER BY a");
Assert.assertEquals(originalResult.size(), alterResult.size());
for (int i = 0; i < originalResult.size(); i++) {
Assert.assertTrue(Arrays.equals(originalResult.get(i), alterResult.get(i)));
}
return;
}
Assert.fail("Alter table operations should have thrown an exception.");
}
}

private void validateSd(String tableName, String format) throws TException, InterruptedException {
org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", tableName);
StorageDescriptor sd = hmsTable.getSd();
Assert.assertTrue(sd.getSerdeInfo().getSerializationLib().toLowerCase().contains(format.toLowerCase()));
Assert.assertTrue(sd.getInputFormat().toLowerCase().contains(format.toLowerCase()));
Assert.assertTrue(sd.getOutputFormat().toLowerCase(Locale.ROOT).contains(format.toLowerCase()));
}
}
@@ -31,7 +31,7 @@
<path.to.iceberg.root>.</path.to.iceberg.root>
<iceberg.version>0.11.0</iceberg.version>
<kryo-shaded.version>4.0.2</kryo-shaded.version>
<iceberg.mockito-core.version>1.10.19</iceberg.mockito-core.version>
<iceberg.mockito-core.version>3.4.4</iceberg.mockito-core.version>
<iceberg.avro.version>1.9.2</iceberg.avro.version>
<iceberg.kryo.version>4.0.2</iceberg.kryo.version>
<iceberg.checkstyle.plugin.version>3.1.2</iceberg.checkstyle.plugin.version>
@@ -198,7 +198,7 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-inline</artifactId>
<version>${iceberg.mockito-core.version}</version>
</dependency>
</dependencies>
@@ -184,7 +184,7 @@
<postgres.version>42.2.14</postgres.version>
<opencsv.version>2.3</opencsv.version>
<orc.version>1.6.8</orc.version>
<mockito-core.version>3.3.3</mockito-core.version>
<mockito-core.version>3.4.4</mockito-core.version>
<powermock.version>2.0.2</powermock.version>
<mina.version>2.0.0-M5</mina.version>
<netty.version>4.1.48.Final</netty.version>
@@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.TableName;
@@ -155,8 +154,33 @@ private void finalizeAlterTableWithWriteIdOp(Table table, Table oldTable, List<P
}
if (partitions == null) {
long writeId = desc.getWriteId() != null ? desc.getWriteId() : 0;
context.getDb().alterTable(desc.getDbTableName(), table, desc.isCascade(), environmentContext, true,
writeId);
try {
context.getDb().alterTable(desc.getDbTableName(), table, desc.isCascade(), environmentContext, true, writeId);
} catch (HiveException ex) {
if (Boolean.valueOf(environmentContext.getProperties()
.getOrDefault(HiveMetaHook.INITIALIZE_ROLLBACK_MIGRATION, "false"))) {
// in case of rollback of alter table do the following:
// 1. restore serde info and input/output format
// 2. remove table columns which are used to be partition columns
// 3. add partition columns
table.getSd().setInputFormat(oldTable.getSd().getInputFormat());
table.getSd().setOutputFormat(oldTable.getSd().getOutputFormat());
table.getSd().setSerdeInfo(oldTable.getSd().getSerdeInfo());
table.getSd().getCols().removeAll(oldTable.getPartitionKeys());
table.setPartCols(oldTable.getPartitionKeys());

table.getParameters().clear();
table.getParameters().putAll(oldTable.getParameters());
context.getDb().alterTable(desc.getDbTableName(), table, desc.isCascade(), environmentContext, true, writeId);
throw new HiveException("Error occurred during hive table migration to iceberg. Table properties "
+ "and serde info was reverted to its original value. Partition info was lost during the migration "
+ "process, but it can be reverted by running MSCK REPAIR on table/partition level.\n"
+ "Retrying the migration without issuing MSCK REPAIR on a partitioned table will result in an empty "
+ "iceberg table.");
} else {
throw ex;
}
}
} else {
// Note: this is necessary for UPDATE_STATISTICS command, that operates via ADDPROPS (why?).
// For any other updates, we don't want to do txn check on partitions when altering table.

0 comments on commit ec7c95d

Please sign in to comment.