Skip to content
Permalink
Browse files
HIVE-26228: Implement Iceberg table rollback feature (#3287) (Laszlo …
…Pinter, reviewed by Adam Szita and Peter Vary)
  • Loading branch information
lcspinter committed Jun 3, 2022
1 parent 79f2a22 commit d237a307286ee0431a9a9ad148896041b5a13ebd
Show file tree
Hide file tree
Showing 24 changed files with 595 additions and 58 deletions.
@@ -26,4 +26,4 @@ key int It is a column key
value string It is the column string value

#### A masked pattern was here ####
FAILED: SemanticException [Error 10134]: ALTER TABLE can only be used for [ADDPROPS, DROPPROPS, ADDCOLS] to a non-native table hbase_table_1
FAILED: SemanticException [Error 10134]: ALTER TABLE can only be used for [ADDPROPS, DROPPROPS, ADDCOLS, EXECUTE] to a non-native table hbase_table_1
@@ -108,7 +108,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
static final EnumSet<AlterTableType> SUPPORTED_ALTER_OPS = EnumSet.of(
AlterTableType.ADDCOLS, AlterTableType.REPLACE_COLUMNS, AlterTableType.RENAME_COLUMN,
AlterTableType.ADDPROPS, AlterTableType.DROPPROPS, AlterTableType.SETPARTITIONSPEC,
AlterTableType.UPDATE_COLUMNS);
AlterTableType.UPDATE_COLUMNS, AlterTableType.SETPARTITIONSPEC, AlterTableType.EXECUTE);
private static final List<String> MIGRATION_ALLOWED_SOURCE_FORMATS = ImmutableList.of(
FileFormat.PARQUET.name().toLowerCase(),
FileFormat.ORC.name().toLowerCase(),
@@ -53,6 +53,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
@@ -451,6 +452,27 @@ public boolean isMetadataTableSupported() {
return true;
}

@Override
public void executeOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable, AlterTableExecuteSpec executeSpec) {
switch (executeSpec.getOperationType()) {
case ROLLBACK:
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table icebergTable = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
LOG.info("Executing rollback operation on iceberg table. If you would like to revert rollback you could " +
"try altering the metadata location to the current metadata location by executing the following query:" +
"ALTER TABLE {}.{} SET TBLPROPERTIES('metadata_location'='{}'). This operation is supported for Hive " +
"Catalog tables.", hmsTable.getDbName(), hmsTable.getTableName(),
((BaseTable) icebergTable).operations().current().metadataFileLocation());
AlterTableExecuteSpec.RollbackSpec rollbackSpec =
(AlterTableExecuteSpec.RollbackSpec) executeSpec.getOperationParams();
IcebergTableUtil.rollback(icebergTable, rollbackSpec.getRollbackType(), rollbackSpec.getParam());
break;
default:
throw new UnsupportedOperationException(
String.format("Operation type %s is not supported", executeSpec.getOperationType().name()));
}
}

@Override
public boolean isValidMetadataTable(String metaTableName) {
return IcebergMetadataTables.isValidMetaTable(metaTableName);
@@ -24,8 +24,10 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -184,4 +186,22 @@ public static void updateSpec(Configuration configuration, Table table) {
public static boolean isBucketed(Table table) {
return table.spec().fields().stream().anyMatch(f -> f.transform().toString().startsWith("bucket["));
}

/**
* Roll an iceberg table's data back to a specific snapshot identified either by id or before a given timestamp.
* @param table the iceberg table
* @param type the type of the rollback, can be either time based or version based
* @param value parameter of the rollback, that can be a timestamp in millis or a snapshot id
*/
public static void rollback(Table table, AlterTableExecuteSpec.RollbackSpec.RollbackType type, Long value) {
ManageSnapshots manageSnapshots = table.manageSnapshots();
if (type == AlterTableExecuteSpec.RollbackSpec.RollbackType.TIME) {
LOG.debug("Trying to rollback iceberg table to snapshot before timestamp {}", value);
manageSnapshots.rollbackToTime(value);
} else {
LOG.debug("Trying to rollback iceberg table to snapshot ID {}", value);
manageSnapshots.rollbackTo(value);
}
manageSnapshots.commit();
}
}
@@ -27,6 +27,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
@@ -35,6 +36,7 @@
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -59,6 +61,7 @@
import org.apache.hadoop.mapred.JobID;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -379,4 +382,23 @@ public static DeleteFile createPositionalDeleteFile(Table table, String deleteFi
return posWriter.toDeleteFile();
}

/**
* Get the timestamp string which we can use in the queries. The timestamp will be after the given snapshot
* and before the next one
* @param table The table which we want to query
* @param snapshotPosition The position of the last snapshot we want to see in the query results
* @return The timestamp which we can use in the queries
*/
public static String timestampAfterSnapshot(Table table, int snapshotPosition) {
List<HistoryEntry> history = table.history();
long snapshotTime = history.get(snapshotPosition).timestampMillis();
long time = snapshotTime + 100;
if (history.size() > snapshotPosition + 1) {
time = snapshotTime + ((history.get(snapshotPosition + 1).timestampMillis() - snapshotTime) / 2);
}

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS000000");
return simpleDateFormat.format(new Date(time));
}

}
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;


import java.io.IOException;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;


/**
* Tests covering the rollback feature
*/
public class TestHiveIcebergRollback extends HiveIcebergStorageHandlerWithEngineBase {

@Test
public void testRollbackToTimestamp() throws IOException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "source");
Table table = testTables.createTableWithVersions(shell, identifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 3);
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE ROLLBACK('" +
HiveIcebergTestUtils.timestampAfterSnapshot(table, 2) + "')");
Assert.assertEquals(5, shell.executeStatement("SELECT * FROM " + identifier.name()).size());
Assert.assertEquals(3, table.history().size());
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE ROLLBACK('" +
HiveIcebergTestUtils.timestampAfterSnapshot(table, 1) + "')");
Assert.assertEquals(4, shell.executeStatement("SELECT * FROM " + identifier.name()).size());
table.refresh();
Assert.assertEquals(4, table.history().size());
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE ROLLBACK('" +
HiveIcebergTestUtils.timestampAfterSnapshot(table, 0) + "')");
Assert.assertEquals(3, shell.executeStatement("SELECT * FROM " + identifier.name()).size());
table.refresh();
Assert.assertEquals(5, table.history().size());
}

@Test
public void testRollbackToVersion() throws IOException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "source");
Table table = testTables.createTableWithVersions(shell, identifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 3);
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE ROLLBACK(" +
table.history().get(2).snapshotId() + ")");
Assert.assertEquals(5, shell.executeStatement("SELECT * FROM " + identifier.name()).size());
table.refresh();
Assert.assertEquals(3, table.history().size());
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE ROLLBACK(" +
table.history().get(1).snapshotId() + ")");
Assert.assertEquals(4, shell.executeStatement("SELECT * FROM " + identifier.name()).size());
table.refresh();
Assert.assertEquals(4, table.history().size());
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE ROLLBACK(" +
table.history().get(0).snapshotId() + ")");
Assert.assertEquals(3, shell.executeStatement("SELECT * FROM " + identifier.name()).size());
table.refresh();
Assert.assertEquals(5, table.history().size());
}

@Test
public void testRevertRollback() throws IOException, InterruptedException {
Assume.assumeTrue("Rollback revert is only supported for tables from Hive Catalog",
testTableType.equals(TestTables.TestTableType.HIVE_CATALOG));
TableIdentifier identifier = TableIdentifier.of("default", "source");
Table table = testTables.createTableWithVersions(shell, identifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
String metadataLocationBeforeRollback = ((BaseTable) table).operations().current().metadataFileLocation();
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE ROLLBACK(" +
table.history().get(0).snapshotId() + ")");
Assert.assertEquals(3, shell.executeStatement("SELECT * FROM " + identifier.name()).size());
table.refresh();
Assert.assertEquals(3, table.history().size());
shell.executeStatement("ALTER TABLE " + identifier.name() + " SET TBLPROPERTIES('metadata_location'='" +
metadataLocationBeforeRollback + "')");
Assert.assertEquals(4, shell.executeStatement("SELECT * FROM " + identifier.name()).size());
table.refresh();
Assert.assertEquals(2, table.history().size());
}

@Test
public void testInvalidRollbackToTimestamp() throws IOException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "source");
testTables.createTableWithVersions(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
"Cannot roll back, no valid snapshot older than", () -> {
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE ROLLBACK('1970-01-01 00:00:00')");
});
}

@Test
public void testInvalidRollbackToVersion() throws IOException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "source");
Table table = testTables.createTableWithVersions(shell, identifier.name(),
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
"Cannot roll back to unknown snapshot id", () -> {
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE ROLLBACK(1111)");
});
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE ROLLBACK(" +
table.history().get(0).snapshotId() + ")");
AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
"Cannot roll back to snapshot, not an ancestor of the current state", () -> {
shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE ROLLBACK(" +
table.history().get(1).snapshotId() + ")");
});
}
}
@@ -20,23 +20,25 @@
package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.Table;
import org.junit.Assert;
import org.junit.Test;

import static org.apache.iceberg.mr.hive.HiveIcebergTestUtils.timestampAfterSnapshot;

/**
* Tests covering the time travel feature, aka reading from a table as of a certain snapshot.
*/
public class TestHiveIcebergTimeTravel extends HiveIcebergStorageHandlerWithEngineBase {

@Test
public void testSelectAsOfTimestamp() throws IOException, InterruptedException {
Table table = prepareTableWithVersions(2);
Table table = testTables.createTableWithVersions(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

List<Object[]> rows = shell.executeStatement(
"SELECT * FROM customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) + "'");
@@ -56,7 +58,9 @@ public void testSelectAsOfTimestamp() throws IOException, InterruptedException {

@Test
public void testSelectAsOfVersion() throws IOException, InterruptedException {
Table table = prepareTableWithVersions(2);
Table table = testTables.createTableWithVersions(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

HistoryEntry first = table.history().get(0);
List<Object[]> rows =
@@ -77,7 +81,9 @@ public void testSelectAsOfVersion() throws IOException, InterruptedException {

@Test
public void testCTASAsOfVersionAndTimestamp() throws IOException, InterruptedException {
Table table = prepareTableWithVersions(3);
Table table = testTables.createTableWithVersions(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 3);

shell.executeStatement("CREATE TABLE customers2 AS SELECT * FROM customers FOR SYSTEM_VERSION AS OF " +
table.history().get(0).snapshotId());
@@ -106,7 +112,9 @@ public void testCTASAsOfVersionAndTimestamp() throws IOException, InterruptedExc

@Test
public void testAsOfWithJoins() throws IOException, InterruptedException {
Table table = prepareTableWithVersions(4);
Table table = testTables.createTableWithVersions(shell, "customers",
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 4);

List<Object[]> rows = shell.executeStatement("SELECT * FROM " +
"customers FOR SYSTEM_TIME AS OF '" + timestampAfterSnapshot(table, 0) + "' fv, " +
@@ -136,47 +144,4 @@ public void testAsOfWithJoins() throws IOException, InterruptedException {

Assert.assertEquals(8, rows.size());
}

/**
* Creates the 'customers' table with the default records and creates extra snapshots by inserting one more line
* into the table.
* @param versions The number of history elements we want to create
* @return The table created
* @throws IOException When there is a problem during table creation
* @throws InterruptedException When there is a problem during adding new data to the table
*/
private Table prepareTableWithVersions(int versions) throws IOException, InterruptedException {
Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);

for (int i = 0; i < versions - 1; ++i) {
// Just wait a little so we definitely will not have the same timestamp for the snapshots
Thread.sleep(100);
shell.executeStatement("INSERT INTO customers values(" +
(i + HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.size()) + ",'Alice','Green_" + i + "')");
}

table.refresh();

return table;
}

/**
* Get the timestamp string which we can use in the queries. The timestamp will be after the given snapshot
* and before the next one
* @param table The table which we want to query
* @param snapshotPosition The position of the last snapshot we want to see in the query results
* @return The timestamp which we can use in the queries
*/
private String timestampAfterSnapshot(Table table, int snapshotPosition) {
List<HistoryEntry> history = table.history();
long snapshotTime = history.get(snapshotPosition).timestampMillis();
long time = snapshotTime + 100;
if (history.size() > snapshotPosition + 1) {
time = snapshotTime + ((history.get(snapshotPosition + 1).timestampMillis() - snapshotTime) / 2);
}

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS000000");
return simpleDateFormat.format(new Date(time));
}
}

0 comments on commit d237a30

Please sign in to comment.