From 4c9e7e968bd8b09d5532fefc51be32223cee97c4 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 28 Nov 2023 22:30:53 +0530 Subject: [PATCH 1/2] HIVE-27903: Iceberg: Implement Expire Snapshot with default table properties. --- .../mr/hive/HiveIcebergStorageHandler.java | 12 +++++++- .../hive/TestHiveIcebergExpireSnapshots.java | 29 +++++++++++++++++++ .../hadoop/hive/ql/parse/AlterClauseParser.g | 4 +-- .../execute/AlterTableExecuteAnalyzer.java | 5 +++- 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index c729fcef4300..9cfb210eec3d 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -894,7 +894,9 @@ private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnap LOG.info("Executing expire snapshots on iceberg table {} with {} threads", icebergTable.name(), numThreads); deleteExecutorService = getDeleteExecutorService(icebergTable.name(), numThreads); } - if (expireSnapshotsSpec.isExpireByTimestampRange()) { + if (expireSnapshotsSpec == null) { + expireSnapshotWithDefaultParams(icebergTable, deleteExecutorService); + } else if (expireSnapshotsSpec.isExpireByTimestampRange()) { expireSnapshotByTimestampRange(icebergTable, expireSnapshotsSpec.getFromTimestampMillis(), expireSnapshotsSpec.getTimestampMillis(), deleteExecutorService); } else if (expireSnapshotsSpec.isExpireByIds()) { @@ -911,6 +913,14 @@ private void expireSnapshot(Table icebergTable, AlterTableExecuteSpec.ExpireSnap } } + private void expireSnapshotWithDefaultParams(Table icebergTable, ExecutorService deleteExecutorService) { + ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots(); + if (deleteExecutorService != null) { + expireSnapshots.executeDeleteWith(deleteExecutorService); + } + expireSnapshots.commit(); + } + private void expireSnapshotRetainLast(Table icebergTable, int numRetainLast, ExecutorService deleteExecutorService) { ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots(); expireSnapshots.retainLast(numRetainLast); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java index 4a3b951bde40..16b0dfa0d6fd 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java @@ -36,6 +36,7 @@ import org.junit.Test; import static org.apache.iceberg.TableProperties.MAX_SNAPSHOT_AGE_MS; +import static org.apache.iceberg.TableProperties.MIN_SNAPSHOTS_TO_KEEP; /** * Tests covering the rollback feature @@ -117,6 +118,34 @@ public void testExpireSnapshotsWithRetainLast() throws IOException, InterruptedE Assert.assertEquals(5, IterableUtils.size(table.snapshots())); } + @Test + public void testExpireSnapshotsWithDefaultParams() throws IOException, InterruptedException { + TableIdentifier identifier = TableIdentifier.of("default", "source"); + Table table = testTables.createTableWithVersions(shell, identifier.name(), + HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat, + HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 10); + // No snapshot should expire, since the max snapshot age to expire is by default 5 days + shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS RETAIN LAST 5"); + table.refresh(); + Assert.assertEquals(10, IterableUtils.size(table.snapshots())); + + // Change max snapshot age to expire to 1 ms & min snapshots to keep as 4 & re-execute + shell.executeStatement( + "ALTER TABLE " + identifier.name() + " SET TBLPROPERTIES('" + MAX_SNAPSHOT_AGE_MS + "'='1'" + ",'" + + MIN_SNAPSHOTS_TO_KEEP + "'='3')"); + shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS"); + table.refresh(); + Assert.assertEquals(3, IterableUtils.size(table.snapshots())); + + // Change the min snapshot to keep as 2 + shell.executeStatement( + "ALTER TABLE " + identifier.name() + " SET TBLPROPERTIES('" + MIN_SNAPSHOTS_TO_KEEP + "'='2')"); + shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE EXPIRE_SNAPSHOTS"); + table.refresh(); + Assert.assertEquals(2, IterableUtils.size(table.snapshots())); + + } + @Test public void testDeleteOrphanFiles() throws IOException, InterruptedException { TableIdentifier identifier = TableIdentifier.of("default", "source"); diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g index 3e6105957c06..8e8ec4e33f99 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g @@ -475,8 +475,8 @@ alterStatementSuffixExecute @after { gParent.popMsg(state); } : KW_EXECUTE KW_ROLLBACK LPAREN (rollbackParam=(StringLiteral | Number)) RPAREN -> ^(TOK_ALTERTABLE_EXECUTE KW_ROLLBACK $rollbackParam) - | KW_EXECUTE KW_EXPIRE_SNAPSHOTS LPAREN (expireParam=StringLiteral) RPAREN - -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $expireParam) + | KW_EXECUTE KW_EXPIRE_SNAPSHOTS (LPAREN (expireParam=StringLiteral) RPAREN)? + -> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $expireParam?) | KW_EXECUTE KW_SET_CURRENT_SNAPSHOT LPAREN (snapshotParam=expression) RPAREN -> ^(TOK_ALTERTABLE_EXECUTE KW_SET_CURRENT_SNAPSHOT $snapshotParam) | KW_EXECUTE KW_FAST_FORWARD sourceBranch=StringLiteral (targetBranch=StringLiteral)? diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java index 79448df3b2a5..275a0e1a4c56 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java @@ -141,7 +141,10 @@ private static AlterTableExecuteDesc getSetCurrentSnapshotDesc(TableName tableNa private static AlterTableExecuteDesc getExpireSnapshotDesc(TableName tableName, Map partitionSpec, List children) throws SemanticException { AlterTableExecuteSpec spec; - + if (children.size() == 1) { + spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, null); + return new AlterTableExecuteDesc(tableName, partitionSpec, spec); + } ZoneId timeZone = SessionState.get() == null ? new HiveConf().getLocalTimeZone() : SessionState.get().getConf().getLocalTimeZone(); From 34b1cb4c128647070018a527ea3f2ba2984d45b0 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 29 Nov 2023 10:10:00 +0530 Subject: [PATCH 2/2] Address review comments. --- .../apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java index 16b0dfa0d6fd..9f036a5615a5 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java @@ -129,7 +129,7 @@ public void testExpireSnapshotsWithDefaultParams() throws IOException, Interrupt table.refresh(); Assert.assertEquals(10, IterableUtils.size(table.snapshots())); - // Change max snapshot age to expire to 1 ms & min snapshots to keep as 4 & re-execute + // Change max snapshot age to expire to 1 ms & min snapshots to keep as 3 & re-execute shell.executeStatement( "ALTER TABLE " + identifier.name() + " SET TBLPROPERTIES('" + MAX_SNAPSHOT_AGE_MS + "'='1'" + ",'" + MIN_SNAPSHOTS_TO_KEEP + "'='3')");